Skip to content

Commit

Permalink
Fix C API support to work with desktop framework
Browse files Browse the repository at this point in the history
  • Loading branch information
CurtHagenlocher committed Jul 21, 2023
1 parent f43bfd6 commit a81bdd1
Show file tree
Hide file tree
Showing 18 changed files with 175 additions and 71 deletions.
14 changes: 9 additions & 5 deletions csharp/src/Apache.Arrow/C/CArrowArray.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ public unsafe struct CArrowArray
public byte** buffers;
public CArrowArray** children;
public CArrowArray* dictionary;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowArray*, void> release;
#else
internal IntPtr release;
#endif
<CArrowArray*, void> release;
public void* private_data;

/// <summary>
Expand All @@ -68,10 +68,14 @@ internal delegate* unmanaged
/// </remarks>
public static void Free(CArrowArray* array)
{
if (array->release != null)
if (array->release != default)
{
// Call release if not already called.
#if NET5_0_OR_GREATER
array->release(array);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayExporter.ReleaseArrowArray>(array->release)(array);
#endif
}
Marshal.FreeHGlobal((IntPtr)array);
}
Expand Down
8 changes: 4 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowArrayExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public static class CArrowArrayExporter
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowArray*, void> ReleaseArrayPtr => &ReleaseArray;
#else
private unsafe delegate void ReleaseArrowArray(CArrowArray* cArray);
internal unsafe delegate void ReleaseArrowArray(CArrowArray* cArray);
private static unsafe readonly NativeDelegate<ReleaseArrowArray> s_releaseArray = new NativeDelegate<ReleaseArrowArray>(ReleaseArray);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArray*, void> ReleaseArrayPtr => (delegate* unmanaged[Cdecl]<CArrowArray*, void>)s_releaseArray.Pointer;
private static IntPtr ReleaseArrayPtr => s_releaseArray.Pointer;
#endif
/// <summary>
/// Export an <see cref="IArrowArray"/> to a <see cref="CArrowArray"/>. Whether or not the
Expand Down Expand Up @@ -93,7 +93,7 @@ public static unsafe void ExportRecordBatch(RecordBatch batch, CArrowArray* cArr
{
throw new ArgumentNullException(nameof(cArray));
}
if (cArray->release != null)
if (cArray->release != default)
{
throw new ArgumentException("Cannot export array to a struct that is already initialized.", nameof(cArray));
}
Expand Down Expand Up @@ -191,7 +191,7 @@ private unsafe static void ConvertRecordBatch(ExportedAllocationOwner sharedOwne
private unsafe static void ReleaseArray(CArrowArray* cArray)
{
Dispose(&cArray->private_data);
cArray->release = null;
cArray->release = default;
}

private unsafe static void* FromDisposable(IDisposable disposable)
Expand Down
11 changes: 8 additions & 3 deletions csharp/src/Apache.Arrow/C/CArrowArrayImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using Apache.Arrow.Memory;
using Apache.Arrow.Types;

Expand Down Expand Up @@ -104,21 +105,25 @@ public ImportedArrowArray(CArrowArray* cArray)
{
throw new ArgumentNullException(nameof(cArray));
}
if (cArray->release == null)
if (cArray->release == default)
{
throw new ArgumentException("Tried to import an array that has already been released.", nameof(cArray));
}
_cArray = *cArray;
cArray->release = null;
cArray->release = default;
}

protected override void FinalRelease()
{
if (_cArray.release != null)
if (_cArray.release != default)
{
fixed (CArrowArray* cArray = &_cArray)
{
#if NET5_0_OR_GREATER
cArray->release(cArray);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayExporter.ReleaseArrowArray>(cArray->release)(cArray);
#endif
}
}
}
Expand Down
39 changes: 22 additions & 17 deletions csharp/src/Apache.Arrow/C/CArrowArrayStream.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,23 @@ public unsafe struct CArrowArrayStream
///
/// Return value: 0 if successful, an `errno`-compatible error code otherwise.
///</summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged <CArrowArrayStream*, CArrowSchema*, int> get_schema;
#else
internal IntPtr get_schema;
#endif
<CArrowArrayStream*, CArrowSchema*, int> get_schema;

/// <summary>
/// Callback to get the next array. If no error and the array is released, the stream has ended.
/// If successful, the ArrowArray must be released independently from the stream.
///
/// Return value: 0 if successful, an `errno`-compatible error code otherwise.
/// </summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowArrayStream*, CArrowArray*, int> get_next;
#else
internal IntPtr get_next;
#endif
<CArrowArrayStream*, CArrowArray*, int> get_next;

/// <summary>
/// Callback to get optional detailed error information. This must only
Expand All @@ -62,21 +62,21 @@ internal delegate* unmanaged
/// Return value: pointer to a null-terminated character array describing the last
/// error, or NULL if no description is available.
///</summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowArrayStream*, byte*> get_last_error;
#else
internal IntPtr get_last_error;
#endif
<CArrowArrayStream*, byte*> get_last_error;

/// <summary>
/// Release callback: release the stream's own resources. Note that arrays returned by
/// get_next must be individually released.
/// </summary>
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged <CArrowArrayStream*, void> release;
#else
internal IntPtr release;
#endif
<CArrowArrayStream*, void> release;

public void* private_data;

Expand All @@ -103,10 +103,15 @@ internal delegate* unmanaged
/// </remarks>
public static void Free(CArrowArrayStream* arrayStream)
{
if (arrayStream->release != null)
if (arrayStream->release != default)
{
// Call release if not already called.
#if NET5_0_OR_GREATER

arrayStream->release(arrayStream);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.ReleaseArrayStream>(arrayStream->release)(arrayStream);
#endif
}
Marshal.FreeHGlobal((IntPtr)arrayStream);
}
Expand Down
24 changes: 10 additions & 14 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,18 @@ public static class CArrowArrayStreamExporter
private static unsafe delegate* unmanaged<CArrowArrayStream*, byte*> GetLastErrorPtr => &GetLastError;
private static unsafe delegate* unmanaged<CArrowArrayStream*, void> ReleasePtr => &Release;
#else
private unsafe delegate int GetSchemaArrayStream(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema);
internal unsafe delegate int GetSchemaArrayStream(CArrowArrayStream* cArrayStream, CArrowSchema* cSchema);
private static unsafe NativeDelegate<GetSchemaArrayStream> s_getSchemaArrayStream = new NativeDelegate<GetSchemaArrayStream>(GetSchema);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowSchema*, int> GetSchemaPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowSchema*, int>)s_getSchemaArrayStream.Pointer;
private unsafe delegate int GetNextArrayStream(CArrowArrayStream* cArrayStream, CArrowArray* cArray);
private static unsafe IntPtr GetSchemaPtr => s_getSchemaArrayStream.Pointer;
internal unsafe delegate int GetNextArrayStream(CArrowArrayStream* cArrayStream, CArrowArray* cArray);
private static unsafe NativeDelegate<GetNextArrayStream> s_getNextArrayStream = new NativeDelegate<GetNextArrayStream>(GetNext);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowArray*, int> GetNextPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, CArrowArray*, int>)s_getNextArrayStream.Pointer;
private unsafe delegate byte* GetLastErrorArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe IntPtr GetNextPtr => s_getNextArrayStream.Pointer;
internal unsafe delegate byte* GetLastErrorArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<GetLastErrorArrayStream> s_getLastErrorArrayStream = new NativeDelegate<GetLastErrorArrayStream>(GetLastError);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, byte*> GetLastErrorPtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, byte*>)s_getLastErrorArrayStream.Pointer;
private unsafe delegate void ReleaseArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe IntPtr GetLastErrorPtr => s_getLastErrorArrayStream.Pointer;
internal unsafe delegate void ReleaseArrayStream(CArrowArrayStream* cArrayStream);
private static unsafe NativeDelegate<ReleaseArrayStream> s_releaseArrayStream = new NativeDelegate<ReleaseArrayStream>(Release);
private static unsafe delegate* unmanaged[Cdecl]<CArrowArrayStream*, void> ReleasePtr =>
(delegate* unmanaged[Cdecl]<CArrowArrayStream*, void>)s_releaseArrayStream.Pointer;
private static unsafe IntPtr ReleasePtr => s_releaseArrayStream.Pointer;
#endif

