From 760fe6a7cbb9d0fd65b6003fd792b221985787eb Mon Sep 17 00:00:00 2001 From: luzhang Date: Fri, 13 Dec 2024 17:17:42 +0800 Subject: [PATCH] fix: fix wrong length of arrow array for zero-copy mode Signed-off-by: luzhang --- internal/storage/serde.go | 43 ++++++++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 7 deletions(-) diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 2df4e833c6074..5fe8ead242907 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -17,6 +17,7 @@ package storage import ( + "encoding/binary" "fmt" "io" "math" @@ -24,12 +25,10 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" - "github.com/apache/arrow/go/v12/arrow/memory" "github.com/apache/arrow/go/v12/parquet" "github.com/apache/arrow/go/v12/parquet/compress" "github.com/apache/arrow/go/v12/parquet/pqarrow" "github.com/cockroachdb/errors" - "github.com/samber/lo" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" @@ -616,12 +615,42 @@ func calculateArraySize(a arrow.Array) int { if a == nil || a.Data() == nil || a.Data().Buffers() == nil { return 0 } - return lo.SumBy[*memory.Buffer, int](a.Data().Buffers(), func(b *memory.Buffer) int { - if b == nil { - return 0 + + var totalSize int + offset := a.Data().Offset() + length := a.Len() + + for i, buf := range a.Data().Buffers() { + if buf == nil { + continue } - return b.Len() - }) + + switch i { + case 0: + // Handle bitmap buffer + totalSize += (length + 7) / 8 + case 1: + switch a.DataType().ID() { + case arrow.STRING, arrow.BINARY: + // Handle variable-length types like STRING/BINARY + startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[offset*4:])) + endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(offset+length)*4:])) + totalSize += endOffset - startOffset + case arrow.LIST: + // Handle nest types like list + for i := 0; i < length; i++ { + startOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[i*4:])) + endOffset := int(binary.LittleEndian.Uint32(buf.Bytes()[(i+1)*4:])) + totalSize += endOffset - startOffset + } + default: + // Handle fixed-length types + elementSize := buf.Len() / a.Data().Len() + totalSize += elementSize * length + } + } + } + return totalSize } func newSelectiveRecord(r Record, selectedFieldId FieldID) *selectiveRecord {