Skip to content

Commit

Permalink
fix: Import data from parquet file in streaming way (#29528)
Browse files Browse the repository at this point in the history
issue:  #29292 
master pr: #29514

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Dec 29, 2023
1 parent 5ec79ab commit ba1d055
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 6 deletions.
2 changes: 1 addition & 1 deletion internal/util/importutil/import_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,7 @@ func splitFieldsData(collectionInfo *CollectionInfo, fieldsData BlockData, shard
rowIDField, ok := fieldsData[common.RowIDField]
if !ok {
rowIDField = &storage.Int64FieldData{
Data: make([]int64, 0),
Data: make([]int64, 0, rowCount),
}
fieldsData[common.RowIDField] = rowIDField
}
Expand Down
1 change: 1 addition & 0 deletions internal/util/importutil/import_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error
if err != nil {
return err
}
defer parser.Close()

err = parser.Parse()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions internal/util/importutil/parquet_column_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ func ReadBoolArrayData(pcr *ParquetColumnReader, count int64) ([][]bool, error)
offsets := listReader.Offsets()
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]bool, 0)
elementData := make([]bool, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, boolReader.Value(int(j)))
}
Expand Down Expand Up @@ -306,7 +306,7 @@ func ReadStringArrayData(pcr *ParquetColumnReader, count int64) ([][]string, err
offsets := listReader.Offsets()
for i := 1; i < len(offsets); i++ {
start, end := offsets[i-1], offsets[i]
elementData := make([]string, 0)
elementData := make([]string, 0, end-start)
for j := start; j < end; j++ {
elementData = append(elementData, stringReader.Value(int(j)))
}
Expand Down
9 changes: 7 additions & 2 deletions internal/util/importutil/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/apache/arrow/go/v12/arrow"
"github.com/apache/arrow/go/v12/arrow/memory"
"github.com/apache/arrow/go/v12/parquet"
"github.com/apache/arrow/go/v12/parquet/file"
"github.com/apache/arrow/go/v12/parquet/pqarrow"
"go.uber.org/zap"
Expand Down Expand Up @@ -87,11 +88,15 @@ func NewParquetParser(ctx context.Context,
return nil, err
}

reader, err := file.NewParquetReader(cmReader)
reader, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{
BufferSize: 32 * 1024 * 1024,
BufferedStreamEnabled: true,
}))
if err != nil {
log.Warn("create parquet reader failed", zap.Error(err))
return nil, err
}
log.Info("create file reader done!", zap.Int("row group num", reader.NumRowGroups()), zap.Int64("num rows", reader.NumRows()))

fileReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
if err != nil {
Expand Down Expand Up @@ -544,7 +549,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int
Dim: columnReader.dimension,
}, nil
case schemapb.DataType_Array:
data := make([]*schemapb.ScalarField, 0)
data := make([]*schemapb.ScalarField, 0, rowCount)
switch columnReader.elementType {
case schemapb.DataType_Bool:
boolArray, err := ReadBoolArrayData(columnReader, rowCount)
Expand Down
2 changes: 1 addition & 1 deletion internal/util/importutil/parquet_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int, isBi

func writeParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows int) error {
schema := convertMilvusSchemaToArrowSchema(milvusSchema)
fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps())
fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(1000)), pqarrow.DefaultWriterProps())
if err != nil {
return err
}
Expand Down

0 comments on commit ba1d055

Please sign in to comment.