Skip to content

Commit

Permalink
internal/records: Create generic reader (#805)
Browse files Browse the repository at this point in the history
Similar to the generic builder I've added a reader.
It can read multiple records and returns the passed struct when calling Value(i).

This makes it super easy to read the records back to structs and compare against some expected values.

```go
type struct3 struct {
	Field1 int64 `frostdb:",asc(0)"`
	Field2 int64 `frostdb:",asc(1)"`
	Field3 int64 `frostdb:",asc(1)"`
}
expected := []struct3{
	// record1
	{Field1: 1, Field2: 2, Field3: 0},
	{Field1: 1, Field2: 3, Field3: 0},
	// record2
	{Field1: 1, Field2: 0, Field3: 2},
	{Field1: 1, Field2: 0, Field3: 3},
	// record3
	{Field1: 1, Field2: 1, Field3: 1},
	{Field1: 2, Field2: 2, Field3: 2},
}

reader := records.NewReader[struct3](recs...)
rows := reader.NumRows()
require.Equal(t, int64(len(expected)), rows)

actual := make([]struct3, rows)
for i := 0; i < int(rows); i++ {
	actual[i] = reader.Value(i)
}
require.Equal(t, expected, actual)
```

I didn't add tests as this reader is going to be used in tests a lot.
  • Loading branch information
metalmatze authored Mar 28, 2024
1 parent 2a82257 commit 7a53786
Showing 1 changed file with 188 additions and 0 deletions.
188 changes: 188 additions & 0 deletions internal/records/record_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
package records

import (
"reflect"

"github.com/apache/arrow/go/v15/arrow"
"github.com/apache/arrow/go/v15/arrow/array"
)

type Reader[T any] struct {
records []arrow.Record
}

func NewReader[T any](records ...arrow.Record) *Reader[T] {
var a T
r := reflect.TypeOf(a)
for r.Kind() == reflect.Ptr {
r = r.Elem()
}
if r.Kind() != reflect.Struct {
panic("frostdb/dynschema: " + r.String() + " is not supported")
}

return &Reader[T]{records: records}
}

func (r *Reader[T]) NumRows() int64 {
var rows int64
for _, record := range r.records {
rows += record.NumRows()
}
return rows
}

func (r *Reader[T]) Value(i int) T {
row := *new(T)
rowType := reflect.TypeOf(row)

// find the record with the value
var record arrow.Record
var previousRows int64
for _, rec := range r.records {
if i < int(previousRows+rec.NumRows()) {
record = rec
i = i - int(previousRows)
break
}
previousRows += rec.NumRows()
}

for j := 0; j < rowType.NumField(); j++ {
f := rowType.Field(j)
name, _ := fieldName(f)

indices := record.Schema().FieldIndices(name)
if len(indices) != 1 {
panic("field " + name + " not found or ambiguous")
}

switch f.Type.Kind() {
case reflect.Bool:
arr, ok := record.Column(indices[0]).(*array.Boolean)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Float32:
arr, ok := record.Column(indices[0]).(*array.Float32)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Float64:
arr, ok := record.Column(indices[0]).(*array.Float64)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Int8:
arr, ok := record.Column(indices[0]).(*array.Int8)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Int16:
arr, ok := record.Column(indices[0]).(*array.Int16)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Int32:
arr, ok := record.Column(indices[0]).(*array.Int32)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Int64:
arr, ok := record.Column(indices[0]).(*array.Int64)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Uint8:
arr, ok := record.Column(indices[0]).(*array.Uint8)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Uint16:
arr, ok := record.Column(indices[0]).(*array.Uint16)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Uint32:
arr, ok := record.Column(indices[0]).(*array.Uint32)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.Uint64:
arr, ok := record.Column(indices[0]).(*array.Uint64)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
case reflect.String:
// We probably need to support array.Binary too
arr, ok := record.Column(indices[0]).(*array.String)
if !ok || arr.IsNull(i) {
continue
}
reflect.ValueOf(&row).Elem().Field(j).Set(
reflect.ValueOf(
arr.Value(i),
),
)
default:
panic("unsupported type " + f.Type.String())
}
}

return row
}

0 comments on commit 7a53786

Please sign in to comment.