Skip to content

Commit

Permalink
feat(python): expose ADBC 1.1.0 features (#937)
Browse files Browse the repository at this point in the history
  • Loading branch information
lidavidm authored Jul 27, 2023
1 parent 1205cbb commit fe36463
Show file tree
Hide file tree
Showing 7 changed files with 626 additions and 42 deletions.
26 changes: 15 additions & 11 deletions c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -626,15 +626,19 @@ int TupleReader::GetNext(struct ArrowArray* out) {

// Check the server-side response
result_ = PQgetResult(conn_);
const int pq_status = PQresultStatus(result_);
const ExecStatusType pq_status = PQresultStatus(result_);
if (pq_status != PGRES_COMMAND_OK) {
StringBuilderAppend(&error_builder_, "[libpq] Query failed [%d]: %s", pq_status,
PQresultErrorMessage(result_));
const char* sqlstate = PQresultErrorField(result_, PG_DIAG_SQLSTATE);
StringBuilderAppend(&error_builder_, "[libpq] Query failed [%s]: %s",
PQresStatus(pq_status), PQresultErrorMessage(result_));

if (tmp.release != nullptr) {
tmp.release(&tmp);
}

if (sqlstate != nullptr && std::strcmp(sqlstate, "57014") == 0) {
return ECANCELED;
}
return EIO;
}

Expand Down Expand Up @@ -1038,7 +1042,7 @@ AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
result = std::to_string(reader_.batch_size_hint_bytes_);
} else {
SetError(error, "[libq] Unknown statement option '%s'", key);
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
}

Expand All @@ -1052,13 +1056,13 @@ AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t
AdbcStatusCode PostgresStatement::GetOptionBytes(const char* key, uint8_t* value,
size_t* length,
struct AdbcError* error) {
SetError(error, "[libq] Unknown statement option '%s'", key);
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
}

AdbcStatusCode PostgresStatement::GetOptionDouble(const char* key, double* value,
struct AdbcError* error) {
SetError(error, "[libq] Unknown statement option '%s'", key);
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
}

Expand All @@ -1069,7 +1073,7 @@ AdbcStatusCode PostgresStatement::GetOptionInt(const char* key, int64_t* value,
*value = reader_.batch_size_hint_bytes_;
return ADBC_STATUS_OK;
}
SetError(error, "[libq] Unknown statement option '%s'", key);
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_FOUND;
}

Expand Down Expand Up @@ -1133,21 +1137,21 @@ AdbcStatusCode PostgresStatement::SetOption(const char* key, const char* value,

this->reader_.batch_size_hint_bytes_ = int_value;
} else {
SetError(error, "[libq] Unknown statement option '%s'", key);
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}
return ADBC_STATUS_OK;
}

