Skip to content

Commit

Permalink
Implement Apache Arrow interface and Arrow type structs
Browse files Browse the repository at this point in the history
Added and defined structures for Arrow schema and Arrow array in a new file "arrow.h". This allows Arrow-compatible communication and data sharing, which is memory-efficient. Implemented Arrow interface in the connection.go file. Also updated the 'duckdb_test.go' file to include tests for the Arrow interface.
  • Loading branch information
levakin committed Dec 1, 2023
1 parent 63df37b commit 360dd1d
Show file tree
Hide file tree
Showing 6 changed files with 323 additions and 14 deletions.
43 changes: 43 additions & 0 deletions arrow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include <stdint.h>

#ifndef ARROW_C_DATA_INTERFACE
#define ARROW_C_DATA_INTERFACE

#define ARROW_FLAG_DICTIONARY_ORDERED 1
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4

struct ArrowSchema {
// Array type description
const char* format;
const char* name;
const char* metadata;
int64_t flags;
int64_t n_children;
struct ArrowSchema** children;
struct ArrowSchema* dictionary;

// Release callback
void (*release)(struct ArrowSchema*);
// Opaque producer-specific data
void* private_data;
};

struct ArrowArray {
// Array data description
int64_t length;
int64_t null_count;
int64_t offset;
int64_t n_buffers;
int64_t n_children;
const void** buffers;
struct ArrowArray** children;
struct ArrowArray* dictionary;

// Release callback
void (*release)(struct ArrowArray*);
// Opaque producer-specific data
void* private_data;
};

#endif // ARROW_C_DATA_INTERFACE
139 changes: 139 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package duckdb

/*
#include <duckdb.h>
#include <arrow.h>
*/
import "C"

Expand All @@ -10,8 +11,14 @@ import (
"database/sql"
"database/sql/driver"
"errors"
"fmt"
"io"
"math/big"
"unsafe"

"github.com/apache/arrow/go/v14/arrow"
"github.com/apache/arrow/go/v14/arrow/array"
"github.com/apache/arrow/go/v14/arrow/cdata"
)

type Conn struct {
Expand Down Expand Up @@ -104,6 +111,138 @@ func (c *Conn) QueryContext(ctx context.Context, query string, args []driver.Nam
return rows, err
}

// QueryArrowContext prepares statements, executes them, returns Apache Arrow array.RecordReader as a result of the last
// executed statement. Arguments are bound to the last statement.
// https://duckdb.org/docs/api/c/api#arrow-interface
// NOTE: Experimental interface.
func (c *Conn) QueryArrowContext(ctx context.Context, query string, args ...any) (array.RecordReader, error) {
if c.closed {
panic("database/sql/driver: misuse of duckdb driver: QueryArrowContext after Close")
}

stmts, size, err := c.extractStmts(query)
if err != nil {
return nil, err
}
defer C.duckdb_destroy_extracted(&stmts)

// execute all statements without args, except the last one
for i := C.idx_t(0); i < size-1; i++ {
stmt, err := c.prepareExtractedStmt(stmts, i)
if err != nil {
return nil, err
}
// send nil args to execute statement and ignore result (using ExecContext since we're ignoring the result anyway)
_, err = stmt.ExecContext(ctx, nil)
stmt.Close()
if err != nil {
return nil, err
}
}

// prepare and execute last statement with args and return result
stmt, err := c.prepareExtractedStmt(stmts, size-1)
if err != nil {
return nil, err
}
defer stmt.Close()

res, err := stmt.executeArrow(args...)
if err != nil {
return nil, err
}

defer C.duckdb_destroy_arrow(res)

sc, err := queryArrowSchema(res)
if err != nil {
return nil, err
}

var recs []arrow.Record
defer func() {
for _, r := range recs {
r.Release()
}
}()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

rec, err := queryArrowArray(res, sc)
if err != nil {
if errors.Is(err, io.EOF) {
break
}

return nil, err
}

recs = append(recs, rec)
}

return array.NewRecordReader(sc, recs)
}

// queryArrowSchema fetches the internal arrow schema from the arrow result.
func queryArrowSchema(res *C.duckdb_arrow) (*arrow.Schema, error) {
cdSchema := (*cdata.CArrowSchema)(unsafe.Pointer(C.malloc(C.sizeof_struct_ArrowSchema)))
defer func() {
cdata.ReleaseCArrowSchema(cdSchema)
C.free(unsafe.Pointer(cdSchema))
}()

if state := C.duckdb_query_arrow_schema(
*res,
(*C.duckdb_arrow_schema)(unsafe.Pointer(&cdSchema)),
); state == C.DuckDBError {
return nil, errors.New("duckdb_query_arrow_schema")
}

sc, err := cdata.ImportCArrowSchema(cdSchema)
if err != nil {
return nil, fmt.Errorf("%w: ImportCArrowSchema", err)
}

return sc, nil
}

