Skip to content

Commit

Permalink
fix: fix wrong length of arrow array for zero-copy mode
Browse files Browse the repository at this point in the history
Signed-off-by: luzhang <[email protected]>
  • Loading branch information
luzhang committed Dec 13, 2024
1 parent 9be106d commit 760fe6a
Showing 1 changed file with 36 additions and 7 deletions.
43 changes: 36 additions & 7 deletions internal/storage/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@
package storage

import (
"encoding/binary"
"fmt"
"io"
"math"
"sync"

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/array"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/compress"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"github.com/cockroachdb/errors"
"github.com/samber/lo"
"google.golang.org/protobuf/proto"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
Expand Down Expand Up @@ -616,12 +615,42 @@ func calculateArraySize(a arrow.Array) int {
if a == nil || a.Data() == nil || a.Data().Buffers() == nil {
return 0
}
return lo.SumBy[*memory.Buffer, int](a.Data().Buffers(), func(b *memory.Buffer) int {
if b == nil {
return 0

var totalSize int
offset := a.Data().Offset()
length := a.Len()

for i, buf := range a.Data().Buffers() {
if buf == nil {
continue

Check warning on line 625 in internal/storage/serde.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/serde.go#L625

Added line #L625 was not covered by tests
}
return b.Len()
})

switch i {
case 0:
// Handle bitmap buffer
totalSize += (length + 7) / 8
case 1:
switch a.DataType().ID() {
case arrow.STRING, arrow.BINARY:
// Handle variable-length types like STRING/BINARY
startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[offset*4:]))
endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+length)*4:]))
totalSize += endOffset - startOffset
case arrow.LIST:
// Handle nest types like list
for i := 0; i < length; i++ {
startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[i*4:]))
endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(i+1)*4:]))
totalSize += endOffset - startOffset
}

Check warning on line 645 in internal/storage/serde.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/serde.go#L639-L645

Added lines #L639 - L645 were not covered by tests
default:
// Handle fixed-length types
elementSize := buf.Len() / a.Data().Len()
totalSize += elementSize * length
}
}
}
return totalSize
}

func newSelectiveRecord(r Record, selectedFieldId FieldID) *selectiveRecord {
Expand Down

0 comments on commit 760fe6a

Please sign in to comment.