diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 58c26f54b2d52..91a254c9de3d9 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -204,7 +204,6 @@ func (s *Server) AssignSegmentID(ctx context.Context, req *datapb.AssignSegmentI zap.Int64("partitionID", r.GetPartitionID()), zap.String("channelName", r.GetChannelName()), zap.Uint32("count", r.GetCount()), - zap.String("segment level", r.GetLevel().String()), ) // Load the collection info from Root Coordinator, if it is not found in server meta. diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 7294914f03e64..0c920a9379994 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -167,7 +167,7 @@ message SegmentIDRequest { int64 partitionID = 4; bool isImport = 5; // deprecated int64 importTaskID = 6; // deprecated - SegmentLevel level = 7; + SegmentLevel level = 7; // deprecated } message AssignSegmentIDRequest { diff --git a/internal/proxy/search_util.go b/internal/proxy/search_util.go index 559a96cfe23de..5aea8de73cd3e 100644 --- a/internal/proxy/search_util.go +++ b/internal/proxy/search_util.go @@ -234,7 +234,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string, useRegexp := Params.ProxyCfg.PartitionNameRegexp.GetAsBool() - partitionsSet := typeutil.NewSet[int64]() + partitionsSet := typeutil.NewUniqueSet() for _, partitionName := range partitionNames { if useRegexp { // Legacy feature, use partition name as regexp @@ -259,9 +259,7 @@ func getPartitionIDs(ctx context.Context, dbName string, collectionName string, // TODO change after testcase updated: return nil, merr.WrapErrPartitionNotFound(partitionName) return nil, fmt.Errorf("partition name %s not found", partitionName) } - if !partitionsSet.Contain(partitionID) { - partitionsSet.Insert(partitionID) - } + partitionsSet.Insert(partitionID) } } return partitionsSet.Collect(), nil diff --git a/internal/proxy/task_delete.go b/internal/proxy/task_delete.go index df5403aea5b65..281b3f700b590 100644 --- a/internal/proxy/task_delete.go +++ b/internal/proxy/task_delete.go @@ -52,11 +52,9 @@ type deleteTask struct { idAllocator allocator.Interface // delete info - primaryKeys *schemapb.IDs - collectionID UniqueID - partitionID UniqueID - dbID UniqueID - partitionKeyMode bool + primaryKeys *schemapb.IDs + collectionID UniqueID + partitionID UniqueID // set by scheduler ts Timestamp @@ -132,7 +130,6 @@ func (dt *deleteTask) PreExecute(ctx context.Context) error { func (dt *deleteTask) Execute(ctx context.Context) (err error) { ctx, sp := otel.Tracer(typeutil.ProxyRole).Start(ctx, "Proxy-Delete-Execute") defer sp.End() - // log := log.Ctx(ctx) if len(dt.req.GetExpr()) == 0 { return merr.WrapErrParameterInvalid("valid expr", "empty expr", "invalid expression") @@ -224,13 +221,13 @@ func repackDeleteMsgByHash( commonpbutil.WithTimeStamp(ts), commonpbutil.WithSourceID(paramtable.GetNodeID()), ), - CollectionID: collectionID, - PartitionID: partitionID, + ShardName: vchannel, CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, + CollectionID: collectionID, + PartitionID: partitionID, PrimaryKeys: &schemapb.IDs{}, - ShardName: vchannel, }, } } @@ -291,8 +288,9 @@ type deleteRunner struct { schema *schemaInfo dbID UniqueID collectionID UniqueID - partitionID UniqueID partitionKeyMode bool + partitionIDs []UniqueID + plan *planpb.PlanNode // for query msgID int64 @@ -331,29 +329,53 @@ func (dr *deleteRunner) Init(ctx context.Context) error { return ErrWithLog(log, "Failed to get collection schema", err) } + dr.plan, err = planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues()) + if err != nil { + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err)) + } + + if planparserv2.IsAlwaysTruePlan(dr.plan) { + return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr())) + } + + // Set partitionIDs, could be empty if no partition name specified and no partition key dr.partitionKeyMode = dr.schema.IsPartitionKeyCollection() - // get partitionIDs of delete - dr.partitionID = common.AllPartitionsID - if len(dr.req.PartitionName) > 0 { - if dr.partitionKeyMode { + partName := dr.req.GetPartitionName() + if dr.partitionKeyMode { + if len(partName) > 0 { return errors.New("not support manually specifying the partition names if partition key mode is used") } - - partName := dr.req.GetPartitionName() + expr, err := exprutil.ParseExprFromPlan(dr.plan) + if err != nil { + return err + } + partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey) + hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys) + if err != nil { + return err + } + dr.partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames) + if err != nil { + return err + } + } else if len(partName) > 0 { + // static validation if err := validatePartitionTag(partName, true); err != nil { return ErrWithLog(log, "Invalid partition name", err) } + + // dynamic validation partID, err := globalMetaCache.GetPartitionID(ctx, dr.req.GetDbName(), collName, partName) if err != nil { return ErrWithLog(log, "Failed to get partition id", err) } - dr.partitionID = partID + dr.partitionIDs = []UniqueID{partID} // only one partID } - // hash primary keys to channels + // set vchannels channelNames, err := dr.chMgr.getVChannels(dr.collectionID) if err != nil { - return ErrWithLog(log, "Failed to get primary keys from expr", err) + return ErrWithLog(log, "Failed to get vchannels from collection", err) } dr.vChannels = channelNames @@ -367,26 +389,22 @@ func (dr *deleteRunner) Init(ctx context.Context) error { } func (dr *deleteRunner) Run(ctx context.Context) error { - plan, err := planparserv2.CreateRetrievePlan(dr.schema.schemaHelper, dr.req.GetExpr(), dr.req.GetExprTemplateValues()) - if err != nil { - return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("failed to create delete plan: %v", err)) - } - - if planparserv2.IsAlwaysTruePlan(plan) { - return merr.WrapErrAsInputError(merr.WrapErrParameterInvalidMsg("delete plan can't be empty or always true : %s", dr.req.GetExpr())) + // Make sure partitionIDs aren't empty + if len(dr.partitionIDs) == 0 { + dr.partitionIDs = []UniqueID{common.AllPartitionsID} } - isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, plan) + isSimple, pk, numRow := getPrimaryKeysFromPlan(dr.schema.CollectionSchema, dr.plan) if isSimple { // if could get delete.primaryKeys from delete expr - err := dr.simpleDelete(ctx, pk, numRow) + err := dr.simpleDelete(ctx, pk, numRow, dr.partitionIDs[0]) if err != nil { return err } } else { // if get complex delete expr // need query from querynode before delete - err = dr.complexDelete(ctx, plan) + err := dr.complexDelete(ctx, dr.plan) if err != nil { log.Warn("complex delete failed,but delete some data", zap.Int64("count", dr.result.DeleteCnt), zap.String("expr", dr.req.GetExpr())) return err @@ -395,58 +413,35 @@ func (dr *deleteRunner) Run(ctx context.Context) error { return nil } -func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs) (*deleteTask, error) { - task := &deleteTask{ - ctx: ctx, - Condition: NewTaskCondition(ctx), - req: dr.req, - idAllocator: dr.idAllocator, - chMgr: dr.chMgr, - chTicker: dr.chTicker, - collectionID: dr.collectionID, - partitionID: dr.partitionID, - partitionKeyMode: dr.partitionKeyMode, - vChannels: dr.vChannels, - primaryKeys: primaryKeys, - dbID: dr.dbID, - } - - if err := dr.queue.Enqueue(task); err != nil { +func (dr *deleteRunner) produce(ctx context.Context, primaryKeys *schemapb.IDs, partitionID UniqueID) (*deleteTask, error) { + dt := &deleteTask{ + ctx: ctx, + Condition: NewTaskCondition(ctx), + req: dr.req, + idAllocator: dr.idAllocator, + chMgr: dr.chMgr, + chTicker: dr.chTicker, + collectionID: dr.collectionID, + partitionID: partitionID, + vChannels: dr.vChannels, + primaryKeys: primaryKeys, + } + + if err := dr.queue.Enqueue(dt); err != nil { log.Error("Failed to enqueue delete task: " + err.Error()) return nil, err } - return task, nil + return dt, nil } // getStreamingQueryAndDelteFunc return query function used by LBPolicy // make sure it concurrent safe func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) executeFunc { return func(ctx context.Context, nodeID int64, qn types.QueryNodeClient, channel string) error { - var partitionIDs []int64 - - // optimize query when partitionKey on - if dr.partitionKeyMode { - expr, err := exprutil.ParseExprFromPlan(plan) - if err != nil { - return err - } - partitionKeys := exprutil.ParseKeys(expr, exprutil.PartitionKey) - hashedPartitionNames, err := assignPartitionKeys(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), partitionKeys) - if err != nil { - return err - } - partitionIDs, err = getPartitionIDs(ctx, dr.req.GetDbName(), dr.req.GetCollectionName(), hashedPartitionNames) - if err != nil { - return err - } - } else if dr.partitionID != common.InvalidFieldID { - partitionIDs = []int64{dr.partitionID} - } - log := log.Ctx(ctx).With( zap.Int64("collectionID", dr.collectionID), - zap.Int64s("partitionIDs", partitionIDs), + zap.Int64s("partitionIDs", dr.partitionIDs), zap.String("channel", channel), zap.Int64("nodeID", nodeID)) @@ -472,7 +467,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe ReqID: paramtable.GetNodeID(), DbID: 0, // TODO CollectionID: dr.collectionID, - PartitionIDs: partitionIDs, + PartitionIDs: dr.partitionIDs, SerializedExprPlan: serializedPlan, OutputFieldsId: outputFieldIDs, GuaranteeTimestamp: parseGuaranteeTsFromConsistency(dr.ts, dr.ts, dr.req.GetConsistencyLevel()), @@ -493,7 +488,7 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe taskCh := make(chan *deleteTask, 256) var receiveErr error go func() { - receiveErr = dr.receiveQueryResult(ctx, client, taskCh, partitionIDs) + receiveErr = dr.receiveQueryResult(ctx, client, taskCh) close(taskCh) }() var allQueryCnt int64 @@ -516,7 +511,15 @@ func (dr *deleteRunner) getStreamingQueryAndDelteFunc(plan *planpb.PlanNode) exe } } -func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask, partitionIDs []int64) error { +func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.QueryNode_QueryStreamClient, taskCh chan *deleteTask) error { + // If a complex delete tries to delete multiple partitions in the filter, use AllPartitionID + // otherwise use the target partitionID, which can come from partition name(UDF) or a partition key expression + // TODO: Get partitionID from Query results + msgPartitionID := common.AllPartitionsID + if len(dr.partitionIDs) == 1 { + msgPartitionID = dr.partitionIDs[0] + } + for { result, err := client.Recv() if err != nil { @@ -534,14 +537,14 @@ func (dr *deleteRunner) receiveQueryResult(ctx context.Context, client querypb.Q } if dr.limiter != nil { - err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds())) + err := dr.limiter.Alloc(ctx, dr.dbID, map[int64][]int64{dr.collectionID: dr.partitionIDs}, internalpb.RateType_DMLDelete, proto.Size(result.GetIds())) if err != nil { log.Warn("query stream for delete failed because rate limiter", zap.Int64("msgID", dr.msgID), zap.Error(err)) return err } } - task, err := dr.produce(ctx, result.GetIds()) + task, err := dr.produce(ctx, result.GetIds(), msgPartitionID) if err != nil { log.Warn("produce delete task failed", zap.Error(err)) return err @@ -586,13 +589,13 @@ func (dr *deleteRunner) complexDelete(ctx context.Context, plan *planpb.PlanNode return nil } -func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numRow int64) error { - log.Debug("get primary keys from expr", +func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numRow int64, partitionID UniqueID) error { + log.Ctx.Debug("get primary keys from expr", zap.Int64("len of primary keys", numRow), zap.Int64("collectionID", dr.collectionID), - zap.Int64("partitionID", dr.partitionID)) + zap.Int64("partitionID", partitionID)) - task, err := dr.produce(ctx, pk) + task, err := dr.produce(ctx, pk, partitionID) if err != nil { log.Warn("produce delete task failed") return err @@ -605,70 +608,71 @@ func (dr *deleteRunner) simpleDelete(ctx context.Context, pk *schemapb.IDs, numR return err } -func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (bool, *schemapb.IDs, int64) { - // simple delete request need expr with "pk in [a, b]" +func getPrimaryKeysFromPlan(schema *schemapb.CollectionSchema, plan *planpb.PlanNode) (isSimpleDelete bool, pks *schemapb.IDs, pkCount int64) { + var err error + // simple delete request with "pk in [a, b]" termExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_TermExpr) if ok { if !termExpr.TermExpr.GetColumnInfo().GetIsPrimaryKey() { return false, nil, 0 } - ids, rowNum, err := getPrimaryKeysFromTermExpr(schema, termExpr) + pks, pkCount, err = getPrimaryKeysFromTermExpr(schema, termExpr) if err != nil { return false, nil, 0 } - return true, ids, rowNum + return true, pks, pkCount } - // simple delete if expr with "pk == a" + // simple delete with "pk == a" unaryRangeExpr, ok := plan.Node.(*planpb.PlanNode_Query).Query.Predicates.Expr.(*planpb.Expr_UnaryRangeExpr) if ok { if unaryRangeExpr.UnaryRangeExpr.GetOp() != planpb.OpType_Equal || !unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetIsPrimaryKey() { return false, nil, 0 } - ids, err := getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr) + pks, err = getPrimaryKeysFromUnaryRangeExpr(schema, unaryRangeExpr) if err != nil { return false, nil, 0 } - return true, ids, 1 + return true, pks, 1 } return false, nil, 0 } -func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (res *schemapb.IDs, err error) { - res = &schemapb.IDs{} +func getPrimaryKeysFromUnaryRangeExpr(schema *schemapb.CollectionSchema, unaryRangeExpr *planpb.Expr_UnaryRangeExpr) (pks *schemapb.IDs, err error) { + pks = &schemapb.IDs{} switch unaryRangeExpr.UnaryRangeExpr.GetColumnInfo().GetDataType() { case schemapb.DataType_Int64: - res.IdField = &schemapb.IDs_IntId{ + pks.IdField = &schemapb.IDs_IntId{ IntId: &schemapb.LongArray{ Data: []int64{unaryRangeExpr.UnaryRangeExpr.GetValue().GetInt64Val()}, }, } case schemapb.DataType_VarChar: - res.IdField = &schemapb.IDs_StrId{ + pks.IdField = &schemapb.IDs_StrId{ StrId: &schemapb.StringArray{ Data: []string{unaryRangeExpr.UnaryRangeExpr.GetValue().GetStringVal()}, }, } default: - return res, fmt.Errorf("invalid field data type specifyed in simple delete expr") + return pks, fmt.Errorf("invalid field data type specifyed in simple delete expr") } - return res, nil + return pks, nil } -func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (res *schemapb.IDs, rowNum int64, err error) { - res = &schemapb.IDs{} - rowNum = int64(len(termExpr.TermExpr.Values)) +func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *planpb.Expr_TermExpr) (pks *schemapb.IDs, pkCount int64, err error) { + pks = &schemapb.IDs{} + pkCount = int64(len(termExpr.TermExpr.Values)) switch termExpr.TermExpr.ColumnInfo.GetDataType() { case schemapb.DataType_Int64: ids := make([]int64, 0) for _, v := range termExpr.TermExpr.Values { ids = append(ids, v.GetInt64Val()) } - res.IdField = &schemapb.IDs_IntId{ + pks.IdField = &schemapb.IDs_IntId{ IntId: &schemapb.LongArray{ Data: ids, }, @@ -678,14 +682,14 @@ func getPrimaryKeysFromTermExpr(schema *schemapb.CollectionSchema, termExpr *pla for _, v := range termExpr.TermExpr.Values { ids = append(ids, v.GetStringVal()) } - res.IdField = &schemapb.IDs_StrId{ + pks.IdField = &schemapb.IDs_StrId{ StrId: &schemapb.StringArray{ Data: ids, }, } default: - return res, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr") + return pks, 0, fmt.Errorf("invalid field data type specifyed in simple delete expr") } - return res, rowNum, nil + return pks, pkCount, nil } diff --git a/internal/proxy/task_delete_test.go b/internal/proxy/task_delete_test.go index 99cd2e03e6c8c..8c37ec6a7166f 100644 --- a/internal/proxy/task_delete_test.go +++ b/internal/proxy/task_delete_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" "google.golang.org/grpc" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -232,18 +233,34 @@ func TestDeleteTask_Execute(t *testing.T) { }) } -func TestDeleteRunner_Init(t *testing.T) { - collectionName := "test_delete" - collectionID := int64(111) - partitionName := "default" - partitionID := int64(222) - // channels := []string{"test_channel"} - dbName := "test_1" +func TestDeleteRunnerSuite(t *testing.T) { + suite.Run(t, new(DeleteRunnerSuite)) +} - collSchema := &schemapb.CollectionSchema{ - Name: collectionName, - Description: "", - AutoID: false, +type DeleteRunnerSuite struct { + suite.Suite + + collectionName string + collectionID int64 + partitionName string + partitionIDs []int64 + + schema *schemaInfo + mockCache *MockCache +} + +func (s *DeleteRunnerSuite) SetupSubTest() { + s.SetupSuite() +} + +func (s *DeleteRunnerSuite) SetupSuite() { + s.collectionName = "test_delete" + s.collectionID = int64(111) + s.partitionName = "default" + s.partitionIDs = []int64{222, 333, 444} + + schema := &schemapb.CollectionSchema{ + Name: s.collectionName, Fields: []*schemapb.FieldSchema{ { FieldID: common.StartOfUserFieldID, @@ -252,200 +269,358 @@ func TestDeleteRunner_Init(t *testing.T) { DataType: schemapb.DataType_Int64, }, { - FieldID: common.StartOfUserFieldID + 1, - Name: "non_pk", - IsPrimaryKey: false, - DataType: schemapb.DataType_Int64, + FieldID: common.StartOfUserFieldID + 1, + Name: "non_pk", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, }, }, } - schema := newSchemaInfo(collSchema) + s.schema = newSchemaInfo(schema) + s.mockCache = NewMockCache(s.T()) +} - t.Run("empty collection name", func(t *testing.T) { - dr := deleteRunner{} - assert.Error(t, dr.Init(context.Background())) +func (s *DeleteRunnerSuite) TestInitSuccess() { + s.Run("non_pk == 1", func() { + mockChMgr := NewMockChannelsMgr(s.T()) + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + Expr: "non_pk == 1", + }, + chMgr: mockChMgr, + } + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice() + s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil) + s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil) + mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil) + + globalMetaCache = s.mockCache + s.NoError(dr.Init(context.Background())) + + s.True(dr.partitionKeyMode) + s.Require().Equal(1, len(dr.partitionIDs)) + s.True(typeutil.NewSet[int64](100, 101).Contain(dr.partitionIDs[0])) }) - t.Run("fail to get database info", func(t *testing.T) { + s.Run("non_pk > 1, partition key", func() { + mockChMgr := NewMockChannelsMgr(s.T()) dr := deleteRunner{ req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, + CollectionName: s.collectionName, + Expr: "non_pk > 1", }, + chMgr: mockChMgr, } - cache := NewMockCache(t) - cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")) - globalMetaCache = cache - - assert.Error(t, dr.Init(context.Background())) + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice() + s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil) + s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil) + mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil) + + globalMetaCache = s.mockCache + s.NoError(dr.Init(context.Background())) + + s.True(dr.partitionKeyMode) + s.Require().Equal(0, len(dr.partitionIDs)) }) - t.Run("fail to get collection id", func(t *testing.T) { + s.Run("pk == 1, partition key", func() { + mockChMgr := NewMockChannelsMgr(s.T()) dr := deleteRunner{ req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, + CollectionName: s.collectionName, + Expr: "pk == 1", }, + chMgr: mockChMgr, } - cache := NewMockCache(t) - cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(0), errors.New("mock GetCollectionID err")) - - globalMetaCache = cache - assert.Error(t, dr.Init(context.Background())) + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice() + s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil) + s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil) + mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil) + + globalMetaCache = s.mockCache + s.NoError(dr.Init(context.Background())) + + s.True(dr.partitionKeyMode) + s.Require().Equal(0, len(dr.partitionIDs)) }) - t.Run("fail get collection schema", func(t *testing.T) { - dr := deleteRunner{req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - DbName: dbName, - }} - cache := NewMockCache(t) - cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(collectionID, nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(nil, errors.New("mock GetCollectionSchema err")) - - globalMetaCache = cache - assert.Error(t, dr.Init(context.Background())) + s.Run("pk == 1, no partition name", func() { + mockChMgr := NewMockChannelsMgr(s.T()) + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + Expr: "pk == 1", + }, + chMgr: mockChMgr, + } + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + // Schema without PartitionKey + schema := &schemapb.CollectionSchema{ + Name: s.collectionName, + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.StartOfUserFieldID, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.StartOfUserFieldID + 1, + Name: "non_pk", + DataType: schemapb.DataType_Int64, + IsPartitionKey: false, + }, + }, + } + s.schema = newSchemaInfo(schema) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Once() + mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil) + + globalMetaCache = s.mockCache + s.NoError(dr.Init(context.Background())) + + s.False(dr.partitionKeyMode) + s.Equal(0, len(dr.partitionIDs)) }) - t.Run("partition key mode but delete with partition name", func(t *testing.T) { - dr := deleteRunner{req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - DbName: dbName, - PartitionName: partitionName, - }} - cache := NewMockCache(t) - cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(collectionID, nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(newSchemaInfo(&schemapb.CollectionSchema{ - Name: collectionName, - Description: "", - AutoID: false, + s.Run("pk == 1, with partition name", func() { + mockChMgr := NewMockChannelsMgr(s.T()) + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + PartitionName: "part1", + Expr: "pk == 1", + }, + chMgr: mockChMgr, + } + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + // Schema without PartitionKey + schema := &schemapb.CollectionSchema{ + Name: s.collectionName, Fields: []*schemapb.FieldSchema{ { - FieldID: common.StartOfUserFieldID, - Name: "pk", - IsPrimaryKey: true, + FieldID: common.StartOfUserFieldID, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.StartOfUserFieldID + 1, + Name: "non_pk", DataType: schemapb.DataType_Int64, - IsPartitionKey: true, + IsPartitionKey: false, }, }, - }), nil) + } + s.schema = newSchemaInfo(schema) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Once() + mockChMgr.EXPECT().getVChannels(mock.Anything).Return([]string{"vchan1"}, nil) + s.mockCache.EXPECT().GetPartitionID(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(int64(1000), nil) + + globalMetaCache = s.mockCache + s.NoError(dr.Init(context.Background())) - globalMetaCache = cache - assert.Error(t, dr.Init(context.Background())) + s.False(dr.partitionKeyMode) + s.Equal(1, len(dr.partitionIDs)) + s.EqualValues(1000, dr.partitionIDs[0]) }) +} - t.Run("invalid partition name", func(t *testing.T) { +func (s *DeleteRunnerSuite) TestInitFailure() { + s.Run("empty collection name", func() { + dr := deleteRunner{} + s.Error(dr.Init(context.Background())) + }) + + s.Run("fail to get database info", func() { dr := deleteRunner{ req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - DbName: dbName, + CollectionName: s.collectionName, + }, + } + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")) + globalMetaCache = s.mockCache + + s.Error(dr.Init(context.Background())) + }) + s.Run("fail to get collection id", func() { + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + }, + } + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything). + Return(int64(0), fmt.Errorf("mock get collectionID error")) + + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) + }) + + s.Run("fail get collection schema", func() { + dr := deleteRunner{req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + }} + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything). + Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything). + Return(nil, errors.New("mock GetCollectionSchema err")) + + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) + }) + + s.Run("create plan failed", func() { + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + Expr: "????", + }, + } + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything). + Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything). + Return(s.schema, nil) + + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) + }) + s.Run("delete with always true expression failed", func() { + alwaysTrueExpr := " " + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + Expr: alwaysTrueExpr, + }, + } + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything). + Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything). + Return(s.schema, nil) + + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) + }) + + s.Run("partition key mode but delete with partition name", func() { + dr := deleteRunner{req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, + PartitionName: s.partitionName, + }} + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything). + Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything). + Return(s.schema, nil) + // The schema enabled partitionKey + + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) + }) + + s.Run("invalid partition name", func() { + dr := deleteRunner{ + req: &milvuspb.DeleteRequest{ + CollectionName: s.collectionName, PartitionName: "???", Expr: "non_pk in [1, 2, 3]", }, } - cache := NewMockCache(t) - cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(10000), nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(schema, nil) - - globalMetaCache = cache - assert.Error(t, dr.Init(context.Background())) + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + + // Schema without PartitionKey + schema := &schemapb.CollectionSchema{ + Name: s.collectionName, + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.StartOfUserFieldID, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.StartOfUserFieldID + 1, + Name: "non_pk", + DataType: schemapb.DataType_Int64, + IsPartitionKey: false, + }, + }, + } + s.schema = newSchemaInfo(schema) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything). + Return(s.schema, nil) + + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) }) - t.Run("get partition id failed", func(t *testing.T) { + s.Run("get partition id failed", func() { dr := deleteRunner{ req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - DbName: dbName, - PartitionName: partitionName, + CollectionName: s.collectionName, + PartitionName: s.partitionName, Expr: "non_pk in [1, 2, 3]", }, } - cache := NewMockCache(t) - cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(collectionID, nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(schema, nil) - cache.On("GetPartitionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(int64(0), errors.New("mock GetPartitionID err")) - - globalMetaCache = cache - assert.Error(t, dr.Init(context.Background())) + // Schema without PartitionKey + schema := &schemapb.CollectionSchema{ + Name: s.collectionName, + Fields: []*schemapb.FieldSchema{ + { + FieldID: common.StartOfUserFieldID, + Name: "pk", + IsPrimaryKey: true, + DataType: schemapb.DataType_Int64, + }, + { + FieldID: common.StartOfUserFieldID + 1, + Name: "non_pk", + DataType: schemapb.DataType_Int64, + IsPartitionKey: false, + }, + }, + } + s.schema = newSchemaInfo(schema) + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil) + s.mockCache.EXPECT().GetPartitionID(mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(int64(0), errors.New("mock GetPartitionID err")) + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) }) - t.Run("get vchannel failed", func(t *testing.T) { - chMgr := NewMockChannelsMgr(t) + s.Run("get vchannel failed", func() { + mockChMgr := NewMockChannelsMgr(s.T()) dr := deleteRunner{ req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - DbName: dbName, - PartitionName: partitionName, + CollectionName: s.collectionName, Expr: "non_pk in [1, 2, 3]", }, - chMgr: chMgr, + chMgr: mockChMgr, } - cache := NewMockCache(t) - cache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) - cache.On("GetCollectionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(collectionID, nil) - cache.On("GetCollectionSchema", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(schema, nil) - cache.On("GetPartitionID", - mock.Anything, // context.Context - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - mock.AnythingOfType("string"), - ).Return(partitionID, nil) - chMgr.On("getVChannels", mock.Anything).Return(nil, fmt.Errorf("mock error")) - - globalMetaCache = cache - assert.Error(t, dr.Init(context.Background())) + s.mockCache.EXPECT().GetDatabaseInfo(mock.Anything, mock.Anything).Return(&databaseInfo{dbID: 0}, nil) + s.mockCache.EXPECT().GetCollectionID(mock.Anything, mock.Anything, mock.Anything).Return(s.collectionID, nil) + s.mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return(s.schema, nil).Twice() + s.mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything).Return([]string{"part1", "part2"}, nil) + s.mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return(map[string]int64{"part1": 100, "part2": 101}, nil) + mockChMgr.EXPECT().getVChannels(mock.Anything).Return(nil, fmt.Errorf("mock error")) + + globalMetaCache = s.mockCache + s.Error(dr.Init(context.Background())) }) } @@ -495,27 +670,19 @@ func TestDeleteRunner_Run(t *testing.T) { globalMetaCache = nil }() - t.Run("create plan failed", func(t *testing.T) { - mockMgr := NewMockChannelsMgr(t) - dr := deleteRunner{ - chMgr: mockMgr, - req: &milvuspb.DeleteRequest{ - Expr: "????", - }, - schema: schema, - } - assert.Error(t, dr.Run(context.Background())) - }) - t.Run("simple delete task failed", func(t *testing.T) { mockMgr := NewMockChannelsMgr(t) lb := NewMockLBPolicy(t) + expr := "pk in [1,2,3]" + plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil) + require.NoError(t, err) + dr := deleteRunner{ chMgr: mockMgr, schema: schema, collectionID: collectionID, - partitionID: partitionID, + partitionIDs: []int64{partitionID}, vChannels: channels, tsoAllocatorIns: tsoAllocator, idAllocator: idAllocator, @@ -531,8 +698,9 @@ func TestDeleteRunner_Run(t *testing.T) { CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, - Expr: "pk in [1,2,3]", + Expr: expr, }, + plan: plan, } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) @@ -543,42 +711,13 @@ func TestDeleteRunner_Run(t *testing.T) { assert.Equal(t, int64(0), dr.result.DeleteCnt) }) - t.Run("delete with always true expression failed", func(t *testing.T) { - mockMgr := NewMockChannelsMgr(t) - lb := NewMockLBPolicy(t) - - dr := deleteRunner{ - chMgr: mockMgr, - schema: schema, - collectionID: collectionID, - partitionID: partitionID, - vChannels: channels, - tsoAllocatorIns: tsoAllocator, - idAllocator: idAllocator, - queue: queue.dmQueue, - lb: lb, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: &schemapb.IDs{ - IdField: nil, - }, - }, - req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - PartitionName: partitionName, - DbName: dbName, - Expr: " ", - }, - } - - assert.Error(t, dr.Run(context.Background())) - assert.Equal(t, int64(0), dr.result.DeleteCnt) - }) - t.Run("complex delete query rpc failed", func(t *testing.T) { mockMgr := NewMockChannelsMgr(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) + expr := "pk < 3" + plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil) + require.NoError(t, err) dr := deleteRunner{ idAllocator: idAllocator, @@ -587,21 +726,20 @@ func TestDeleteRunner_Run(t *testing.T) { chMgr: mockMgr, schema: schema, collectionID: collectionID, - partitionID: partitionID, + partitionIDs: []int64{partitionID}, vChannels: channels, lb: lb, result: &milvuspb.MutationResult{ Status: merr.Success(), - IDs: &schemapb.IDs{ - IdField: nil, - }, + IDs: &schemapb.IDs{}, }, req: &milvuspb.DeleteRequest{ CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, - Expr: "pk < 3", + Expr: expr, }, + plan: plan, } lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { return workload.exec(ctx, 1, qn, "") @@ -619,13 +757,16 @@ func TestDeleteRunner_Run(t *testing.T) { mockMgr := NewMockChannelsMgr(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) + expr := "pk < 3" + plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil) + require.NoError(t, err) dr := deleteRunner{ queue: queue.dmQueue, chMgr: mockMgr, schema: schema, collectionID: collectionID, - partitionID: partitionID, + partitionIDs: []int64{partitionID}, vChannels: channels, tsoAllocatorIns: tsoAllocator, idAllocator: idAllocator, @@ -640,8 +781,9 @@ func TestDeleteRunner_Run(t *testing.T) { CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, - Expr: "pk < 3", + Expr: expr, }, + plan: plan, } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) @@ -684,13 +826,16 @@ func TestDeleteRunner_Run(t *testing.T) { mockMgr := NewMockChannelsMgr(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) + expr := "pk < 3" + plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil) + require.NoError(t, err) dr := deleteRunner{ chMgr: mockMgr, queue: queue.dmQueue, schema: schema, collectionID: collectionID, - partitionID: partitionID, + partitionIDs: []int64{partitionID}, vChannels: channels, idAllocator: idAllocator, tsoAllocatorIns: tsoAllocator, @@ -706,8 +851,9 @@ func TestDeleteRunner_Run(t *testing.T) { CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, - Expr: "pk < 3", + Expr: expr, }, + plan: plan, } lb.EXPECT().Execute(mock.Anything, mock.Anything).Call.Return(func(ctx context.Context, workload CollectionWorkLoad) error { return workload.exec(ctx, 1, qn, "") @@ -743,13 +889,16 @@ func TestDeleteRunner_Run(t *testing.T) { mockMgr := NewMockChannelsMgr(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) + expr := "pk < 3" + plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil) + require.NoError(t, err) dr := deleteRunner{ chMgr: mockMgr, queue: queue.dmQueue, schema: schema, collectionID: collectionID, - partitionID: partitionID, + partitionIDs: []int64{partitionID}, vChannels: channels, idAllocator: idAllocator, tsoAllocatorIns: tsoAllocator, @@ -764,8 +913,9 @@ func TestDeleteRunner_Run(t *testing.T) { CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, - Expr: "pk < 3", + Expr: expr, }, + plan: plan, } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) @@ -805,13 +955,16 @@ func TestDeleteRunner_Run(t *testing.T) { mockMgr := NewMockChannelsMgr(t) qn := mocks.NewMockQueryNodeClient(t) lb := NewMockLBPolicy(t) + expr := "pk < 3" + plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil) + require.NoError(t, err) dr := deleteRunner{ queue: queue.dmQueue, chMgr: mockMgr, schema: schema, collectionID: collectionID, - partitionID: partitionID, + partitionIDs: []int64{partitionID}, vChannels: channels, idAllocator: idAllocator, tsoAllocatorIns: tsoAllocator, @@ -826,8 +979,9 @@ func TestDeleteRunner_Run(t *testing.T) { CollectionName: collectionName, PartitionName: partitionName, DbName: dbName, - Expr: "pk < 3", + Expr: expr, }, + plan: plan, } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) @@ -865,7 +1019,6 @@ func TestDeleteRunner_Run(t *testing.T) { partitionMaps["test_0"] = 1 partitionMaps["test_1"] = 2 partitionMaps["test_2"] = 3 - indexedPartitions := []string{"test_0", "test_1", "test_2"} t.Run("complex delete with partitionKey mode success", func(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) @@ -877,21 +1030,18 @@ func TestDeleteRunner_Run(t *testing.T) { mockCache := NewMockCache(t) mockCache.EXPECT().GetCollectionID(mock.Anything, dbName, collectionName).Return(collectionID, nil).Maybe() - mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return( - partitionMaps, nil) - mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return( - schema, nil) - mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything). - Return(indexedPartitions, nil) globalMetaCache = mockCache defer func() { globalMetaCache = metaCache }() + expr := "non_pk in [2, 3]" + plan, err := planparserv2.CreateRetrievePlan(schema.schemaHelper, expr, nil) + require.NoError(t, err) dr := deleteRunner{ queue: queue.dmQueue, chMgr: mockMgr, schema: schema, collectionID: collectionID, - partitionID: int64(-1), + partitionIDs: []int64{common.AllPartitionsID}, vChannels: channels, idAllocator: idAllocator, tsoAllocatorIns: tsoAllocator, @@ -905,10 +1055,10 @@ func TestDeleteRunner_Run(t *testing.T) { }, req: &milvuspb.DeleteRequest{ CollectionName: collectionName, - PartitionName: "", DbName: dbName, - Expr: "non_pk in [2, 3]", + Expr: expr, }, + plan: plan, } stream := msgstream.NewMockMsgStream(t) mockMgr.EXPECT().getOrCreateDmlStream(mock.Anything).Return(stream, nil) @@ -941,167 +1091,3 @@ func TestDeleteRunner_Run(t *testing.T) { assert.Equal(t, int64(3), dr.result.DeleteCnt) }) } - -func TestDeleteRunner_StreamingQueryAndDelteFunc(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - collectionName := "test_delete" - collectionID := int64(111) - channels := []string{"test_channel"} - dbName := "test_1" - tsoAllocator := &mockTsoAllocator{} - idAllocator := &mockIDAllocatorInterface{} - - queue, err := newTaskScheduler(ctx, tsoAllocator, nil) - assert.NoError(t, err) - queue.Start() - defer queue.Close() - - collSchema := &schemapb.CollectionSchema{ - Name: "test_delete", - Description: "", - AutoID: false, - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.StartOfUserFieldID, - Name: "pk", - IsPrimaryKey: true, - DataType: schemapb.DataType_Int64, - }, - { - FieldID: common.StartOfUserFieldID + 1, - Name: "non_pk", - IsPrimaryKey: false, - DataType: schemapb.DataType_Int64, - }, - }, - } - - // test partitionKey mode - collSchema.Fields[1].IsPartitionKey = true - - schema := newSchemaInfo(collSchema) - partitionMaps := make(map[string]int64) - partitionMaps["test_0"] = 1 - partitionMaps["test_1"] = 2 - partitionMaps["test_2"] = 3 - indexedPartitions := []string{"test_0", "test_1", "test_2"} - t.Run("partitionKey mode parse plan failed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dr := deleteRunner{ - schema: schema, - queue: queue.dmQueue, - tsoAllocatorIns: tsoAllocator, - idAllocator: idAllocator, - collectionID: collectionID, - partitionID: int64(-1), - vChannels: channels, - partitionKeyMode: true, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: &schemapb.IDs{ - IdField: nil, - }, - }, - req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - PartitionName: "", - DbName: dbName, - Expr: "non_pk in [2, 3]", - }, - } - qn := mocks.NewMockQueryNodeClient(t) - // witho out plan - queryFunc := dr.getStreamingQueryAndDelteFunc(nil) - assert.Error(t, queryFunc(ctx, 1, qn, "")) - }) - - t.Run("partitionKey mode get meta failed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dr := deleteRunner{ - schema: schema, - tsoAllocatorIns: tsoAllocator, - idAllocator: idAllocator, - collectionID: collectionID, - partitionID: int64(-1), - vChannels: channels, - partitionKeyMode: true, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: &schemapb.IDs{ - IdField: nil, - }, - }, - req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - PartitionName: "", - DbName: dbName, - Expr: "non_pk in [2, 3]", - }, - } - qn := mocks.NewMockQueryNodeClient(t) - - mockCache := NewMockCache(t) - mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything). - Return(nil, fmt.Errorf("mock error")) - globalMetaCache = mockCache - defer func() { globalMetaCache = nil }() - - schemaHelper, err := typeutil.CreateSchemaHelper(dr.schema.CollectionSchema) - require.NoError(t, err) - plan, err := planparserv2.CreateRetrievePlan(schemaHelper, dr.req.Expr, nil) - assert.NoError(t, err) - queryFunc := dr.getStreamingQueryAndDelteFunc(plan) - assert.Error(t, queryFunc(ctx, 1, qn, "")) - }) - - t.Run("partitionKey mode get partition ID failed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - dr := deleteRunner{ - schema: schema, - tsoAllocatorIns: tsoAllocator, - idAllocator: idAllocator, - collectionID: collectionID, - partitionID: int64(-1), - vChannels: channels, - partitionKeyMode: true, - result: &milvuspb.MutationResult{ - Status: merr.Success(), - IDs: &schemapb.IDs{ - IdField: nil, - }, - }, - req: &milvuspb.DeleteRequest{ - CollectionName: collectionName, - PartitionName: "", - DbName: dbName, - Expr: "non_pk in [2, 3]", - }, - } - qn := mocks.NewMockQueryNodeClient(t) - - mockCache := NewMockCache(t) - mockCache.EXPECT().GetPartitionsIndex(mock.Anything, mock.Anything, mock.Anything). - Return(indexedPartitions, nil) - mockCache.EXPECT().GetCollectionSchema(mock.Anything, mock.Anything, mock.Anything).Return( - schema, nil) - mockCache.EXPECT().GetPartitions(mock.Anything, mock.Anything, mock.Anything).Return( - nil, fmt.Errorf("mock error")) - globalMetaCache = mockCache - defer func() { globalMetaCache = nil }() - - schemaHelper, err := typeutil.CreateSchemaHelper(dr.schema.CollectionSchema) - require.NoError(t, err) - plan, err := planparserv2.CreateRetrievePlan(schemaHelper, dr.req.Expr, nil) - assert.NoError(t, err) - queryFunc := dr.getStreamingQueryAndDelteFunc(plan) - assert.Error(t, queryFunc(ctx, 1, qn, "")) - }) -} diff --git a/tests/integration/compaction/compaction_test.go b/tests/integration/compaction/compaction_test.go index cda5a6d1ff6fa..f38d926bbea88 100644 --- a/tests/integration/compaction/compaction_test.go +++ b/tests/integration/compaction/compaction_test.go @@ -41,6 +41,7 @@ func (s *CompactionSuite) TearDownSuite() { s.MiniClusterSuite.TearDownSuite() paramtable.Get().Reset(paramtable.Get().DataCoordCfg.L0CompactionTriggerInterval.Key) + paramtable.Get().Reset(paramtable.Get().DataCoordCfg.MixCompactionTriggerInterval.Key) } func TestCompaction(t *testing.T) { diff --git a/tests/integration/levelzero/delete_on_growing_test.go b/tests/integration/levelzero/delete_on_growing_test.go index 283288b68b4ab..e96670b9ae881 100644 --- a/tests/integration/levelzero/delete_on_growing_test.go +++ b/tests/integration/levelzero/delete_on_growing_test.go @@ -21,13 +21,9 @@ import ( "fmt" "time" - "go.uber.org/zap" - "google.golang.org/protobuf/proto" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/funcutil" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" @@ -35,68 +31,6 @@ import ( "github.com/milvus-io/milvus/tests/integration" ) -func (s *LevelZeroSuite) createCollection(collection string) { - schema := integration.ConstructSchema(collection, s.dim, false) - marshaledSchema, err := proto.Marshal(schema) - s.Require().NoError(err) - - status, err := s.Cluster.Proxy.CreateCollection(context.TODO(), &milvuspb.CreateCollectionRequest{ - CollectionName: collection, - Schema: marshaledSchema, - ShardsNum: 1, - }) - s.Require().NoError(err) - s.Require().True(merr.Ok(status)) - log.Info("CreateCollection result", zap.Any("status", status)) -} - -func (s *LevelZeroSuite) generateSegment(collection string, numRows int, startPk int64, seal bool) { - log.Info("=========================Start generate one segment=========================") - pkColumn := integration.NewInt64FieldDataWithStart(integration.Int64Field, numRows, startPk) - fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, numRows, s.dim) - hashKeys := integration.GenerateHashKeys(numRows) - insertResult, err := s.Cluster.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{ - CollectionName: collection, - FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn}, - HashKeys: hashKeys, - NumRows: uint32(numRows), - }) - s.Require().NoError(err) - s.True(merr.Ok(insertResult.GetStatus())) - s.Require().EqualValues(numRows, insertResult.GetInsertCnt()) - s.Require().EqualValues(numRows, len(insertResult.GetIDs().GetIntId().GetData())) - - if seal { - log.Info("=========================Start to flush =========================", - zap.String("collection", collection), - zap.Int("numRows", numRows), - zap.Int64("startPK", startPk), - ) - - flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{ - CollectionNames: []string{collection}, - }) - s.NoError(err) - segmentLongArr, has := flushResp.GetCollSegIDs()[collection] - s.Require().True(has) - segmentIDs := segmentLongArr.GetData() - s.Require().NotEmpty(segmentLongArr) - s.Require().True(has) - - flushTs, has := flushResp.GetCollFlushTs()[collection] - s.True(has) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection) - log.Info("=========================Finish to generate one segment=========================", - zap.String("collection", collection), - zap.Int("numRows", numRows), - zap.Int64("startPK", startPk), - ) - } -} - func (s *LevelZeroSuite) TestDeleteOnGrowing() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) defer cancel() @@ -113,7 +47,9 @@ func (s *LevelZeroSuite) TestDeleteOnGrowing() { ) collectionName := "TestLevelZero_" + funcutil.GenRandomStr() - s.createCollection(collectionName) + s.schema = integration.ConstructSchema(collectionName, s.dim, false) + req := s.buildCreateCollectionRequest(collectionName, s.schema, 0) + s.createCollection(req) // create index createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ @@ -134,9 +70,9 @@ func (s *LevelZeroSuite) TestDeleteOnGrowing() { s.Require().NoError(err) s.WaitForLoad(ctx, collectionName) - s.generateSegment(collectionName, 1, 0, true) - s.generateSegment(collectionName, 2, 1, true) - s.generateSegment(collectionName, 2, 3, false) + s.generateSegment(collectionName, 1, 0, true, -1) + s.generateSegment(collectionName, 2, 1, true, -1) + s.generateSegment(collectionName, 2, 3, false, -1) checkRowCount := func(rowCount int) { // query diff --git a/tests/integration/levelzero/delete_partition_key_test.go b/tests/integration/levelzero/delete_partition_key_test.go new file mode 100644 index 0000000000000..426356c0cbfcc --- /dev/null +++ b/tests/integration/levelzero/delete_partition_key_test.go @@ -0,0 +1,171 @@ +package levelzero + +import ( + "context" + "fmt" + "time" + + "github.com/samber/lo" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/milvus-io/milvus/tests/integration" +) + +func (s *LevelZeroSuite) TestDeletePartitionKeyHint() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + + const ( + indexType = integration.IndexFaissIvfFlat + metricType = metric.L2 + vecType = schemapb.DataType_FloatVector + ) + + collectionName := "TestLevelZero_" + funcutil.GenRandomStr() + + // create a collection with partition key field "partition_key" + s.schema = integration.ConstructSchema(collectionName, s.dim, false) + s.schema.Fields = append(s.schema.Fields, &schemapb.FieldSchema{ + FieldID: 102, + Name: "partition_key", + DataType: schemapb.DataType_Int64, + IsPartitionKey: true, + }) + + req := s.buildCreateCollectionRequest(collectionName, s.schema, 2) + s.createCollection(req) + c := s.Cluster + + // create index and load + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(s.dim, indexType, metricType), + }) + err = merr.CheckRPCCall(createIndexStatus, err) + s.NoError(err) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(loadStatus, err) + s.Require().NoError(err) + s.WaitForLoad(ctx, collectionName) + + // Generate 2 growing segments with 2 differenct partition key 0, 1001, with exactlly same PK start from 0 + s.generateSegment(collectionName, 1000, 0, false, 0) + s.generateSegment(collectionName, 1001, 0, false, 1001) + segments, err := s.Cluster.MetaWatcher.ShowSegments() + s.Require().NoError(err) + s.Require().EqualValues(len(segments), 2) + for _, segment := range segments { + s.Require().EqualValues(commonpb.SegmentState_Growing, segment.GetState()) + s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel()) + } + + L1SegIDs := lo.Map(segments, func(seg *datapb.SegmentInfo, _ int) int64 { + return seg.GetID() + }) + L1SegIDSet := typeutil.NewUniqueSet(L1SegIDs...) + + checkRowCount := func(rowCount int) { + // query + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + CollectionName: collectionName, + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.EqualValues(rowCount, queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + } + checkRowCount(2001) + + // delete all data belongs to partition_key == 1001 + // expr: partition_key == 1001 && pk >= 0 + // - for previous implementation, the delete pk >= 0 will touch every segments and leave only 1 numRows + // - for latest enhancements, the expr "pk >= 0" will only touch partitions that contains partition key == 1001 + deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: fmt.Sprintf("partition_key == 1001 && %s >= 0", integration.Int64Field), + }) + err = merr.CheckRPCCall(deleteResult, err) + s.NoError(err) + + checkRowCount(1000) + + // Flush will generates 2 Flushed L1 segments and 1 Flushed L0 segment + s.Flush(collectionName) + + segments, err = s.Cluster.MetaWatcher.ShowSegments() + s.Require().NoError(err) + s.Require().EqualValues(len(segments), 3) + for _, segment := range segments { + s.Require().EqualValues(commonpb.SegmentState_Flushed, segment.GetState()) + // L1 segments + if L1SegIDSet.Contain(segment.GetID()) { + s.Require().EqualValues(commonpb.SegmentLevel_L1, segment.GetLevel()) + } else { // L0 segment with 1001 delete entries count + s.Require().EqualValues(commonpb.SegmentLevel_L0, segment.GetLevel()) + s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum()) + } + } + + l0Dropped := func() bool { + segments, err := s.Cluster.MetaWatcher.ShowSegments() + s.Require().NoError(err) + s.Require().EqualValues(len(segments), 3) + + for _, segment := range segments { + // Return if L0 segments not compacted + if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Flushed { + return false + } + + // If L0 segment compacted + if !L1SegIDSet.Contain(segment.GetID()) && segment.GetState() == commonpb.SegmentState_Dropped { + // find the segment belong to partition_key == 1001 + // check for the deltalog entries count == 1001 + if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1001 { + s.True(L1SegIDSet.Contain(segment.GetID())) + s.EqualValues(1001, segment.Deltalogs[0].GetBinlogs()[0].GetEntriesNum()) + } + + // find segment of another partition_key == 0 + // check compaction doesn't touch it even though delete expression will delete it all + if segment.GetLevel() == datapb.SegmentLevel_L1 && segment.GetNumOfRows() == 1000 { + s.True(L1SegIDSet.Contain(segment.GetID())) + s.Empty(segment.Deltalogs) + } + + return true + } + } + return false + } + + checkL0CompactionTouchOnePartition := func() { + failT := time.NewTimer(3 * time.Minute) + checkT := time.NewTicker(1 * time.Second) + for { + select { + case <-failT.C: + s.FailNow("L0 compaction timeout") + case <-checkT.C: + if l0Dropped() { + failT.Stop() + return + } + } + } + } + + checkL0CompactionTouchOnePartition() +} diff --git a/tests/integration/levelzero/levelzero_test.go b/tests/integration/levelzero/levelzero_test.go index 1249153cf3f1b..9d8c41be23953 100644 --- a/tests/integration/levelzero/levelzero_test.go +++ b/tests/integration/levelzero/levelzero_test.go @@ -17,10 +17,18 @@ package levelzero import ( + "context" "testing" + "time" "github.com/stretchr/testify/suite" + "go.uber.org/zap" + "google.golang.org/protobuf/proto" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/tests/integration" ) @@ -28,7 +36,8 @@ import ( type LevelZeroSuite struct { integration.MiniClusterSuite - dim int + schema *schemapb.CollectionSchema + dim int } func (s *LevelZeroSuite) SetupSuite() { @@ -45,3 +54,95 @@ func (s *LevelZeroSuite) TearDownSuite() { func TestLevelZero(t *testing.T) { suite.Run(t, new(LevelZeroSuite)) } + +func (s *LevelZeroSuite) buildCreateCollectionRequest( + collection string, + schema *schemapb.CollectionSchema, + numPartitions int64, +) *milvuspb.CreateCollectionRequest { + marshaledSchema, err := proto.Marshal(schema) + s.Require().NoError(err) + + return &milvuspb.CreateCollectionRequest{ + CollectionName: collection, + Schema: marshaledSchema, + ShardsNum: 1, + NumPartitions: numPartitions, + } +} + +func (s *LevelZeroSuite) createCollection(req *milvuspb.CreateCollectionRequest) { + status, err := s.Cluster.Proxy.CreateCollection(context.TODO(), req) + s.Require().NoError(err) + s.Require().True(merr.Ok(status)) + log.Info("CreateCollection result", zap.Any("status", status)) +} + +// For PrimaryKey field, startPK will be the start PK of this generation +// For PartitionKey field, partitikonKey will be the same in this generation +func (s *LevelZeroSuite) buildFieldDataBySchema(schema *schemapb.CollectionSchema, numRows int, startPK int64, partitionKey int64) []*schemapb.FieldData { + var fieldData []*schemapb.FieldData + for _, field := range schema.Fields { + switch field.DataType { + case schemapb.DataType_Int64: + if field.IsPartitionKey { + fieldData = append(fieldData, integration.NewInt64SameFieldData(field.Name, numRows, partitionKey)) + } else { + fieldData = append(fieldData, integration.NewInt64FieldDataWithStart(field.Name, numRows, startPK)) + } + case schemapb.DataType_FloatVector: + fieldData = append(fieldData, integration.NewFloatVectorFieldData(field.Name, numRows, s.dim)) + default: + s.Fail("not supported yet") + } + } + return fieldData +} + +func (s *LevelZeroSuite) generateSegment(collection string, numRows int, startPk int64, seal bool, partitionKey int64) { + log.Info("=========================Start generate one segment=========================") + fieldData := s.buildFieldDataBySchema(s.schema, numRows, startPk, partitionKey) + hashKeys := integration.GenerateHashKeys(numRows) + insertResult, err := s.Cluster.Proxy.Insert(context.TODO(), &milvuspb.InsertRequest{ + CollectionName: collection, + FieldsData: fieldData, + HashKeys: hashKeys, + NumRows: uint32(numRows), + }) + s.Require().NoError(err) + s.True(merr.Ok(insertResult.GetStatus())) + s.Require().EqualValues(numRows, insertResult.GetInsertCnt()) + s.Require().EqualValues(numRows, len(insertResult.GetIDs().GetIntId().GetData())) + + if seal { + log.Info("=========================Start to flush =========================", + zap.String("collection", collection), + zap.Int("numRows", numRows), + zap.Int64("startPK", startPk), + ) + s.Flush(collection) + log.Info("=========================Finish to generate one segment=========================", + zap.String("collection", collection), + zap.Int("numRows", numRows), + zap.Int64("startPK", startPk), + ) + } +} + +func (s *LevelZeroSuite) Flush(collection string) { + flushResp, err := s.Cluster.Proxy.Flush(context.TODO(), &milvuspb.FlushRequest{ + CollectionNames: []string{collection}, + }) + s.NoError(err) + segmentLongArr, has := flushResp.GetCollSegIDs()[collection] + s.Require().True(has) + segmentIDs := segmentLongArr.GetData() // segmentIDs might be empty + // s.Require().NotEmpty(segmentLongArr) + + flushTs, has := flushResp.GetCollFlushTs()[collection] + s.True(has) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + s.WaitForFlush(ctx, segmentIDs, flushTs, "", collection) +}