Skip to content

Commit

Permalink
Add Data Chunks to the Appender (marcboeker#139)
Browse files Browse the repository at this point in the history
* works for single column

* works for chunk

* add row at a time to chunk, only works upto one chunk

* Store chunks in appender struct, and append in flush.

* Chunks Work!

* Add destroy data_chunk

* Some minor refactorings

---------

Co-authored-by: Marc Boeker <[email protected]>
  • Loading branch information
2 people authored and levakin committed Jan 9, 2024
1 parent 23bc93c commit fd3b63f
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 37 deletions.
172 changes: 135 additions & 37 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,20 @@ import (
"unsafe"
)

// Appender holds the duckdb appender. It allows to load bulk data into a DuckDB database.
// Appender holds the DuckDB appender. It allows to load bulk data into a DuckDB database.
type Appender struct {
c *Conn
schema string
table string
appender *C.duckdb_appender
closed bool

currentRow C.idx_t
chunks []C.duckdb_data_chunk
chunkVectors []C.duckdb_vector
currentChunkIdx int
currentChunkSize C.idx_t
chunkTypes *C.duckdb_logical_type
}

// NewAppenderFromConn returns a new Appender from a DuckDB driver connection.
Expand Down Expand Up @@ -47,21 +54,38 @@ func NewAppenderFromConn(driverConn driver.Conn, schema string, table string) (*
return nil, fmt.Errorf("can't create appender")
}

return &Appender{c: dbConn, schema: schema, table: table, appender: &a}, nil
var chunkSize = C.duckdb_vector_size()

return &Appender{c: dbConn, schema: schema, table: table, appender: &a, currentRow: 0, currentChunkIdx: 0, currentChunkSize: chunkSize}, nil
}

// Error returns the last duckdb appender error.
// Error returns the last DuckDB appender error.
func (a *Appender) Error() error {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return errors.New(dbErr)
}

// Flush the appender to the underlying table and clear the internal cache.
func (a *Appender) Flush() error {
// set the size of the current chunk to the current row
C.duckdb_data_chunk_set_size(a.chunks[a.currentChunkIdx], C.uint64_t(a.currentRow))

// append all chunks to the appender and destroy them
var state C.duckdb_state
for i, chunk := range a.chunks {
state = C.duckdb_append_data_chunk(*a.appender, chunk)
if state == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return fmt.Errorf("duckdb error appending chunk %d of %d: %s", i+1, a.currentChunkIdx+1, dbErr)
}
C.duckdb_destroy_data_chunk(&chunk)
}

if state := C.duckdb_appender_flush(*a.appender); state == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return errors.New(dbErr)
}

return nil
}

Expand All @@ -82,75 +106,149 @@ func (a *Appender) Close() error {

// AppendRow loads a row of values into the appender. The values are provided as separate arguments.
func (a *Appender) AppendRow(args ...driver.Value) error {
return a.AppendRowArray(args)
}

// AppendRowArray loads a row of values into the appender. The values are provided as an array.
func (a *Appender) AppendRowArray(args []driver.Value) error {
if a.closed {
panic("database/sql/driver: misuse of duckdb driver: use of closed Appender")
}

var err error
// Initialize the chunk on the first call
if len(a.chunks) == 0 {
a.initializeChunkTypes(args)
err = a.addChunk(len(args))
// If the current chunk is full, create a new one
} else if a.currentRow == C.duckdb_vector_size() {
a.currentChunkIdx++
err = a.addChunk(len(args))
}

if err != nil {
return err
}

return a.appendRowArray(args)
}

// Create an array of duckdb types from a list of go types
func (a *Appender) initializeChunkTypes(args []driver.Value) {
defaultLogicalType := C.duckdb_create_logical_type(0)
rowTypes := C.malloc(C.size_t(len(args)) * C.size_t(unsafe.Sizeof(defaultLogicalType)))

tmpChunkTypes := (*[1<<30 - 1]C.duckdb_logical_type)(rowTypes)

for i, val := range args {
switch v := val.(type) {
case uint8:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UTINYINT)
case int8:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_TINYINT)
case uint16:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_USMALLINT)
case int16:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_SMALLINT)
case uint32:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UINTEGER)
case int32:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_INTEGER)
case uint64, uint:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_UBIGINT)
case int64, int:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BIGINT)
case float32:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_FLOAT)
case float64:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_DOUBLE)
case bool:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BOOLEAN)
case []byte:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_BLOB)
case string:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_VARCHAR)
case time.Time:
tmpChunkTypes[i] = C.duckdb_create_logical_type(C.DUCKDB_TYPE_TIMESTAMP)
default:
panic(fmt.Sprintf("couldn't append unsupported parameter %T", v))
}
}

a.chunkTypes = (*C.duckdb_logical_type)(rowTypes)
}

