Skip to content

Commit

Permalink
GH-44360: [C#] Fix Flight DoExchange incompatibility with C++ impleme…
Browse files Browse the repository at this point in the history
…ntation (#44424)

### Rationale for this change

See #44360

### What changes are included in this PR?

* Adds a new integration test to allow testing `do_exchange` between C++/Python and .NET.
* Updates the Flight stream reader to handle when a descriptor is sent in the first message without any schema.

### Are these changes tested?

* Yes, using the new integration test.

### Are there any user-facing changes?

No
* GitHub Issue: #44360

Authored-by: Adam Reeve <adreeve@gmail.com>
Signed-off-by: Curt Hagenlocher <curt@hagenlocher.org>
  • Loading branch information
adamreeve authored Oct 16, 2024
1 parent 1dcd145 commit da5a295
Show file tree
Hide file tree
Showing 12 changed files with 430 additions and 70 deletions.
129 changes: 129 additions & 0 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <iostream>
#include <memory>
#include <numeric>
#include <string>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -1026,6 +1027,131 @@ class AppMetadataFlightInfoEndpointScenario : public Scenario {
}
};

/// \brief The server used for testing do_exchange
class DoExchangeServer : public FlightServerBase {
public:
DoExchangeServer() : FlightServerBase() {}

Status DoExchange(const ServerCallContext& context,
std::unique_ptr<FlightMessageReader> reader,
std::unique_ptr<FlightMessageWriter> writer) override {
if (reader->descriptor().type != FlightDescriptor::DescriptorType::CMD) {
return Status::Invalid("Must provide a command descriptor");
}

const std::string& cmd = reader->descriptor().cmd;
if (cmd == "echo") {
return RunEchoExchange(reader, writer);
} else {
return Status::NotImplemented("Command not implemented: ", cmd);
}
}

private:
static Status RunEchoExchange(std::unique_ptr<FlightMessageReader>& reader,
std::unique_ptr<FlightMessageWriter>& writer) {
FlightStreamChunk chunk;
bool begun = false;
while (true) {
ARROW_ASSIGN_OR_RAISE(chunk, reader->Next());
if (!chunk.data && !chunk.app_metadata) {
break;
}
if (!begun && chunk.data) {
begun = true;
RETURN_NOT_OK(writer->Begin(chunk.data->schema()));
}
if (chunk.data && chunk.app_metadata) {
RETURN_NOT_OK(writer->WriteWithMetadata(*chunk.data, chunk.app_metadata));
} else if (chunk.data) {
RETURN_NOT_OK(writer->WriteRecordBatch(*chunk.data));
} else if (chunk.app_metadata) {
RETURN_NOT_OK(writer->WriteMetadata(chunk.app_metadata));
}
}
return Status::OK();
}
};

