Skip to content

Commit

Permalink
Add support for returning structs and lists from tableUDF
Browse files Browse the repository at this point in the history
  • Loading branch information
JAicewizard committed May 4, 2024
1 parent d419d43 commit aec1205
Show file tree
Hide file tree
Showing 3 changed files with 318 additions and 99 deletions.
237 changes: 161 additions & 76 deletions appender_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package duckdb
import "C"

import (
"fmt"
"reflect"
"strconv"
"time"
Expand Down Expand Up @@ -105,57 +104,6 @@ func tryCastInteger[S any, R numericType](val S) (R, error) {

}

/*
func tryCast[T, R any](val T) (R, error) {
var x R
switch any(x).(type) {
case uint8:
r, err := tryCastInteger[T, R](val)
return R(r), err
case int8:
return convertNumericType[T, int8]
case uint16:
return convertNumericType[T, uint16]
case int16:
return convertNumericType[T, int16]
case uint32:
return convertNumericType[T, uint32]
case int32:
return convertNumericType[T, int32]
case uint64:
return convertNumericType[T, uint64]
case int64:
return convertNumericType[T, int64]
case uint:
return convertNumericType[T, uint]
case int:
return convertNumericType[T, int]
case C.DUCKDB_TYPE_BIGINT:
return tryNumericCast[int64](val, reflect.Int64.String())
case C.DUCKDB_TYPE_FLOAT:
return tryNumericCast[float32](val, reflect.Float32.String())
case C.DUCKDB_TYPE_DOUBLE:
return tryNumericCast[float64](val, reflect.Float64.String())
case C.DUCKDB_TYPE_BOOLEAN:
return tryPrimitiveCast[bool](val, reflect.Bool.String())
case C.DUCKDB_TYPE_VARCHAR:
return tryPrimitiveCast[string](val, reflect.String.String())
case C.DUCKDB_TYPE_BLOB:
return tryPrimitiveCast[[]byte](val, reflect.TypeOf([]byte{}).String())
case C.DUCKDB_TYPE_TIMESTAMP, C.DUCKDB_TYPE_TIMESTAMP_S, C.DUCKDB_TYPE_TIMESTAMP_MS,
C.DUCKDB_TYPE_TIMESTAMP_NS, C.DUCKDB_TYPE_TIMESTAMP_TZ:
return tryPrimitiveCast[time.Time](val, reflect.TypeOf(time.Time{}).String())
case C.DUCKDB_TYPE_UUID:
return tryPrimitiveCast[UUID](val, reflect.TypeOf(UUID{}).String())
case C.DUCKDB_TYPE_LIST:
return vec.tryCastList(val)
case C.DUCKDB_TYPE_STRUCT:
return vec.tryCastStruct(val)
}
return nil, getError(errDriver, nil)
}
*/
func (*vector) canNil(val reflect.Value) bool {
switch val.Kind() {
case reflect.Chan, reflect.Func, reflect.Map, reflect.Pointer,
Expand Down Expand Up @@ -362,7 +310,6 @@ func (vec *vector) setCString(rowIdx C.idx_t, val any) {
} else if vec.duckdbType == C.DUCKDB_TYPE_BLOB {
str = string(val.([]byte)[:])
}

// This setter also writes BLOBs.
cStr := C.CString(str)
C.duckdb_vector_assign_string_element_len(vec.duckdbVector, rowIdx, cStr, C.idx_t(len(str)))
Expand Down Expand Up @@ -513,6 +460,12 @@ func (vec *vector) initStruct(logicalType C.duckdb_logical_type) error {
return nil
}

func _setPrimitive[T any](vec *vector, rowIdx C.idx_t, val T) {
ptr := C.duckdb_vector_get_data(vec.duckdbVector)
xs := (*[1 << 31]T)(ptr)
xs[rowIdx] = val
}

func _setVectorNumeric[S any, T numericType](vec *vector, rowIdx C.idx_t, val S) error {
var fv T
switch v := any(val).(type) {
Expand Down Expand Up @@ -547,12 +500,9 @@ func _setVectorNumeric[S any, T numericType](vec *vector, rowIdx C.idx_t, val S)
fv = 0
}
default:
return fmt.Errorf("wrong input type")
return castError(reflect.TypeOf(val).String(), reflect.TypeOf(fv).String())
}

ptr := C.duckdb_vector_get_data(vec.duckdbVector)
xs := (*[1 << 31]T)(ptr)
xs[rowIdx] = fv
_setPrimitive(vec, rowIdx, fv)
return nil
}

Expand Down Expand Up @@ -584,14 +534,149 @@ func _setVectorBool[S any](vec *vector, rowIdx C.idx_t, val S) error {
case float64:
fv = v == 0
case bool:
fv = v
fv = v
default:
return fmt.Errorf("wrong input type")
return castError(reflect.TypeOf(val).String(), reflect.TypeOf(fv).String())
}
_setPrimitive(vec, rowIdx, fv)
return nil
}

ptr := C.duckdb_vector_get_data(vec.duckdbVector)
xs := (*[1 << 31]bool)(ptr)
xs[rowIdx] = fv
func _setVectorString[S any](vec *vector, rowIdx C.idx_t, val S) error {
var cStr *C.char
var length int
switch v := any(val).(type) {
case string:
cStr = C.CString(v)
defer C.free(unsafe.Pointer(cStr))
length = len(v)
case []byte:
cStr = (*C.char)(C.CBytes(v))
defer C.free(unsafe.Pointer(cStr))
length = len(v)
default:
return castError(reflect.TypeOf(val).String(), reflect.TypeOf(cStr).String())
}

C.duckdb_vector_assign_string_element_len(vec.duckdbVector, rowIdx, (*C.char)(cStr), C.idx_t(length))
return nil
}

func _setVectorTS[S any](vec *vector, rowIdx C.idx_t, val S) error {
var t time.Time
switch v := any(val).(type) {
case time.Time:
t = v
default:
return castError(reflect.TypeOf(val).String(), reflect.TypeOf(t).String())
}
var ticks int64
switch vec.duckdbType {
case C.DUCKDB_TYPE_TIMESTAMP:
ticks = t.UTC().UnixMicro()
case C.DUCKDB_TYPE_TIMESTAMP_S:
ticks = t.UTC().Unix()
case C.DUCKDB_TYPE_TIMESTAMP_MS:
ticks = t.UTC().UnixMilli()
case C.DUCKDB_TYPE_TIMESTAMP_NS:
ticks = t.UTC().UnixNano()
case C.DUCKDB_TYPE_TIMESTAMP_TZ:
ticks = t.UTC().UnixMicro()
}
var ts C.duckdb_timestamp
ts.micros = C.int64_t(ticks)
_setPrimitive(vec, rowIdx, ts)
return nil
}

func _setVectorUUID[S any](vec *vector, rowIdx C.idx_t, val S) error {
var uuid UUID
switch v := any(val).(type) {
case UUID:
uuid = v
default:
return castError(reflect.TypeOf(val).String(), reflect.TypeOf(uuid).String())
}
hi := uuidToHugeInt(uuid)
_setPrimitive(vec, rowIdx, hi)
return nil
}

func _setVectorList[S any](vec *vector, rowIdx C.idx_t, val S) error {
var list []any
switch v := any(val).(type) {
case []any:
list = v
default:
// Insert the values into the child vector.
rv := reflect.ValueOf(val)
list = make([]any, rv.Len())
childVector := vec.childVectors[0]

for i := 0; i < rv.Len(); i++ {
idx := rv.Index(i)
if vec.canNil(idx) && idx.IsNil() {
list[i] = nil
continue
}

var err error
list[i], err = childVector.tryCast(idx.Interface())
if err != nil {
return err
}
}
}
childVectorSize := C.duckdb_list_vector_get_size(vec.duckdbVector)

// Set the offset and length of the list vector using the current size of the child vector.
listEntry := C.duckdb_list_entry{
offset: C.idx_t(childVectorSize),
length: C.idx_t(len(list)),
}
_setPrimitive(vec, rowIdx, listEntry)

newLength := C.idx_t(len(list)) + childVectorSize
C.duckdb_list_vector_set_size(vec.duckdbVector, newLength)
C.duckdb_list_vector_reserve(vec.duckdbVector, newLength)

// Insert the values into the child vector.
childVector := vec.childVectors[0]
for i, e := range list {
offset := C.idx_t(i) + childVectorSize
childVector.fn(&childVector, offset, e)
}
return nil
}

func _setVectorStruct[S any](vec *vector, rowIdx C.idx_t, val S) error {
//TODO: cast to map if possible
var m map[string]any
switch v := any(val).(type) {
case map[string]any:
m = v
default:
// Catch mismatching types.
goType := reflect.TypeOf(val)
if reflect.TypeOf(val).Kind() != reflect.Struct {
return castError(goType.String(), reflect.Struct.String())
}

m = make(map[string]any)
rv := reflect.ValueOf(val)
structType := rv.Type()

for i := 0; i < structType.NumField(); i++ {
fieldName := structType.Field(i).Name
m[fieldName] = rv.Field(i).Interface()
}
}

for i := 0; i < len(vec.childVectors); i++ {
childVector := vec.childVectors[i]
childName := vec.childNames[i]
childVector.fn(&childVector, rowIdx, m[childName])
}
return nil
}

Expand Down Expand Up @@ -619,19 +704,19 @@ func setVectorVal[S any](vec *vector, rowIdx C.idx_t, val S) error {
return _setVectorNumeric[S, float64](vec, rowIdx, val)
case C.DUCKDB_TYPE_BOOLEAN:
return _setVectorBool[S](vec, rowIdx, val)
/* case C.DUCKDB_TYPE_VARCHAR:
return tryPrimitiveCast[string](val, reflect.String.String())
case C.DUCKDB_TYPE_BLOB:
return tryPrimitiveCast[[]byte](val, reflect.TypeOf([]byte{}).String())
case C.DUCKDB_TYPE_TIMESTAMP, C.DUCKDB_TYPE_TIMESTAMP_S, C.DUCKDB_TYPE_TIMESTAMP_MS,
C.DUCKDB_TYPE_TIMESTAMP_NS, C.DUCKDB_TYPE_TIMESTAMP_TZ:
return tryPrimitiveCast[time.Time](val, reflect.TypeOf(time.Time{}).String())
case C.DUCKDB_TYPE_UUID:
return tryPrimitiveCast[UUID](val, reflect.TypeOf(UUID{}).String())
case C.DUCKDB_TYPE_LIST:
return vec.tryCastList(val)
case C.DUCKDB_TYPE_STRUCT:
return vec.tryCastStruct(val)*/
case C.DUCKDB_TYPE_VARCHAR:
return _setVectorString[S](vec, rowIdx, val)
case C.DUCKDB_TYPE_BLOB:
return _setVectorString[S](vec, rowIdx, val)
case C.DUCKDB_TYPE_TIMESTAMP, C.DUCKDB_TYPE_TIMESTAMP_S, C.DUCKDB_TYPE_TIMESTAMP_MS,
C.DUCKDB_TYPE_TIMESTAMP_NS, C.DUCKDB_TYPE_TIMESTAMP_TZ:
return _setVectorTS[S](vec, rowIdx, val)
case C.DUCKDB_TYPE_UUID:
return _setVectorUUID[S](vec, rowIdx, val)
case C.DUCKDB_TYPE_LIST:
return _setVectorList[S](vec, rowIdx, val)
case C.DUCKDB_TYPE_STRUCT:
return _setVectorStruct[S](vec, rowIdx, val)
}
// TODO: error
return nil
Expand Down
36 changes: 33 additions & 3 deletions udf.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ func SetRowValue[T any](row Row, c int, val T) {
setVectorVal[T](&vec, C.ulong(row.r), val)

Check failure on line 44 in udf.go

View workflow job for this annotation

GitHub Actions / lint

Error return value is not checked (errcheck)

Check failure on line 44 in udf.go

View workflow job for this annotation

GitHub Actions / Test (macos-12, 1.21)

cannot use _Ctype_ulong(row.r) (value of type _Ctype_ulong) as _Ctype_ulonglong value in argument to setVectorVal[T]
}


func (row Row) SetRowValue(c int, val any) {
vec := row.vectors[c]

Expand All @@ -56,7 +55,7 @@ func (row Row) SetRowValue(c int, val any) {
defer C.free(unsafe.Pointer(errstr))
C.duckdb_function_set_error(row.info, errstr)
}

vec.fn(&vec, C.ulong(row.r), v)

Check failure on line 59 in udf.go

View workflow job for this annotation

GitHub Actions / Test (macos-12, 1.21)

cannot use _Ctype_ulong(row.r) (value of type _Ctype_ulong) as _Ctype_ulonglong value in argument to vec.fn
}

Expand Down Expand Up @@ -130,7 +129,7 @@ func udf_callback(info C.duckdb_function_info, output C.duckdb_data_chunk) {
break
}
}
// since row.r points to one past the last value, it is also the size
// since row.r points to one past the last value, it is also the size
C.duckdb_data_chunk_set_size(output, C.ulong(row.r))