func (a *Appender) addChunk(colCount int) error {
a.currentRow = 0

// duckdb_create_data_chunk takes an array of duckdb_logical_type and a column count
dataChunk := C.duckdb_create_data_chunk(a.chunkTypes, C.uint64_t(colCount))
C.duckdb_data_chunk_set_size(dataChunk, C.uint64_t(a.currentChunkSize))

// reset the chunkVectors array if they've been previously set
if a.chunkVectors != nil {
a.chunkVectors = nil
}

for i := 0; i < colCount; i++ {
vector := C.duckdb_data_chunk_get_vector(dataChunk, C.uint64_t(i))
if vector == nil {
return fmt.Errorf("error while appending column %d", i)
}
a.chunkVectors = append(a.chunkVectors, vector)
}

a.chunks = append(a.chunks, dataChunk)
return nil
}

// appendRowArray loads a row of values into the appender. The values are provided as an array.
func (a *Appender) appendRowArray(args []driver.Value) error {
for i, v := range args {
if v == nil {
if rv := C.duckdb_append_null(*a.appender); rv == C.DuckDBError {
return fmt.Errorf("couldn't append parameter %d", i)
if state := C.duckdb_append_null(*a.appender); state == C.DuckDBError {
return fmt.Errorf("couldn't append parameter %d", v)
}
continue
}

var rv C.duckdb_state
switch v := v.(type) {
case uint8:
rv = C.duckdb_append_uint8(*a.appender, C.uint8_t(v))
set[uint8](a.chunkVectors[i], a.currentRow, v)
case int8:
rv = C.duckdb_append_int8(*a.appender, C.int8_t(v))
set[int8](a.chunkVectors[i], a.currentRow, v)
case uint16:
rv = C.duckdb_append_uint16(*a.appender, C.uint16_t(v))
set[uint16](a.chunkVectors[i], a.currentRow, v)
case int16:
rv = C.duckdb_append_int16(*a.appender, C.int16_t(v))
set[int16](a.chunkVectors[i], a.currentRow, v)
case uint32:
rv = C.duckdb_append_uint32(*a.appender, C.uint32_t(v))
set[uint32](a.chunkVectors[i], a.currentRow, v)
case int32:
rv = C.duckdb_append_int32(*a.appender, C.int32_t(v))
set[int32](a.chunkVectors[i], a.currentRow, v)
case uint64:
rv = C.duckdb_append_uint64(*a.appender, C.uint64_t(v))
set[uint64](a.chunkVectors[i], a.currentRow, v)
case int64:
rv = C.duckdb_append_int64(*a.appender, C.int64_t(v))
set[int64](a.chunkVectors[i], a.currentRow, v)
case uint:
rv = C.duckdb_append_uint64(*a.appender, C.uint64_t(v))
set[uint](a.chunkVectors[i], a.currentRow, v)
case int:
rv = C.duckdb_append_int64(*a.appender, C.int64_t(v))
set[int](a.chunkVectors[i], a.currentRow, v)
case float32:
rv = C.duckdb_append_float(*a.appender, C.float(v))
set[float32](a.chunkVectors[i], a.currentRow, v)
case float64:
rv = C.duckdb_append_double(*a.appender, C.double(v))
set[float64](a.chunkVectors[i], a.currentRow, v)
case bool:
rv = C.duckdb_append_bool(*a.appender, C.bool(v))
set[bool](a.chunkVectors[i], a.currentRow, v)
case []byte:
rv = C.duckdb_append_blob(*a.appender, unsafe.Pointer(&v[0]), C.uint64_t(len(v)))
set[[]byte](a.chunkVectors[i], a.currentRow, v)
case string:
str := C.CString(v)
rv = C.duckdb_append_varchar(*a.appender, str)
C.duckdb_vector_assign_string_element(a.chunkVectors[i], C.uint64_t(a.currentRow), str)
C.free(unsafe.Pointer(str))
case time.Time:
var dt C.duckdb_timestamp
dt.micros = C.int64_t(v.UTC().UnixMicro())
rv = C.duckdb_append_timestamp(*a.appender, dt)

set[C.duckdb_timestamp](a.chunkVectors[i], a.currentRow, dt)
default:
return fmt.Errorf("couldn't append unsupported parameter %d (type %T)", i, v)
}
if rv == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return fmt.Errorf("couldn't append parameter %d (type %T): %s", i, v, dbErr)
}
}

if state := C.duckdb_appender_end_row(*a.appender); state == C.DuckDBError {
dbErr := C.GoString(C.duckdb_appender_error(*a.appender))
return errors.New(dbErr)
}

a.currentRow++
return nil
}
6 changes: 6 additions & 0 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ func (r *rows) Close() error {
return err
}

func set[T any](vector C.duckdb_vector, rowIdx C.idx_t, value T) {
ptr := C.duckdb_vector_get_data(vector)
xs := (*[1 << 31]T)(ptr)
xs[rowIdx] = value
}

func get[T any](vector C.duckdb_vector, rowIdx C.idx_t) T {
ptr := C.duckdb_vector_get_data(vector)
xs := (*[1 << 31]T)(ptr)
Expand Down

0 comments on commit fd3b63f

Please sign in to comment.