/// \brief The DoExchangeEcho scenario.
///
/// This tests that the client and server can perform a two-way data exchange.
///
/// The server should echo back any data sent by the client.
class DoExchangeEchoScenario : public Scenario {
Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<DoExchangeServer>();
return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override { return Status::OK(); }

Status RunClient(std::unique_ptr<FlightClient> client) override {
auto descriptor = FlightDescriptor::Command("echo");
FlightCallOptions call_options;

ARROW_ASSIGN_OR_RAISE(auto do_exchange_result,
client->DoExchange(call_options, descriptor));
std::unique_ptr<FlightStreamWriter> writer = std::move(do_exchange_result.writer);
std::unique_ptr<FlightStreamReader> reader = std::move(do_exchange_result.reader);

auto schema = arrow::schema({field("x", int32(), false)});
ARROW_RETURN_NOT_OK(writer->Begin(schema));

ARROW_ASSIGN_OR_RAISE(auto builder,
RecordBatchBuilder::Make(schema, arrow::default_memory_pool()));

for (int batch_idx = 0; batch_idx < 4; ++batch_idx) {
auto int_builder = builder->GetFieldAs<Int32Builder>(0);
std::vector<int32_t> batch_data(10);
std::iota(batch_data.begin(), batch_data.end(), batch_idx);
ARROW_RETURN_NOT_OK(int_builder->AppendValues(batch_data));
ARROW_ASSIGN_OR_RAISE(auto record_batch, builder->Flush());

std::string app_metadata = std::to_string(batch_idx);
bool write_metadata = batch_idx % 2 == 0;

if (write_metadata) {
ARROW_RETURN_NOT_OK(
writer->WriteWithMetadata(*record_batch, Buffer::FromString(app_metadata)));
} else {
ARROW_RETURN_NOT_OK(writer->WriteRecordBatch(*record_batch));
}

ARROW_ASSIGN_OR_RAISE(auto read_result, reader->Next());
if (read_result.data == nullptr) {
return Status::Invalid("Received null data");
}
if (!read_result.data->Equals(*record_batch)) {
return Status::Invalid("Read data doesn't match expected data for batch ",
std::to_string(batch_idx), ".\n", "Expected:\n",
record_batch->ToString(), "Actual:\n",
read_result.data->ToString());
}

if (write_metadata) {
if (read_result.app_metadata == nullptr) {
return Status::Invalid("Received null app metadata");
}
if (read_result.app_metadata->ToString() != app_metadata) {
return Status::Invalid("Read metadata doesn't match expected for batch ",
std::to_string(batch_idx), ".\n", "Expected:\n",
app_metadata, "\nActual:\n",
read_result.app_metadata->ToString());
}
} else if (read_result.app_metadata != nullptr) {
return Status::Invalid("Expected no app metadata but received non-null metadata");
}
}

ARROW_RETURN_NOT_OK(writer->DoneWriting());
ARROW_RETURN_NOT_OK(writer->Close());

return Status::OK();
}
};

