diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 541346b613fae..a67904fe0686a 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -855,7 +855,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { if taskType == TaskTypeMove || taskType == TaskTypeUpdate { segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget) } else { - segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) + segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst) } if segment == nil { log.Warn("task stale due to the segment to load not exists in targets", diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index 893e4644a6e4a..d461b7e2ce10a 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -18,10 +18,14 @@ package balance import ( "context" + "fmt" + "strconv" + "strings" "testing" "time" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -51,7 +55,7 @@ func (s *BalanceTestSuit) SetupSuite() { s.Require().NoError(s.SetupEmbedEtcd()) } -func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int) { +func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -92,6 +96,26 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha s.NoError(err) s.True(merr.Ok(insertResult.Status)) + if segmentDeleteNum > 0 { + if segmentDeleteNum > segmentRowNum { + segmentDeleteNum = segmentRowNum + } + + pks := insertResult.GetIDs().GetIntId().GetData() + expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) + log.Info("========================delete expr==================", + zap.String("expr", expr), + ) + + deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: expr, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(deleteResp.GetStatus())) + s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt()) + } + // flush flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, @@ -137,7 +161,7 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 1, 2, 2, 2000) + s.initCollection(name, 1, 2, 2, 2000, 500) ctx := context.Background() // disable compact @@ -161,7 +185,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) s.True(merr.Ok(resp.GetStatus())) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) // check total segment number @@ -173,7 +197,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { s.True(merr.Ok(resp1.GetStatus())) count += len(resp1.Segments) } - return count == 4 + return count == 8 }, 10*time.Second, 1*time.Second) } @@ -193,11 +217,11 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { Command: datapb.GcCommand_Resume, }) - // init collection with 2 channel, each channel has 2 segment, each segment has 2000 row + // init collection with 2 channel, each channel has 4 segment, each segment has 2000 row // and load it with 2 replicas on 2 nodes. // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 2, 2, 2, 2000) + s.initCollection(name, 2, 2, 2, 2000, 500) resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{CollectionName: name}) s.NoError(err) @@ -211,13 +235,13 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { s.Eventually(func() bool { resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) s.Eventually(func() bool { resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) // check total segment num @@ -229,7 +253,7 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { s.True(merr.Ok(resp1.GetStatus())) count += len(resp1.Segments) } - return count == 8 + return count == 16 }, 10*time.Second, 1*time.Second) } @@ -256,7 +280,7 @@ func (s *BalanceTestSuit) TestNodeDown() { // init collection with 3 channel, each channel has 15 segment, each segment has 2000 row // and load it with 2 replicas on 2 nodes. name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 1, 2, 15, 2000) + s.initCollection(name, 1, 2, 15, 2000, 500) // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments qn1 := s.Cluster.AddQueryNode() @@ -268,7 +292,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 0 && len(resp.Segments) == 10 + return len(resp.Channels) == 0 && len(resp.Segments) >= 10 }, 30*time.Second, 1*time.Second) s.Eventually(func() bool { @@ -276,7 +300,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 0 && len(resp.Segments) == 10 + return len(resp.Channels) == 0 && len(resp.Segments) >= 10 }, 30*time.Second, 1*time.Second) // then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2 @@ -298,7 +322,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 1 && len(resp.Segments) == 15 + return len(resp.Channels) == 1 && len(resp.Segments) >= 15 }, 30*time.Second, 1*time.Second) // expect all delegator will recover to healthy