diff --git a/internal/util/importutil/import_util.go b/internal/util/importutil/import_util.go index 56b629c2661b3..58b28494368d1 100644 --- a/internal/util/importutil/import_util.go +++ b/internal/util/importutil/import_util.go @@ -1024,7 +1024,7 @@ func splitFieldsData(collectionInfo *CollectionInfo, fieldsData BlockData, shard rowIDField, ok := fieldsData[common.RowIDField] if !ok { rowIDField = &storage.Int64FieldData{ - Data: make([]int64, 0), + Data: make([]int64, 0, rowCount), } fieldsData[common.RowIDField] = rowIDField } diff --git a/internal/util/importutil/import_wrapper.go b/internal/util/importutil/import_wrapper.go index 730d0a024232c..d9199e334bab4 100644 --- a/internal/util/importutil/import_wrapper.go +++ b/internal/util/importutil/import_wrapper.go @@ -301,6 +301,7 @@ func (p *ImportWrapper) Import(filePaths []string, options ImportOptions) error if err != nil { return err } + defer parser.Close() err = parser.Parse() if err != nil { diff --git a/internal/util/importutil/parquet_column_reader.go b/internal/util/importutil/parquet_column_reader.go index f0ebf41b08ca6..2b45444307284 100644 --- a/internal/util/importutil/parquet_column_reader.go +++ b/internal/util/importutil/parquet_column_reader.go @@ -208,7 +208,7 @@ func ReadBoolArrayData(pcr *ParquetColumnReader, count int64) ([][]bool, error) offsets := listReader.Offsets() for i := 1; i < len(offsets); i++ { start, end := offsets[i-1], offsets[i] - elementData := make([]bool, 0) + elementData := make([]bool, 0, end-start) for j := start; j < end; j++ { elementData = append(elementData, boolReader.Value(int(j))) } @@ -306,7 +306,7 @@ func ReadStringArrayData(pcr *ParquetColumnReader, count int64) ([][]string, err offsets := listReader.Offsets() for i := 1; i < len(offsets); i++ { start, end := offsets[i-1], offsets[i] - elementData := make([]string, 0) + elementData := make([]string, 0, end-start) for j := start; j < end; j++ { elementData = append(elementData, stringReader.Value(int(j))) } diff --git a/internal/util/importutil/parquet_parser.go b/internal/util/importutil/parquet_parser.go index d7865046e289e..ab7356b2aca91 100644 --- a/internal/util/importutil/parquet_parser.go +++ b/internal/util/importutil/parquet_parser.go @@ -23,6 +23,7 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/memory" + "github.com/apache/arrow/go/v12/parquet" "github.com/apache/arrow/go/v12/parquet/file" "github.com/apache/arrow/go/v12/parquet/pqarrow" "go.uber.org/zap" @@ -87,11 +88,15 @@ func NewParquetParser(ctx context.Context, return nil, err } - reader, err := file.NewParquetReader(cmReader) + reader, err := file.NewParquetReader(cmReader, file.WithReadProps(&parquet.ReaderProperties{ + BufferSize: 32 * 1024 * 1024, + BufferedStreamEnabled: true, + })) if err != nil { log.Warn("create parquet reader failed", zap.Error(err)) return nil, err } + log.Info("create file reader done!", zap.Int("row group num", reader.NumRowGroups()), zap.Int64("num rows", reader.NumRows())) fileReader, err := pqarrow.NewFileReader(reader, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) if err != nil { @@ -544,7 +549,7 @@ func (p *ParquetParser) readData(columnReader *ParquetColumnReader, rowCount int Dim: columnReader.dimension, }, nil case schemapb.DataType_Array: - data := make([]*schemapb.ScalarField, 0) + data := make([]*schemapb.ScalarField, 0, rowCount) switch columnReader.elementType { case schemapb.DataType_Bool: boolArray, err := ReadBoolArrayData(columnReader, rowCount) diff --git a/internal/util/importutil/parquet_parser_test.go b/internal/util/importutil/parquet_parser_test.go index e14cdef415b6e..ad498444c6ca1 100644 --- a/internal/util/importutil/parquet_parser_test.go +++ b/internal/util/importutil/parquet_parser_test.go @@ -474,7 +474,7 @@ func buildArrayData(dataType, elementType schemapb.DataType, dim, rows int, isBi func writeParquet(w io.Writer, milvusSchema *schemapb.CollectionSchema, numRows int) error { schema := convertMilvusSchemaToArrowSchema(milvusSchema) - fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(), pqarrow.DefaultWriterProps()) + fw, err := pqarrow.NewFileWriter(schema, w, parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(1000)), pqarrow.DefaultWriterProps()) if err != nil { return err }