/// \brief Schema to be returned for mocking the statement/prepared statement results.
///
/// Must be the same across all languages.
Expand Down Expand Up @@ -2283,6 +2409,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "app_metadata_flight_info_endpoint") {
*out = std::make_shared<AppMetadataFlightInfoEndpointScenario>();
return Status::OK();
} else if (scenario_name == "do_exchange:echo") {
*out = std::make_shared<DoExchangeEchoScenario>();
return Status::OK();
} else if (scenario_name == "flight_sql") {
*out = std::make_shared<FlightSqlScenario>();
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,42 +69,43 @@ public override void ReadSchema()

public override async ValueTask ReadSchemaAsync(CancellationToken cancellationToken)
{
if (HasReadSchema)
while (!HasReadSchema)
{
return;
}

var moveNextResult = await _flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false);

if (!moveNextResult)
{
throw new Exception("No records or schema in this flight");
}
var moveNextResult = await _flightDataStream.MoveNext(cancellationToken).ConfigureAwait(false);
if (!moveNextResult)
{
throw new Exception("No records or schema in this flight");
}

//AppMetadata will never be null, but length 0 if empty
//Those are skipped
if(_flightDataStream.Current.AppMetadata.Length > 0)
{
_applicationMetadatas.Add(_flightDataStream.Current.AppMetadata);
}
if (_flightDescriptor == null && _flightDataStream.Current.FlightDescriptor != null)
{
_flightDescriptor = new FlightDescriptor(_flightDataStream.Current.FlightDescriptor);
}

var header = _flightDataStream.Current.DataHeader.Memory;
Message message = Message.GetRootAsMessage(
ArrowReaderImplementation.CreateByteBuffer(header));
// AppMetadata will never be null, but length 0 if empty
// Those are skipped
if(_flightDataStream.Current.AppMetadata.Length > 0)
{
_applicationMetadatas.Add(_flightDataStream.Current.AppMetadata);
}

var header = _flightDataStream.Current.DataHeader.Memory;
if (header.IsEmpty)
{
// Clients may send a first message with a descriptor only and no schema
continue;
}

if(_flightDataStream.Current.FlightDescriptor != null)
{
_flightDescriptor = new FlightDescriptor(_flightDataStream.Current.FlightDescriptor);
}
Message message = Message.GetRootAsMessage(ArrowReaderImplementation.CreateByteBuffer(header));

switch (message.HeaderType)
{
case MessageHeader.Schema:
_schema = FlightMessageSerializer.DecodeSchema(message.ByteBuffer);
break;
default:
throw new Exception($"Expected schema as the first message, but got: {message.HeaderType.ToString()}");
switch (message.HeaderType)
{
case MessageHeader.Schema:
_schema = FlightMessageSerializer.DecodeSchema(message.ByteBuffer);
break;
default:
throw new Exception($"Expected schema as the first message, but got: {message.HeaderType.ToString()}");
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Apache.Arrow.Flight.IntegrationTest.Scenarios;

namespace Apache.Arrow.Flight.IntegrationTest;

Expand All @@ -34,18 +35,13 @@ public FlightClientCommand(int port, string scenario, FileInfo jsonFileInfo)

public async Task Execute()
{
if (!string.IsNullOrEmpty(_scenario))
IScenario scenario = _scenario switch
{
// No named scenarios are currently implemented
throw new Exception($"Scenario '{_scenario}' is not supported.");
}
null => new JsonTestScenario(_jsonFileInfo),
"do_exchange:echo" => new DoExchangeEchoScenario(),
_ => throw new NotSupportedException($"Scenario '{_scenario}' is not supported"),
};

if (!(_jsonFileInfo?.Exists ?? false))
{
throw new Exception($"Invalid JSON file path '{_jsonFileInfo?.FullName}'");
}

var scenario = new JsonTestScenario(_port, _jsonFileInfo);
await scenario.RunClient().ConfigureAwait(false);
await scenario.RunClient(_port).ConfigureAwait(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
using System;
using System.Net;
using System.Threading.Tasks;
using Apache.Arrow.Flight.IntegrationTest.Scenarios;
using Apache.Arrow.Flight.Server;
using Apache.Arrow.Flight.TestWeb;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Hosting.Server;
using Microsoft.AspNetCore.Hosting.Server.Features;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Console;

namespace Apache.Arrow.Flight.IntegrationTest;

Expand All @@ -37,11 +41,12 @@ public FlightServerCommand(string scenario)

public async Task Execute()
{
if (!string.IsNullOrEmpty(_scenario))
IScenario scenario = _scenario switch
{
// No named scenarios are currently implemented
throw new Exception($"Scenario '{_scenario}' is not supported.");
}
null => null,
"do_exchange:echo" => new DoExchangeEchoScenario(),
_ => throw new NotSupportedException($"Scenario {_scenario} is not supported")
};

var host = Host.CreateDefaultBuilder()
.ConfigureWebHostDefaults(webBuilder =>
Expand All @@ -51,6 +56,26 @@ public async Task Execute()
{
options.Listen(IPEndPoint.Parse("127.0.0.1:0"), l => l.Protocols = HttpProtocols.Http2);
})
.ConfigureServices(services =>
{
if (scenario == null)
{
// Use the TestFlightServer for JSON based integration tests
services.AddGrpc().AddFlightServer<TestFlightServer>();
services.AddSingleton(new FlightStore());
}
else
{
// Use a scenario-specific server implementation
services.AddGrpc().Services.AddScoped<FlightServer>(_ => scenario.MakeServer());
}
// The integration tests rely on the port being written to the first line of stdout,
// so send all logging to stderr.
services.Configure<ConsoleLoggerOptions>(
o => o.LogToStandardErrorThreshold = LogLevel.Debug);
})
.UseStartup<Startup>();
})
.Build();
Expand Down
35 changes: 35 additions & 0 deletions csharp/test/Apache.Arrow.Flight.IntegrationTest/IScenario.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

using System.Threading.Tasks;
using Apache.Arrow.Flight.Server;

namespace Apache.Arrow.Flight.IntegrationTest;

/// <summary>
/// A Flight integration test scenario
/// </summary>
internal interface IScenario
{
/// <summary>
/// Create a FlightServer instance to run the scenario
/// </summary>
FlightServer MakeServer();

/// <summary>
/// Run the scenario using a Flight client
/// </summary>
Task RunClient(int serverPort);
}
Loading

0 comments on commit da5a295

Please sign in to comment.