Skip to content

Commit

Permalink
Merge branch 'apache:main' into dev/snowflakedriver
Browse files Browse the repository at this point in the history
  • Loading branch information
davidhcoe authored Oct 5, 2023
2 parents c3e153f + 79172ea commit 4595678
Show file tree
Hide file tree
Showing 60 changed files with 2,842 additions and 1,380 deletions.
1 change: 1 addition & 0 deletions .asf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ github:
collaborators:
- krlmlr
- nbenn
- ywc88
enabled_merge_buttons:
merge: false
rebase: false
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/native-windows.yml
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ jobs:
--file ci/conda_env_cpp.txt
- uses: actions/setup-go@v3
with:
go-version: 1.19.13
go-version: 1.20.8
check-latest: true
cache: true
cache-dependency-path: go/adbc/go.sum
Expand All @@ -238,15 +238,15 @@ jobs:
env:
CGO_ENABLED: "1"
run: |
$env:PATH="$($env:RUNNER_TOOL_CACHE)\go\1.19.13\x64\bin;" + $env:PATH
$env:PATH="$($env:RUNNER_TOOL_CACHE)\go\1.20.8\x64\bin;" + $env:PATH
.\ci\scripts\go_build.ps1 $pwd $pwd\build
# TODO(apache/arrow#358): enable these tests on Windows
# - name: Go Test
# shell: pwsh
# env:
# CGO_ENABLED: "1"
# run: |
# $env:PATH="$($env:RUNNER_TOOL_CACHE)\go\1.19.13\x64\bin;" + $env:PATH
# $env:PATH="$($env:RUNNER_TOOL_CACHE)\go\1.20.8\x64\bin;" + $env:PATH
# .\ci\scripts\go_test.ps1 $pwd $pwd\build

# ------------------------------------------------------------
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/packaging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,7 @@ jobs:
strategy:
fail-fast: false
matrix:
python_version: ["3.9", "3.10", "3.11"]
python_version: ["3.9", "3.10", "3.11", "3.12"]
env:
PYTHON_VERSION: "${{ matrix.python_version }}"
# Where to install vcpkg
Expand Down Expand Up @@ -727,9 +727,11 @@ jobs:
.\bootstrap-vcpkg.bat -disableMetrics
popd
# Windows needs newer Go than 1.19
# https://github.com/golang/go/issues/51007
- uses: actions/setup-go@v3
with:
go-version: 1.19.13
go-version: 1.20.8
check-latest: true
cache: true
cache-dependency-path: adbc/go/adbc/go.sum
Expand Down Expand Up @@ -763,6 +765,8 @@ jobs:
- name: Test wheel
shell: cmd
# PyArrow wheels not yet available
if: matrix.python_version != "3.12"
run: |
pushd adbc
where python.exe
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ ADBC versions the API standard and the implementing libraries separately.
The API standard (version 1.0.0) is considered stable, but enhancements may be made.

Libraries are under development.
For more details, see the [documentation](https://arrow.apache.org/adbc/main/driver/status.html).
For more details, see the [documentation](https://arrow.apache.org/adbc/main/driver/status.html), or read the [changelog](CHANGELOG.md).

## Installation

Expand Down
18 changes: 13 additions & 5 deletions c/driver/flightsql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,16 @@ add_go_lib("${REPOSITORY_ROOT}/go/adbc/pkg/flightsql/"
PKG_CONFIG_NAME
adbc-driver-flightsql
SHARED_LINK_FLAGS
${LDFLAGS})
${LDFLAGS}
OUTPUTS
ADBC_LIBRARIES)

include_directories(SYSTEM ${REPOSITORY_ROOT})
include_directories(SYSTEM ${REPOSITORY_ROOT}/c/)
include_directories(SYSTEM ${REPOSITORY_ROOT}/c/driver)
include_directories(SYSTEM ${REPOSITORY_ROOT}/c/vendor)
foreach(LIB_TARGET ${ADBC_LIBRARIES})
target_include_directories(${LIB_TARGET} SYSTEM
INTERFACE ${REPOSITORY_ROOT} ${REPOSITORY_ROOT}/c/
${REPOSITORY_ROOT}/c/vendor
${REPOSITORY_ROOT}/c/driver)
endforeach()

if(ADBC_TEST_LINKAGE STREQUAL "shared")
set(TEST_LINK_LIBS adbc_driver_flightsql_shared)
Expand All @@ -57,5 +61,9 @@ if(ADBC_BUILD_TESTS)
nanoarrow
${TEST_LINK_LIBS})
target_compile_features(adbc-driver-flightsql-test PRIVATE cxx_std_17)
target_include_directories(adbc-driver-flightsql-test SYSTEM
PRIVATE ${REPOSITORY_ROOT} ${REPOSITORY_ROOT}/c/
${REPOSITORY_ROOT}/c/vendor
${REPOSITORY_ROOT}/c/driver)
adbc_configure_target(adbc-driver-flightsql-test)
endif()
2 changes: 1 addition & 1 deletion c/driver/postgresql/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,7 @@ AdbcStatusCode PostgresConnection::Commit(struct AdbcError* error) {
return ADBC_STATUS_INVALID_STATE;
}

