Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Data Chunks to the Appender #139

Merged
merged 7 commits into from
Dec 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 135 additions & 37 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@ import (
"unsafe"
)

// Appender holds the duckdb appender. It allows to load bulk data into a DuckDB database.
// Appender holds the DuckDB appender. It allows to load bulk data into a DuckDB database.
type Appender struct {
c *conn
schema string
table string
appender *C.duckdb_appender
closed bool

currentRow C.idx_t
chunks []C.duckdb_data_chunk
chunkVectors []C.duckdb_vector
currentChunkIdx int
currentChunkSize C.idx_t
chunkTypes *C.duckdb_logical_type
}

// NewAppenderFromConn returns a new Appender from a DuckDB driver connection.
Expand Down Expand Up @@ -47,21 +54,38 @@ func NewAppenderFromConn(driverConn driver.Conn, schema string, table string) (*
return nil, fmt.Errorf("can't create appender")
}

return &Appender{c: dbConn, schema: schema, table: table, appender: &a}, nil
var chunkSize = C.duckdb_vector_size()

return &Appender{c: dbConn, schema: schema, table: table, appender: &a, currentRow: 0, currentChunkIdx: 0, currentChunkSize: chunkSize}, nil
}

// Error returns the last duckdb appender error.
// Error returns the last DuckDB appender error.
func (a *Appender) Error() error {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return errors.New(dbErr)
}

// Flush the appender to the underlying table and clear the internal cache.
func (a *Appender) Flush() error {
// set the size of the current chunk to the current row
C.duckdb_data_chunk_set_size(a.chunks[a.currentChunkIdx], C.uint64_t(a.currentRow))

// append all chunks to the appender and destroy them
var state C.duckdb_state
for i, chunk := range a.chunks {
state = C.duckdb_append_data_chunk(*a.appender, chunk)
if state == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return fmt.Errorf("duckdb error appending chunk %d of %d: %s", i+1, a.currentChunkIdx+1, dbErr)
}
C.duckdb_destroy_data_chunk(&chunk)
}

if state := C.duckdb_appender_flush(*a.appender); state == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return errors.New(dbErr)
}

return nil
}

Expand All @@ -82,76 +106,150 @@ func (a *Appender) Close() error {

// AppendRow loads a row of values into the appender. The values are provided as separate arguments.
func (a *Appender) AppendRow(args ...driver.Value) error {
return a.AppendRowArray(args)
}

// AppendRowArray loads a row of values into the appender. The values are provided as an array.
func (a *Appender) AppendRowArray(args []driver.Value) error {
if a.closed {
panic("database/sql/driver: misuse of duckdb driver: use of closed Appender")
}

var err error
// Initialize the chunk on the first call
if len(a.chunks) == 0 {
a.initializeChunkTypes(args)
err = a.addChunk(len(args))
// If the current chunk is full, create a new one
} else if a.currentRow == C.duckdb_vector_size() {
a.currentChunkIdx++
err = a.addChunk(len(args))
}

if err != nil {
return err
}

return a.appendRowArray(args)
}

// Create an array of duckdb types from a list of go types
func (a *Appender) initializeChunkTypes(args []driver.Value) {
defaultLogicalType := C.duckdb_create_logical_type(0)
rowTypes := C.malloc(C.size_t(len(args)) * C.size_t(unsafe.Sizeof(defaultLogicalType)))

tmpChunkTypes := (*[1<<30 - 1]C.duckdb_logical_type)(rowTypes)

for i, val := range args {
switch v := val.(type) {
case uint8:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UTINYINT)
case int8:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_TINYINT)
case uint16:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_USMALLINT)
case int16:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_SMALLINT)
case uint32:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UINTEGER)
case int32:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_INTEGER)
case uint64, uint:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UBIGINT)
case int64, int:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BIGINT)
case float32:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_FLOAT)
case float64:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_DOUBLE)
case bool:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BOOLEAN)
case []byte:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BLOB)
case string:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_VARCHAR)
case time.Time:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_TIMESTAMP)
default:
panic(fmt.Sprintf("couldn't append unsupported parameter %T", v))
}
}

a.chunkTypes = (*C.duckdb_logical_type)(rowTypes)
}

func (a *Appender) addChunk(colCount int) error {
a.currentRow = 0

// duckdb_create_data_chunk takes an array of duckdb_logical_type and a column count
dataChunk := C.duckdb_create_data_chunk(a.chunkTypes, C.uint64_t(colCount))
C.duckdb_data_chunk_set_size(dataChunk, C.uint64_t(a.currentChunkSize))

// reset the chunkVectors array if they've been previously set
if a.chunkVectors != nil {
a.chunkVectors = nil
}

for i := 0; i < colCount; i++ {
vector := C.duckdb_data_chunk_get_vector(dataChunk, C.uint64_t(i))
if vector == nil {
return fmt.Errorf("error while appending column %d", i)
}
a.chunkVectors = append(a.chunkVectors, vector)
}

a.chunks = append(a.chunks, dataChunk)
return nil
}

