From 493f15123d872ba0ca8f75e9922c72d21d0b84dc Mon Sep 17 00:00:00 2001 From: taniabogatsch <44262898+taniabogatsch@users.noreply.github.com> Date: Thu, 19 Sep 2024 14:07:31 +0200 Subject: [PATCH] tidy tests, add pinner, other nits --- errors.go | 10 +- pinner.go | 16 +++ scalar_udf.go | 15 +- scalar_udf_test.go | 342 +++++++++++++++++++-------------------------- 4 files changed, 174 insertions(+), 209 deletions(-) create mode 100644 pinner.go diff --git a/errors.go b/errors.go index 5aefe309..ca0f8dfd 100644 --- a/errors.go +++ b/errors.go @@ -98,12 +98,10 @@ var ( errInvalidDecimalWidth = fmt.Errorf("the DECIMAL with must be between 1 and %d", MAX_DECIMAL_WIDTH) errInvalidDecimalScale = errors.New("the DECIMAL scale must be less than or equal to the width") - errScalarUDFCreate = errors.New("could not create scalar UDF") - errScalarUDFNoName = fmt.Errorf("%w: missing name", errScalarUDFCreate) - errScalarUDFIsNil = fmt.Errorf("%w: function is nil", errScalarUDFCreate) - errScalarUDFNoExecutor = fmt.Errorf("%w: executor is nil", errScalarUDFCreate) - //errScalarUDFNilInputTypes = fmt.Errorf("%w: input types are nil", errScalarUDFCreate) - //errScalarUDFEmptyInputTypes = fmt.Errorf("%w: empty input types", errScalarUDFCreate) + errScalarUDFCreate = errors.New("could not create scalar UDF") + errScalarUDFNoName = fmt.Errorf("%w: missing name", errScalarUDFCreate) + errScalarUDFIsNil = fmt.Errorf("%w: function is nil", errScalarUDFCreate) + errScalarUDFNoExecutor = fmt.Errorf("%w: executor is nil", errScalarUDFCreate) errScalarUDFInputTypeIsNil = fmt.Errorf("%w: input type is nil", errScalarUDFCreate) errScalarUDFResultTypeIsNil = fmt.Errorf("%w: result type is nil", errScalarUDFCreate) errScalarUDFResultTypeIsANY = fmt.Errorf("%w: result type is ANY, which is not supported", errScalarUDFCreate) diff --git a/pinner.go b/pinner.go new file mode 100644 index 00000000..63e14b92 --- /dev/null +++ b/pinner.go @@ -0,0 +1,16 @@ +package duckdb + +import "runtime" + +type pinnedValue[T any] struct { + pinner *runtime.Pinner + value T +} + +type unpinner interface { + unpin() +} + +func (v pinnedValue[T]) unpin() { + v.pinner.Unpin() +} diff --git a/scalar_udf.go b/scalar_udf.go index 0dff3215..2bb26ea0 100644 --- a/scalar_udf.go +++ b/scalar_udf.go @@ -15,6 +15,7 @@ import "C" import ( "database/sql" "database/sql/driver" + "runtime" "runtime/cgo" "unsafe" ) @@ -134,9 +135,9 @@ func setFuncError(function_info C.duckdb_function_info, msg string) { func scalar_udf_callback(function_info C.duckdb_function_info, input C.duckdb_data_chunk, output C.duckdb_vector) { extraInfo := C.duckdb_scalar_function_get_extra_info(function_info) - // extraInfo is a void* pointer to our ScalarFunc. + // extraInfo is a void* pointer to our pinned ScalarFunc f. h := *(*cgo.Handle)(unsafe.Pointer(extraInfo)) - function := h.Value().(ScalarFunc) + function := h.Value().(pinnedValue[ScalarFunc]).value // Initialize the input chunk. var inputChunk DataChunk @@ -203,6 +204,7 @@ func scalar_udf_callback(function_info C.duckdb_function_info, input C.duckdb_da //export scalar_udf_delete_callback func scalar_udf_delete_callback(extraInfo unsafe.Pointer) { h := (*cgo.Handle)(extraInfo) + h.Value().(unpinner).unpin() h.Delete() } @@ -282,8 +284,15 @@ func createScalarFunc(name string, f ScalarFunc) (C.duckdb_scalar_function, erro // Set the function callback. C.duckdb_scalar_function_set_function(function, C.scalar_udf_callback_t(C.scalar_udf_callback)) + // Pin the ScalarFunc f. + value := pinnedValue[ScalarFunc]{ + pinner: &runtime.Pinner{}, + value: f, + } + h := cgo.NewHandle(value) + value.pinner.Pin(&h) + // Set data available during execution. - h := cgo.NewHandle(f) C.duckdb_scalar_function_set_extra_info( function, unsafe.Pointer(&h), diff --git a/scalar_udf_test.go b/scalar_udf_test.go index bb061397..ce38bf96 100644 --- a/scalar_udf_test.go +++ b/scalar_udf_test.go @@ -13,14 +13,19 @@ import ( var currentInfo TypeInfo -type simpleSUDF struct{} - -func (*simpleSUDF) Config() ScalarFuncConfig { - return ScalarFuncConfig{ - InputTypeInfos: []TypeInfo{currentInfo, currentInfo}, - ResultTypeInfo: currentInfo, - } -} +type ( + simpleSUDF struct{} + constantSUDF struct{} + otherConstantSUDF struct{} + typesSUDF struct{} + variadicSUDF struct{} + anyTypeSUDF struct{} + errExecutorSUDF struct{} + errInputNilSUDF struct{} + errResultNilSUDF struct{} + errResultAnySUDF struct{} + errExecSUDF struct{} +) func simpleSum(args []driver.Value) (any, error) { if args[0] == nil || args[1] == nil { @@ -30,10 +35,137 @@ func simpleSum(args []driver.Value) (any, error) { return val, nil } +func constantOne([]driver.Value) (any, error) { + return int32(1), nil +} + +func identity(args []driver.Value) (any, error) { + return args[0], nil +} + +func variadicSum(args []driver.Value) (any, error) { + sum := int32(0) + for _, val := range args { + if val == nil { + return nil, nil + } + sum += val.(int32) + } + return sum, nil +} + +func nilCount(args []driver.Value) (any, error) { + count := int32(0) + for _, val := range args { + if val == nil { + count++ + } + } + return count, nil +} + +func constantError([]driver.Value) (any, error) { + return nil, errors.New("test invalid execution") +} + +func (*simpleSUDF) Config() ScalarFuncConfig { + return ScalarFuncConfig{[]TypeInfo{currentInfo, currentInfo}, currentInfo, nil, false, false} +} + func (*simpleSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: simpleSum, + return ScalarFuncExecutor{simpleSum} +} + +func (*constantSUDF) Config() ScalarFuncConfig { + return ScalarFuncConfig{ResultTypeInfo: currentInfo} +} + +func (*constantSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{constantOne} +} + +func (*otherConstantSUDF) Config() ScalarFuncConfig { + return ScalarFuncConfig{[]TypeInfo{}, currentInfo, nil, false, false} +} + +func (*otherConstantSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{constantOne} +} + +func (*typesSUDF) Config() ScalarFuncConfig { + return ScalarFuncConfig{[]TypeInfo{currentInfo}, currentInfo, nil, false, false} +} + +func (*typesSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{identity} +} + +func (*variadicSUDF) Config() ScalarFuncConfig { + return ScalarFuncConfig{nil, currentInfo, currentInfo, true, true} +} + +func (*variadicSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{variadicSum} +} + +func (*anyTypeSUDF) Config() ScalarFuncConfig { + info, err := NewTypeInfo(TYPE_ANY) + if err != nil { + panic(err) + } + + return ScalarFuncConfig{nil, currentInfo, info, false, true} +} + +func (*anyTypeSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{nilCount} +} + +func (*errExecutorSUDF) Config() ScalarFuncConfig { + scalarUDF := simpleSUDF{} + return scalarUDF.Config() +} + +func (*errExecutorSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{nil} +} + +func (*errInputNilSUDF) Config() ScalarFuncConfig { + return ScalarFuncConfig{[]TypeInfo{nil}, currentInfo, nil, false, false} +} + +func (*errInputNilSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{constantOne} +} + +func (*errResultNilSUDF) Config() ScalarFuncConfig { + return ScalarFuncConfig{[]TypeInfo{currentInfo}, nil, nil, false, false} +} + +func (*errResultNilSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{constantOne} +} + +func (*errResultAnySUDF) Config() ScalarFuncConfig { + info, err := NewTypeInfo(TYPE_ANY) + if err != nil { + panic(err) } + + return ScalarFuncConfig{[]TypeInfo{currentInfo}, info, nil, false, false} +} + +func (*errResultAnySUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{constantOne} +} + +func (*errExecSUDF) Config() ScalarFuncConfig { + scalarUDF := simpleSUDF{} + return scalarUDF.Config() +} + +func (*errExecSUDF) Executor() ScalarFuncExecutor { + return ScalarFuncExecutor{constantError} } func TestSimpleScalarUDF(t *testing.T) { @@ -67,38 +199,6 @@ func TestSimpleScalarUDF(t *testing.T) { require.NoError(t, db.Close()) } -type constantSUDF struct{} -type otherConstantSUDF struct{} - -func (*constantSUDF) Config() ScalarFuncConfig { - return ScalarFuncConfig{ - ResultTypeInfo: currentInfo, - } -} - -func (*otherConstantSUDF) Config() ScalarFuncConfig { - return ScalarFuncConfig{ - InputTypeInfos: []TypeInfo{}, - ResultTypeInfo: currentInfo, - } -} - -func constantOne([]driver.Value) (any, error) { - return int32(1), nil -} - -func (*constantSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: constantOne, - } -} - -func (*otherConstantSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: constantOne, - } -} - func TestConstantScalarUDF(t *testing.T) { db, err := sql.Open("duckdb", "") require.NoError(t, err) @@ -130,25 +230,6 @@ func TestConstantScalarUDF(t *testing.T) { require.NoError(t, db.Close()) } -type typesSUDF struct{} - -func (*typesSUDF) Config() ScalarFuncConfig { - return ScalarFuncConfig{ - InputTypeInfos: []TypeInfo{currentInfo}, - ResultTypeInfo: currentInfo, - } -} - -func identity(args []driver.Value) (any, error) { - return args[0], nil -} - -func (*typesSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: identity, - } -} - func TestAllTypesScalarUDF(t *testing.T) { typeInfos := getTypeInfos(t, false) for _, info := range typeInfos { @@ -209,34 +290,6 @@ func TestScalarUDFSet(t *testing.T) { require.NoError(t, db.Close()) } -type variadicSUDF struct{} - -func (*variadicSUDF) Config() ScalarFuncConfig { - return ScalarFuncConfig{ - ResultTypeInfo: currentInfo, - VariadicTypeInfo: currentInfo, - Volatile: true, - SpecialNullHandling: true, - } -} - -func variadicSum(args []driver.Value) (any, error) { - sum := int32(0) - for _, val := range args { - if val == nil { - return nil, nil - } - sum += val.(int32) - } - return sum, nil -} - -func (*variadicSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: variadicSum, - } -} - func TestVariadicScalarUDF(t *testing.T) { db, err := sql.Open("duckdb", "") require.NoError(t, err) @@ -276,37 +329,6 @@ func TestVariadicScalarUDF(t *testing.T) { require.NoError(t, db.Close()) } -type anyTypeSUDF struct{} - -func (*anyTypeSUDF) Config() ScalarFuncConfig { - info, err := NewTypeInfo(TYPE_ANY) - if err != nil { - panic(err) - } - - return ScalarFuncConfig{ - ResultTypeInfo: currentInfo, - VariadicTypeInfo: info, - SpecialNullHandling: true, - } -} - -func nilCount(args []driver.Value) (any, error) { - count := int32(0) - for _, val := range args { - if val == nil { - count++ - } - } - return count, nil -} - -func (*anyTypeSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: nilCount, - } -} - func TestANYScalarUDF(t *testing.T) { db, err := sql.Open("duckdb", "") require.NoError(t, err) @@ -346,86 +368,6 @@ func TestANYScalarUDF(t *testing.T) { require.NoError(t, db.Close()) } -type errExecutorSUDF struct{} - -func (*errExecutorSUDF) Config() ScalarFuncConfig { - scalarUDF := simpleSUDF{} - return scalarUDF.Config() -} - -func (*errExecutorSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: nil, - } -} - -type errInputNilSUDF struct{} - -func (*errInputNilSUDF) Config() ScalarFuncConfig { - return ScalarFuncConfig{ - InputTypeInfos: []TypeInfo{nil}, - ResultTypeInfo: currentInfo, - } -} - -func (*errInputNilSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: constantOne, - } -} - -type errResultNilSUDF struct{} - -func (*errResultNilSUDF) Config() ScalarFuncConfig { - return ScalarFuncConfig{ - InputTypeInfos: []TypeInfo{currentInfo}, - ResultTypeInfo: nil, - } -} - -func (*errResultNilSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: constantOne, - } -} - -type errResultAnySUDF struct{} - -func (*errResultAnySUDF) Config() ScalarFuncConfig { - info, err := NewTypeInfo(TYPE_ANY) - if err != nil { - panic(err) - } - - return ScalarFuncConfig{ - InputTypeInfos: []TypeInfo{currentInfo}, - ResultTypeInfo: info, - } -} - -func (*errResultAnySUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: constantOne, - } -} - -type errExecSUDF struct{} - -func (*errExecSUDF) Config() ScalarFuncConfig { - scalarUDF := simpleSUDF{} - return scalarUDF.Config() -} - -func constantError([]driver.Value) (any, error) { - return nil, errors.New("test invalid execution") -} - -func (*errExecSUDF) Executor() ScalarFuncExecutor { - return ScalarFuncExecutor{ - RowExecutor: constantError, - } -} - func TestScalarUDFErrors(t *testing.T) { t.Parallel()