From f7007adfe2ecfe8b692aab88e4e853a32b7cd53d Mon Sep 17 00:00:00 2001 From: tania <44262898+taniabogatsch@users.noreply.github.com> Date: Thu, 1 Feb 2024 21:33:51 +0100 Subject: [PATCH] Memory leak fixes in the connector (#154) * refactor Connector to expose Close method * destroy config when closing database DuckDB destroys config when closing database. e.g. here https://github.com/duckdb/duckdb/blob/b58f184e1e87df5fc2d574996a6b63409df96f47/src/common/adbc/adbc.cpp#L218-L228 * more refactoring and leak fixes * fix memory leaks, add section to readme, minor code tidying --------- Co-authored-by: Anton Levakin --- README.md | 122 ++++++++++++++++++++++++++++++++----------------- arrow.go | 2 +- arrow_test.go | 2 + duckdb.go | 117 ++++++++++++++++++++++++++++++----------------- duckdb_test.go | 14 +++++- result.go | 4 +- rows.go | 17 ++++--- statement.go | 11 +++-- 8 files changed, 186 insertions(+), 103 deletions(-) diff --git a/README.md b/README.md index ab733d06..bf329431 100644 --- a/README.md +++ b/README.md @@ -18,116 +18,152 @@ go get github.com/marcboeker/go-duckdb ## Usage -`go-duckdb` hooks into the `database/sql` interface provided by the Go stdlib. To open a connection, simply specify the driver type as `duckdb`: +`go-duckdb` hooks into the `database/sql` interface provided by the Go `stdlib`. To open a connection, simply specify the driver type as `duckdb`. ```go db, err := sql.Open("duckdb", "") +if err != nil { + ... +} +defer db.Close() ``` -This creates an in-memory instance of DuckDB. If you would like to store the data on the filesystem, you need to specify the path where to store the database: +This creates an in-memory instance of DuckDB. To open a persistent database, you need to specify a filepath to the database file. If +the file does not exist, then DuckDB creates it. + ```go db, err := sql.Open("duckdb", "/path/to/foo.db") +if err != nil { + ... +} +defer db.Close() ``` -If you want to set specific [config options for DuckDB](https://duckdb.org/docs/sql/configuration), you can add them as query style parameters in the form of `name=value` to the DSN, like: +If you want to set specific [config options for DuckDB](https://duckdb.org/docs/sql/configuration), you can add them as query style parameters in the form of `name=value` pairs to the DSN. ```go db, err := sql.Open("duckdb", "/path/to/foo.db?access_mode=read_only&threads=4") +if err != nil { + ... +} +defer db.Close() ``` -Alternatively, you can also use `sql.OpenDB` when you want to perform some initialization before the connection is created and returned from the connection pool on call to `db.Conn`. -Here's an example that installs and loads the JSON extension for each connection: +Alternatively, you can use [sql.OpenDB](https://cs.opensource.google/go/go/+/go1.21.6:src/database/sql/sql.go;l=781). That way, you can perform initialization steps in a callback function before opening the database. +Here's an example that installs and loads the JSON extension when opening a database with `sql.OpenDB(connector)`. ```go connector, err := duckdb.NewConnector("/path/to/foo.db?access_mode=read_only&threads=4", func(execer driver.Execer) error { - bootQueries := []string{ - "INSTALL 'json'", - "LOAD 'json'", - } - - for _, qry := range bootQueries { - _, err = execer.Exec(qry, nil) - if err != nil { - return err + bootQueries := []string{ + "INSTALL 'json'", + "LOAD 'json'", } - } - return nil + + for _, query := range bootQueries { + _, err = execer.Exec(query, nil) + if err != nil { + ... + } + } + return nil }) if err != nil { - return nil, err + ... } db := sql.OpenDB(connector) -db.SetMaxOpenConns(poolsize) -... +defer db.Close() ``` -Please refer to the [database/sql](https://godoc.org/database/sql) GoDoc for further usage instructions. +Please refer to the [database/sql](https://godoc.org/database/sql) documentation for further usage instructions. + +## A Note on Memory Allocation + +DuckDB lives in-process. Therefore, all its memory lives in the driver. All allocations live in the host process, which +is the Go application. Especially for long-running applications, it is crucial to call the corresponding `Close`-functions as specified +in [database/sql](https://godoc.org/database/sql). The following is a list of examples. +```go +db, err := sql.Open("duckdb", "") +defer db.Close() + +conn, err := db.Conn(context.Background()) +defer conn.Close() + +rows, err := conn.QueryContext(context.Background(), "SELECT 42") +// alternatively, rows.Next() has to return false +rows.Close() + +appender, err := NewAppenderFromConn(conn, "", "test") +defer appender.Close() + +// if not passed to sql.OpenDB +connector, err := NewConnector("", nil) +defer connector.Close() +``` ## DuckDB Appender API -If you want to use the [DuckDB Appender API](https://duckdb.org/docs/data/appender.html), you can obtain a new Appender by supplying a DuckDB connection to `NewAppenderFromConn()`. +If you want to use the [DuckDB Appender API](https://duckdb.org/docs/data/appender.html), you can obtain a new `Appender` by passing a DuckDB connection to `NewAppenderFromConn()`. ```go connector, err := duckdb.NewConnector("test.db", nil) if err != nil { - ... + ... } +defer connector.Close() + conn, err := connector.Connect(context.Background()) if err != nil { - ... + ... } defer conn.Close() -// Retrieve appender from connection (note that you have to create the table 'test' beforehand). -appender, err := NewAppenderFromConn(conn, "", "test") +// obtain an appender from the connection +// NOTE: the table 'test_tbl' must exist in test.db +appender, err := NewAppenderFromConn(conn, "", "test_tbl") if err != nil { - ... + ... } defer appender.Close() err = appender.AppendRow(...) if err != nil { - ... -} - -// Optional, if you want to access the appended rows immediately. -err = appender.Flush() -if err != nil { - ... + ... } ``` ## DuckDB Apache Arrow Interface -If you want to use the [DuckDB Arrow Interface](https://duckdb.org/docs/api/c/api#arrow-interface), you can obtain a new Arrow by supplying a DuckDB connection to `NewArrowFromConn()`. +If you want to use the [DuckDB Arrow Interface](https://duckdb.org/docs/api/c/api#arrow-interface), you can obtain a new `Arrow` by passing a DuckDB connection to `NewArrowFromConn()`. ```go connector, err := duckdb.NewConnector("", nil) if err != nil { - ... + ... } +defer connector.Close() + conn, err := connector.Connect(context.Background()) if err != nil { - ... + ... } defer conn.Close() -// Retrieve Arrow from connection. -ar, err := duckdb.NewArrowFromConn(conn) -if err != nil { - ... +// obtain the Arrow from the connection +arrow, err := duckdb.NewArrowFromConn(conn) +if err != nil w + ... } -rdr, err := ar.QueryContext(context.Background(), "SELECT * FROM generate_series(1, 10)") +rdr, err := arrow.QueryContext(context.Background(), "SELECT * FROM generate_series(1, 10)") if err != nil { - ... + ... } defer rdr.Release() for rdr.Next() { - // Process records. + // process records } ``` diff --git a/arrow.go b/arrow.go index b7c803fe..d2342f5b 100644 --- a/arrow.go +++ b/arrow.go @@ -210,7 +210,7 @@ func (a *Arrow) execute(s *stmt, args []driver.NamedValue) (*C.duckdb_arrow, err panic("database/sql/driver: misuse of duckdb driver: executeArrow after Close") } - if err := s.start(args); err != nil { + if err := s.bind(args); err != nil { return nil, err } diff --git a/arrow_test.go b/arrow_test.go index 7e6ee4cd..b659d925 100644 --- a/arrow_test.go +++ b/arrow_test.go @@ -22,6 +22,7 @@ func TestArrow(t *testing.T) { t.Run("select series", func(t *testing.T) { c, err := NewConnector("", nil) require.NoError(t, err) + defer c.Close() conn, err := c.Connect(context.Background()) require.NoError(t, err) @@ -46,6 +47,7 @@ func TestArrow(t *testing.T) { t.Run("select long series", func(t *testing.T) { c, err := NewConnector("", nil) require.NoError(t, err) + defer c.Close() conn, err := c.Connect(context.Background()) require.NoError(t, err) diff --git a/duckdb.go b/duckdb.go index ba228c9b..84769c39 100644 --- a/duckdb.go +++ b/duckdb.go @@ -26,29 +26,30 @@ func init() { type Driver struct{} -func (d Driver) Open(dataSourceName string) (driver.Conn, error) { - connector, err := d.OpenConnector(dataSourceName) +func (d Driver) Open(dsn string) (driver.Conn, error) { + connector, err := d.OpenConnector(dsn) if err != nil { return nil, err } return connector.Connect(context.Background()) } -func (Driver) OpenConnector(dataSourceName string) (driver.Connector, error) { - return createConnector(dataSourceName, func(execerContext driver.ExecerContext) error { return nil }) +func (Driver) OpenConnector(dsn string) (driver.Connector, error) { + return NewConnector(dsn, func(execerContext driver.ExecerContext) error { + return nil + }) } -// NewConnector creates a new Connector for the DuckDB database. -func NewConnector(dsn string, connInitFn func(execer driver.ExecerContext) error) (driver.Connector, error) { - return createConnector(dsn, connInitFn) -} +// NewConnector opens a new Connector for a DuckDB database. +// The user must close the returned Connector, if it is not passed to the sql.OpenDB function. +// Otherwise, sql.DB closes the Connector when calling sql.DB.Close(). +func NewConnector(dsn string, connInitFn func(execer driver.ExecerContext) error) (*Connector, error) { -func createConnector(dataSourceName string, connInitFn func(execer driver.ExecerContext) error) (driver.Connector, error) { var db C.duckdb_database - parsedDSN, err := url.Parse(dataSourceName) + parsedDSN, err := url.Parse(dsn) if err != nil { - return nil, fmt.Errorf("%w: %s", errParseConfig, err.Error()) + return nil, fmt.Errorf("%w: %s", errParseDSN, err.Error()) } config, err := prepareConfig(parsedDSN) @@ -57,33 +58,38 @@ func createConnector(dataSourceName string, connInitFn func(execer driver.Execer } defer C.duckdb_destroy_config(&config) - connectionString := C.CString(extractConnectionString(dataSourceName)) - defer C.free(unsafe.Pointer(connectionString)) + connStr := C.CString(extractConnectionString(dsn)) + defer C.free(unsafe.Pointer(connStr)) - var errMsg *C.char - defer C.duckdb_free(unsafe.Pointer(errMsg)) + var errOpenMsg *C.char + defer C.duckdb_free(unsafe.Pointer(errOpenMsg)) - if state := C.duckdb_open_ext(connectionString, &db, config, &errMsg); state == C.DuckDBError { - return nil, fmt.Errorf("%w: %s", errOpen, C.GoString(errMsg)) + if state := C.duckdb_open_ext(connStr, &db, config, &errOpenMsg); state == C.DuckDBError { + return nil, fmt.Errorf("%w: %s", errOpen, C.GoString(errOpenMsg)) } - return &connector{db: &db, connInitFn: connInitFn}, nil + return &Connector{ + db: db, + connInitFn: connInitFn, + }, nil } -type connector struct { - db *C.duckdb_database +type Connector struct { + db C.duckdb_database connInitFn func(execer driver.ExecerContext) error } -func (c *connector) Driver() driver.Driver { +func (c *Connector) Driver() driver.Driver { return Driver{} } -func (c *connector) Connect(context.Context) (driver.Conn, error) { +func (c *Connector) Connect(context.Context) (driver.Conn, error) { + var con C.duckdb_connection - if state := C.duckdb_connect(*c.db, &con); state == C.DuckDBError { + if state := C.duckdb_connect(c.db, &con); state == C.DuckDBError { return nil, errOpen } + conn := &conn{con: &con} // Call the connection init function if defined @@ -92,49 +98,74 @@ func (c *connector) Connect(context.Context) (driver.Conn, error) { return nil, err } } + return conn, nil } -func (c *connector) Close() error { - C.duckdb_close(c.db) +func (c *Connector) Close() error { + C.duckdb_close(&c.db) c.db = nil return nil } -func extractConnectionString(dataSourceName string) string { - var queryIndex = strings.Index(dataSourceName, "?") +func extractConnectionString(dsn string) string { + var queryIndex = strings.Index(dsn, "?") if queryIndex < 0 { - queryIndex = len(dataSourceName) + queryIndex = len(dsn) } - return dataSourceName[0:queryIndex] + + return dsn[0:queryIndex] } func prepareConfig(parsedDSN *url.URL) (C.duckdb_config, error) { + var config C.duckdb_config if state := C.duckdb_create_config(&config); state == C.DuckDBError { + C.duckdb_destroy_config(&config) return nil, errCreateConfig } - if state := C.duckdb_set_config(config, C.CString("duckdb_api"), C.CString("go")); state == C.DuckDBError { - return nil, fmt.Errorf("%w: failed to set duckdb_api", errPrepareConfig) + + if err := setConfig(config, "duckdb_api", "go"); err != nil { + return nil, err + } + + // early-out + if len(parsedDSN.RawQuery) == 0 { + return config, nil } - if len(parsedDSN.RawQuery) > 0 { - for k, v := range parsedDSN.Query() { - if len(v) > 0 { - state := C.duckdb_set_config(config, C.CString(k), C.CString(v[0])) - if state == C.DuckDBError { - return nil, fmt.Errorf("%w: affected config option %s=%s", errPrepareConfig, k, v[0]) - } - } + for k, v := range parsedDSN.Query() { + if len(v) == 0 { + continue + } + if err := setConfig(config, k, v[0]); err != nil { + return nil, err } } return config, nil } +func setConfig(config C.duckdb_config, name string, option string) error { + + cName := C.CString(name) + defer C.free(unsafe.Pointer(cName)) + + cOption := C.CString(option) + defer C.free(unsafe.Pointer(cOption)) + + state := C.duckdb_set_config(config, cName, cOption) + if state == C.DuckDBError { + C.duckdb_destroy_config(&config) + return fmt.Errorf("%w: affected config option %s=%s", errSetConfig, name, option) + } + + return nil +} + var ( - errOpen = errors.New("could not open database") - errParseConfig = errors.New("could not parse config for database") - errCreateConfig = errors.New("could not create config for database") - errPrepareConfig = errors.New("could not set config for database") + errOpen = errors.New("could not open database") + errParseDSN = errors.New("could not parse DSN for database") + errCreateConfig = errors.New("could not create config for database") + errSetConfig = errors.New("could not set config for database") ) diff --git a/duckdb_test.go b/duckdb_test.go index d9303973..c1c6cbd9 100644 --- a/duckdb_test.go +++ b/duckdb_test.go @@ -56,12 +56,23 @@ func TestOpen(t *testing.T) { t.Run("with invalid config", func(t *testing.T) { _, err := sql.Open("duckdb", "?threads=NaN") - if !errors.Is(err, errPrepareConfig) { + if !errors.Is(err, errSetConfig) { t.Fatal("invalid config should not be accepted") } }) } +func TestConnector_Close(t *testing.T) { + t.Parallel() + + connector, err := NewConnector("", nil) + require.NoError(t, err) + + // check that multiple close calls don't cause panics or errors + require.NoError(t, connector.Close()) + require.NoError(t, connector.Close()) +} + func TestConnPool(t *testing.T) { db := openDB(t) db.SetMaxOpenConns(2) // set connection pool size greater than 1 @@ -968,6 +979,7 @@ func TestTypeNamesAndScanTypes(t *testing.T) { err = rows.Scan(&val) require.NoError(t, err) require.Equal(t, test.value, val) + require.Equal(t, rows.Next(), false) }) } } diff --git a/result.go b/result.go index aef314cb..f8455f34 100644 --- a/result.go +++ b/result.go @@ -1,7 +1,7 @@ package duckdb type result struct { - ra int64 + rowsAffected int64 } func (r result) LastInsertId() (int64, error) { @@ -9,5 +9,5 @@ func (r result) LastInsertId() (int64, error) { } func (r result) RowsAffected() (int64, error) { - return r.ra, nil + return r.rowsAffected, nil } diff --git a/rows.go b/rows.go index 57ddc862..b73045fd 100644 --- a/rows.go +++ b/rows.go @@ -28,10 +28,6 @@ type rows struct { chunkRowIdx C.idx_t } -func newRows(res C.duckdb_result) *rows { - return newRowsWithStmt(res, nil) -} - func newRowsWithStmt(res C.duckdb_result, stmt *stmt) *rows { n := C.duckdb_column_count(&res) columns := make([]string, 0, n) @@ -331,16 +327,19 @@ func scanString(vector C.duckdb_vector, rowIdx C.idx_t) string { } // duckdb/tools/juliapkg/src/ctypes.jl -// `json`, `varchar`, and `blob` have the same repr +// `json`, `varchar`, and `blob` are C-style char arrays func scanBlob(vector C.duckdb_vector, rowIdx C.idx_t) []byte { + + // we don't have to free s.ptr, as it is part of the data in the vector s := get[duckdb_string_t](vector, rowIdx) + if s.length <= stringInlineLength { - // inline data is stored from byte 4..16 (up to 12 bytes) + // inlined data is stored from byte 4..16 (up to 12 bytes) return C.GoBytes(unsafe.Pointer(&s.prefix), C.int(s.length)) - } else { - // any longer strings are stored as a pointer in `ptr` - return C.GoBytes(unsafe.Pointer(s.ptr), C.int(s.length)) } + + // any longer strings are stored as a pointer in `ptr` + return C.GoBytes(unsafe.Pointer(s.ptr), C.int(s.length)) } func scanList(vector C.duckdb_vector, rowIdx C.idx_t) ([]any, error) { diff --git a/statement.go b/statement.go index d4b8b5d3..d5296974 100644 --- a/statement.go +++ b/statement.go @@ -44,11 +44,13 @@ func (s *stmt) NumInput() int { return int(paramCount) } -func (s *stmt) start(args []driver.NamedValue) error { +func (s *stmt) bind(args []driver.NamedValue) error { if s.NumInput() != len(args) { return fmt.Errorf("incorrect argument count for command: have %d want %d", len(args), s.NumInput()) } + // FIXME (feature): we can't pass nested types as parameters (bind_value) yet + for i, v := range args { switch v := v.Value.(type) { case bool: @@ -190,7 +192,7 @@ func (s *stmt) execute(ctx context.Context, args []driver.NamedValue) (*C.duckdb panic("database/sql/driver: misuse of duckdb driver: ExecContext or QueryContext with active Rows") } - if err := s.start(args); err != nil { + if err := s.bind(args); err != nil { return nil, err } @@ -225,12 +227,13 @@ func (s *stmt) execute(ctx context.Context, args []driver.NamedValue) (*C.duckdb <-bgDoneCh if state == C.DuckDBError { if ctx.Err() != nil { + C.duckdb_destroy_result(&res) return nil, ctx.Err() } - dbErr := C.GoString(C.duckdb_result_error(&res)) + err := C.GoString(C.duckdb_result_error(&res)) C.duckdb_destroy_result(&res) - return nil, errors.New(dbErr) + return nil, errors.New(err) } return &res, nil