diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 64983d7b729b7..74c5b377979f7 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1681,19 +1681,12 @@ func (s *Server) ImportV2(ctx context.Context, in *internalpb.ImportRequestInter zap.Int64("collection", in.GetCollectionID()), zap.Int64s("partitions", in.GetPartitionIDs()), zap.Strings("channels", in.GetChannelNames())) - log.Info("receive import request", zap.Any("files", in.GetFiles())) + log.Info("receive import request", zap.Any("files", in.GetFiles()), zap.Any("options", in.GetOptions())) - var timeoutTs uint64 = math.MaxUint64 - timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV("timeout", in.GetOptions()) - if err == nil { - // Specifies the timeout duration for import, such as "300s", "1.5h" or "1h45m". - dur, err := time.ParseDuration(timeoutStr) - if err != nil { - resp.Status = merr.Status(merr.WrapErrImportFailed(fmt.Sprint("parse import timeout failed, err=%w", err))) - return resp, nil - } - curTs := tsoutil.GetCurrentTime() - timeoutTs = tsoutil.AddPhysicalDurationOnTs(curTs, dur) + timeoutTs, err := importutilv2.GetTimeoutTs(in.GetOptions()) + if err != nil { + resp.Status = merr.Status(merr.WrapErrImportFailed(err.Error())) + return resp, nil } files := in.GetFiles() diff --git a/internal/proxy/impl.go b/internal/proxy/impl.go index 0ab12b12280d0..8c05e2f42f06b 100644 --- a/internal/proxy/impl.go +++ b/internal/proxy/impl.go @@ -6247,6 +6247,7 @@ func (node *Proxy) ImportV2(ctx context.Context, req *internalpb.ImportRequest) zap.String("partition name", req.GetPartitionName()), zap.Any("files", req.GetFiles()), zap.String("role", typeutil.ProxyRole), + zap.Any("options", req.GetOptions()), ) resp := &internalpb.ImportResponse{ diff --git a/internal/util/importutilv2/common/util.go b/internal/util/importutilv2/common/util.go index 62f18491b933e..ba26bd5f91994 100644 --- a/internal/util/importutilv2/common/util.go +++ b/internal/util/importutilv2/common/util.go @@ -78,3 +78,14 @@ func CheckArrayCapacity(arrLength int, maxCapacity int64) error { } return nil } + +func EstimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { + sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) + if err != nil { + return 0, err + } + if 1000*sizePerRecord <= bufferSize { + return 1000, nil + } + return int64(bufferSize) / int64(sizePerRecord), nil +} diff --git a/internal/util/importutilv2/common/util_test.go b/internal/util/importutilv2/common/util_test.go new file mode 100644 index 0000000000000..efbb32cbdb201 --- /dev/null +++ b/internal/util/importutilv2/common/util_test.go @@ -0,0 +1,68 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" +) + +func TestUtil_EstimateReadCountPerBatch(t *testing.T) { + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "vec", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } + count, err := EstimateReadCountPerBatch(16*1024*1024, schema) + assert.NoError(t, err) + assert.Equal(t, int64(1000), count) + + schema.Fields = append(schema.Fields, &schemapb.FieldSchema{ + FieldID: 102, + Name: "vec2", + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "invalidDim", + }, + }, + }) + _, err = EstimateReadCountPerBatch(16*1024*1024, schema) + assert.Error(t, err) +} diff --git a/internal/util/importutilv2/csv/reader.go b/internal/util/importutilv2/csv/reader.go index f216f10d9f4f9..375b79bfd41b2 100644 --- a/internal/util/importutilv2/csv/reader.go +++ b/internal/util/importutilv2/csv/reader.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package csv import ( @@ -11,6 +27,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" ) @@ -36,13 +53,10 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("read csv file failed, path=%s, err=%s", path, err.Error())) } - // count, err := estimateReadCountPerBatch(bufferSize, schema) - // if err != nil { - // return nil, err - // } - - // set the interval for determining if the buffer is exceeded - var count int64 = 1000 + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) + if err != nil { + return nil, err + } csvReader := csv.NewReader(cmReader) csvReader.Comma = sep @@ -119,14 +133,3 @@ func (r *reader) Size() (int64, error) { r.fileSize.Store(size) return size, nil } - -// func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { -// sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) -// if err != nil { -// return 0, err -// } -// if 1000*sizePerRecord <= bufferSize { -// return 1000, nil -// } -// return int64(bufferSize) / int64(sizePerRecord), nil -// } diff --git a/internal/util/importutilv2/csv/reader_test.go b/internal/util/importutilv2/csv/reader_test.go index 1f776773ab9e5..5ae983166110c 100644 --- a/internal/util/importutilv2/csv/reader_test.go +++ b/internal/util/importutilv2/csv/reader_test.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package csv import ( @@ -72,6 +88,10 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data Key: common.MaxLengthKey, Value: "128", }, + { + Key: common.MaxCapacityKey, + Value: "256", + }, }, Nullable: nullable, }, diff --git a/internal/util/importutilv2/csv/row_parser.go b/internal/util/importutilv2/csv/row_parser.go index 6605cf4e2e769..50f0f2089e26a 100644 --- a/internal/util/importutilv2/csv/row_parser.go +++ b/internal/util/importutilv2/csv/row_parser.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package csv import ( @@ -9,7 +25,9 @@ import ( "github.com/samber/lo" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/parameterutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -224,6 +242,13 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e if nullable && obj == r.nullkey { return nil, nil } + maxLength, err := parameterutil.GetMaxLength(field) + if err != nil { + return nil, err + } + if err = common.CheckVarcharLength(obj, maxLength); err != nil { + return nil, err + } return obj, nil case schemapb.DataType_BinaryVector: if nullable && obj == r.nullkey { @@ -323,6 +348,13 @@ func (r *rowParser) parseEntity(field *schemapb.FieldSchema, obj string) (any, e if err != nil { return nil, r.wrapTypeError(obj, field) } + maxCapacity, err := parameterutil.GetMaxCapacity(field) + if err != nil { + return nil, err + } + if err = common.CheckArrayCapacity(len(vec), maxCapacity); err != nil { + return nil, err + } // elements in array not support null value scalarFieldData, err := r.arrayToFieldData(vec, field.GetElementType()) if err != nil { diff --git a/internal/util/importutilv2/csv/row_parser_test.go b/internal/util/importutilv2/csv/row_parser_test.go index 79795831e37f2..7621e1b1a6d0e 100644 --- a/internal/util/importutilv2/csv/row_parser_test.go +++ b/internal/util/importutilv2/csv/row_parser_test.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package csv import ( diff --git a/internal/util/importutilv2/json/reader.go b/internal/util/importutilv2/json/reader.go index 49c84ee8b8560..7606600914680 100644 --- a/internal/util/importutilv2/json/reader.go +++ b/internal/util/importutilv2/json/reader.go @@ -27,8 +27,8 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/importutilv2/common" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) const ( @@ -58,7 +58,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, merr.WrapErrImportFailed(fmt.Sprintf("read json file failed, path=%s, err=%s", path, err.Error())) } - count, err := estimateReadCountPerBatch(bufferSize, schema) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } @@ -181,14 +181,3 @@ func (j *reader) Size() (int64, error) { } func (j *reader) Close() {} - -func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { - sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) - if err != nil { - return 0, err - } - if 1000*sizePerRecord <= bufferSize { - return 1000, nil - } - return int64(bufferSize) / int64(sizePerRecord), nil -} diff --git a/internal/util/importutilv2/json/row_parser.go b/internal/util/importutilv2/json/row_parser.go index dde341aa7f418..8c1f0b48957fd 100644 --- a/internal/util/importutilv2/json/row_parser.go +++ b/internal/util/importutilv2/json/row_parser.go @@ -399,7 +399,9 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { } case schemapb.DataType_Array: arr, ok := obj.([]interface{}) - + if !ok { + return nil, r.wrapTypeError(obj, fieldID) + } maxCapacity, err := parameterutil.GetMaxCapacity(r.id2Field[fieldID]) if err != nil { return nil, err @@ -407,9 +409,6 @@ func (r *rowParser) parseEntity(fieldID int64, obj any) (any, error) { if err = common.CheckArrayCapacity(len(arr), maxCapacity); err != nil { return nil, err } - if !ok { - return nil, r.wrapTypeError(obj, fieldID) - } scalarFieldData, err := r.arrayToFieldData(arr, r.id2Field[fieldID].GetElementType()) if err != nil { return nil, err diff --git a/internal/util/importutilv2/numpy/reader.go b/internal/util/importutilv2/numpy/reader.go index 9e3dda144df2f..dd754f03dd708 100644 --- a/internal/util/importutilv2/numpy/reader.go +++ b/internal/util/importutilv2/numpy/reader.go @@ -53,7 +53,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co fields := lo.KeyBy(schema.GetFields(), func(field *schemapb.FieldSchema) int64 { return field.GetFieldID() }) - count, err := calcRowCount(bufferSize, schema) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/numpy/util.go b/internal/util/importutilv2/numpy/util.go index 612596b375e19..e55392f0b4590 100644 --- a/internal/util/importutilv2/numpy/util.go +++ b/internal/util/importutilv2/numpy/util.go @@ -30,7 +30,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/util/merr" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) var ( @@ -241,12 +240,3 @@ func validateHeader(npyReader *npy.Reader, field *schemapb.FieldSchema, dim int) } return nil } - -func calcRowCount(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { - sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) - if err != nil { - return 0, err - } - rowCount := int64(bufferSize) / int64(sizePerRecord) - return rowCount, nil -} diff --git a/internal/util/importutilv2/option.go b/internal/util/importutilv2/option.go index 1e0d271e01436..f4e72ca6f298d 100644 --- a/internal/util/importutilv2/option.go +++ b/internal/util/importutilv2/option.go @@ -21,6 +21,7 @@ import ( "math" "strconv" "strings" + "time" "github.com/samber/lo" @@ -31,17 +32,51 @@ import ( ) const ( - StartTs = "start_ts" - StartTs2 = "startTs" - EndTs = "end_ts" - EndTs2 = "endTs" + // Timeout specifies the timeout duration for import, such as "300s", "1.5h" or "1h45m". + Timeout = "timeout" + + // SkipDQC indicates whether to bypass the disk quota check, default to false. + SkipDQC = "skip_disk_quota_check" + + // CSVSep specifies the delimiter used for importing CSV files. + CSVSep = "sep" + + // CSVNullKey specifies the null key used when importing CSV files. + CSVNullKey = "nullkey" +) + +// Options for backup-restore mode. +const ( + // BackupFlag indicates whether the import is in backup-restore mode, default to false. BackupFlag = "backup" - L0Import = "l0_import" - SkipDQC = "skip_disk_quota_check" + + // L0Import indicates whether to import l0 segments only. + L0Import = "l0_import" + + // StartTs StartTs2 EndTs EndTs2 are used to filter data during backup-restore import. + StartTs = "start_ts" + StartTs2 = "startTs" + EndTs = "end_ts" + EndTs2 = "endTs" ) type Options []*commonpb.KeyValuePair +func GetTimeoutTs(options Options) (uint64, error) { + var timeoutTs uint64 = math.MaxUint64 + timeoutStr, err := funcutil.GetAttrByKeyFromRepeatedKV(Timeout, options) + if err == nil { + var dur time.Duration + dur, err = time.ParseDuration(timeoutStr) + if err != nil { + return 0, fmt.Errorf("parse timeout failed, err=%w", err) + } + curTs := tsoutil.GetCurrentTime() + timeoutTs = tsoutil.AddPhysicalDurationOnTs(curTs, dur) + } + return timeoutTs, nil +} + func ParseTimeRange(options Options) (uint64, uint64, error) { importOptions := funcutil.KeyValuePair2Map(options) getTimestamp := func(defaultValue uint64, targetKeys ...string) (uint64, error) { @@ -103,7 +138,7 @@ func SkipDiskQuotaCheck(options Options) bool { } func GetCSVSep(options Options) (rune, error) { - sep, err := funcutil.GetAttrByKeyFromRepeatedKV("sep", options) + sep, err := funcutil.GetAttrByKeyFromRepeatedKV(CSVSep, options) unsupportedSep := []rune{0, '\n', '\r', '"', 0xFFFD} defaultSep := ',' if err != nil || len(sep) == 0 { @@ -115,7 +150,7 @@ func GetCSVSep(options Options) (rune, error) { } func GetCSVNullKey(options Options) (string, error) { - nullKey, err := funcutil.GetAttrByKeyFromRepeatedKV("nullkey", options) + nullKey, err := funcutil.GetAttrByKeyFromRepeatedKV(CSVNullKey, options) defaultNullKey := "" if err != nil || len(nullKey) == 0 { return defaultNullKey, nil diff --git a/internal/util/importutilv2/option_test.go b/internal/util/importutilv2/option_test.go new file mode 100644 index 0000000000000..0a5deedad979c --- /dev/null +++ b/internal/util/importutilv2/option_test.go @@ -0,0 +1,53 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package importutilv2 + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) + +func TestOption_GetTimeout(t *testing.T) { + const delta = 3 * time.Second + + options := []*commonpb.KeyValuePair{{Key: Timeout, Value: "300s"}} + ts, err := GetTimeoutTs(options) + assert.NoError(t, err) + pt := tsoutil.PhysicalTime(ts) + assert.WithinDuration(t, time.Now().Add(300*time.Second), pt, delta) + + options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "1.5h"}} + ts, err = GetTimeoutTs(options) + assert.NoError(t, err) + pt = tsoutil.PhysicalTime(ts) + assert.WithinDuration(t, time.Now().Add(90*time.Minute), pt, delta) + + options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "1h45m"}} + ts, err = GetTimeoutTs(options) + assert.NoError(t, err) + pt = tsoutil.PhysicalTime(ts) + assert.WithinDuration(t, time.Now().Add(105*time.Minute), pt, delta) + + options = []*commonpb.KeyValuePair{{Key: Timeout, Value: "invalidTime"}} + _, err = GetTimeoutTs(options) + assert.Error(t, err) +} diff --git a/internal/util/importutilv2/parquet/reader.go b/internal/util/importutilv2/parquet/reader.go index 8038c383ee805..19f3631a9e955 100644 --- a/internal/util/importutilv2/parquet/reader.go +++ b/internal/util/importutilv2/parquet/reader.go @@ -74,7 +74,7 @@ func NewReader(ctx context.Context, cm storage.ChunkManager, schema *schemapb.Co if err != nil { return nil, err } - count, err := estimateReadCountPerBatch(bufferSize, schema) + count, err := common.EstimateReadCountPerBatch(bufferSize, schema) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/parquet/util.go b/internal/util/importutilv2/parquet/util.go index 659f6fd5b5367..f4b252abf4cd2 100644 --- a/internal/util/importutilv2/parquet/util.go +++ b/internal/util/importutilv2/parquet/util.go @@ -264,17 +264,6 @@ func isSchemaEqual(schema *schemapb.CollectionSchema, arrSchema *arrow.Schema) e return nil } -func estimateReadCountPerBatch(bufferSize int, schema *schemapb.CollectionSchema) (int64, error) { - sizePerRecord, err := typeutil.EstimateMaxSizePerRecord(schema) - if err != nil { - return 0, err - } - if 1000*sizePerRecord <= bufferSize { - return 1000, nil - } - return int64(bufferSize) / int64(sizePerRecord), nil -} - // todo(smellthemoon): use byte to store valid_data func bytesToBoolArray(length int, bytes []byte) []bool { bools := make([]bool, 0, length)