Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(c/driver/postgresql): Enable basic connect/query workflow for Redshift #2219

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 35 additions & 10 deletions c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "connection.h"

#include <array>
#include <cassert>
#include <cinttypes>
#include <cmath>
Expand Down Expand Up @@ -478,19 +479,35 @@ AdbcStatusCode PostgresConnection::GetInfo(struct AdbcConnection* connection,
for (size_t i = 0; i < info_codes_length; i++) {
switch (info_codes[i]) {
case ADBC_INFO_VENDOR_NAME:
infos.push_back({info_codes[i], "PostgreSQL"});
if (RedshiftVersion()[0] > 0) {
infos.emplace_back(info_codes[i], "Redshift");
} else {
infos.push_back({info_codes[i], "PostgreSQL"});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should have been clearer but all of the push_back's here I think are better with emplace_back

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind either way, but most advice I read tends to suggest only using emplace back in specific cases (e.g., https://abseil.io/tips/112 ).

Copy link
Contributor

@WillAyd WillAyd Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

C++...what a language.

Well in this case either is likely fine. I am under the impression that emplace_back would avoid any calls to the move constructor of the list element, along with any move constructors that need to be called when the vector is resized. In this particular case it probably doesn't make a difference; maybe something to just look at when performance is more critical

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find any existing emplace_back() usage so I changed these back. We can always reevaluate!

}

break;
case ADBC_INFO_VENDOR_VERSION: {
const char* stmt = "SHOW server_version_num";
auto result_helper = PqResultHelper{conn_, std::string(stmt)};
RAISE_STATUS(error, result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for '%s'", stmt);
return ADBC_STATUS_INTERNAL;
if (RedshiftVersion()[0] > 0) {
std::array<int, 3> version = RedshiftVersion();
std::string version_string = std::to_string(version[0]) + "." +
std::to_string(version[1]) + "." +
std::to_string(version[2]);
infos.emplace_back(info_codes[i], std::move(version_string));

} else {
// Gives a version in the form 140000 instead of 14.0.0
const char* stmt = "SHOW server_version_num";
auto result_helper = PqResultHelper{conn_, std::string(stmt)};
RAISE_STATUS(error, result_helper.Execute());
auto it = result_helper.begin();
if (it == result_helper.end()) {
SetError(error, "[libpq] PostgreSQL returned no rows for '%s'", stmt);
return ADBC_STATUS_INTERNAL;
}
const char* server_version_num = (*it)[0].data;
infos.push_back({info_codes[i], server_version_num});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's another spot

}
const char* server_version_num = (*it)[0].data;
infos.push_back({info_codes[i], server_version_num});

break;
}
case ADBC_INFO_DRIVER_NAME:
Expand Down Expand Up @@ -1136,4 +1153,12 @@ AdbcStatusCode PostgresConnection::SetOptionInt(const char* key, int64_t value,
return ADBC_STATUS_NOT_IMPLEMENTED;
}

std::array<int, 3> PostgresConnection::PostgreSQLVersion() {
return database_->PostgreSQLVersion();
}

std::array<int, 3> PostgresConnection::RedshiftVersion() {
return database_->RedshiftVersion();
}

} // namespace adbcpq
3 changes: 3 additions & 0 deletions c/driver/postgresql/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <array>
#include <cstdint>
#include <memory>

Expand Down Expand Up @@ -73,6 +74,8 @@ class PostgresConnection {
return type_resolver_;
}
bool autocommit() const { return autocommit_; }
std::array<int, 3> PostgreSQLVersion();
std::array<int, 3> RedshiftVersion();

private:
std::shared_ptr<PostgresDatabase> database_;
Expand Down
146 changes: 126 additions & 20 deletions c/driver/postgresql/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "database.h"

#include <array>
#include <charconv>
#include <cinttypes>
#include <cstring>
#include <memory>
Expand All @@ -28,6 +30,7 @@
#include <nanoarrow/nanoarrow.h>

#include "driver/common/utils.h"
#include "result_helper.h"

namespace adbcpq {

Expand All @@ -54,8 +57,19 @@ AdbcStatusCode PostgresDatabase::GetOptionDouble(const char* option, double* val
}

AdbcStatusCode PostgresDatabase::Init(struct AdbcError* error) {
// Connect to validate the parameters.
return RebuildTypeResolver(error);
// Connect to initialize the version information and build the type table
PGconn* conn = nullptr;
RAISE_ADBC(Connect(&conn, error));

AdbcStatusCode code = InitVersions(conn, error);
if (code != ADBC_STATUS_OK) {
RAISE_ADBC(Disconnect(&conn, nullptr));
return code;
}

code = RebuildTypeResolver(conn, error);
RAISE_ADBC(Disconnect(&conn, nullptr));
return code;
}

AdbcStatusCode PostgresDatabase::Release(struct AdbcError* error) {
Expand Down Expand Up @@ -123,19 +137,88 @@ AdbcStatusCode PostgresDatabase::Disconnect(PGconn** conn, struct AdbcError* err
return ADBC_STATUS_OK;
}

namespace {

// Parse an individual version in the form of "xxx.xxx.xxx".
// If the version components aren't numeric, they will be zero.
std::array<int, 3> ParseVersion(std::string_view version) {
std::array<int, 3> out{};
size_t component = 0;
size_t component_begin = 0;
size_t component_end = 0;

// While there are remaining version components and we haven't reached the end of the
// string
while (component_begin < version.size() && component < out.size()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a good use case for str::find here?

// Find the next character that marks a version component separation or the end of the
// string
while (component_end < version.size() && version[component_end] != '.' &&
version[component_end] != '-') {
component_end++;
}

// Try to parse the component as an integer (assigning zero if this fails)
int value = 0;
std::from_chars(version.data() + component_begin, version.data() + component_end,
value);
out[component] = value;

// Move on to the next component
component_begin = component_end + 1;
component_end = component_begin;
component++;
}

return out;
}

// Parse the PostgreSQL version() string that looks like:
// PostgreSQL 8.0.2 on i686-pc-linux-gnu, compiled by GCC gcc (GCC) 3.4.2 20041017 (Red
// Hat 3.4.2-6.fc3), Redshift 1.0.77467
std::array<int, 3> ParsePrefixedVersion(std::string_view version_info,
std::string_view prefix) {
size_t pos = version_info.find(prefix);
if (pos == version_info.npos) {
return {0, 0, 0};
}

// Skip the prefix and any leading whitespace
pos += prefix.size();
while (pos < version_info.size() && version_info[pos] == ' ') {
++pos;
}

return ParseVersion(version_info.substr(pos));
}

} // namespace

AdbcStatusCode PostgresDatabase::InitVersions(PGconn* conn, struct AdbcError* error) {
PqResultHelper helper(conn, "SELECT version();");
RAISE_STATUS(error, helper.Execute());
if (helper.NumRows() != 1 || helper.NumColumns() != 1) {
SetError(error, "Expected 1 row and 1 column for SELECT version(); but got %d/%d",
helper.NumRows(), helper.NumColumns());
return ADBC_STATUS_INTERNAL;
}

std::string_view version_info = helper.Row(0)[0].value();
postgres_server_version_ = ParsePrefixedVersion(version_info, "PostgreSQL");
redshift_server_version_ = ParsePrefixedVersion(version_info, "Redshift");

return ADBC_STATUS_OK;
}

// Helpers for building the type resolver from queries
static inline int32_t InsertPgAttributeResult(
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);

static inline int32_t InsertPgTypeResult(
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver);

AdbcStatusCode PostgresDatabase::RebuildTypeResolver(struct AdbcError* error) {
PGconn* conn = nullptr;
AdbcStatusCode final_status = Connect(&conn, error);
if (final_status != ADBC_STATUS_OK) {
return final_status;
}
AdbcStatusCode PostgresDatabase::RebuildTypeResolver(PGconn* conn,
struct AdbcError* error) {
AdbcStatusCode final_status = ADBC_STATUS_OK;

// We need a few queries to build the resolver. The current strategy might
// fail for some recursive definitions (e.g., arrays of records of arrays).
Expand All @@ -162,8 +245,8 @@ SELECT
typname,
typreceive,
typbasetype,
typarray,
typrelid
typrelid,
typarray
FROM
pg_catalog.pg_type
WHERE
Expand All @@ -172,6 +255,28 @@ ORDER BY
oid
)";

const std::string kTypeQueryNoArrays = R"(
SELECT
oid,
typname,
typreceive,
typbasetype,
typrelid
FROM
pg_catalog.pg_type
WHERE
(typreceive != 0 OR typname = 'aclitem') AND typtype != 'r'
ORDER BY
oid
)";

std::string type_query;
if (redshift_server_version_[0] == 0) {
type_query = kTypeQuery;
} else {
type_query = kTypeQueryNoArrays;
}

// Create a new type resolver (this instance's type_resolver_ member
// will be updated at the end if this succeeds).
auto resolver = std::make_shared<PostgresTypeResolver>();
Expand All @@ -192,7 +297,7 @@ ORDER BY
// Attempt filling the resolver a few times to handle recursive definitions.
int32_t max_attempts = 3;
for (int32_t i = 0; i < max_attempts; i++) {
result = PQexec(conn, kTypeQuery.c_str());
result = PQexec(conn, type_query.c_str());
ExecStatusType pq_status = PQresultStatus(result);
if (pq_status == PGRES_TUPLES_OK) {
InsertPgTypeResult(result, resolver);
Expand All @@ -208,12 +313,6 @@ ORDER BY
}
}

// Disconnect since PostgreSQL connections can be heavy.
{
AdbcStatusCode status = Disconnect(&conn, error);
if (status != ADBC_STATUS_OK) final_status = status;
}

if (final_status == ADBC_STATUS_OK) {
type_resolver_ = std::move(resolver);
}
Expand Down Expand Up @@ -256,6 +355,7 @@ static inline int32_t InsertPgAttributeResult(
static inline int32_t InsertPgTypeResult(
PGresult* result, const std::shared_ptr<PostgresTypeResolver>& resolver) {
int num_rows = PQntuples(result);
int num_cols = PQnfields(result);
PostgresTypeResolver::Item item;
int32_t n_added = 0;

Expand All @@ -266,10 +366,16 @@ static inline int32_t InsertPgTypeResult(
const char* typreceive = PQgetvalue(result, row, 2);
const uint32_t typbasetype = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 3), /*str_end=*/nullptr, /*base=*/10));
const uint32_t typarray = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 4), /*str_end=*/nullptr, /*base=*/10));
const uint32_t typrelid = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 5), /*str_end=*/nullptr, /*base=*/10));
std::strtol(PQgetvalue(result, row, 4), /*str_end=*/nullptr, /*base=*/10));