/// <summary>
Expand Down Expand Up @@ -103,7 +99,7 @@ private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray*
ExportedArrayStream arrayStream = null;
try
{
cArray->release = null;
cArray->release = default;
arrayStream = ExportedArrayStream.FromPointer(cArrayStream->private_data);
RecordBatch recordBatch = arrayStream.ArrowArrayStream.ReadNextRecordBatchAsync().Result;
if (recordBatch != null)
Expand Down Expand Up @@ -140,7 +136,7 @@ private unsafe static int GetNext(CArrowArrayStream* cArrayStream, CArrowArray*
private unsafe static void Release(CArrowArrayStream* cArrayStream)
{
ExportedArrayStream.Free(&cArrayStream->private_data);
cArrayStream->release = null;
cArrayStream->release = default;
}

sealed unsafe class ExportedArrayStream : IDisposable
Expand Down
25 changes: 21 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowArrayStreamImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

using System;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Ipc;
Expand Down Expand Up @@ -57,7 +58,11 @@ private sealed unsafe class ImportedArrowArrayStream : IArrowArrayStream

internal static string GetLastError(CArrowArrayStream* arrayStream, int errno)
{
#if NET5_0_OR_GREATER
byte* error = arrayStream->get_last_error(arrayStream);
#else
byte* error = Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.GetLastErrorArrayStream>(arrayStream->get_last_error)(arrayStream);
#endif
if (error == null)
{
return $"Array stream operation failed with no message. Error code: {errno}";
Expand All @@ -71,21 +76,25 @@ public ImportedArrowArrayStream(CArrowArrayStream* cArrayStream)
{
throw new ArgumentNullException(nameof(cArrayStream));
}
if (cArrayStream->release == null)
if (cArrayStream->release == default)
{
throw new ArgumentException("Tried to import an array stream that has already been released.", nameof(cArrayStream));
}

CArrowSchema cSchema = new CArrowSchema();
#if NET5_0_OR_GREATER
int errno = cArrayStream->get_schema(cArrayStream, &cSchema);
#else
int errno = Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.GetSchemaArrayStream>(cArrayStream->get_schema)(cArrayStream, &cSchema);
#endif
if (errno != 0)
{
throw new Exception(GetLastError(cArrayStream, errno));
}
_schema = CArrowSchemaImporter.ImportSchema(&cSchema);

_cArrayStream = *cArrayStream;
cArrayStream->release = null;
cArrayStream->release = default;
}

~ImportedArrowArrayStream()
Expand All @@ -111,12 +120,16 @@ public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancell
CArrowArray cArray = new CArrowArray();
fixed (CArrowArrayStream* cArrayStream = &_cArrayStream)
{
#if NET5_0_OR_GREATER
int errno = cArrayStream->get_next(cArrayStream, &cArray);
#else
int errno = Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.GetNextArrayStream>(cArrayStream->get_next)(cArrayStream, &cArray);
#endif
if (errno != 0)
{
return new(Task.FromException<RecordBatch>(new Exception(GetLastError(cArrayStream, errno))));
}
if (cArray.release != null)
if (cArray.release != default)
{
result = CArrowArrayImporter.ImportRecordBatch(&cArray, _schema);
}
Expand All @@ -127,12 +140,16 @@ public ValueTask<RecordBatch> ReadNextRecordBatchAsync(CancellationToken cancell

public void Dispose()
{
if (!_disposed && _cArrayStream.release != null)
if (!_disposed && _cArrayStream.release != default)
{
_disposed = true;
fixed (CArrowArrayStream* cArrayStream = &_cArrayStream)
{
#if NET5_0_OR_GREATER
cArrayStream->release(cArrayStream);
#else
Marshal.GetDelegateForFunctionPointer<CArrowArrayStreamExporter.ReleaseArrayStream>(cArrayStream->release)(cArrayStream);
#endif
}
}
GC.SuppressFinalize(this);
Expand Down
14 changes: 9 additions & 5 deletions csharp/src/Apache.Arrow/C/CArrowSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ public unsafe struct CArrowSchema
public long n_children;
public CArrowSchema** children;
public CArrowSchema* dictionary;
internal delegate* unmanaged
#if !NET5_0_OR_GREATER
[Cdecl]
#if NET5_0_OR_GREATER
internal delegate* unmanaged<CArrowSchema*, void> release;
#else
internal IntPtr release;
#endif
<CArrowSchema*, void> release;
public void* private_data;

/// <summary>
Expand All @@ -69,10 +69,14 @@ internal delegate* unmanaged
/// </remarks>
public static void Free(CArrowSchema* schema)
{
if (schema->release != null)
if (schema->release != default)
{
// Call release if not already called.
#if NET5_0_OR_GREATER
schema->release(schema);
#else
Marshal.GetDelegateForFunctionPointer<CArrowSchemaExporter.ReleaseArrowSchema>(schema->release)(schema);
#endif
}
Marshal.FreeHGlobal((IntPtr)schema);
}
Expand Down
8 changes: 4 additions & 4 deletions csharp/src/Apache.Arrow/C/CArrowSchemaExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ public static class CArrowSchemaExporter
#if NET5_0_OR_GREATER
private static unsafe delegate* unmanaged<CArrowSchema*, void> ReleaseSchemaPtr => &ReleaseCArrowSchema;
#else
private unsafe delegate void ReleaseArrowSchema(CArrowSchema* cArray);
internal unsafe delegate void ReleaseArrowSchema(CArrowSchema* cArray);
private static unsafe readonly NativeDelegate<ReleaseArrowSchema> s_releaseSchema = new NativeDelegate<ReleaseArrowSchema>(ReleaseCArrowSchema);
private static unsafe delegate* unmanaged[Cdecl]<CArrowSchema*, void> ReleaseSchemaPtr => (delegate* unmanaged[Cdecl]<CArrowSchema*, void>)s_releaseSchema.Pointer;
private static IntPtr ReleaseSchemaPtr => s_releaseSchema.Pointer;
#endif

/// <summary>
Expand Down Expand Up @@ -297,7 +297,7 @@ private unsafe static void WriteMetadataString(ref byte* ptr, int length, string
private static unsafe void ReleaseCArrowSchema(CArrowSchema* schema)
{
if (schema == null) return;
if (schema->release == null) return;
if (schema->release == default) return;

Marshal.FreeHGlobal((IntPtr)schema->format);
Marshal.FreeHGlobal((IntPtr)schema->name);
Expand All @@ -324,7 +324,7 @@ private static unsafe void ReleaseCArrowSchema(CArrowSchema* schema)
schema->n_children = 0;
schema->dictionary = null;
schema->children = null;
schema->release = null;
schema->release = default;
}
}
}
8 changes: 6 additions & 2 deletions csharp/src/Apache.Arrow/C/CArrowSchemaImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public ImportedArrowSchema(CArrowSchema* cSchema)
throw new ArgumentException("Passed null pointer for cSchema.");
}
_cSchema = cSchema;
if (_cSchema->release == null)
if (_cSchema->release == default)
{
throw new ArgumentException("Tried to import a schema that has already been released.");
}
Expand All @@ -128,9 +128,13 @@ public ImportedArrowSchema(CArrowSchema* handle, bool isRoot) : this(handle)
public void Dispose()
{
// We only call release on a root-level schema, not child ones.
if (_isRoot && _cSchema->release != null)
if (_isRoot && _cSchema->release != default)
{
#if NET5_0_OR_GREATER
_cSchema->release(_cSchema);
#else
Marshal.GetDelegateForFunctionPointer<CArrowSchemaExporter.ReleaseArrowSchema>(_cSchema->release)(_cSchema);
#endif
}
}

Expand Down
Loading

0 comments on commit a81bdd1

Please sign in to comment.