Skip to content

Commit

Permalink
Add user defined table functions
Browse files Browse the repository at this point in the history
There are some limmitations due to the nature of the C api.
The current API is fully type-safe, and does not require any special
treatment from the user (no freeing values for example).
  • Loading branch information
JAicewizard committed May 24, 2024
1 parent 7733182 commit 0ff54bd
Show file tree
Hide file tree
Showing 5 changed files with 808 additions and 8 deletions.
21 changes: 13 additions & 8 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,20 @@ func invalidatedAppenderError(err error) error {
return fmt.Errorf("%w: %s", err, invalidatedAppenderMsg)
}

func invalidTableFunctionError() error {
return fmt.Errorf(invalidTampleFunctionMsg)
}

const (
driverErrMsg = "database/sql/driver"
duckdbErrMsg = "duckdb error"
castErrMsg = "cast error"
structFieldErrMsg = "invalid STRUCT field"
columnErrMsg = "column index"
columnCountErrMsg = "invalid column count"
unsupportedTypeErrMsg = "unsupported data type"
invalidatedAppenderMsg = "appended data has been invalidated due to corrupt row"
driverErrMsg = "database/sql/driver"
duckdbErrMsg = "duckdb error"
castErrMsg = "cast error"
structFieldErrMsg = "invalid STRUCT field"
columnErrMsg = "column index"
columnCountErrMsg = "invalid column count"
unsupportedTypeErrMsg = "unsupported data type"
invalidatedAppenderMsg = "appended data has been invalidated due to corrupt row"
invalidTampleFunctionMsg = "table function was rejected by duckdb for unknown reason"
)

var (
Expand Down
123 changes: 123 additions & 0 deletions examples/udf/udf.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"database/sql"
"fmt"
"log"
"reflect"
"time"

"github.com/marcboeker/go-duckdb"
)

var db *sql.DB

type user struct {
name string
age int
height float32
awesome bool
bday time.Time
}

type tableUDF struct {
n int64
count int64
}

func (d *tableUDF) Config() duckdb.TableFunctionConfig {
return duckdb.TableFunctionConfig{
Arguments: []duckdb.Type{
duckdb.NewDuckdbType[int64](),
},
Pushdownprojection: false,
}
}

func (d *tableUDF) BindArguments(namedArgs map[string]any, args ...interface{}) (duckdb.TableFunction, []duckdb.ColumnMetaData, error) {
d.count = 0
d.n = args[0].(int64)
return d, []duckdb.ColumnMetaData{
{Name: "result", T: duckdb.NewDuckdbType[int64]()},
}, nil
}

func (d *tableUDF) Init() duckdb.TableFunctionInitData {
return duckdb.TableFunctionInitData{
MaxThreads: 1,
}
}

func (d *tableUDF) FillRow(row duckdb.Row) (bool, error) {
if d.count > d.n {
return false, nil
}
d.count++
duckdb.SetRowValue(row, 0, d.count)
return true, nil
}

func (d *tableUDF) Cardinality() *duckdb.CardinalityData {
return &duckdb.CardinalityData{
Cardinality: uint(d.n),
IsExact: true,
}
}

func main() {
var err error
db, err = sql.Open("duckdb", "?access_mode=READ_WRITE")
if err != nil {
log.Fatal(err)
}
defer db.Close()
conn, _ := db.Conn(context.Background())
var fun tableUDF
duckdb.RegisterTableUDF(conn, "whoo", &fun)

check(db.Ping())

rows, err := db.QueryContext(context.Background(), "SELECT * FROM whoo(100)")
check(err)
defer rows.Close()

// Get column names
columns, err := rows.Columns()
if err != nil {
panic(err.Error())
}

values := make([]interface{}, len(columns))
scanArgs := make([]interface{}, len(values))
for i := range values {
scanArgs[i] = &values[i]
}

// Fetch rows
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
panic(err.Error())
}
for i, value := range values {
switch value.(type) {
case nil:
fmt.Print(columns[i], ": NULL")
case []byte:
fmt.Print(columns[i], ": ", string(value.([]byte)))
default:
fmt.Print(columns[i], ": ", value)
}
fmt.Printf("\nType: %s\n", reflect.TypeOf(value))
}
fmt.Println("-----------------------------------")
}
}

func check(args ...interface{}) {
err := args[len(args)-1]
if err != nil {
panic(err)
}
}
57 changes: 57 additions & 0 deletions row.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package duckdb

/*
#include <stdlib.h>
#include <duckdb.h>
*/
import "C"
import "unsafe"

type (
Row struct {
vectors []vector
r C.idx_t
info C.duckdb_function_info
projection []int
}
)

// Returns whether or now the column is projected
func (r Row) IsProjected(c int) bool {
return r.projection[c] != -1
}

func SetRowValue[T any](row Row, c int, val T) error {
if !row.IsProjected(c) {
// we want to allow setting to columns that are not projected,
// it should just be a nop.
return nil
}
vec := row.vectors[row.projection[c]]
return setVectorVal(&vec, row.r, val)
}

func (row Row) SetRowValue(c int, val any) {
vec := row.vectors[c]

// Ensure the types match before adding to the vector
v, err := vec.tryCast(val)
if err != nil {
cerr := columnError(err, c+1)
errstr := C.CString(cerr.Error())
defer C.free(unsafe.Pointer(errstr))
C.duckdb_function_set_error(row.info, errstr)
}

vec.fn(&vec, row.r, v)
}

func (row *Row) initColumn(i C.idx_t, duckdbVector C.duckdb_vector) error {
t := C.duckdb_vector_get_column_type(duckdbVector)
if err := row.vectors[i].init(t, int(i)); err != nil {
return err
}
row.vectors[i].duckdbVector = duckdbVector
row.vectors[i].getChildVectors(duckdbVector)
return nil
}
Loading

0 comments on commit 0ff54bd

Please sign in to comment.