uint32_t typarray;
if (num_cols == 6) {
typarray = static_cast<uint32_t>(
std::strtol(PQgetvalue(result, row, 5), /*str_end=*/nullptr, /*base=*/10));
} else {
typarray = 0;
}

// Special case the aclitem because it shows up in a bunch of internal tables
if (strcmp(typname, "aclitem") == 0) {
Expand Down
8 changes: 7 additions & 1 deletion c/driver/postgresql/database.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include <array>
#include <cstdint>
#include <memory>
#include <string>
Expand Down Expand Up @@ -58,12 +59,17 @@ class PostgresDatabase {
return type_resolver_;
}

AdbcStatusCode RebuildTypeResolver(struct AdbcError* error);
AdbcStatusCode InitVersions(PGconn* conn, struct AdbcError* error);
AdbcStatusCode RebuildTypeResolver(PGconn* conn, struct AdbcError* error);
std::array<int, 3> PostgreSQLVersion() { return postgres_server_version_; }
std::array<int, 3> RedshiftVersion() { return redshift_server_version_; }

private:
int32_t open_connections_;
std::string uri_;
std::shared_ptr<PostgresTypeResolver> type_resolver_;
std::array<int, 3> postgres_server_version_{};
std::array<int, 3> redshift_server_version_{};
};
} // namespace adbcpq

