From 099e38e15b9fd6e41fae80e60b82e916c94d683f Mon Sep 17 00:00:00 2001 From: Matt Topol Date: Fri, 25 Oct 2024 08:48:22 -0400 Subject: [PATCH] Updates from feedback --- cpp/src/arrow/c/abi.h | 18 ++++++++----- docs/source/format/CDeviceDataInterface.rst | 30 ++++++++++++++++----- 2 files changed, 34 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/c/abi.h b/cpp/src/arrow/c/abi.h index 5a75ecca691d2..0d9f31af5eda4 100644 --- a/cpp/src/arrow/c/abi.h +++ b/cpp/src/arrow/c/abi.h @@ -231,8 +231,8 @@ struct ArrowDeviceArrayStream { #ifndef ARROW_C_ASYNC_STREAM_INTERFACE # define ARROW_C_ASYNC_STREAM_INTERFACE -// ArrowAsyncTask represents available data from a producer that was passed to -// an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler. +// EXPERIMENTAL: ArrowAsyncTask represents available data from a producer that was passed +// to an invocation of `on_next_task` on the ArrowAsyncDeviceStreamHandler. // // The reason for this Task approach instead of the Async interface returning // the Array directly is to allow for more complex thread handling and reducing @@ -275,8 +275,8 @@ struct ArrowAsyncTask { void* private_data; }; -// ArrowAsyncProducer represents a 1-to-1 relationship between an async producer -// and consumer. This object allows the consumer to perform backpressure and flow +// EXPERIMENTAL: ArrowAsyncProducer represents a 1-to-1 relationship between an async +// producer and consumer. This object allows the consumer to perform backpressure and flow // control on the asynchronous stream processing. This object must be owned by the // producer who creates it, and thus is responsible for cleaning it up. struct ArrowAsyncProducer { @@ -323,7 +323,7 @@ struct ArrowAsyncProducer { void* private_data; }; -// Similar to ArrowDeviceArrayStream, except designed for an asynchronous +// EXPERIMENTAL: Similar to ArrowDeviceArrayStream, except designed for an asynchronous // style of interaction. While ArrowDeviceArrayStream provides producer // defined callbacks, this is intended to be created by the consumer instead. // The consumer passes this handler to the producer, which in turn uses the @@ -356,8 +356,7 @@ struct ArrowAsyncDeviceStreamHandler { // A producer that receives a non-zero return here should stop producing and eventually // call release instead. int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, - struct ArrowAsyncProducer* producer, struct ArrowSchema* stream_schema, - const char* addl_metadata); + struct ArrowSchema* stream_schema, const char* addl_metadata); // Handler for receiving data. This is called when data is available providing an // ArrowAsyncTask struct to signify it. The producer indicates the end of the stream @@ -423,6 +422,11 @@ struct ArrowAsyncDeviceStreamHandler { // The release callback must not call any methods of an ArrowAsyncProducer object. void (*release)(struct ArrowAsyncDeviceStreamHandler* self); + // MUST be populated by the producer BEFORE calling any callbacks other than release. + // This provides the connection between a handler and its producer, and must exist until + // the release callback is called. + struct ArrowAsyncProducer* producer; + // Opaque handler-specific data void* private_data; }; diff --git a/docs/source/format/CDeviceDataInterface.rst b/docs/source/format/CDeviceDataInterface.rst index 7b67d86dd9cfe..17ac5cf308ab7 100644 --- a/docs/source/format/CDeviceDataInterface.rst +++ b/docs/source/format/CDeviceDataInterface.rst @@ -655,6 +655,12 @@ serialized. Async Device Stream Interface ============================= +.. warning:: + + Experimental: The Aync C Device Stream interface is experimental in its current + form. Based on feedback and usage the protocol definition may change until + it is fully standardized. + The :ref:`C stream interface ` provides a synchronous API centered around the consumer calling the producer functions to retrieve the next record batch. For concurrent communication between producer and consumer, @@ -699,7 +705,6 @@ The C device async stream interface consists of three ``struct`` definitions: struct ArrowAsyncDeviceStreamHandler { // consumer-specific handlers int (*on_schema)(struct ArrowAsyncDeviceStreamHandler* self, - struct ArrowAsyncProducer* producer, struct ArrowSchema* stream_schema, const char* addl_metadata); int (*on_next_task)(struct ArrowAsyncDeviceStreamHandler* self, struct ArrowAsyncTask* task, const char* metadata); @@ -709,6 +714,9 @@ The C device async stream interface consists of three ``struct`` definitions: // release callback void (*release)(struct ArrowAsyncDeviceStreamHandler* self); + // must be populated before calling any callbacks + struct ArrowAsyncProducer* producer; + // opaque handler-specific data void* private_data; }; @@ -727,7 +735,7 @@ The ArrowAsyncDeviceStreamHandler structure The structure has the following fields: -.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowAsyncProducer*, struct ArrowSchema*, const char*) +.. c:member:: int (*ArrowAsyncDeviceStreamHandler.on_schema)(struct ArrowAsyncDeviceStreamHandler*, struct ArrowSchema*, const char*) *Mandatory.* Handler for receiving the schema of the stream. All incoming records should match the provided schema. If successful, the function should return 0, otherwise @@ -740,8 +748,8 @@ The structure has the following fields: the additional metadata beyond the lifetime of this call *MUST* copy the value themselves. Unless the ``on_error`` handler is called, this will always get called exactly once and will be - the first method called on this object. As such the producer *MUST* provide an ``ArrowAsyncProducer`` - object when calling this function to allow the consumer to apply back-pressure and control the flow of data. + the first method called on this object. As such the producer *MUST* populate the ``ArrowAsyncProducer`` + member before calling this function to allow the consumer to apply back-pressure and control the flow of data. The producer maintains ownership of the ``ArrowAsyncProducer`` and must clean it up *after* calling the release callback on the ``ArrowAsyncDeviceStreamHandler``. @@ -799,6 +807,14 @@ The structure has the following fields: :c:member:`ArrowAsyncProducer.request`. This must not call any methods of an ``ArrowAsyncProducer`` object. +.. c:member:: struct ArrowAsyncProducer ArrowAsyncDeviceStreamHandler.producer + + *Mandatory.* The producer object that the consumer will use to request additional data or cancel. + + This object *MUST* be populated before calling the :c:member:`ArrowAsyncDeviceStreamHandler.on_schema` + callback. The producer maintains ownership of this object and must clean it up *after* calling + the release callback on the ``ArrowAsyncDeviceStreamHandler``. + .. c:member:: void* ArrowAsyncDeviceStreamHandler.private_data *Optional.* An opaque pointer to consumer-provided private data. @@ -938,9 +954,9 @@ usage as in :ref:`C data interface `. ArrowAsyncProducer Lifetime ''''''''''''''''''''''''''' -The lifetime of the ``ArrowAsyncProducer`` passed to ``on_schema`` is owned by the producer -itself and should be managed by it. It *MUST* remain valid at least until just before -calling ``release`` on the stream handler object. +The lifetime of the ``ArrowAsyncProducer`` is owned by the producer itself and should +be managed by it. It *MUST* be populated before calling any methods other than ``release`` +and *MUST* remain valid at least until just before calling ``release`` on the stream handler object. Thread safety '''''''''''''