From 7abebf81a3c6b5dc57ec216084f5c4892dec6602 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 20 Mar 2024 15:11:05 +0800 Subject: [PATCH] fix: Load segment task promote failed (#31431) issue: #30816 pr: #31430 pr #31319 introduce the logic that segment checker need to load level zero segment which only exist in current target. This PR fix load segment task promote failed when segment only belongs to current target --------- Signed-off-by: Wei Liu --- internal/querycoordv2/task/scheduler.go | 2 +- tests/integration/balance/balance_test.go | 50 +++++++++++++++++------ 2 files changed, 38 insertions(+), 14 deletions(-) diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 541346b613fae..a67904fe0686a 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -855,7 +855,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { if taskType == TaskTypeMove || taskType == TaskTypeUpdate { segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget) } else { - segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) + segment = scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst) } if segment == nil { log.Warn("task stale due to the segment to load not exists in targets", diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index 893e4644a6e4a..d461b7e2ce10a 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -18,10 +18,14 @@ package balance import ( "context" + "fmt" + "strconv" + "strings" "testing" "time" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "github.com/stretchr/testify/suite" "go.uber.org/zap" @@ -51,7 +55,7 @@ func (s *BalanceTestSuit) SetupSuite() { s.Require().NoError(s.SetupEmbedEtcd()) } -func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int) { +func (s *BalanceTestSuit) initCollection(collectionName string, replica int, channelNum int, segmentNum int, segmentRowNum int, segmentDeleteNum int) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -92,6 +96,26 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha s.NoError(err) s.True(merr.Ok(insertResult.Status)) + if segmentDeleteNum > 0 { + if segmentDeleteNum > segmentRowNum { + segmentDeleteNum = segmentRowNum + } + + pks := insertResult.GetIDs().GetIntId().GetData() + expr := fmt.Sprintf("%s in [%s]", integration.Int64Field, strings.Join(lo.Map(pks, func(pk int64, _ int) string { return strconv.FormatInt(pk, 10) }), ",")) + log.Info("========================delete expr==================", + zap.String("expr", expr), + ) + + deleteResp, err := s.Cluster.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + CollectionName: collectionName, + Expr: expr, + }) + s.Require().NoError(err) + s.Require().True(merr.Ok(deleteResp.GetStatus())) + s.Require().EqualValues(len(pks), deleteResp.GetDeleteCnt()) + } + // flush flushResp, err := s.Cluster.Proxy.Flush(ctx, &milvuspb.FlushRequest{ DbName: dbName, @@ -137,7 +161,7 @@ func (s *BalanceTestSuit) initCollection(collectionName string, replica int, cha func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 1, 2, 2, 2000) + s.initCollection(name, 1, 2, 2, 2000, 500) ctx := context.Background() // disable compact @@ -161,7 +185,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { resp, err := qn.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) s.True(merr.Ok(resp.GetStatus())) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) // check total segment number @@ -173,7 +197,7 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() { s.True(merr.Ok(resp1.GetStatus())) count += len(resp1.Segments) } - return count == 4 + return count == 8 }, 10*time.Second, 1*time.Second) } @@ -193,11 +217,11 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { Command: datapb.GcCommand_Resume, }) - // init collection with 2 channel, each channel has 2 segment, each segment has 2000 row + // init collection with 2 channel, each channel has 4 segment, each segment has 2000 row // and load it with 2 replicas on 2 nodes. // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 2, 2, 2, 2000) + s.initCollection(name, 2, 2, 2, 2000, 500) resp, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{CollectionName: name}) s.NoError(err) @@ -211,13 +235,13 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { s.Eventually(func() bool { resp, err := qn1.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) s.Eventually(func() bool { resp, err := qn2.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{}) s.NoError(err) - return len(resp.Channels) == 1 && len(resp.Segments) == 2 + return len(resp.Channels) == 1 && len(resp.Segments) >= 2 }, 30*time.Second, 1*time.Second) // check total segment num @@ -229,7 +253,7 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() { s.True(merr.Ok(resp1.GetStatus())) count += len(resp1.Segments) } - return count == 8 + return count == 16 }, 10*time.Second, 1*time.Second) } @@ -256,7 +280,7 @@ func (s *BalanceTestSuit) TestNodeDown() { // init collection with 3 channel, each channel has 15 segment, each segment has 2000 row // and load it with 2 replicas on 2 nodes. name := "test_balance_" + funcutil.GenRandomStr() - s.initCollection(name, 1, 2, 15, 2000) + s.initCollection(name, 1, 2, 15, 2000, 500) // then we add 2 query node, after balance happens, expected each node have 1 channel and 2 segments qn1 := s.Cluster.AddQueryNode() @@ -268,7 +292,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 0 && len(resp.Segments) == 10 + return len(resp.Channels) == 0 && len(resp.Segments) >= 10 }, 30*time.Second, 1*time.Second) s.Eventually(func() bool { @@ -276,7 +300,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 0 && len(resp.Segments) == 10 + return len(resp.Channels) == 0 && len(resp.Segments) >= 10 }, 30*time.Second, 1*time.Second) // then we force stop qn1 and resume balance channel, let balance channel and load segment happens concurrently on qn2 @@ -298,7 +322,7 @@ func (s *BalanceTestSuit) TestNodeDown() { s.NoError(err) s.True(merr.Ok(resp.GetStatus())) log.Info("resp", zap.Any("channel", resp.Channels), zap.Any("segments", resp.Segments)) - return len(resp.Channels) == 1 && len(resp.Segments) == 15 + return len(resp.Channels) == 1 && len(resp.Segments) >= 15 }, 30*time.Second, 1*time.Second) // expect all delegator will recover to healthy