Skip to content

Commit

Permalink
arrow: separate DuckDB Arrow interface
Browse files Browse the repository at this point in the history
- conn structure was made unexported
- NewConnector was renamed to OpenConnector to be consistent with sql package and now returns ConnectorCloser interface to expose Close method
- appender: NewAppenderFromConn receives connection of type any to be compatible with sql.Conn.Raw method. Tests and docs were adopted as well.
  • Loading branch information
levakin committed Jan 10, 2024
1 parent fd2c76d commit fffcf07
Show file tree
Hide file tree
Showing 12 changed files with 487 additions and 320 deletions.
142 changes: 120 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,131 @@ Please refer to the [database/sql](https://godoc.org/database/sql) GoDoc for fur
If you want to use the [DuckDB Appender API](https://duckdb.org/docs/data/appender.html), you can obtain a new Appender by supplying a DuckDB connection to `NewAppenderFromConn()`.

```go
connector, err := NewConnector("test.db", nil)
if err != {
...
package main

import (
"context"
"database/sql"

"github.com/marcboeker/go-duckdb"
)

func main() {
connector, err := duckdb.OpenConnector("test.db", nil)
if err != nil {
panic(err)
}
driverConn, err := connector.Connect(context.Background())
if err != nil {
panic(err)
}
defer driverConn.Close()

// Retrieve appender from connection (note that you have to create the table 'test' beforehand).
appender, err := duckdb.NewAppenderFromConn(driverConn, "", "test")
if err != nil {
panic(err)
}
defer appender.Close()

err = appender.AppendRow(1, "a")
if err != nil {
panic(err)
}

// Optional, if you want to access the appended rows immediately.
err = appender.Flush()
if err != nil {
panic(err)
}

// Alternatively, you can use Raw method of sql.Conn to obtain the driver connection.
db, err := sql.Open("duckdb", "")
if err != nil {
panic(err)
}
defer db.Close()

conn, err := db.Conn(context.Background())
if err != nil {
panic(err)
}
defer conn.Close()

err = conn.Raw(func(driverConn any) error {
// Notice usage of driverConn
appender, err := duckdb.NewAppenderFromConn(driverConn, "", "test")
if err != nil {
panic(err)
}
defer appender.Close()

err = appender.AppendRow(...)
if err != nil {
panic(err)
}

return nil
})
if err != nil {
panic(err)
}
}
conn, err := connector.Connect(context.Background())
if err != {
...
}
defer conn.Close()

// Retrieve appender from connection (note that you have to create the table 'test' beforehand).
appender, err := NewAppenderFromConn(conn, "", "test")
if err != {
...
}
defer appender.Close()
```

err = appender.AppendRow(...)
if err != {
...
}
## DuckDB Apache Arrow Interface

// Optional, if you want to access the appended rows immediately.
err = appender.Flush()
if err != {
...
If you want to use the [DuckDB Arrow Interface](https://duckdb.org/docs/api/c/api#arrow-interface), you can obtain a new Arrow by supplying a DuckDB connection to `NewArrowFromConn()`.

```go
package main

import (
"context"
"database/sql"

"github.com/marcboeker/go-duckdb"
)

func main() {
db, err := sql.Open("duckdb", "")
if err != nil {
panic(err)
}
defer db.Close()

conn, err := db.Conn(context.Background())
if err != nil {
panic(err)
}
defer conn.Close()

// Use Raw method of sql.Conn to obtain the driver connection.
err = conn.Raw(func(driverConn any) error {
// Create Arrow interface from the connection.
ar, err := duckdb.NewArrowFromConn(driverConn)
if err != nil {
panic(err)
}

rdr, err := ar.Query(context.Background(), "SELECT * FROM generate_series(1, 10)")
if err != nil {
panic(err)
}
defer rdr.Release()

for rdr.Next() {
// Process records.
}

return nil
})
if err != nil {
panic(err)
}
}

```

## Linking DuckDB
Expand Down
6 changes: 3 additions & 3 deletions appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

// Appender holds the DuckDB appender. It allows to load bulk data into a DuckDB database.
type Appender struct {
c *Conn
c *conn
schema string
table string
appender *C.duckdb_appender
Expand All @@ -30,8 +30,8 @@ type Appender struct {
}

// NewAppenderFromConn returns a new Appender from a DuckDB driver connection.
func NewAppenderFromConn(driverConn driver.Conn, schema string, table string) (*Appender, error) {
dbConn, ok := driverConn.(*Conn)
func NewAppenderFromConn(driverConn any, schema, table string) (*Appender, error) {
dbConn, ok := driverConn.(*conn)
if !ok {
return nil, fmt.Errorf("not a duckdb driver connection")
}
Expand Down
74 changes: 47 additions & 27 deletions appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,27 @@ func randString(n int) string {
}

func TestAppender(t *testing.T) {
c, err := NewConnector("", nil)
connector, err := OpenConnector("", nil)
require.NoError(t, err)
defer connector.Close()

db := sql.OpenDB(c)
db := sql.OpenDB(connector)
createAppenderTable(db, t)
defer db.Close()

// Test that appender can be opened from the connector directly
driverConn, err := connector.Connect(context.Background())
require.NoError(t, err)

appender, err := NewAppenderFromConn(driverConn, "", "test")
require.NoError(t, err)

err = appender.Close()
require.NoError(t, err)

err = driverConn.Close()
require.NoError(t, err)

type dataRow struct {
ID int
UInt8 uint8
Expand Down Expand Up @@ -103,39 +117,45 @@ func TestAppender(t *testing.T) {
Bool: randBool(),
}
}
rows := []dataRow{}
var rows []dataRow
for i := 0; i < numAppenderTestRows; i++ {
rows = append(rows, randRow(i))
}

conn, err := c.Connect(context.Background())
conn, err := db.Conn(context.Background())
require.NoError(t, err)
defer conn.Close()

appender, err := NewAppenderFromConn(conn, "", "test")
require.NoError(t, err)
defer appender.Close()

for _, row := range rows {
err := appender.AppendRow(
row.ID,
row.UInt8,
row.Int8,
row.UInt16,
row.Int16,
row.UInt32,
row.Int32,
row.UInt64,
row.Int64,
row.Timestamp,
row.Float,
row.Double,
row.String,
row.Bool,
)
err = conn.Raw(func(driverConn any) error {
appender, err := NewAppenderFromConn(driverConn, "", "test")
require.NoError(t, err)
}
err = appender.Flush()
defer appender.Close()

for _, row := range rows {
err := appender.AppendRow(
row.ID,
row.UInt8,
row.Int8,
row.UInt16,
row.Int16,
row.UInt32,
row.Int32,
row.UInt64,
row.Int64,
row.Timestamp,
row.Float,
row.Double,
row.String,
row.Bool,
)
require.NoError(t, err)
}

err = appender.Flush()
require.NoError(t, err)

return nil
})
require.NoError(t, err)

res, err := db.QueryContext(
Expand Down
Loading

0 comments on commit fffcf07

Please sign in to comment.