From 561e21753e3677769d5796039ef3762518551f1f Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 14 Oct 2024 14:25:03 +1300 Subject: [PATCH 1/5] Add C++ Flight integration test for do_exchange --- .../integration_tests/test_integration.cc | 129 ++++++++++++++++++ dev/archery/archery/integration/runner.py | 6 + 2 files changed, 135 insertions(+) diff --git a/cpp/src/arrow/flight/integration_tests/test_integration.cc b/cpp/src/arrow/flight/integration_tests/test_integration.cc index da6fcf81eb737..f38076822c778 100644 --- a/cpp/src/arrow/flight/integration_tests/test_integration.cc +++ b/cpp/src/arrow/flight/integration_tests/test_integration.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -1026,6 +1027,131 @@ class AppMetadataFlightInfoEndpointScenario : public Scenario { } }; +/// \brief The server used for testing do_exchange +class DoExchangeServer : public FlightServerBase { + public: + DoExchangeServer() : FlightServerBase() {} + + Status DoExchange(const ServerCallContext& context, + std::unique_ptr reader, + std::unique_ptr writer) override { + if (reader->descriptor().type != FlightDescriptor::DescriptorType::CMD) { + return Status::Invalid("Must provide a command descriptor"); + } + + const std::string& cmd = reader->descriptor().cmd; + if (cmd == "echo") { + return RunEchoExchange(reader, writer); + } else { + return Status::NotImplemented("Command not implemented: ", cmd); + } + } + + private: + static Status RunEchoExchange(std::unique_ptr& reader, + std::unique_ptr& writer) { + FlightStreamChunk chunk; + bool begun = false; + while (true) { + ARROW_ASSIGN_OR_RAISE(chunk, reader->Next()); + if (!chunk.data && !chunk.app_metadata) { + break; + } + if (!begun && chunk.data) { + begun = true; + RETURN_NOT_OK(writer->Begin(chunk.data->schema())); + } + if (chunk.data && chunk.app_metadata) { + RETURN_NOT_OK(writer->WriteWithMetadata(*chunk.data, chunk.app_metadata)); + } else if (chunk.data) { + RETURN_NOT_OK(writer->WriteRecordBatch(*chunk.data)); + } else if (chunk.app_metadata) { + RETURN_NOT_OK(writer->WriteMetadata(chunk.app_metadata)); + } + } + return Status::OK(); + } +}; + +/// \brief The DoExchangeEcho scenario. +/// +/// This tests that the client and server can perform a two-way data exchange. +/// +/// The server should echo back any data sent by the client. +class DoExchangeEchoScenario : public Scenario { + Status MakeServer(std::unique_ptr* server, + FlightServerOptions* options) override { + *server = std::make_unique(); + return Status::OK(); + } + + Status MakeClient(FlightClientOptions* options) override { return Status::OK(); } + + Status RunClient(std::unique_ptr client) override { + auto descriptor = FlightDescriptor::Command("echo"); + FlightCallOptions call_options; + + ARROW_ASSIGN_OR_RAISE(auto do_exchange_result, + client->DoExchange(call_options, descriptor)); + std::unique_ptr writer = std::move(do_exchange_result.writer); + std::unique_ptr reader = std::move(do_exchange_result.reader); + + auto schema = arrow::schema({field("x", int32(), false)}); + ARROW_RETURN_NOT_OK(writer->Begin(schema)); + + ARROW_ASSIGN_OR_RAISE(auto builder, + RecordBatchBuilder::Make(schema, arrow::default_memory_pool())); + + for (int batch_idx = 0; batch_idx < 4; ++batch_idx) { + auto int_builder = builder->GetFieldAs(0); + std::vector batch_data(10); + std::iota(batch_data.begin(), batch_data.end(), batch_idx); + ARROW_RETURN_NOT_OK(int_builder->AppendValues(batch_data)); + ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush()); + + std::string app_metadata = std::to_string(batch_idx); + bool write_metadata = batch_idx % 2 == 0; + + if (write_metadata) { + ARROW_RETURN_NOT_OK( + writer->WriteWithMetadata(*record_batch, Buffer::FromString(app_metadata))); + } else { + ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch)); + } + + ARROW_ASSIGN_OR_RAISE(auto read_result, reader->Next()); + if (read_result.data == nullptr) { + return Status::Invalid("Received null data"); + } + if (!read_result.data->Equals(*record_batch)) { + return Status::Invalid("Read data doesn't match expected data for batch ", + std::to_string(batch_idx), ".\n", "Expected:\n", + record_batch->ToString(), "Actual:\n", + read_result.data->ToString()); + } + + if (write_metadata) { + if (read_result.app_metadata == nullptr) { + return Status::Invalid("Received null app metadata"); + } + if (read_result.app_metadata->ToString() != app_metadata) { + return Status::Invalid("Read metadata doesn't match expected for batch ", + std::to_string(batch_idx), ".\n", "Expected:\n", + app_metadata, "\nActual:\n", + read_result.app_metadata->ToString()); + } + } else if (read_result.app_metadata != nullptr) { + return Status::Invalid("Expected no app metadata but received non-null metadata"); + } + } + + ARROW_RETURN_NOT_OK(writer->DoneWriting()); + ARROW_RETURN_NOT_OK(writer->Close()); + + return Status::OK(); + } +}; + /// \brief Schema to be returned for mocking the statement/prepared statement results. /// /// Must be the same across all languages. @@ -2283,6 +2409,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr* } else if (scenario_name == "app_metadata_flight_info_endpoint") { *out = std::make_shared(); return Status::OK(); + } else if (scenario_name == "do_exchange:echo") { + *out = std::make_shared(); + return Status::OK(); } else if (scenario_name == "flight_sql") { *out = std::make_shared(); return Status::OK(); diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index 378b17d75fdce..f8db3dfb63770 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -669,6 +669,12 @@ def append_tester(implementation, tester): "RenewFlightEndpoint are working as expected."), skip_testers={"JS", "C#", "Rust"}, ), + Scenario( + "do_exchange:echo", + description=("Test the do_exchange method by " + "echoing data back to the client."), + skip_testers={"C#", "Go", "Java", "JS", "Rust"}, + ), Scenario( "location:reuse_connection", description="Ensure arrow-flight-reuse-connection is accepted.", From aa6ec7d1f00f16a63c1c22b5bf59ddadaac44434 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Mon, 14 Oct 2024 15:59:56 +1300 Subject: [PATCH 2/5] Add C# do_exchange integration test --- .../FlightClientCommand.cs | 18 ++- .../FlightServerCommand.cs | 33 +++++- .../IScenario.cs | 35 ++++++ .../Scenarios/DoExchangeEchoScenario.cs | 107 ++++++++++++++++++ .../{ => Scenarios}/JsonTestScenario.cs | 23 ++-- .../Startup.cs | 46 ++++++++ .../Apache.Arrow.Flight.TestWeb/Startup.cs | 7 -- dev/archery/archery/integration/runner.py | 2 +- 8 files changed, 241 insertions(+), 30 deletions(-) create mode 100644 csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs create mode 100644 csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs rename csharp/test/Apache.Arrow.Flight.IntegrationTest/{ => Scenarios}/JsonTestScenario.cs (91%) create mode 100644 csharp/test/Apache.Arrow.Flight.IntegrationTest/Startup.cs diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs index d9e0ff5230611..a26bcf07eca49 100644 --- a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightClientCommand.cs @@ -16,6 +16,7 @@ using System; using System.IO; using System.Threading.Tasks; +using Apache.Arrow.Flight.IntegrationTest.Scenarios; namespace Apache.Arrow.Flight.IntegrationTest; @@ -34,18 +35,13 @@ public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo) public async Task Execute() { - if (!string.IsNullOrEmpty(_scenario)) + IScenario scenario = _scenario switch { - // No named scenarios are currently implemented - throw new Exception($"Scenario '{_scenario}' is not supported."); - } + null => new JsonTestScenario(_jsonFileInfo), + "do_exchange:echo" => new DoExchangeEchoScenario(), + _ => throw new NotSupportedException($"Scenario '{_scenario}' is not supported"), + }; - if (!(_jsonFileInfo?.Exists ?? false)) - { - throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'"); - } - - var scenario = new JsonTestScenario(_port, _jsonFileInfo); - await scenario.RunClient().ConfigureAwait(false); + await scenario.RunClient(_port).ConfigureAwait(false); } } diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs index c3a7694485b69..38f14b789974d 100644 --- a/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/FlightServerCommand.cs @@ -16,6 +16,8 @@ using System; using System.Net; using System.Threading.Tasks; +using Apache.Arrow.Flight.IntegrationTest.Scenarios; +using Apache.Arrow.Flight.Server; using Apache.Arrow.Flight.TestWeb; using Microsoft.AspNetCore.Hosting; using Microsoft.AspNetCore.Hosting.Server; @@ -23,6 +25,8 @@ using Microsoft.AspNetCore.Server.Kestrel.Core; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Console; namespace Apache.Arrow.Flight.IntegrationTest; @@ -37,11 +41,12 @@ public FlightServerCommand(string scenario) public async Task Execute() { - if (!string.IsNullOrEmpty(_scenario)) + IScenario scenario = _scenario switch { - // No named scenarios are currently implemented - throw new Exception($"Scenario '{_scenario}' is not supported."); - } + null => null, + "do_exchange:echo" => new DoExchangeEchoScenario(), + _ => throw new NotSupportedException($"Scenario {_scenario} is not supported") + }; var host = Host.CreateDefaultBuilder() .ConfigureWebHostDefaults(webBuilder => @@ -51,6 +56,26 @@ public async Task Execute() { options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2); }) + .ConfigureServices(services => + { + if (scenario == null) + { + // Use the TestFlightServer for JSON based integration tests + services.AddGrpc().AddFlightServer(); + services.AddSingleton(new FlightStore()); + } + else + { + // Use a scenario-specific server implementation + services.AddGrpc().Services.AddScoped(_ => scenario.MakeServer()); + } + + // The integration tests rely on the port being written to the first line of stdout, + // so send all logging to stderr. + services.Configure( + o => o.LogToStandardErrorThreshold = LogLevel.Debug); + + }) .UseStartup(); }) .Build(); diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs new file mode 100644 index 0000000000000..41ed631f33528 --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.Threading.Tasks; +using Apache.Arrow.Flight.Server; + +namespace Apache.Arrow.Flight.IntegrationTest; + +/// +/// A Flight integration test scenario +/// +internal interface IScenario +{ + /// + /// Create a FlightServer instance to run the scenario + /// + FlightServer MakeServer(); + + /// + /// Run the scenario using a Flight client + /// + Task RunClient(int serverPort); +} diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs new file mode 100644 index 0000000000000..fd95f4256c9dc --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs @@ -0,0 +1,107 @@ +using System; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Apache.Arrow.Flight.Client; +using Apache.Arrow.Flight.Server; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Net.Client; +using Grpc.Net.Client.Balancer; +using Microsoft.Extensions.DependencyInjection; + +namespace Apache.Arrow.Flight.IntegrationTest.Scenarios; + +internal class DoExchangeServer : FlightServer +{ + public override async Task DoExchange( + FlightServerRecordBatchStreamReader requestStream, + FlightServerRecordBatchStreamWriter responseStream, + ServerCallContext context) + { + var descriptor = await requestStream.FlightDescriptor; + var command = descriptor.Command?.ToStringUtf8(); + if (command != "echo") + { + throw new Exception($"Unsupported command: '{command}'"); + } + + while (await requestStream.MoveNext()) + { + await responseStream.WriteAsync( + requestStream.Current, requestStream.ApplicationMetadata.FirstOrDefault()); + } + } +} + +internal class DoExchangeEchoScenario : IScenario +{ + public FlightServer MakeServer() => new DoExchangeServer(); + + public async Task RunClient(int serverPort) + { + var services = new ServiceCollection(); + services.AddSingleton(new GrpcTcpResolverFactory()); + var serviceProvider = services.BuildServiceProvider(); + + var address = $"grpc+tcp://localhost:{serverPort}"; + using var channel = GrpcChannel.ForAddress( + address, + new GrpcChannelOptions + { + ServiceProvider = serviceProvider, + Credentials = ChannelCredentials.Insecure + }); + + var client = new FlightClient(channel); + var descriptor = FlightDescriptor.CreateCommandDescriptor("echo"); + using var exchange = client.DoExchange(descriptor); + + using var writer = exchange.RequestStream; + using var reader = exchange.ResponseStream; + + for (var batchIdx = 0; batchIdx < 4; batchIdx++) + { + using var batch = new RecordBatch.Builder() + .Append( + "x", + nullable: false, + array: new Int32Array.Builder().AppendRange(Enumerable.Range(batchIdx, 10)).Build()) + .Build(); + + var expectedMetadata = $"{batchIdx}"; + var writeMetadata = batchIdx % 2 == 0; + if (writeMetadata) + { + await writer.WriteAsync(batch, ByteString.CopyFromUtf8(expectedMetadata)); + } + else + { + await writer.WriteAsync(batch); + } + + if (!await reader.MoveNext(CancellationToken.None)) + { + throw new Exception("Unexpected end of read stream"); + } + + var readMetadata = reader.ApplicationMetadata?.FirstOrDefault()?.ToStringUtf8(); + + if (writeMetadata && readMetadata != expectedMetadata) + { + throw new Exception($"Expected metadata '{expectedMetadata}' but received '{readMetadata}'"); + } + if (!writeMetadata && readMetadata != null) + { + throw new Exception($"Unexpected metadata received: '{readMetadata}'"); + } + } + + await writer.CompleteAsync(); + + if (await reader.MoveNext(CancellationToken.None)) + { + throw new Exception("Expected end of read stream"); + } + } +} diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/JsonTestScenario.cs similarity index 91% rename from csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs rename to csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/JsonTestScenario.cs index f4f3ac28bfa1b..4f7fed74352fc 100644 --- a/csharp/test/Apache.Arrow.Flight.IntegrationTest/JsonTestScenario.cs +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/JsonTestScenario.cs @@ -18,6 +18,7 @@ using System.Linq; using System.Threading.Tasks; using Apache.Arrow.Flight.Client; +using Apache.Arrow.Flight.Server; using Apache.Arrow.IntegrationTest; using Apache.Arrow.Tests; using Apache.Arrow.Types; @@ -27,20 +28,23 @@ using Grpc.Net.Client.Balancer; using Microsoft.Extensions.DependencyInjection; -namespace Apache.Arrow.Flight.IntegrationTest; +namespace Apache.Arrow.Flight.IntegrationTest.Scenarios; /// /// A test scenario defined using a JSON data file /// -internal class JsonTestScenario +internal class JsonTestScenario : IScenario { - private readonly int _serverPort; private readonly FileInfo _jsonFile; private readonly ServiceProvider _serviceProvider; - public JsonTestScenario(int serverPort, FileInfo jsonFile) + public JsonTestScenario(FileInfo jsonFile) { - _serverPort = serverPort; + if (!(jsonFile?.Exists ?? false)) + { + throw new Exception($"Invalid JSON file path '{jsonFile?.FullName}'"); + } + _jsonFile = jsonFile; var services = new ServiceCollection(); @@ -48,9 +52,14 @@ public JsonTestScenario(int serverPort, FileInfo jsonFile) _serviceProvider = services.BuildServiceProvider(); } - public async Task RunClient() + public FlightServer MakeServer() + { + throw new NotImplementedException(); + } + + public async Task RunClient(int serverPort) { - var address = $"grpc+tcp://localhost:{_serverPort}"; + var address = $"grpc+tcp://localhost:{serverPort}"; using var channel = GrpcChannel.ForAddress( address, new GrpcChannelOptions diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Startup.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Startup.cs new file mode 100644 index 0000000000000..7e29d1997e63f --- /dev/null +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Startup.cs @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.Http; +using Microsoft.Extensions.Hosting; + +namespace Apache.Arrow.Flight.IntegrationTest +{ + public class Startup + { + // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. + public void Configure(IApplicationBuilder app, IWebHostEnvironment env) + { + if (env.IsDevelopment()) + { + app.UseDeveloperExceptionPage(); + } + + app.UseRouting(); + + app.UseEndpoints(endpoints => + { + endpoints.MapFlightEndpoint(); + + endpoints.MapGet("/", async context => + { + await context.Response.WriteAsync("Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); + }); + }); + } + } +} diff --git a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs index d1cfe9e445808..68ce378ccd064 100644 --- a/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs +++ b/csharp/test/Apache.Arrow.Flight.TestWeb/Startup.cs @@ -18,8 +18,6 @@ using Microsoft.AspNetCore.Http; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Logging.Console; namespace Apache.Arrow.Flight.TestWeb { @@ -33,11 +31,6 @@ public void ConfigureServices(IServiceCollection services) .AddFlightServer(); services.AddSingleton(new FlightStore()); - - // The integration tests rely on the port being written to the first line of stdout, - // so send all logging to stderr. - services.Configure( - o => o.LogToStandardErrorThreshold = LogLevel.Debug); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. diff --git a/dev/archery/archery/integration/runner.py b/dev/archery/archery/integration/runner.py index f8db3dfb63770..781b41090d7e5 100644 --- a/dev/archery/archery/integration/runner.py +++ b/dev/archery/archery/integration/runner.py @@ -673,7 +673,7 @@ def append_tester(implementation, tester): "do_exchange:echo", description=("Test the do_exchange method by " "echoing data back to the client."), - skip_testers={"C#", "Go", "Java", "JS", "Rust"}, + skip_testers={"Go", "Java", "JS", "Rust"}, ), Scenario( "location:reuse_connection", From 734f81e766167d71391b429dc08d5ff89242f120 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Thu, 10 Oct 2024 10:59:39 +1300 Subject: [PATCH 3/5] Handle clients sending schema after the first flight data stream message --- .../RecordBatchReaderImplementation.cs | 61 ++++++++++--------- .../Apache.Arrow.Flight.Tests/FlightTests.cs | 1 - 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs b/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs index 99876bf769dc7..22d0bd84fef77 100644 --- a/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs +++ b/csharp/src/Apache.Arrow.Flight/Internal/RecordBatchReaderImplementation.cs @@ -69,42 +69,43 @@ public override void ReadSchema() public override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken) { - if (HasReadSchema) + while (!HasReadSchema) { - return; - } - - var moveNextResult = await _flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false); - - if (!moveNextResult) - { - throw new Exception("No records or schema in this flight"); - } + var moveNextResult = await _flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false); + if (!moveNextResult) + { + throw new Exception("No records or schema in this flight"); + } - //AppMetadata will never be null, but length 0 if empty - //Those are skipped - if(_flightDataStream.Current.AppMetadata.Length > 0) - { - _applicationMetadatas.Add(_flightDataStream.Current.AppMetadata); - } + if (_flightDescriptor == null && _flightDataStream.Current.FlightDescriptor != null) + { + _flightDescriptor = new FlightDescriptor(_flightDataStream.Current.FlightDescriptor); + } - var header = _flightDataStream.Current.DataHeader.Memory; - Message message = Message.GetRootAsMessage( - ArrowReaderImplementation.CreateByteBuffer(header)); + // AppMetadata will never be null, but length 0 if empty + // Those are skipped + if(_flightDataStream.Current.AppMetadata.Length > 0) + { + _applicationMetadatas.Add(_flightDataStream.Current.AppMetadata); + } + var header = _flightDataStream.Current.DataHeader.Memory; + if (header.IsEmpty) + { + // Clients may send a first message with a descriptor only and no schema + continue; + } - if(_flightDataStream.Current.FlightDescriptor != null) - { - _flightDescriptor = new FlightDescriptor(_flightDataStream.Current.FlightDescriptor); - } + Message message = Message.GetRootAsMessage(ArrowReaderImplementation.CreateByteBuffer(header)); - switch (message.HeaderType) - { - case MessageHeader.Schema: - _schema = FlightMessageSerializer.DecodeSchema(message.ByteBuffer); - break; - default: - throw new Exception($"Expected schema as the first message, but got: {message.HeaderType.ToString()}"); + switch (message.HeaderType) + { + case MessageHeader.Schema: + _schema = FlightMessageSerializer.DecodeSchema(message.ByteBuffer); + break; + default: + throw new Exception($"Expected schema as the first message, but got: {message.HeaderType.ToString()}"); + } } } diff --git a/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs b/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs index 0e82673d02240..350762c992769 100644 --- a/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs +++ b/csharp/test/Apache.Arrow.Flight.Tests/FlightTests.cs @@ -24,7 +24,6 @@ using Google.Protobuf; using Grpc.Core; using Grpc.Core.Utils; -using Python.Runtime; using Xunit; namespace Apache.Arrow.Flight.Tests From 82fbd0bac8713708520e635b8ae30d208f93da35 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 16 Oct 2024 10:40:12 +1300 Subject: [PATCH 4/5] Add missing license header --- .../Scenarios/DoExchangeEchoScenario.cs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs index fd95f4256c9dc..6e9b2696bbfb9 100644 --- a/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs +++ b/csharp/test/Apache.Arrow.Flight.IntegrationTest/Scenarios/DoExchangeEchoScenario.cs @@ -1,3 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + using System; using System.Linq; using System.Threading; From 05080da3a3e5eb53ae8bc70d34cddd84cd3401e6 Mon Sep 17 00:00:00 2001 From: Adam Reeve Date: Wed, 16 Oct 2024 10:51:03 +1300 Subject: [PATCH 5/5] Update status doc --- docs/source/status.rst | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/docs/source/status.rst b/docs/source/status.rst index c838604fcaef6..5ab35f7639f56 100644 --- a/docs/source/status.rst +++ b/docs/source/status.rst @@ -208,15 +208,15 @@ Supported features in the gRPC transport: +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Flight RPC Feature | C++ | Java | Go | JS | C# | Rust | Julia | Swift | +============================================+=======+=======+=======+====+=======+=======+=======+=======+ -| All RPC methods | ✓ | ✓ | ✓ | | ✓ (1) | ✓ | | | +| All RPC methods | ✓ | ✓ | ✓ | | ✓ | ✓ | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ -| Authentication handlers | ✓ | ✓ | ✓ | | ✓ (2) | ✓ | | | +| Authentication handlers | ✓ | ✓ | ✓ | | ✓ (1) | ✓ | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Call timeouts | ✓ | ✓ | ✓ | | | ✓ | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Call cancellation | ✓ | ✓ | ✓ | | | ✓ | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ -| Concurrent client calls (3) | ✓ | ✓ | ✓ | | ✓ | ✓ | | | +| Concurrent client calls (2) | ✓ | ✓ | ✓ | | ✓ | ✓ | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Custom middleware | ✓ | ✓ | ✓ | | | ✓ | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ @@ -228,7 +228,7 @@ Supported features in the UCX transport: +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Flight RPC Feature | C++ | Java | Go | JS | C# | Rust | Julia | Swift | +============================================+=======+=======+=======+====+=======+=======+=======+=======+ -| All RPC methods | ✓ (4) | | | | | | | | +| All RPC methods | ✓ (3) | | | | | | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Authentication handlers | | | | | | | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ @@ -236,7 +236,7 @@ Supported features in the UCX transport: +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Call cancellation | | | | | | | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ -| Concurrent client calls | ✓ (5) | | | | | | | | +| Concurrent client calls | ✓ (4) | | | | | | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ | Custom middleware | | | | | | | | | +--------------------------------------------+-------+-------+-------+----+-------+-------+-------+-------+ @@ -245,11 +245,10 @@ Supported features in the UCX transport: Notes: -* \(1) No support for Handshake or DoExchange. -* \(2) Support using AspNetCore authentication handlers. -* \(3) Whether a single client can support multiple concurrent calls. -* \(4) Only support for DoExchange, DoGet, DoPut, and GetFlightInfo. -* \(5) Each concurrent call is a separate connection to the server +* \(1) Support using AspNetCore authentication handlers. +* \(2) Whether a single client can support multiple concurrent calls. +* \(3) Only support for DoExchange, DoGet, DoPut, and GetFlightInfo. +* \(4) Each concurrent call is a separate connection to the server (unlike gRPC where concurrent calls are multiplexed over a single connection). This will generally provide better throughput but consumes more resources both on the server and the client.