PGresult* result = PQexec(conn_, "COMMIT");
PGresult* result = PQexec(conn_, "COMMIT; BEGIN TRANSACTION");
if (PQresultStatus(result) != PGRES_COMMAND_OK) {
AdbcStatusCode code = SetError(error, result, "%s%s",
"[libpq] Failed to commit: ", PQerrorMessage(conn_));
Expand Down
209 changes: 209 additions & 0 deletions c/driver/postgresql/postgres_copy_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,54 @@ ArrowErrorCode ReadChecked(ArrowBufferView* data, T* out, ArrowError* error) {
return NANOARROW_OK;
}

// Write a value to a buffer without checking the buffer size. Advances
// the cursor of buffer and reduces it by sizeof(T)
template <typename T>
inline void WriteUnsafe(ArrowBuffer* buffer, T in) {
const T value = SwapNetworkToHost(in);
memcpy(buffer->data, &value, sizeof(T));
buffer->data += sizeof(T);
buffer->size_bytes += sizeof(T);
}

template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int8_t in) {
buffer->data[0] = in;
buffer->data += sizeof(int8_t);
buffer->size_bytes += sizeof(int8_t);
}

template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int16_t in) {
WriteUnsafe<uint16_t>(buffer, in);
}

template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int32_t in) {
WriteUnsafe<uint32_t>(buffer, in);
}

template <>
inline void WriteUnsafe(ArrowBuffer* buffer, int64_t in) {
WriteUnsafe<uint64_t>(buffer, in);
}

template <typename T>
ArrowErrorCode WriteChecked(ArrowBuffer* buffer, T in, ArrowError* error) {
// TODO: beware of overflow here
if (buffer->capacity_bytes < buffer->size_bytes + static_cast<int64_t>(sizeof(T))) {
ArrowErrorSet(error,
"Insufficient buffer capacity (expected " PRId64
" bytes but found " PRId64 ")",
buffer->size_bytes + sizeof(T), buffer->capacity_bytes);

return EINVAL;
}

WriteUnsafe<T>(buffer, in);
return NANOARROW_OK;
}

class PostgresCopyFieldReader {
public:
PostgresCopyFieldReader() : validity_(nullptr), offsets_(nullptr), data_(nullptr) {
Expand Down Expand Up @@ -1058,4 +1106,165 @@ class PostgresCopyStreamReader {
int64_t array_size_approx_bytes_;
};

class PostgresCopyFieldWriter {
public:
virtual ~PostgresCopyFieldWriter() {}

void Init(struct ArrowArrayView* array_view) { array_view_ = array_view; };

virtual ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) {
return ENOTSUP;
}

protected:
struct ArrowArrayView* array_view_;
std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
};

class PostgresCopyFieldTupleWriter : public PostgresCopyFieldWriter {
public:
void AppendChild(std::unique_ptr<PostgresCopyFieldWriter> child) {
int64_t child_i = static_cast<int64_t>(children_.size());
children_.push_back(std::move(child));
children_[child_i]->Init(array_view_->children[child_i]);
}

ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
if (index >= array_view_->length) {
return ENODATA;
}

const int16_t n_fields = children_.size();
NANOARROW_RETURN_NOT_OK(WriteChecked<int16_t>(buffer, n_fields, error));

for (int16_t i = 0; i < n_fields; i++) {
children_[i]->Write(buffer, index, error);
}

return NANOARROW_OK;
}

private:
std::vector<std::unique_ptr<PostgresCopyFieldWriter>> children_;
};

class PostgresCopyBooleanFieldWriter : public PostgresCopyFieldWriter {
public:
ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
const int32_t field_size_bytes = is_null ? -1 : 1;
NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
if (is_null) {
return ADBC_STATUS_OK;
}

const int8_t value =
static_cast<int8_t>(ArrowArrayViewGetIntUnsafe(array_view_, index));
NANOARROW_RETURN_NOT_OK(WriteChecked<int8_t>(buffer, value, error));

return ADBC_STATUS_OK;
}
};