Expand Down
12 changes: 10 additions & 2 deletions c/driver/postgresql/statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ AdbcStatusCode PostgresStatement::ExecuteQuery(struct ArrowArrayStream* stream,

// If we have been requested to avoid COPY or there is no output requested,
// execute using the PqResultArrayReader.
if (!stream || !use_copy_) {
if (!stream || !UseCopy()) {
PqResultArrayReader reader(connection_->conn(), type_resolver_, query_);
RAISE_STATUS(error, reader.ToArrayStream(rows_affected, stream));
return ADBC_STATUS_OK;
Expand Down Expand Up @@ -666,7 +666,7 @@ AdbcStatusCode PostgresStatement::GetOption(const char* key, char* value, size_t
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_BATCH_SIZE_HINT_BYTES) == 0) {
result = std::to_string(reader_.batch_size_hint_bytes_);
} else if (std::strcmp(key, ADBC_POSTGRESQL_OPTION_USE_COPY) == 0) {
if (use_copy_) {
if (UseCopy()) {
result = "true";
} else {
result = "false";
Expand Down Expand Up @@ -838,4 +838,12 @@ void PostgresStatement::ClearResult() {
reader_.Release();
}

int PostgresStatement::UseCopy() {
if (use_copy_ == -1) {
return connection_->RedshiftVersion()[0] == 0;
} else {
return use_copy_;
}
}

} // namespace adbcpq
6 changes: 4 additions & 2 deletions c/driver/postgresql/statement.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class PostgresStatement {
: connection_(nullptr),
query_(),
prepared_(false),
use_copy_(true),
use_copy_(-1),
reader_(nullptr) {
std::memset(&bind_, 0, sizeof(bind_));
}
Expand Down Expand Up @@ -161,7 +161,7 @@ class PostgresStatement {
};

// Options
bool use_copy_;
int use_copy_;

struct {
std::string db_schema;
Expand All @@ -171,5 +171,7 @@ class PostgresStatement {
} ingest_;

TupleReader reader_;

int UseCopy();
};
} // namespace adbcpq
Loading