From 7454a0f6d8a84db1f488973f97b9d2ec5c551ede Mon Sep 17 00:00:00 2001 From: Jaap Aarts Date: Sun, 12 May 2024 18:19:25 +0200 Subject: [PATCH] Add support for cardinality and projection pushdown --- examples/udf/udf.go | 8 +++++- udf.go | 65 ++++++++++++++++++++++++++++++++++++++++----- udf_test.go | 9 +++++++ 3 files changed, 74 insertions(+), 8 deletions(-) diff --git a/examples/udf/udf.go b/examples/udf/udf.go index 91e8e459..09e126d8 100644 --- a/examples/udf/udf.go +++ b/examples/udf/udf.go @@ -47,7 +47,6 @@ func (d *tableUDF) Init() duckdb.TableFunctionInitData { } func (d *tableUDF) FillRow(row duckdb.Row) bool { - fmt.Println(d.count, d.n) if d.count > d.n { return false } @@ -56,6 +55,13 @@ func (d *tableUDF) FillRow(row duckdb.Row) bool { return true } +func (d * tableUDF) Cardinality() *duckdb.CardinalityData { + return &duckdb.CardinalityData{ + Cardinality: uint(d.n), + IsExact: true, + } +} + func main() { var err error db, err = sql.Open("duckdb", "?access_mode=READ_WRITE") diff --git a/udf.go b/udf.go index 18641490..83612235 100644 --- a/udf.go +++ b/udf.go @@ -30,6 +30,7 @@ type ( vectors []vector r C.idx_t info C.duckdb_function_info + projection []int } ColumnName struct { @@ -37,24 +38,48 @@ type ( V any } + CardinalityData struct { + Cardinality uint + IsExact bool + } + TableFunctionInitData struct { MaxThreads int } + //TODO: tableFunctionData + tableFunctionMetaInstance struct { + fun TableFunctionInstance + projection []int + } + + //TODO: TableFunction TableFunctionInstance interface { Init() TableFunctionInitData FillRow(Row) bool + Cardinality() *CardinalityData } + //TODO: TableFunctionProvider TableFunction interface { GetArguments() []any BindArguments(args ...interface{}) (TableFunctionInstance, []ColumnName) } ) +// Returns whether or now the column is projected +func (r Row) IsProjected(c int) bool { + return r.projection[c] != -1 +} + func SetRowValue[T any](row Row, c int, val T) error { - vec := row.vectors[c] - return setVectorVal[T](&vec, row.r, val) + if !row.IsProjected(c) { + // we want to allow setting to columns that are not projected, + // it should just be a nop. + return nil + } + vec := row.vectors[row.projection[c]] + return setVectorVal(&vec, row.r, val) } func (row Row) SetRowValue(c int, val any) { @@ -109,7 +134,23 @@ func udf_bind(info C.duckdb_bind_info) { C.duckdb_bind_add_result_column(info, colName, t) } - handle := cgo.NewHandle(instance) + + + cardinality := instance.Cardinality() + if cardinality != nil { + C.duckdb_bind_set_cardinality(info, C.idx_t(cardinality.Cardinality), C.bool(cardinality.IsExact)) + } + + instanceData := tableFunctionMetaInstance{ + fun: instance, + projection: make([]int, len(returnvalues)), + } + + for i := range returnvalues { + instanceData.projection[i] = -1 + } + + handle := cgo.NewHandle(instanceData) C.duckdb_bind_set_bind_data(info, unsafe.Pointer(&handle), C.duckdb_delete_callback_t(C.udf_destroy_data)) } @@ -117,8 +158,15 @@ func udf_bind(info C.duckdb_bind_info) { func udf_init(info C.duckdb_init_info) { instanceRef := C.duckdb_init_get_bind_data(info) h := *(*cgo.Handle)(instanceRef) - instance := h.Value().(TableFunctionInstance) - initData := instance.Init() + instance := h.Value().(tableFunctionMetaInstance) + initData := instance.fun.Init() + + columnCount := C.duckdb_init_get_column_count(info) + for i := C.idx_t(0); i < columnCount; i++ { + srcPos := int(C.duckdb_init_get_column_index(info, i)) + instance.projection[srcPos] = int(i) + } + C.duckdb_init_set_max_threads(info, C.idx_t(initData.MaxThreads)) } @@ -126,11 +174,13 @@ func udf_init(info C.duckdb_init_info) { func udf_callback(info C.duckdb_function_info, output C.duckdb_data_chunk) { instanceRef := C.duckdb_function_get_bind_data(info) h := *(*cgo.Handle)(instanceRef) - instance := h.Value().(TableFunctionInstance) + instance := h.Value().(tableFunctionMetaInstance) + fun := instance.fun columnCount := C.duckdb_data_chunk_get_column_count(output) var row Row row.vectors = make([]vector, columnCount) + row.projection = instance.projection var err error for i := C.idx_t(0); i < columnCount; i++ { duckdbVector := C.duckdb_data_chunk_get_vector(output, i) @@ -151,7 +201,7 @@ func udf_callback(info C.duckdb_function_info, output C.duckdb_data_chunk) { maxSize := C.duckdb_vector_size() // At the end of the loop row.r must be the index one past the last added row for row.r = 0; row.r < maxSize; row.r++ { - nextResults := instance.FillRow(row) + nextResults := fun.FillRow(row) if !nextResults { break } @@ -174,6 +224,7 @@ func RegisterTableUDF(c *sql.Conn, name string, function TableFunction) error { C.duckdb_table_function_set_init(tableFunction, C.init(C.udf_init)) C.duckdb_table_function_set_function(tableFunction, C.callback(C.udf_callback)) C.duckdb_table_function_set_extra_info(tableFunction, unsafe.Pointer(&handle), C.duckdb_delete_callback_t(C.udf_destroy_data)) + C.duckdb_table_function_supports_projection_pushdown(tableFunction, C.bool(true)) argumentvalues := function.GetArguments() diff --git a/udf_test.go b/udf_test.go index f0a9e013..b9e13eae 100644 --- a/udf_test.go +++ b/udf_test.go @@ -89,6 +89,10 @@ func (d *incTableUDF) GetTypes() []any { } } +func (d *incTableUDF) Cardinality() *CardinalityData { + return nil +} + func (d *structTableUDF) GetArguments() []interface{} { return []interface{}{ int64(0), @@ -134,6 +138,11 @@ func (d *structTableUDF) GetValue(r, c int) any { } } +func (d *structTableUDF) Cardinality() *CardinalityData { + return nil +} + + func TestTableUDF(t *testing.T) { for _, fun := range tudfs { _fun := fun