template <typename T, T kOffset = 0>
class PostgresCopyNetworkEndianFieldWriter : public PostgresCopyFieldWriter {
public:
ArrowErrorCode Write(ArrowBuffer* buffer, int64_t index, ArrowError* error) override {
const int8_t is_null = ArrowArrayViewIsNull(array_view_, index);
const int32_t field_size_bytes = is_null ? -1 : sizeof(T);
NANOARROW_RETURN_NOT_OK(WriteChecked<int32_t>(buffer, field_size_bytes, error));
if (is_null) {
return ADBC_STATUS_OK;
}

const T value =
static_cast<T>(ArrowArrayViewGetIntUnsafe(array_view_, index)) - kOffset;
NANOARROW_RETURN_NOT_OK(WriteChecked<T>(buffer, value, error));

return ADBC_STATUS_OK;
}
};

static inline ArrowErrorCode MakeCopyFieldWriter(const enum ArrowType arrow_type,
PostgresCopyFieldWriter** out,
ArrowError* error) {
switch (arrow_type) {
case NANOARROW_TYPE_BOOL:
*out = new PostgresCopyBooleanFieldWriter();
return NANOARROW_OK;
case NANOARROW_TYPE_INT16:
*out = new PostgresCopyNetworkEndianFieldWriter<int16_t>();
return NANOARROW_OK;
case NANOARROW_TYPE_INT32:
*out = new PostgresCopyNetworkEndianFieldWriter<int32_t>();
return NANOARROW_OK;
case NANOARROW_TYPE_INT64:
*out = new PostgresCopyNetworkEndianFieldWriter<int64_t>();
return NANOARROW_OK;
default:
return EINVAL;
}
return NANOARROW_OK;
}

class PostgresCopyStreamWriter {
public:
ArrowErrorCode Init(struct ArrowSchema* schema, struct ArrowArray* array) {
schema_ = schema;
NANOARROW_RETURN_NOT_OK(
ArrowArrayViewInitFromSchema(&array_view_.value, schema, nullptr));
NANOARROW_RETURN_NOT_OK(ArrowArrayViewSetArray(&array_view_.value, array, nullptr));
root_writer_.Init(&array_view_.value);
return NANOARROW_OK;
}

ArrowErrorCode WriteHeader(ArrowBuffer* buffer, ArrowError* error) {
ArrowBufferAppend(buffer, kPgCopyBinarySignature, sizeof(kPgCopyBinarySignature));

const uint32_t flag_fields = 0;
ArrowBufferAppend(buffer, &flag_fields, sizeof(flag_fields));

const uint32_t extension_bytes = 0;
ArrowBufferAppend(buffer, &extension_bytes, sizeof(extension_bytes));

const int64_t header_bytes =
sizeof(kPgCopyBinarySignature) + sizeof(flag_fields) + sizeof(extension_bytes);
buffer->data += header_bytes;

return NANOARROW_OK;
}

ArrowErrorCode WriteRecord(ArrowBuffer* buffer, ArrowError* error) {
NANOARROW_RETURN_NOT_OK(root_writer_.Write(buffer, records_written_, error));
records_written_++;
return NANOARROW_OK;
}

ArrowErrorCode InitFieldWriters(ArrowError* error) {
if (schema_->release == nullptr) {
return EINVAL;
}

for (int64_t i = 0; i < schema_->n_children; i++) {
struct ArrowSchemaView schema_view;
if (ArrowSchemaViewInit(&schema_view, schema_->children[i], error) !=
NANOARROW_OK) {
return ADBC_STATUS_INTERNAL;
}
const ArrowType arrow_type = schema_view.type;
PostgresCopyFieldWriter* child_writer;
NANOARROW_RETURN_NOT_OK(MakeCopyFieldWriter(arrow_type, &child_writer, error));
root_writer_.AppendChild(std::unique_ptr<PostgresCopyFieldWriter>(child_writer));
}

return NANOARROW_OK;
}

private:
PostgresCopyFieldTupleWriter root_writer_;
struct ArrowSchema* schema_;
Handle<struct ArrowArrayView> array_view_;
int64_t records_written_ = 0;
};

} // namespace adbcpq
Loading

0 comments on commit 4595678

Please sign in to comment.