From e76031383af3ee844a07cab3b2b63bcb70d31989 Mon Sep 17 00:00:00 2001 From: Chun Han <116052805+MrPresent-Han@users.noreply.github.com> Date: Tue, 11 Jun 2024 02:21:56 -0400 Subject: [PATCH] fix: sync partitiion stats blocking balance task(#33741) (#33742) related: #33741 Signed-off-by: MrPresent-Han Co-authored-by: wayblink Signed-off-by: wayblink --- internal/datacoord/handler.go | 1 + internal/querycoordv2/checkers/balance_checker.go | 2 +- internal/querycoordv2/checkers/leader_checker_test.go | 6 +----- internal/querycoordv2/task/action.go | 4 ++++ tests/integration/balance/balance_test.go | 3 --- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index ac4fd0dff358b..199e6ebd68ebd 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -134,6 +134,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . zap.String("channel", channel.GetName()), zap.Int("numOfSegments", len(segments)), zap.Int("indexed segment", len(indexedSegments)), + zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion), ) unIndexedIDs := make(typeutil.UniqueSet) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index d300bda51f2bc..86cfb064534c9 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -129,7 +129,7 @@ func (b *BalanceChecker) replicasToBalance() []int64 { hasUnbalancedCollection := false for _, cid := range loadedCollections { if b.normalBalanceCollectionsCurrentRound.Contain(cid) { - log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round", + log.Debug("ScoreBasedBalancer is balancing this collection, skip balancing in this round", zap.Int64("collectionID", cid)) continue } diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index f644cda5bc747..0d2249b14ad15 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -75,7 +75,7 @@ func (suite *LeaderCheckerTestSuite) SetupTest() { distManager := meta.NewDistributionManager() targetManager := meta.NewTargetManager(suite.broker, suite.meta) - suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr, false) + suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr) } func (suite *LeaderCheckerTestSuite) TearDownTest() { @@ -476,10 +476,6 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() { func (suite *LeaderCheckerTestSuite) TestUpdatePartitionStats() { testChannel := "test-insert-channel" - suite.checker.enableSyncPartitionStats = true - defer func() { - suite.checker.enableSyncPartitionStats = false - }() leaderID := int64(2) observer := suite.checker observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 3948b2e98ebae..54823a988fa44 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -17,6 +17,8 @@ package task import ( + "reflect" + "github.com/samber/lo" "go.uber.org/atomic" @@ -234,6 +236,8 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool { return action.rpcReturned.Load() && dist != nil && dist.NodeID == action.Node() case ActionTypeReduce: return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node()) + case ActionTypeUpdate: + return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions)) } return false } diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index 9f0321d6f6389..b0df436e68431 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -53,15 +53,12 @@ func (s *BalanceTestSuit) SetupSuite() { // disable compaction paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") - // todo @wayblink repair this test - // paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key, "false") s.Require().NoError(s.SetupEmbedEtcd()) } func (s *BalanceTestSuit) TearDownSuite() { defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) - // defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key) s.MiniClusterSuite.TearDownSuite() }