Skip to content

Commit

Permalink
pqarrow/arrowutils: Add sorting support for Struct, RunEndEncoded, Fi…
Browse files Browse the repository at this point in the history
…xedSizeBinary (#936)

* pqarrow/arrowutils: Add FixedSizeBinaryDictionary support

* pqarrow/arrowutils: Add support for Struct and RunEndEncoded

* pqarrow/arrowutils: Handle empty structs correctly

* pqarrow/arrowutils: Retain record if unmodified

* query/physicalplan: Sampler requires LessOrEqual 1024 bytes allocations

* remove trailing newline

* pqarrow/arrowutils: Limit by using the arrowutils Take helper

This will also allow us to limit all the newly supported column types, timestamp, struct and runendencoded

* query/physicalplan: Guard against s.size==0 panic

Please take a look at this fix, @thorfour. I'm not sure why this started panicking now.

* pqarrow/arrowutils: Release List ValueBuilder
  • Loading branch information
metalmatze authored Nov 26, 2024
1 parent 8ee3beb commit e336b3a
Show file tree
Hide file tree
Showing 5 changed files with 428 additions and 85 deletions.
174 changes: 160 additions & 14 deletions pqarrow/arrowutils/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco
// does not have these columns.
var customTake bool
for i := 0; i < int(r.NumCols()); i++ {
if r.Column(i).DataType().ID() == arrow.DICTIONARY || r.Column(i).DataType().ID() == arrow.LIST {
if r.Column(i).DataType().ID() == arrow.DICTIONARY ||
r.Column(i).DataType().ID() == arrow.RUN_END_ENCODED ||
r.Column(i).DataType().ID() == arrow.LIST ||
r.Column(i).DataType().ID() == arrow.STRUCT {
customTake = true
break
}
Expand Down Expand Up @@ -108,8 +111,12 @@ func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Reco
switch arr := r.Column(i).(type) {
case *array.Dictionary:
g.Go(func() error { return TakeDictColumn(ctx, arr, i, resArr, indices) })
case *array.RunEndEncoded:
g.Go(func() error { return TakeRunEndEncodedColumn(ctx, arr, i, resArr, indices) })
case *array.List:
g.Go(func() error { return TakeListColumn(ctx, arr, i, resArr, indices) })
case *array.Struct:
g.Go(func() error { return TakeStructColumn(ctx, arr, i, resArr, indices) })
default:
g.Go(func() error { return TakeColumn(ctx, col, i, resArr, indices) })
}
Expand Down Expand Up @@ -140,22 +147,91 @@ func TakeColumn(ctx context.Context, a arrow.Array, idx int, arr []arrow.Array,
}

func TakeDictColumn(ctx context.Context, a *array.Dictionary, idx int, arr []arrow.Array, indices *array.Int32) error {
r := array.NewDictionaryBuilderWithDict(
compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(),
).(*array.BinaryDictionaryBuilder)
defer r.Release()
switch a.Dictionary().(type) {
case *array.String, *array.Binary:
r := array.NewDictionaryBuilderWithDict(
compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(),
).(*array.BinaryDictionaryBuilder)
defer r.Release()

r.Reserve(indices.Len())
idxBuilder := r.IndexBuilder()
for _, i := range indices.Int32Values() {
if a.IsNull(int(i)) {
r.AppendNull()
continue
}
idxBuilder.Append(a.GetValueIndex(int(i)))
}

r.Reserve(indices.Len())
idxBuilder := r.IndexBuilder()
for _, i := range indices.Int32Values() {
if a.IsNull(int(i)) {
r.AppendNull()
arr[idx] = r.NewArray()
return nil
case *array.FixedSizeBinary:
r := array.NewDictionaryBuilderWithDict(
compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(),
).(*array.FixedSizeBinaryDictionaryBuilder)
defer r.Release()

r.Reserve(indices.Len())
idxBuilder := r.IndexBuilder()
for _, i := range indices.Int32Values() {
if a.IsNull(int(i)) {
r.AppendNull()
continue
}
// TODO: Improve this by not copying actual values.
idxBuilder.Append(a.GetValueIndex(int(i)))
}

arr[idx] = r.NewArray()
return nil
}

return nil
}

func TakeRunEndEncodedColumn(ctx context.Context, a *array.RunEndEncoded, idx int, arr []arrow.Array, indices *array.Int32) error {
expandedIndexBuilder := array.NewInt32Builder(compute.GetAllocator(ctx))
defer expandedIndexBuilder.Release()

dict := a.Values().(*array.Dictionary)
for i := 0; i < a.Len(); i++ {
if dict.IsNull(a.GetPhysicalIndex(i)) {
expandedIndexBuilder.AppendNull()
} else {
expandedIndexBuilder.Append(int32(dict.GetValueIndex(a.GetPhysicalIndex(i))))
}
}
expandedIndex := expandedIndexBuilder.NewInt32Array()
defer expandedIndex.Release()

expandedReorderedArr := make([]arrow.Array, 1)
if err := TakeColumn(ctx, expandedIndex, 0, expandedReorderedArr, indices); err != nil {
return err
}
expandedReordered := expandedReorderedArr[0].(*array.Int32)
defer expandedReordered.Release()

b := array.NewRunEndEncodedBuilder(
compute.GetAllocator(ctx), a.RunEndsArr().DataType(), a.Values().DataType(),
)
defer b.Release()
b.Reserve(indices.Len())

dictValues := dict.Dictionary().(*array.String)
for i := 0; i < expandedReordered.Len(); i++ {
if expandedReordered.IsNull(i) {
b.AppendNull()
continue
}
idxBuilder.Append(a.GetValueIndex(int(i)))
reorderedIndex := expandedReordered.Value(i)
v := dictValues.Value(int(reorderedIndex))
if err := b.AppendValueFromString(v); err != nil {
return err
}
}

arr[idx] = r.NewArray()
arr[idx] = b.NewRunEndEncodedArray()
return nil
}

Expand All @@ -165,6 +241,7 @@ func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Arr
if !ok {
return fmt.Errorf("unexpected value builder type %T for list column", r.ValueBuilder())
}
defer valueBuilder.Release()

listValues := a.ListValues().(*array.Dictionary)
switch dictV := listValues.Dictionary().(type) {
Expand Down Expand Up @@ -200,6 +277,54 @@ func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Arr
return nil
}

func TakeStructColumn(ctx context.Context, a *array.Struct, idx int, arr []arrow.Array, indices *array.Int32) error {
aType := a.Data().DataType().(*arrow.StructType)

// Immediately, return this struct if it has no fields/columns
if a.NumField() == 0 {
// If the original record is released and this is released once more,
// as usually done, we want to retain it once more.
a.Retain()
arr[idx] = a
return nil
}

cols := make([]arrow.Array, a.NumField())
names := make([]string, a.NumField())
defer func() {
for _, col := range cols {
if col != nil {
col.Release()
}
}
}()

for i := 0; i < a.NumField(); i++ {
names[i] = aType.Field(i).Name

switch f := a.Field(i).(type) {
case *array.RunEndEncoded:
err := TakeRunEndEncodedColumn(ctx, f, i, cols, indices)
if err != nil {
return err
}
default:
err := TakeColumn(ctx, f, i, cols, indices)
if err != nil {
return err
}
}
}

takeStruct, err := array.NewStructArray(cols, names)
if err != nil {
return err
}

arr[idx] = takeStruct
return nil
}

type multiColSorter struct {
indices *builder.OptInt32Builder
comparisons []comparator
Expand Down Expand Up @@ -263,13 +388,21 @@ func newMultiColSorter(
},
bytes.Compare,
)
case *array.FixedSizeBinary:
ms.comparisons[i] = newOrderedSorter[[]byte](
&fixedSizeBinaryDictionary{
dict: e,
elem: elem,
},
bytes.Compare,
)
default:
ms.Release()
return nil, fmt.Errorf("unsupported dictionary column type for sorting %T", e)
return nil, fmt.Errorf("unsupported dictionary column type for sorting %T for column %s", e, r.Schema().Field(col.Index).Name)
}
default:
ms.Release()
return nil, fmt.Errorf("unsupported column type for sorting %T", e)
return nil, fmt.Errorf("unsupported column type for sorting %T for column %s", e, r.Schema().Field(col.Index).Name)
}
}
return ms, nil
Expand Down Expand Up @@ -417,3 +550,16 @@ func (s *binaryDictionary) IsNull(i int) bool {
func (s *binaryDictionary) Value(i int) []byte {
return s.elem.Value(s.dict.GetValueIndex(i))
}

type fixedSizeBinaryDictionary struct {
dict *array.Dictionary
elem *array.FixedSizeBinary
}

func (s *fixedSizeBinaryDictionary) IsNull(i int) bool {
return s.dict.IsNull(i)
}

func (s *fixedSizeBinaryDictionary) Value(i int) []byte {
return s.elem.Value(s.dict.GetValueIndex(i))
}
Loading

0 comments on commit e336b3a

Please sign in to comment.