Check failure on line 133 in udf.go

View workflow job for this annotation

GitHub Actions / Test (macos-12, 1.21)

cannot use _Ctype_ulong(row.r) (value of type _Ctype_ulong) as _Ctype_ulonglong value in variable declaration
}

Expand Down Expand Up @@ -182,6 +181,37 @@ func getDuckdbTypeFromValue(v any) (C.duckdb_logical_type, error) {
case string:
return C.duckdb_create_logical_type(C.DUCKDB_TYPE_VARCHAR), nil
default:
rv := reflect.ValueOf(v)
rt := reflect.TypeOf(v)

switch rt.Kind() {
case reflect.Struct:
var nfields int
for i := rt.NumField()-1; i >= 0; i-- {
if rv.Field(i).CanInterface(){
nfields ++
}
}

types := (*[1 << 31]C.duckdb_logical_type)(C.malloc(C.ulong(uintptr(nfields) * unsafe.Sizeof(C.duckdb_logical_type(nil)))))
names := (*[1 << 31]*C.char)(C.malloc(C.ulong(uintptr(nfields) * unsafe.Sizeof((*C.char)(nil)))))
defer C.free(unsafe.Pointer(types))
defer C.free(unsafe.Pointer(names))
for i := 0; i < nfields; i++ {
if !rv.Field(i).CanInterface(){
continue
}
var err error
(*types)[i], err = getDuckdbTypeFromValue(rv.Field(i).Interface())
if err != nil {
return C.duckdb_logical_type(nil), err
}
(*names)[i] = C.CString(rt.Field(i).Name)
}
ctypes := (*C.duckdb_logical_type)(unsafe.Pointer(types))
cnames := (**C.char)(unsafe.Pointer(names))
return C.duckdb_create_struct_type(ctypes, cnames, C.ulong(nfields)), nil

Check failure on line 213 in udf.go

View workflow job for this annotation

GitHub Actions / Test (macos-12, 1.21)

cannot use _Ctype_ulong(nfields) (value of type _Ctype_ulong) as _Ctype_ulonglong value in variable declaration
}
return C.duckdb_logical_type(nil), unsupportedTypeError(reflect.TypeOf(v).String())
}
}
Expand Down
Loading

0 comments on commit aec1205

Please sign in to comment.