// appendRowArray loads a row of values into the appender. The values are provided as an array.
func (a *Appender) appendRowArray(args []driver.Value) error {
for i, v := range args {
if v == nil {
if rv := C.duckdb_append_null(*a.appender); rv == C.DuckDBError {
return fmt.Errorf("couldn't append parameter %d", i)
if state := C.duckdb_append_null(*a.appender); state == C.DuckDBError {
return fmt.Errorf("couldn't append parameter %d", v)
}
continue
}

var rv C.duckdb_state
switch v := v.(type) {
case uint8:
rv = C.duckdb_append_uint8(*a.appender, C.uint8_t(v))
set[uint8](a.chunkVectors[i], a.currentRow, v)
case int8:
rv = C.duckdb_append_int8(*a.appender, C.int8_t(v))
set[int8](a.chunkVectors[i], a.currentRow, v)
case uint16:
rv = C.duckdb_append_uint16(*a.appender, C.uint16_t(v))
set[uint16](a.chunkVectors[i], a.currentRow, v)
case int16:
rv = C.duckdb_append_int16(*a.appender, C.int16_t(v))
set[int16](a.chunkVectors[i], a.currentRow, v)
case uint32:
rv = C.duckdb_append_uint32(*a.appender, C.uint32_t(v))
set[uint32](a.chunkVectors[i], a.currentRow, v)
case int32:
rv = C.duckdb_append_int32(*a.appender, C.int32_t(v))
set[int32](a.chunkVectors[i], a.currentRow, v)
case uint64:
rv = C.duckdb_append_uint64(*a.appender, C.uint64_t(v))
set[uint64](a.chunkVectors[i], a.currentRow, v)
case int64:
rv = C.duckdb_append_int64(*a.appender, C.int64_t(v))
set[int64](a.chunkVectors[i], a.currentRow, v)
case uint:
rv = C.duckdb_append_uint64(*a.appender, C.uint64_t(v))
set[uint](a.chunkVectors[i], a.currentRow, v)
case int:
rv = C.duckdb_append_int64(*a.appender, C.int64_t(v))
set[int](a.chunkVectors[i], a.currentRow, v)
case float32:
rv = C.duckdb_append_float(*a.appender, C.float(v))
set[float32](a.chunkVectors[i], a.currentRow, v)
case float64:
rv = C.duckdb_append_double(*a.appender, C.double(v))
set[float64](a.chunkVectors[i], a.currentRow, v)
case bool:
rv = C.duckdb_append_bool(*a.appender, C.bool(v))
set[bool](a.chunkVectors[i], a.currentRow, v)
case []byte:
rv = C.duckdb_append_blob(*a.appender, unsafe.Pointer(&v[0]), C.uint64_t(len(v)))
set[[]byte](a.chunkVectors[i], a.currentRow, v)
case string:
str := C.CString(v)
rv = C.duckdb_append_varchar(*a.appender, str)
C.duckdb_vector_assign_string_element(a.chunkVectors[i], C.uint64_t(a.currentRow), str)
C.free(unsafe.Pointer(str))
case time.Time:
var dt C.duckdb_timestamp
dt.micros = C.int64_t(v.UTC().UnixMicro())
rv = C.duckdb_append_timestamp(*a.appender, dt)

set[C.duckdb_timestamp](a.chunkVectors[i], a.currentRow, dt)
default:
return fmt.Errorf("couldn't append unsupported parameter %d (type %T)", i, v)
}
if rv == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return fmt.Errorf("couldn't append parameter %d (type %T): %s", i, v, dbErr)
}
}

if state := C.duckdb_appender_end_row(*a.appender); state == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return errors.New(dbErr)
}

a.currentRow++
return nil
}

Expand Down
6 changes: 6 additions & 0 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ func (r *rows) Close() error {
return err
}

func set[T any](vector C.duckdb_vector, rowIdx C.idx_t, value T) {
ptr := C.duckdb_vector_get_data(vector)
xs := (*[1 << 31]T)(ptr)
xs[rowIdx] = value
}

func get[T any](vector C.duckdb_vector, rowIdx C.idx_t) T {
ptr := C.duckdb_vector_get_data(vector)
xs := (*[1 << 31]T)(ptr)
Expand Down
Loading