diff --git a/appender.go b/appender.go index 3f328112..51b23af1 100644 --- a/appender.go +++ b/appender.go @@ -13,13 +13,20 @@ import ( "unsafe" ) -// Appender holds the duckdb appender. It allows to load bulk data into a DuckDB database. +// Appender holds the DuckDB appender. It allows to load bulk data into a DuckDB database. type Appender struct { c *conn schema string table string appender *C.duckdb_appender closed bool + + currentRow C.idx_t + chunks []C.duckdb_data_chunk + chunkVectors []C.duckdb_vector + currentChunkIdx int + currentChunkSize C.idx_t + chunkTypes *C.duckdb_logical_type } // NewAppenderFromConn returns a new Appender from a DuckDB driver connection. @@ -47,10 +54,12 @@ func NewAppenderFromConn(driverConn driver.Conn, schema string, table string) (* return nil, fmt.Errorf("can't create appender") } - return &Appender{c: dbConn, schema: schema, table: table, appender: &a}, nil + var chunkSize = C.duckdb_vector_size() + + return &Appender{c: dbConn, schema: schema, table: table, appender: &a, currentRow: 0, currentChunkIdx: 0, currentChunkSize: chunkSize}, nil } -// Error returns the last duckdb appender error. +// Error returns the last DuckDB appender error. func (a *Appender) Error() error { dbErr := C.GoString(C.duckdb_appender_error(*a.appender)) return errors.New(dbErr) @@ -58,10 +67,25 @@ func (a *Appender) Error() error { // Flush the appender to the underlying table and clear the internal cache. func (a *Appender) Flush() error { + // set the size of the current chunk to the current row + C.duckdb_data_chunk_set_size(a.chunks[a.currentChunkIdx], C.uint64_t(a.currentRow)) + + // append all chunks to the appender and destroy them + var state C.duckdb_state + for i, chunk := range a.chunks { + state = C.duckdb_append_data_chunk(*a.appender, chunk) + if state == C.DuckDBError { + dbErr := C.GoString(C.duckdb_appender_error(*a.appender)) + return fmt.Errorf("duckdb error appending chunk %d of %d: %s", i+1, a.currentChunkIdx+1, dbErr) + } + C.duckdb_destroy_data_chunk(&chunk) + } + if state := C.duckdb_appender_flush(*a.appender); state == C.DuckDBError { dbErr := C.GoString(C.duckdb_appender_error(*a.appender)) return errors.New(dbErr) } + return nil } @@ -82,76 +106,150 @@ func (a *Appender) Close() error { // AppendRow loads a row of values into the appender. The values are provided as separate arguments. func (a *Appender) AppendRow(args ...driver.Value) error { - return a.AppendRowArray(args) -} - -// AppendRowArray loads a row of values into the appender. The values are provided as an array. -func (a *Appender) AppendRowArray(args []driver.Value) error { if a.closed { panic("database/sql/driver: misuse of duckdb driver: use of closed Appender") } + var err error + // Initialize the chunk on the first call + if len(a.chunks) == 0 { + a.initializeChunkTypes(args) + err = a.addChunk(len(args)) + // If the current chunk is full, create a new one + } else if a.currentRow == C.duckdb_vector_size() { + a.currentChunkIdx++ + err = a.addChunk(len(args)) + } + + if err != nil { + return err + } + + return a.appendRowArray(args) +} + +// Create an array of duckdb types from a list of go types +func (a *Appender) initializeChunkTypes(args []driver.Value) { + defaultLogicalType := C.duckdb_create_logical_type(0) + rowTypes := C.malloc(C.size_t(len(args)) * C.size_t(unsafe.Sizeof(defaultLogicalType))) + + tmpChunkTypes := (*[1<<30 - 1]C.duckdb_logical_type)(rowTypes) + + for i, val := range args { + switch v := val.(type) { + case uint8: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UTINYINT) + case int8: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_TINYINT) + case uint16: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_USMALLINT) + case int16: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_SMALLINT) + case uint32: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UINTEGER) + case int32: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_INTEGER) + case uint64, uint: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UBIGINT) + case int64, int: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BIGINT) + case float32: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_FLOAT) + case float64: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_DOUBLE) + case bool: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BOOLEAN) + case []byte: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BLOB) + case string: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_VARCHAR) + case time.Time: + tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_TIMESTAMP) + default: + panic(fmt.Sprintf("couldn't append unsupported parameter %T", v)) + } + } + + a.chunkTypes = (*C.duckdb_logical_type)(rowTypes) +} + +func (a *Appender) addChunk(colCount int) error { + a.currentRow = 0 + + // duckdb_create_data_chunk takes an array of duckdb_logical_type and a column count + dataChunk := C.duckdb_create_data_chunk(a.chunkTypes, C.uint64_t(colCount)) + C.duckdb_data_chunk_set_size(dataChunk, C.uint64_t(a.currentChunkSize)) + + // reset the chunkVectors array if they've been previously set + if a.chunkVectors != nil { + a.chunkVectors = nil + } + + for i := 0; i < colCount; i++ { + vector := C.duckdb_data_chunk_get_vector(dataChunk, C.uint64_t(i)) + if vector == nil { + return fmt.Errorf("error while appending column %d", i) + } + a.chunkVectors = append(a.chunkVectors, vector) + } + + a.chunks = append(a.chunks, dataChunk) + return nil +} + +// appendRowArray loads a row of values into the appender. The values are provided as an array. +func (a *Appender) appendRowArray(args []driver.Value) error { for i, v := range args { if v == nil { - if rv := C.duckdb_append_null(*a.appender); rv == C.DuckDBError { - return fmt.Errorf("couldn't append parameter %d", i) + if state := C.duckdb_append_null(*a.appender); state == C.DuckDBError { + return fmt.Errorf("couldn't append parameter %d", v) } continue } - var rv C.duckdb_state switch v := v.(type) { case uint8: - rv = C.duckdb_append_uint8(*a.appender, C.uint8_t(v)) + set[uint8](a.chunkVectors[i], a.currentRow, v) case int8: - rv = C.duckdb_append_int8(*a.appender, C.int8_t(v)) + set[int8](a.chunkVectors[i], a.currentRow, v) case uint16: - rv = C.duckdb_append_uint16(*a.appender, C.uint16_t(v)) + set[uint16](a.chunkVectors[i], a.currentRow, v) case int16: - rv = C.duckdb_append_int16(*a.appender, C.int16_t(v)) + set[int16](a.chunkVectors[i], a.currentRow, v) case uint32: - rv = C.duckdb_append_uint32(*a.appender, C.uint32_t(v)) + set[uint32](a.chunkVectors[i], a.currentRow, v) case int32: - rv = C.duckdb_append_int32(*a.appender, C.int32_t(v)) + set[int32](a.chunkVectors[i], a.currentRow, v) case uint64: - rv = C.duckdb_append_uint64(*a.appender, C.uint64_t(v)) + set[uint64](a.chunkVectors[i], a.currentRow, v) case int64: - rv = C.duckdb_append_int64(*a.appender, C.int64_t(v)) + set[int64](a.chunkVectors[i], a.currentRow, v) case uint: - rv = C.duckdb_append_uint64(*a.appender, C.uint64_t(v)) + set[uint](a.chunkVectors[i], a.currentRow, v) case int: - rv = C.duckdb_append_int64(*a.appender, C.int64_t(v)) + set[int](a.chunkVectors[i], a.currentRow, v) case float32: - rv = C.duckdb_append_float(*a.appender, C.float(v)) + set[float32](a.chunkVectors[i], a.currentRow, v) case float64: - rv = C.duckdb_append_double(*a.appender, C.double(v)) + set[float64](a.chunkVectors[i], a.currentRow, v) case bool: - rv = C.duckdb_append_bool(*a.appender, C.bool(v)) + set[bool](a.chunkVectors[i], a.currentRow, v) case []byte: - rv = C.duckdb_append_blob(*a.appender, unsafe.Pointer(&v[0]), C.uint64_t(len(v))) + set[[]byte](a.chunkVectors[i], a.currentRow, v) case string: str := C.CString(v) - rv = C.duckdb_append_varchar(*a.appender, str) + C.duckdb_vector_assign_string_element(a.chunkVectors[i], C.uint64_t(a.currentRow), str) C.free(unsafe.Pointer(str)) case time.Time: var dt C.duckdb_timestamp dt.micros = C.int64_t(v.UTC().UnixMicro()) - rv = C.duckdb_append_timestamp(*a.appender, dt) - + set[C.duckdb_timestamp](a.chunkVectors[i], a.currentRow, dt) default: return fmt.Errorf("couldn't append unsupported parameter %d (type %T)", i, v) } - if rv == C.DuckDBError { - dbErr := C.GoString(C.duckdb_appender_error(*a.appender)) - return fmt.Errorf("couldn't append parameter %d (type %T): %s", i, v, dbErr) - } - } - - if state := C.duckdb_appender_end_row(*a.appender); state == C.DuckDBError { - dbErr := C.GoString(C.duckdb_appender_error(*a.appender)) - return errors.New(dbErr) } + a.currentRow++ return nil } diff --git a/rows.go b/rows.go index 55275a33..ca6658b2 100644 --- a/rows.go +++ b/rows.go @@ -277,6 +277,12 @@ func (r *rows) Close() error { return err } +func set[T any](vector C.duckdb_vector, rowIdx C.idx_t, value T) { + ptr := C.duckdb_vector_get_data(vector) + xs := (*[1 << 31]T)(ptr) + xs[rowIdx] = value +} + func get[T any](vector C.duckdb_vector, rowIdx C.idx_t) T { ptr := C.duckdb_vector_get_data(vector) xs := (*[1 << 31]T)(ptr)