From db42905a283be6f109f5db6bce89c4e5df758524 Mon Sep 17 00:00:00 2001 From: taniabogatsch <44262898+taniabogatsch@users.noreply.github.com> Date: Wed, 19 Jun 2024 16:07:38 +0200 Subject: [PATCH 1/5] fix appending multiple chunks --- appender.go | 25 +++++++++++-------------- appender_test.go | 26 +++++++++++--------------- data_chunk.go | 6 +++--- 3 files changed, 25 insertions(+), 32 deletions(-) diff --git a/appender.go b/appender.go index c035d5ea..ced1ebe4 100644 --- a/appender.go +++ b/appender.go @@ -153,10 +153,11 @@ func (a *Appender) appendRowSlice(args []driver.Value) error { } // Create a new data chunk if the current chunk is full. - if C.idx_t(a.rowCount) == C.duckdb_vector_size() || len(a.chunks) == 0 { + if a.rowCount == GetDataChunkCapacity() || len(a.chunks) == 0 { if err := a.addDataChunk(); err != nil { return err } + a.rowCount = 0 } // Set all values. @@ -176,34 +177,30 @@ func (a *Appender) appendDataChunks() error { var state C.duckdb_state var err error - for i, chunk := range a.chunks { - + for i := 0; i < len(a.chunks); i++ { // All data chunks except the last are at maximum capacity. - size := chunk.GetCapacity() + size := GetDataChunkCapacity() if i == len(a.chunks)-1 { size = a.rowCount } - if err = chunk.SetSize(size); err != nil { + if err = a.chunks[i].SetSize(size); err != nil { break } - state = C.duckdb_append_data_chunk(a.duckdbAppender, chunk.data) + state = C.duckdb_append_data_chunk(a.duckdbAppender, a.chunks[i].data) if state == C.DuckDBError { err = duckdbError(C.duckdb_appender_error(a.duckdbAppender)) break } + a.chunks[i].close() } - a.closeDataChunks() - a.rowCount = 0 - return err -} + //for i, chunk := range a.chunks { + //} -func (a *Appender) closeDataChunks() { - for _, chunk := range a.chunks { - chunk.close() - } a.chunks = a.chunks[:0] + a.rowCount = 0 + return err } func mallocTypeSlice(count int) (unsafe.Pointer, []C.duckdb_logical_type) { diff --git a/appender_test.go b/appender_test.go index c8bbe14a..f44339be 100644 --- a/appender_test.go +++ b/appender_test.go @@ -15,8 +15,6 @@ import ( "github.com/stretchr/testify/require" ) -const numAppenderTestRows = 10000 - type simpleStruct struct { A int32 B string @@ -169,6 +167,8 @@ func TestAppenderPrimitive(t *testing.T) { bool BOOLEAN )`) + // Test appending a few data chunks. + rowCount := GetDataChunkCapacity() * 5 type row struct { ID int64 UInt8 uint8 @@ -198,8 +198,8 @@ func TestAppenderPrimitive(t *testing.T) { ts, err := time.ParseInLocation(longForm, "2016-01-17 20:04:05 IST", IST) require.NoError(t, err) - rowsToAppend := make([]row, numAppenderTestRows) - for i := 0; i < numAppenderTestRows; i++ { + rowsToAppend := make([]row, rowCount) + for i := 0; i < rowCount; i++ { u64 := rand.Uint64() // Go SQL does not support uint64 values with their high bit set (see for example https://github.com/lib/pq/issues/72). @@ -248,10 +248,6 @@ func TestAppenderPrimitive(t *testing.T) { rowsToAppend[i].String, rowsToAppend[i].Bool, )) - - if i%1000 == 0 { - require.NoError(t, a.Flush()) - } } require.NoError(t, a.Flush()) @@ -282,16 +278,16 @@ func TestAppenderPrimitive(t *testing.T) { &r.String, &r.Bool, )) - rowsToAppend[i].Timestamp = rowsToAppend[i].Timestamp.UTC() - rowsToAppend[i].TimestampS = rowsToAppend[i].TimestampS.UTC() - rowsToAppend[i].TimestampMS = rowsToAppend[i].TimestampMS.UTC() - rowsToAppend[i].TimestampNS = rowsToAppend[i].TimestampNS.UTC() - rowsToAppend[i].TimestampTZ = rowsToAppend[i].TimestampTZ.UTC() - require.Equal(t, rowsToAppend[i], r) + rowsToAppend[r.ID].Timestamp = rowsToAppend[i].Timestamp.UTC() + rowsToAppend[r.ID].TimestampS = rowsToAppend[i].TimestampS.UTC() + rowsToAppend[r.ID].TimestampMS = rowsToAppend[i].TimestampMS.UTC() + rowsToAppend[r.ID].TimestampNS = rowsToAppend[i].TimestampNS.UTC() + rowsToAppend[r.ID].TimestampTZ = rowsToAppend[i].TimestampTZ.UTC() + require.Equal(t, rowsToAppend[r.ID], r) i++ } - require.Equal(t, numAppenderTestRows, i) + require.Equal(t, rowCount, i) require.NoError(t, res.Close()) cleanupAppender(t, c, con, a) } diff --git a/data_chunk.go b/data_chunk.go index 966ac329..e372c329 100644 --- a/data_chunk.go +++ b/data_chunk.go @@ -18,14 +18,14 @@ type DataChunk struct { columns []vector } -// GetCapacity returns the capacity of a data chunk. -func (chunk *DataChunk) GetCapacity() int { +// GetDataChunkCapacity returns the capacity of a data chunk. +func GetDataChunkCapacity() int { return int(C.duckdb_vector_size()) } // SetSize sets the internal size of the data chunk. Cannot exceed GetCapacity(). func (chunk *DataChunk) SetSize(size int) error { - if size > chunk.GetCapacity() { + if size > GetDataChunkCapacity() { return getError(errAPI, errVectorSize) } C.duckdb_data_chunk_set_size(chunk.data, C.idx_t(size)) From 02cc6a2ece0f7edce7704931df2e408c55cebeb0 Mon Sep 17 00:00:00 2001 From: taniabogatsch <44262898+taniabogatsch@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:13:36 +0200 Subject: [PATCH 2/5] clean up --- appender.go | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/appender.go b/appender.go index ced1ebe4..ccf787e6 100644 --- a/appender.go +++ b/appender.go @@ -177,27 +177,24 @@ func (a *Appender) appendDataChunks() error { var state C.duckdb_state var err error - for i := 0; i < len(a.chunks); i++ { + for i, chunk := range a.chunks { // All data chunks except the last are at maximum capacity. size := GetDataChunkCapacity() if i == len(a.chunks)-1 { size = a.rowCount } - if err = a.chunks[i].SetSize(size); err != nil { + if err = chunk.SetSize(size); err != nil { break } - state = C.duckdb_append_data_chunk(a.duckdbAppender, a.chunks[i].data) + state = C.duckdb_append_data_chunk(a.duckdbAppender, chunk.data) if state == C.DuckDBError { err = duckdbError(C.duckdb_appender_error(a.duckdbAppender)) break } - a.chunks[i].close() + chunk.close() } - //for i, chunk := range a.chunks { - //} - a.chunks = a.chunks[:0] a.rowCount = 0 return err From 3f31b718f655b16f7adfb87f4c340c0d30278124 Mon Sep 17 00:00:00 2001 From: taniabogatsch <44262898+taniabogatsch@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:14:50 +0200 Subject: [PATCH 3/5] ensure we clean up all non-broken chunks --- appender.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/appender.go b/appender.go index ccf787e6..87db8cf7 100644 --- a/appender.go +++ b/appender.go @@ -184,13 +184,13 @@ func (a *Appender) appendDataChunks() error { size = a.rowCount } if err = chunk.SetSize(size); err != nil { - break + continue } state = C.duckdb_append_data_chunk(a.duckdbAppender, chunk.data) if state == C.DuckDBError { err = duckdbError(C.duckdb_appender_error(a.duckdbAppender)) - break + continue } chunk.close() } From 6659f05ca7b263f8628ef16ec2bce6645d72706a Mon Sep 17 00:00:00 2001 From: taniabogatsch <44262898+taniabogatsch@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:18:05 +0200 Subject: [PATCH 4/5] now I am happy with chunk.close() --- appender.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/appender.go b/appender.go index 87db8cf7..f7e8d184 100644 --- a/appender.go +++ b/appender.go @@ -184,14 +184,17 @@ func (a *Appender) appendDataChunks() error { size = a.rowCount } if err = chunk.SetSize(size); err != nil { - continue + break } state = C.duckdb_append_data_chunk(a.duckdbAppender, chunk.data) if state == C.DuckDBError { err = duckdbError(C.duckdb_appender_error(a.duckdbAppender)) - continue + break } + } + + for _, chunk := range a.chunks { chunk.close() } From baa10ffc5668936defdc5259c01d2d61e6d08d7f Mon Sep 17 00:00:00 2001 From: taniabogatsch <44262898+taniabogatsch@users.noreply.github.com> Date: Wed, 19 Jun 2024 17:19:41 +0200 Subject: [PATCH 5/5] revert test changes --- appender_test.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/appender_test.go b/appender_test.go index f44339be..01ae8367 100644 --- a/appender_test.go +++ b/appender_test.go @@ -278,12 +278,12 @@ func TestAppenderPrimitive(t *testing.T) { &r.String, &r.Bool, )) - rowsToAppend[r.ID].Timestamp = rowsToAppend[i].Timestamp.UTC() - rowsToAppend[r.ID].TimestampS = rowsToAppend[i].TimestampS.UTC() - rowsToAppend[r.ID].TimestampMS = rowsToAppend[i].TimestampMS.UTC() - rowsToAppend[r.ID].TimestampNS = rowsToAppend[i].TimestampNS.UTC() - rowsToAppend[r.ID].TimestampTZ = rowsToAppend[i].TimestampTZ.UTC() - require.Equal(t, rowsToAppend[r.ID], r) + rowsToAppend[i].Timestamp = rowsToAppend[i].Timestamp.UTC() + rowsToAppend[i].TimestampS = rowsToAppend[i].TimestampS.UTC() + rowsToAppend[i].TimestampMS = rowsToAppend[i].TimestampMS.UTC() + rowsToAppend[i].TimestampNS = rowsToAppend[i].TimestampNS.UTC() + rowsToAppend[i].TimestampTZ = rowsToAppend[i].TimestampTZ.UTC() + require.Equal(t, rowsToAppend[i], r) i++ }