AdbcStatusCode PostgresStatement::SetOptionBytes(const char* key, const uint8_t* value,
size_t length, struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
SetError(error, "%s%s", "[libpq] Unknown statement option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode PostgresStatement::SetOptionDouble(const char* key, double value,
struct AdbcError* error) {
SetError(error, "%s%s", "[libpq] Unknown option ", key);
SetError(error, "%s%s", "[libpq] Unknown statement option ", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

Expand All @@ -1162,7 +1166,7 @@ AdbcStatusCode PostgresStatement::SetOptionInt(const char* key, int64_t value,
this->reader_.batch_size_hint_bytes_ = value;
return ADBC_STATUS_OK;
}
SetError(error, "%s%s", "[libpq] Unknown option ", key);
SetError(error, "[libpq] Unknown statement option '%s'", key);
return ADBC_STATUS_NOT_IMPLEMENTED;
}

Expand Down
2 changes: 2 additions & 0 deletions docs/source/python/api/adbc_driver_manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ Constants & Enums

.. autoclass:: adbc_driver_manager.AdbcStatusCode
:members:
:undoc-members:

.. autoclass:: adbc_driver_manager.GetObjectsDepth
:members:
:undoc-members:

.. autoclass:: adbc_driver_manager.ConnectionOptions
:members:
Expand Down
10 changes: 10 additions & 0 deletions python/adbc_driver_manager/adbc_driver_manager/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class DatabaseOptions(enum.Enum):

#: Set the password to use for username-password authentication.
PASSWORD = "password"
#: The URI to connect to.
URI = "uri"
#: Set the username to use for username-password authentication.
USERNAME = "username"

Expand All @@ -100,6 +102,10 @@ class ConnectionOptions(enum.Enum):
Not all drivers support all options.
"""

#: Get/set the current catalog.
CURRENT_CATALOG = "adbc.connection.catalog"
#: Get/set the current schema.
CURRENT_DB_SCHEMA = "adbc.connection.db_schema"
#: Set the transaction isolation level.
ISOLATION_LEVEL = "adbc.connection.transaction.isolation_level"

Expand All @@ -110,7 +116,11 @@ class StatementOptions(enum.Enum):
Not all drivers support all options.
"""

#: Enable incremental execution on ExecutePartitions.
INCREMENTAL = "adbc.statement.exec.incremental"
#: For bulk ingestion, whether to create or append to the table.
INGEST_MODE = INGEST_OPTION_MODE
#: For bulk ingestion, the table to ingest into.
INGEST_TARGET_TABLE = INGEST_OPTION_TARGET_TABLE
#: Get progress of a query.
PROGRESS = "adbc.statement.exec.progress"
23 changes: 20 additions & 3 deletions python/adbc_driver_manager/adbc_driver_manager/_lib.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,22 @@ import typing
INGEST_OPTION_MODE: str
INGEST_OPTION_MODE_APPEND: str
INGEST_OPTION_MODE_CREATE: str
INGEST_OPTION_MODE_CREATE_APPEND: str
INGEST_OPTION_MODE_REPLACE: str
INGEST_OPTION_TARGET_TABLE: str

class AdbcConnection(_AdbcHandle):
def __init__(self, database: "AdbcDatabase", **kwargs: str) -> None: ...
def cancel(self) -> None: ...
def close(self) -> None: ...
def commit(self) -> None: ...
def get_info(
self, info_codes: Optional[List[Union[int, "AdbcInfoCode"]]] = None
) -> "ArrowArrayStreamHandle": ...
def get_option(self, key: str) -> str: ...
def get_option_bytes(self, key: str) -> bytes: ...
def get_option_float(self, key: str) -> float: ...
def get_option_int(self, key: str) -> int: ...
def get_objects(
self,
depth: "GetObjectsDepth",
Expand All @@ -54,12 +61,16 @@ class AdbcConnection(_AdbcHandle):
def read_partition(self, partition: bytes) -> "ArrowArrayStreamHandle": ...
def rollback(self) -> None: ...
def set_autocommit(self, enabled: bool) -> None: ...
def set_options(self, **kwargs: str) -> None: ...
def set_options(self, **kwargs: Union[bytes, float, int, str]) -> None: ...

class AdbcDatabase(_AdbcHandle):
def __init__(self, **kwargs: str) -> None: ...
def close(self) -> None: ...
def set_options(self, **kwargs: str) -> None: ...
def get_option(self, key: str) -> str: ...
def get_option_bytes(self, key: str) -> bytes: ...
def get_option_float(self, key: str) -> float: ...
def get_option_int(self, key: str) -> int: ...
def set_options(self, **kwargs: Union[bytes, float, int, str]) -> None: ...

class AdbcInfoCode(enum.IntEnum):
DRIVER_ARROW_VERSION = ...
Expand All @@ -73,13 +84,19 @@ class AdbcStatement(_AdbcHandle):
def __init__(self, *args, **kwargs) -> None: ...
def bind(self, *args, **kwargs) -> Any: ...
def bind_stream(self, *args, **kwargs) -> Any: ...
def cancel(self) -> None: ...
def close(self) -> None: ...
def execute_partitions(self, *args, **kwargs) -> Any: ...
def execute_query(self, *args, **kwargs) -> Any: ...
def execute_schema(self) -> "ArrowSchemaHandle": ...
def execute_update(self, *args, **kwargs) -> Any: ...
def get_option(self, key: str) -> str: ...
def get_option_bytes(self, key: str) -> bytes: ...
def get_option_float(self, key: str) -> float: ...
def get_option_int(self, key: str) -> int: ...
def get_parameter_schema(self, *args, **kwargs) -> Any: ...
def prepare(self, *args, **kwargs) -> Any: ...
def set_options(self, *args, **kwargs) -> Any: ...
def set_options(self, **kwargs: Union[bytes, float, int, str]) -> None: ...
def set_sql_query(self, *args, **kwargs) -> Any: ...
def set_substrait_plan(self, *args, **kwargs) -> Any: ...
def __reduce__(self) -> Any: ...
Expand Down
Loading

0 comments on commit fe36463

Please sign in to comment.