From 53fe3d1ab5e97e62b87e80714c075da5e4ef25b8 Mon Sep 17 00:00:00 2001 From: Anshul Khandelwal <12948312+k-anshul@users.noreply.github.com> Date: Thu, 11 Jan 2024 18:25:19 -0600 Subject: [PATCH 1/4] interrupt query on ctx cancel/timeout (#143) * interrupt query on ctx cancel/timeout * adding unit test * adding unit test - reduce timeout --- duckdb_test.go | 16 ++++++++++++++++ statement.go | 2 ++ 2 files changed, 18 insertions(+) diff --git a/duckdb_test.go b/duckdb_test.go index bd278d16..d9303973 100644 --- a/duckdb_test.go +++ b/duckdb_test.go @@ -1100,6 +1100,22 @@ func TestParquetExtension(t *testing.T) { require.NoError(t, err) } +func TestQueryTimeout(t *testing.T) { + db := openDB(t) + defer db.Close() + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*250) + defer cancel() + + now := time.Now() + _, err := db.ExecContext(ctx, "CREATE TABLE test AS SELECT * FROM range(10000000) t1, range(1000000) t2;") + require.ErrorIs(t, err, context.DeadlineExceeded) + + // a very defensive time check, but should be good enough + // the query takes much longer than 10 seconds + require.Less(t, time.Since(now), 10*time.Second) +} + func openDB(t *testing.T) *sql.DB { db, err := sql.Open("duckdb", "") require.NoError(t, err) diff --git a/statement.go b/statement.go index ad777ec2..23e2dad4 100644 --- a/statement.go +++ b/statement.go @@ -207,6 +207,8 @@ func (s *stmt) execute(ctx context.Context, args []driver.NamedValue) (*C.duckdb select { // if context is cancelled or deadline exceeded, don't execute further case <-ctx.Done(): + // also need to interrupt to cancel the query + C.duckdb_interrupt(*s.c.con) return nil, ctx.Err() default: // continue From 6b4e54d0907b8ed93a6096256045f2652da599b6 Mon Sep 17 00:00:00 2001 From: Marc Boeker Date: Fri, 12 Jan 2024 11:51:16 +0100 Subject: [PATCH 2/4] Run in goroutine --- statement.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/statement.go b/statement.go index 23e2dad4..b0357275 100644 --- a/statement.go +++ b/statement.go @@ -203,32 +203,31 @@ func (s *stmt) execute(ctx context.Context, args []driver.NamedValue) (*C.duckdb } defer C.duckdb_destroy_pending(&pendingRes) - for { + done := make(chan bool) + defer close(done) + + go func() { select { - // if context is cancelled or deadline exceeded, don't execute further case <-ctx.Done(): // also need to interrupt to cancel the query C.duckdb_interrupt(*s.c.con) - return nil, ctx.Err() - default: - // continue - } - state := C.duckdb_pending_execute_task(pendingRes) - if state == C.DUCKDB_PENDING_ERROR { - dbErr := C.GoString(C.duckdb_pending_error(pendingRes)) - return nil, errors.New(dbErr) - } - if C.duckdb_pending_execution_is_finished(state) { - break + return + case <-done: + return } - } + }() var res C.duckdb_result if state := C.duckdb_execute_pending(pendingRes, &res); state == C.DuckDBError { + if ctx.Err() != nil { + return nil, ctx.Err() + } + dbErr := C.GoString(C.duckdb_result_error(&res)) C.duckdb_destroy_result(&res) return nil, errors.New(dbErr) } + return &res, nil } From 423d4bc1938d911d3808f1f90951312a34dbaf7a Mon Sep 17 00:00:00 2001 From: Marc Boeker Date: Fri, 12 Jan 2024 12:04:23 +0100 Subject: [PATCH 3/4] Revert to previous version and disable test --- duckdb_test.go | 1 + statement.go | 26 ++++++++++++++------------ 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/duckdb_test.go b/duckdb_test.go index d9303973..1f37c31e 100644 --- a/duckdb_test.go +++ b/duckdb_test.go @@ -1101,6 +1101,7 @@ func TestParquetExtension(t *testing.T) { } func TestQueryTimeout(t *testing.T) { + t.Skip("TODO: fix blocking in duckdb_pending_execute_task first.") db := openDB(t) defer db.Close() diff --git a/statement.go b/statement.go index b0357275..8dcf439e 100644 --- a/statement.go +++ b/statement.go @@ -203,30 +203,32 @@ func (s *stmt) execute(ctx context.Context, args []driver.NamedValue) (*C.duckdb } defer C.duckdb_destroy_pending(&pendingRes) - done := make(chan bool) - defer close(done) - - go func() { + for { select { case <-ctx.Done(): // also need to interrupt to cancel the query C.duckdb_interrupt(*s.c.con) - return - case <-done: - return + return nil, ctx.Err() + default: + // continue + } + state := C.duckdb_pending_execute_task(pendingRes) + if state == C.DUCKDB_PENDING_ERROR { + dbErr := C.GoString(C.duckdb_pending_error(pendingRes)) + return nil, errors.New(dbErr) + } + if C.duckdb_pending_execution_is_finished(state) { + break } - }() + } var res C.duckdb_result if state := C.duckdb_execute_pending(pendingRes, &res); state == C.DuckDBError { - if ctx.Err() != nil { - return nil, ctx.Err() - } - dbErr := C.GoString(C.duckdb_result_error(&res)) C.duckdb_destroy_result(&res) return nil, errors.New(dbErr) } + return &res, nil return &res, nil } From 6b90a947bf49c93d8462c894227ae7a6d4a981f1 Mon Sep 17 00:00:00 2001 From: Marc Boeker Date: Fri, 12 Jan 2024 12:45:28 +0100 Subject: [PATCH 4/4] Switch to duckdb_execute_pending --- duckdb_test.go | 1 - statement.go | 26 ++++++++++++-------------- 2 files changed, 12 insertions(+), 15 deletions(-) diff --git a/duckdb_test.go b/duckdb_test.go index 1f37c31e..d9303973 100644 --- a/duckdb_test.go +++ b/duckdb_test.go @@ -1101,7 +1101,6 @@ func TestParquetExtension(t *testing.T) { } func TestQueryTimeout(t *testing.T) { - t.Skip("TODO: fix blocking in duckdb_pending_execute_task first.") db := openDB(t) defer db.Close() diff --git a/statement.go b/statement.go index 8dcf439e..b0357275 100644 --- a/statement.go +++ b/statement.go @@ -203,32 +203,30 @@ func (s *stmt) execute(ctx context.Context, args []driver.NamedValue) (*C.duckdb } defer C.duckdb_destroy_pending(&pendingRes) - for { + done := make(chan bool) + defer close(done) + + go func() { select { case <-ctx.Done(): // also need to interrupt to cancel the query C.duckdb_interrupt(*s.c.con) - return nil, ctx.Err() - default: - // continue - } - state := C.duckdb_pending_execute_task(pendingRes) - if state == C.DUCKDB_PENDING_ERROR { - dbErr := C.GoString(C.duckdb_pending_error(pendingRes)) - return nil, errors.New(dbErr) - } - if C.duckdb_pending_execution_is_finished(state) { - break + return + case <-done: + return } - } + }() var res C.duckdb_result if state := C.duckdb_execute_pending(pendingRes, &res); state == C.DuckDBError { + if ctx.Err() != nil { + return nil, ctx.Err() + } + dbErr := C.GoString(C.duckdb_result_error(&res)) C.duckdb_destroy_result(&res) return nil, errors.New(dbErr) } - return &res, nil return &res, nil }