// queryArrowArray fetches an internal arrow array from the arrow result.
//
// This function can be called multiple time to get next chunks,
// which will free the previous out_array.
// It will return io.EOF when the end of the result is reached.
func queryArrowArray(res *C.duckdb_arrow, sc *arrow.Schema) (arrow.Record, error) {
cdArr := (*cdata.CArrowArray)(unsafe.Pointer(C.malloc(C.sizeof_struct_ArrowArray)))
defer func() {
cdata.ReleaseCArrowArray(cdArr)
C.free(unsafe.Pointer(cdArr))
}()

if state := C.duckdb_query_arrow_array(
*res,
(*C.duckdb_arrow_array)(unsafe.Pointer(&cdArr)),
); state == C.DuckDBError {
return nil, errors.New("duckdb_query_arrow_array")
}

if (*C.struct_ArrowArray)(unsafe.Pointer(cdArr)).length == 0 {
return nil, io.EOF
}

rec, err := cdata.ImportCRecordBatchWithSchema(cdArr, sc)
if err != nil {
rec.Release()
return nil, err
}

return rec, nil
}

func (c *Conn) Prepare(cmd string) (driver.Stmt, error) {
if c.closed {
panic("database/sql/driver: misuse of duckdb driver: Prepare after Close")
Expand Down
56 changes: 56 additions & 0 deletions duckdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,62 @@ func TestQuery(t *testing.T) {
})
}

func TestQueryArrowContext(t *testing.T) {
t.Parallel()
db := openDB(t)
defer db.Close()
createTable(db, t)

t.Run("simple", func(t *testing.T) {
_, err := db.Exec("INSERT INTO foo VALUES ('lala', ?), ('lalo', ?)", 12345, 1234)
require.NoError(t, err)

conn, err := db.Conn(context.Background())
require.NoError(t, err)
defer conn.Close()

err = conn.Raw(func(driverConn any) error {
duckdbConn, ok := driverConn.(*Conn)
require.True(t, ok)

rdr, err := duckdbConn.QueryArrowContext(context.Background(), "SELECT bar, baz FROM foo WHERE baz > ?", 12344)
require.NoError(t, err, "should query arrow")
defer rdr.Release()

for rdr.Next() {
rec := rdr.Record()
require.Equal(t, int64(1), rec.NumRows())
bs, err := rec.MarshalJSON()
require.NoError(t, err)

t.Log(string(bs))
}

require.NoError(t, rdr.Err())

return nil
})
require.NoError(t, err)
})

t.Run("query error", func(t *testing.T) {
conn, err := db.Conn(context.Background())
require.NoError(t, err)
defer conn.Close()

err = conn.Raw(func(driverConn any) error {
duckdbConn, ok := driverConn.(*Conn)
require.True(t, ok)

_, err := duckdbConn.QueryArrowContext(context.Background(), "select bar")
require.Error(t, err)

return nil
})
require.NoError(t, err)
})
}

func TestStruct(t *testing.T) {
t.Parallel()
db := openDB(t)
Expand Down
15 changes: 13 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,26 @@ module github.com/marcboeker/go-duckdb
go 1.18

require (
github.com/google/uuid v1.3.0
github.com/apache/arrow/go/v14 v14.0.1
github.com/google/uuid v1.3.1
github.com/mitchellh/mapstructure v1.5.0
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.4
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/google/flatbuffers v23.5.26+incompatible // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/mod v0.13.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/tools v0.14.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
42 changes: 32 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,25 +1,47 @@
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/apache/arrow/go/v14 v14.0.1 h1:Fr9W9c7hIHN8ESeM8MBgQ0pQGSqZ7y2TgeeRDqxNJWw=
github.com/apache/arrow/go/v14 v14.0.1/go.mod h1:u3fgh3EdgN/YQ8cVQRguVW3R+seMybFg8QBQ5LU+eBY=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/google/flatbuffers v23.5.26+incompatible h1:M9dgRyhJemaM4Sw8+66GHBu8ioaQmyPLg1b8VwK5WJg=
github.com/google/flatbuffers v23.5.26+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
github.com/pierrec/lz4/v4 v4.1.18/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/mod v0.13.0 h1:I/DsJXRlw/8l/0c24sM9yb0T4z9liZTduXvdAWYiysY=
golang.org/x/mod v0.13.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/tools v0.14.0 h1:jvNa2pY0M4r62jkRQ6RwEZZyPcymeL9XZMLBbV7U2nc=
golang.org/x/tools v0.14.0/go.mod h1:uYBEerGOWcJyEORxN+Ek8+TT266gXkNlHdJBwexUsBg=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 h1:H2TDz8ibqkAF6YGhCdN3jS9O0/s90v0rJh3X/OLHEUk=
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2/go.mod h1:K8+ghG5WaK9qNqU5K3HdILfMLy1f3aNYFI/wnl100a8=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
Loading

0 comments on commit 360dd1d

Please sign in to comment.