Skip to content

Commit

Permalink
Initial DataChunk API support (#233)
Browse files Browse the repository at this point in the history
* moving the appender into the data chunk API

* linter

* remove some functions for now

* implement review suggestions, i.e., more conservative with exposing functions

* add test for exposed function
  • Loading branch information
taniabogatsch authored Jun 15, 2024
1 parent 9eca001 commit 8eaf120
Show file tree
Hide file tree
Showing 8 changed files with 389 additions and 263 deletions.
145 changes: 53 additions & 92 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ type Appender struct {
duckdbAppender C.duckdb_appender
closed bool

chunks []C.duckdb_data_chunk
currSize C.idx_t
colTypes []C.duckdb_logical_type
colTypesPtr unsafe.Pointer

// The vector storage of each column in the data chunk.
vectors []vector
// The appender storage before flushing any data.
chunks []DataChunk
// The column types of the table to append to.
types []C.duckdb_logical_type
// A pointer to the allocated memory of the column types.
ptr unsafe.Pointer
// The number of appended rows.
rowCount int
}

// NewAppenderFromConn returns a new Appender from a DuckDB driver connection.
Expand Down Expand Up @@ -62,30 +63,25 @@ func NewAppenderFromConn(driverConn driver.Conn, schema, table string) (*Appende
schema: schema,
table: table,
duckdbAppender: duckdbAppender,
currSize: 0,
rowCount: 0,
}

columnCount := int(C.duckdb_appender_column_count(duckdbAppender))
a.colTypesPtr, a.colTypes = a.mallocTypeSlice(columnCount)

// Get the column types.
columnCount := int(C.duckdb_appender_column_count(duckdbAppender))
a.ptr, a.types = mallocTypeSlice(columnCount)
for i := 0; i < columnCount; i++ {
a.colTypes[i] = C.duckdb_appender_column_type(duckdbAppender, C.idx_t(i))
}

// Get the vector storage of each column.
a.vectors = make([]vector, columnCount)
var err error
for i := 0; i < columnCount; i++ {
if err = a.vectors[i].init(a.colTypes[i], i); err != nil {
break
a.types[i] = C.duckdb_appender_column_type(duckdbAppender, C.idx_t(i))

// Ensure that we only create an appender for supported column types.
duckdbType := C.duckdb_get_type_id(a.types[i])
name, found := unsupportedAppenderTypeMap[duckdbType]
if found {
err := columnError(unsupportedTypeError(name), i+1)
destroyTypeSlice(a.ptr, a.types)
C.duckdb_appender_destroy(&duckdbAppender)
return nil, getError(errAppenderCreation, err)
}
}
if err != nil {
a.destroyColumnTypes()
C.duckdb_appender_destroy(&duckdbAppender)
return nil, getError(errAppenderCreation, err)
}

return a, nil
}
Expand All @@ -94,11 +90,6 @@ func NewAppenderFromConn(driverConn driver.Conn, schema, table string) (*Appende
// Does not close the appender, even if it returns an error. Unless you have a good reason to call this,
// call Close when you are done with the appender.
func (a *Appender) Flush() error {
// Nothing to flush.
if len(a.chunks) == 0 && a.currSize == 0 {
return nil
}

if err := a.appendDataChunks(); err != nil {
return getError(errAppenderFlush, invalidatedAppenderError(err))
}
Expand All @@ -120,16 +111,14 @@ func (a *Appender) Close() error {
a.closed = true

// Append all remaining chunks.
var err error
if len(a.chunks) != 0 || a.currSize != 0 {
err = a.appendDataChunks()
}
err := a.appendDataChunks()

a.destroyColumnTypes()
// Destroy all appender data.
destroyTypeSlice(a.ptr, a.types)
state := C.duckdb_appender_destroy(&a.duckdbAppender)

if err != nil || state == C.DuckDBError {
// We destroyed the appender, so we cannot retrieve the duckdb error.
// We destroyed the appender, so we cannot retrieve the duckdb internal error.
return getError(errAppenderClose, invalidatedAppenderError(err))
}
return nil
Expand All @@ -148,92 +137,64 @@ func (a *Appender) AppendRow(args ...driver.Value) error {
return nil
}

func (a *Appender) destroyColumnTypes() {
for i := range a.colTypes {
C.duckdb_destroy_logical_type(&a.colTypes[i])
func (a *Appender) addDataChunk() error {
var chunk DataChunk
if err := chunk.initFromTypes(a.ptr, a.types); err != nil {
return err
}
C.free(a.colTypesPtr)
}

func (*Appender) mallocTypeSlice(count int) (unsafe.Pointer, []C.duckdb_logical_type) {
var dummy C.duckdb_logical_type
size := C.size_t(unsafe.Sizeof(dummy))

ctPtr := unsafe.Pointer(C.malloc(C.size_t(count) * size))
slice := (*[1 << 30]C.duckdb_logical_type)(ctPtr)[:count:count]

return ctPtr, slice
}

func (a *Appender) newDataChunk(colCount int) {
a.currSize = 0

// duckdb_create_data_chunk takes an array of duckdb_logical_type and a column count.
colTypesPtr := (*C.duckdb_logical_type)(a.colTypesPtr)
dataChunk := C.duckdb_create_data_chunk(colTypesPtr, C.idx_t(colCount))
C.duckdb_data_chunk_set_size(dataChunk, C.duckdb_vector_size())

for i := 0; i < colCount; i++ {
duckdbVector := C.duckdb_data_chunk_get_vector(dataChunk, C.idx_t(i))
a.vectors[i].duckdbVector = duckdbVector
a.vectors[i].getChildVectors(duckdbVector)
}

a.chunks = append(a.chunks, dataChunk)
a.chunks = append(a.chunks, chunk)
return nil
}

func (a *Appender) appendRowSlice(args []driver.Value) error {
// early-out, if the number of args does not match the column count
if len(args) != len(a.vectors) {
return columnCountError(len(args), len(a.vectors))
// Early-out, if the number of args does not match the column count.
if len(args) != len(a.types) {
return columnCountError(len(args), len(a.types))
}

// Create a new data chunk if the current chunk is full, or if this is the first row.
if a.currSize == C.duckdb_vector_size() || len(a.chunks) == 0 {
a.newDataChunk(len(args))
// Create a new data chunk if the current chunk is full.
if C.idx_t(a.rowCount) == C.duckdb_vector_size() || len(a.chunks) == 0 {
if err := a.addDataChunk(); err != nil {
return err
}
}

// Set all values.
for i, val := range args {
vec := a.vectors[i]

// Ensure that the types match before attempting to append anything.
v, err := vec.tryCast(val)
chunk := &a.chunks[len(a.chunks)-1]
err := chunk.SetValue(i, a.rowCount, val)
if err != nil {
// Use 1-based indexing for readability, as we're talking about columns.
return columnError(err, i+1)
return err
}

// Append the row to the data chunk.
vec.fn(&vec, a.currSize, v)
}

a.currSize++
a.rowCount++
return nil
}

func (a *Appender) appendDataChunks() error {
// Set the size of the current chunk to the current row count.
C.duckdb_data_chunk_set_size(a.chunks[len(a.chunks)-1], C.idx_t(a.currSize))

// Append all chunks to the appender and destroy them.
var state C.duckdb_state
var err error

for _, chunk := range a.chunks {
state = C.duckdb_append_data_chunk(a.duckdbAppender, chunk)
if err = chunk.setSize(); err != nil {
break
}
state = C.duckdb_append_data_chunk(a.duckdbAppender, chunk.data)
if state == C.DuckDBError {
err = duckdbError(C.duckdb_appender_error(a.duckdbAppender))
break
}
}
a.destroyDataChunks()

a.closeDataChunks()
a.rowCount = 0
return err
}

func (a *Appender) destroyDataChunks() {
func (a *Appender) closeDataChunks() {
for _, chunk := range a.chunks {
C.duckdb_destroy_data_chunk(&chunk)
chunk.close()
}
a.currSize = 0
a.chunks = a.chunks[:0]
}
93 changes: 93 additions & 0 deletions data_chunk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package duckdb

/*
#include <stdlib.h>
#include <duckdb.h>
*/
import "C"

import (
"unsafe"
)

// DataChunk storage of a DuckDB table.
type DataChunk struct {
// data holds the underlying duckdb data chunk.
data C.duckdb_data_chunk
// columns is a helper slice providing direct access to all columns.
columns []vector
}

// SetValue writes a single value to a column in a data chunk. Note that this requires casting the type for each invocation.
func (chunk *DataChunk) SetValue(colIdx int, rowIdx int, val any) error {
if colIdx >= len(chunk.columns) {
return getError(errAPI, columnCountError(colIdx, len(chunk.columns)))
}
column := &chunk.columns[colIdx]

// Ensure that the types match before attempting to set anything.
v, err := column.tryCast(val)
if err != nil {
return columnError(err, colIdx)
}

// Set the value.
column.setFn(column, C.idx_t(rowIdx), v)
return nil
}

func (chunk *DataChunk) initFromTypes(ptr unsafe.Pointer, types []C.duckdb_logical_type) error {
columnCount := len(types)

// Initialize the callback functions to read and write values.
chunk.columns = make([]vector, columnCount)
var err error
for i := 0; i < columnCount; i++ {
if err = chunk.columns[i].init(types[i], i); err != nil {
break
}
}
if err != nil {
return err
}

logicalTypesPtr := (*C.duckdb_logical_type)(ptr)
chunk.data = C.duckdb_create_data_chunk(logicalTypesPtr, C.idx_t(columnCount))
C.duckdb_data_chunk_set_size(chunk.data, C.duckdb_vector_size())

// Initialize the vectors and their child vectors.
for i := 0; i < columnCount; i++ {
duckdbVector := C.duckdb_data_chunk_get_vector(chunk.data, C.idx_t(i))
chunk.columns[i].duckdbVector = duckdbVector
chunk.columns[i].getChildVectors(duckdbVector)
}
return nil
}

func (chunk *DataChunk) close() {
C.duckdb_destroy_data_chunk(&chunk.data)
}

func (chunk *DataChunk) setSize() error {
if len(chunk.columns) == 0 {
C.duckdb_data_chunk_set_size(chunk.data, C.idx_t(0))
return nil
}

allEqual := true
maxSize := C.idx_t(chunk.columns[0].size)
for i := 0; i < len(chunk.columns); i++ {
if chunk.columns[i].size != maxSize {
allEqual = false
}
if chunk.columns[i].size > maxSize {
maxSize = chunk.columns[i].size
}
}

if !allEqual {
return errDriver
}
C.duckdb_data_chunk_set_size(chunk.data, maxSize)
return nil
}
3 changes: 2 additions & 1 deletion errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ const (
)

var (
errDriver = errors.New("internal driver error, please file a bug report")
errDriver = errors.New("internal driver error: please file a bug report")
errAPI = errors.New("API error")

errParseDSN = errors.New("could not parse DSN for database")
errOpen = errors.New("could not open database")
Expand Down
6 changes: 6 additions & 0 deletions errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,9 @@ func TestErrAppendNestedList(t *testing.T) {

cleanupAppender(t, c, con, a)
}

func TestErrAPISetValue(t *testing.T) {
var chunk DataChunk
err := chunk.SetValue(1, 42, "hello")
testError(t, err, errAPI.Error(), columnCountErrMsg)
}
29 changes: 29 additions & 0 deletions helper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package duckdb

/*
#include <stdlib.h>
#include <duckdb.h>
*/
import "C"

import "unsafe"

// secondsPerDay to calculate the days since 1970-01-01.
const secondsPerDay = 24 * 60 * 60

func mallocTypeSlice(count int) (unsafe.Pointer, []C.duckdb_logical_type) {
var dummy C.duckdb_logical_type
size := C.size_t(unsafe.Sizeof(dummy))

ptr := unsafe.Pointer(C.malloc(C.size_t(count) * size))
slice := (*[1 << 30]C.duckdb_logical_type)(ptr)[:count:count]

return ptr, slice
}

func destroyTypeSlice(ptr unsafe.Pointer, slice []C.duckdb_logical_type) {
for _, t := range slice {
C.duckdb_destroy_logical_type(&t)
}
C.free(ptr)
}
2 changes: 1 addition & 1 deletion statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *stmt) bind(args []driver.NamedValue) error {
C.duckdb_free(unsafe.Pointer(name))

// fallback on index position
var arg = args[i]
arg := args[i]

// override with ordinal if set
for _, v := range args {
Expand Down
Loading

0 comments on commit 8eaf120

Please sign in to comment.