Skip to content

Commit

Permalink
fix(csharp/src/Drivers/Apache): fix to workaround concurrency issue (#…
Browse files Browse the repository at this point in the history
…2282)

Provides an interim work-around for the concurrency issue identified in
#2280.

* Removes the SQL `DELETE` statements from the SQL table scripts.
* Uses the XUnit.Collection to serialize the execution of ClientTests
and DriverTests.
* Fixes the missing application of `HttpRequestTimeout` due to an
incomplete implementation of the `ValidateOptions` in
`SparkDatabricksConnection`.
* Improve table create table syntax to `CREATE OR REPLACE TABLE` to
reduce probably of inconsistent state.

Note: this is not the final solution. A more robust isolation of table
creation needs to done to isolate concurrency.
  • Loading branch information
birschick-bq authored Oct 29, 2024
1 parent e93d70f commit 99e7e53
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 23 deletions.
12 changes: 0 additions & 12 deletions csharp/src/Drivers/Apache/Spark/SparkDatabricksConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*/

using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Adbc.Drivers.Apache.Hive2;
using Apache.Arrow.Ipc;
using Apache.Hive.Service.Rpc.Thrift;

Expand All @@ -45,16 +43,6 @@ protected override TOpenSessionReq CreateSessionRequest()
return req;
}

protected override void ValidateOptions()
{
Properties.TryGetValue(SparkParameters.DataTypeConv, out string? dataTypeConv);
// Note: In Databricks, scalar types are provided implicitly.
DataTypeConversion = DataTypeConversionParser.Parse(dataTypeConv);

Properties.TryGetValue(SparkParameters.TLSOptions, out string? tlsOptions);
TlsOptions = TlsOptionsParser.Parse(tlsOptions);
}

protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetSchemasResp response) =>
Task.FromResult(response.DirectResults.ResultSetMetadata);
protected override Task<TGetResultSetMetadataResp> GetResultSetMetadataAsync(TGetCatalogsResp response) =>
Expand Down
6 changes: 4 additions & 2 deletions csharp/test/Drivers/Apache/Spark/ClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
/// <remarks>
/// Tests are ordered to ensure data is created for the other
/// queries to run.
/// <para>Note: This test creates/replaces the table identified in the configuration (metadata/table).
/// It uses the test collection "TableCreateTestCollection" to ensure it does not run
/// as the same time as any other tests that may create/update the same table.</para>
/// </remarks>
[TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer", "Apache.Arrow.Adbc.Tests")]
[Collection("TableCreateTestCollection")]
public class ClientTests : TestBase<SparkTestConfiguration, SparkTestEnvironment>
{
public ClientTests(ITestOutputHelper? outputHelper) : base(outputHelper, new SparkTestEnvironment.Factory())
Expand All @@ -54,7 +58,6 @@ public void CanClientExecuteUpdate()

List<int> expectedResults = TestEnvironment.ServerType != SparkServerType.Databricks
? [
-1, // DROP TABLE
-1, // CREATE TABLE
affectedRows, // INSERT
affectedRows, // INSERT
Expand All @@ -63,7 +66,6 @@ public void CanClientExecuteUpdate()
//1, // DELETE
]
: [
-1, // DROP TABLE
-1, // CREATE TABLE
affectedRows, // INSERT
affectedRows, // INSERT
Expand Down
6 changes: 4 additions & 2 deletions csharp/test/Drivers/Apache/Spark/DriverTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,12 @@ namespace Apache.Arrow.Adbc.Tests.Drivers.Apache.Spark
/// <remarks>
/// Tests are ordered to ensure data is created for the other
/// queries to run.
/// <para>Note: This test creates/replaces the table identified in the configuration (metadata/table).
/// It uses the test collection "TableCreateTestCollection" to ensure it does not run
/// as the same time as any other tests that may create/update the same table.</para>
/// </remarks>
[TestCaseOrderer("Apache.Arrow.Adbc.Tests.Xunit.TestOrderer", "Apache.Arrow.Adbc.Tests")]
[Collection("TableCreateTestCollection")]
public class DriverTests : TestBase<SparkTestConfiguration, SparkTestEnvironment>
{
/// <summary>
Expand Down Expand Up @@ -92,7 +96,6 @@ public void CanExecuteUpdate()
List<int> expectedResults = TestEnvironment.ServerType != SparkServerType.Databricks
?
[
-1, // DROP TABLE
-1, // CREATE TABLE
1, // INSERT
1, // INSERT
Expand All @@ -102,7 +105,6 @@ public void CanExecuteUpdate()
]
:
[
-1, // DROP TABLE
-1, // CREATE TABLE
1, // INSERT
1, // INSERT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.

DROP TABLE IF EXISTS {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE};

CREATE TABLE IF NOT EXISTS {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE} (
CREATE OR REPLACE TABLE {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE} (
id LONG,
byte BYTE,
short SHORT,
Expand All @@ -42,7 +40,7 @@ CREATE TABLE IF NOT EXISTS {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE} (
>,
varchar VARCHAR(255),
char CHAR(10)
);
) USING DELTA;

INSERT INTO {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE} (
id,
Expand Down
4 changes: 1 addition & 3 deletions csharp/test/Drivers/Apache/Spark/Resources/SparkData.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.

DROP TABLE IF EXISTS {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE};

CREATE TABLE IF NOT EXISTS {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE} (
CREATE OR REPLACE TABLE IF NOT EXISTS {ADBC_CATALOG}.{ADBC_DATASET}.{ADBC_TABLE} (
id LONG,
byte BYTE,
short SHORT,
Expand Down

0 comments on commit 99e7e53

Please sign in to comment.