Skip to content

Commit

Permalink
Merge pull request #6 from rilldata/appender_revert
Browse files Browse the repository at this point in the history
Appender revert
  • Loading branch information
k-anshul authored Jan 18, 2024
2 parents cd35c4f + fdc43d4 commit c7771ee
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 22 deletions.
16 changes: 16 additions & 0 deletions duckdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1100,6 +1100,22 @@ func TestParquetExtension(t *testing.T) {
require.NoError(t, err)
}

func TestQueryTimeout(t *testing.T) {
db := openDB(t)
defer db.Close()

ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*250)
defer cancel()

now := time.Now()
_, err := db.ExecContext(ctx, "CREATE TABLE test AS SELECT * FROM range(10000000) t1, range(1000000) t2;")
require.ErrorIs(t, err, context.DeadlineExceeded)

// a very defensive time check, but should be good enough
// the query takes much longer than 10 seconds
require.Less(t, time.Since(now), 10*time.Second)
}

func openDB(t *testing.T) *sql.DB {
db, err := sql.Open("duckdb", "")
require.NoError(t, err)
Expand Down
21 changes: 13 additions & 8 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,8 +355,13 @@ func scanList(vector C.duckdb_vector, rowIdx C.idx_t) ([]any, error) {

func scanStruct(ty C.duckdb_logical_type, vector C.duckdb_vector, rowIdx C.idx_t) (map[string]any, error) {
data := map[string]any{}

for j := C.idx_t(0); j < C.duckdb_struct_type_child_count(ty); j++ {
name := C.GoString(C.duckdb_struct_type_child_name(ty, j))

ptrToChildName := C.duckdb_struct_type_child_name(ty, j)
name := C.GoString(ptrToChildName)
C.duckdb_free(unsafe.Pointer(ptrToChildName))

child := C.duckdb_struct_vector_get_child(vector, j)
value, err := scan(child, rowIdx)
if err != nil {
Expand Down Expand Up @@ -529,19 +534,19 @@ func logicalTypeNameStruct(lt C.duckdb_logical_type) string {
count := int(C.duckdb_struct_type_child_count(lt))
name := "STRUCT("
for i := 0; i < count; i++ {
// Child name
cn := C.duckdb_struct_type_child_name(lt, C.idx_t(i))
defer C.duckdb_free(unsafe.Pointer(cn))

// Child logical type
clt := C.duckdb_struct_type_child_type(lt, C.idx_t(i))
defer C.duckdb_destroy_logical_type(&clt)
ptrToChildName := C.duckdb_struct_type_child_name(lt, C.idx_t(i))
childName := C.GoString(ptrToChildName)
childLogicalType := C.duckdb_struct_type_child_type(lt, C.idx_t(i))

// Add comma if not at end of list
name += escapeStructFieldName(C.GoString(cn)) + " " + logicalTypeName(clt)
name += escapeStructFieldName(childName) + " " + logicalTypeName(childLogicalType)
if i != count-1 {
name += ", "
}

C.duckdb_free(unsafe.Pointer(ptrToChildName))
C.duckdb_destroy_logical_type(&childLogicalType)
}
return name + ")"
}
Expand Down
38 changes: 24 additions & 14 deletions statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,30 +203,40 @@ func (s *stmt) execute(ctx context.Context, args []driver.NamedValue) (*C.duckdb
}
defer C.duckdb_destroy_pending(&pendingRes)

for {
done := make(chan struct{})
defer close(done)
queryComplete := false

go func() {
select {
// if context is cancelled or deadline exceeded, don't execute further
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}
state := C.duckdb_pending_execute_task(pendingRes)
if state == C.DUCKDB_PENDING_ERROR {
dbErr := C.GoString(C.duckdb_pending_error(pendingRes))
return nil, errors.New(dbErr)
}
if C.duckdb_pending_execution_is_finished(state) {
break
// sometimes this goroutine is not scheduled immediately and by that time if another query is scheduled on this connection
// this can cancel that query so need to handle cases when original query was complete.
if queryComplete {
return
}
// need to interrupt to cancel the query
C.duckdb_interrupt(*s.c.con)
return
case <-done:
return
}
}
}()

var res C.duckdb_result
if state := C.duckdb_execute_pending(pendingRes, &res); state == C.DuckDBError {
state := C.duckdb_execute_pending(pendingRes, &res)
queryComplete = true
if state == C.DuckDBError {
if ctx.Err() != nil {
return nil, ctx.Err()
}

dbErr := C.GoString(C.duckdb_result_error(&res))
C.duckdb_destroy_result(&res)
return nil, errors.New(dbErr)
}

return &res, nil
}

Expand Down

0 comments on commit c7771ee

Please sign in to comment.