Skip to content

Commit

Permalink
fix: sync partitiion stats blocking balance task(milvus-io#33741) (mi…
Browse files Browse the repository at this point in the history
…lvus-io#33742)

related: milvus-io#33741

Signed-off-by: MrPresent-Han <[email protected]>
  • Loading branch information
MrPresent-Han authored Jun 11, 2024
1 parent ecf2bce commit f7af323
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 53 deletions.
1 change: 1 addition & 0 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("indexed segment", len(indexedSegments)),
zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion),
)
unIndexedIDs := make(typeutil.UniqueSet)

Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/balance_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (b *BalanceChecker) replicasToBalance() []int64 {
hasUnbalancedCollection := false
for _, cid := range loadedCollections {
if b.normalBalanceCollectionsCurrentRound.Contain(cid) {
log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round",
log.Debug("ScoreBasedBalancer is balancing this collection, skip balancing in this round",
zap.Int64("collectionID", cid))
continue
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewCheckerController(
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr),
// todo temporary work around must fix
// utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true),
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, Params.QueryNodeCfg.EnableSyncPartitionStats.GetAsBool()),
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr),
}

manualCheckChs := map[utils.CheckerType]chan struct{}{
Expand Down
53 changes: 25 additions & 28 deletions internal/querycoordv2/checkers/leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,24 @@ var _ Checker = (*LeaderChecker)(nil)
// LeaderChecker perform segment index check.
type LeaderChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
target *meta.TargetManager
nodeMgr *session.NodeManager
enableSyncPartitionStats bool
meta *meta.Meta
dist *meta.DistributionManager
target *meta.TargetManager
nodeMgr *session.NodeManager
}

func NewLeaderChecker(
meta *meta.Meta,
dist *meta.DistributionManager,
target *meta.TargetManager,
nodeMgr *session.NodeManager,
enableSyncPartitionStats bool,
) *LeaderChecker {
return &LeaderChecker{
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
target: target,
nodeMgr: nodeMgr,
enableSyncPartitionStats: enableSyncPartitionStats,
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
target: target,
nodeMgr: nodeMgr,
}
}

Expand Down Expand Up @@ -100,9 +97,7 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task {
dist := c.dist.SegmentDistManager.GetByFilter(meta.WithChannel(leaderView.Channel), meta.WithReplica(replica))
tasks = append(tasks, c.findNeedLoadedSegments(ctx, replica, leaderView, dist)...)
tasks = append(tasks, c.findNeedRemovedSegments(ctx, replica, leaderView, dist)...)
if c.enableSyncPartitionStats {
tasks = append(tasks, c.findNeedSyncPartitionStats(ctx, replica, leaderView, node)...)
}
tasks = append(tasks, c.findNeedSyncPartitionStats(ctx, replica, leaderView, node)...)
}
}
}
Expand All @@ -127,22 +122,24 @@ func (c *LeaderChecker) findNeedSyncPartitionStats(ctx context.Context, replica
partStatsToUpdate[partID] = psVersionInTarget
}
}
if len(partStatsToUpdate) > 0 {
action := task.NewLeaderUpdatePartStatsAction(leaderView.ID, nodeID, task.ActionTypeUpdate, leaderView.Channel, partStatsToUpdate)

action := task.NewLeaderUpdatePartStatsAction(leaderView.ID, nodeID, task.ActionTypeUpdate, leaderView.Channel, partStatsToUpdate)
t := task.NewLeaderPartStatsTask(
ctx,
c.ID(),
leaderView.CollectionID,
replica,
leaderView.ID,
action,
)

t := task.NewLeaderPartStatsTask(
ctx,
c.ID(),
leaderView.CollectionID,
replica,
leaderView.ID,
action,
)
// leader task shouldn't replace executing segment task
t.SetPriority(task.TaskPriorityLow)
t.SetReason("sync partition stats versions")
ret = append(ret, t)
}

// leader task shouldn't replace executing segment task
t.SetPriority(task.TaskPriorityLow)
t.SetReason("sync partition stats versions")
ret = append(ret, t)
return ret
}

Expand Down
6 changes: 1 addition & 5 deletions internal/querycoordv2/checkers/leader_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (suite *LeaderCheckerTestSuite) SetupTest() {

distManager := meta.NewDistributionManager()
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr, false)
suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr)
}

func (suite *LeaderCheckerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -476,10 +476,6 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {

func (suite *LeaderCheckerTestSuite) TestUpdatePartitionStats() {
testChannel := "test-insert-channel"
suite.checker.enableSyncPartitionStats = true
defer func() {
suite.checker.enableSyncPartitionStats = false
}()
leaderID := int64(2)
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
Expand Down
4 changes: 4 additions & 0 deletions internal/querycoordv2/task/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package task

import (
"reflect"

"github.com/samber/lo"
"go.uber.org/atomic"

Expand Down Expand Up @@ -225,6 +227,8 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool {
return action.rpcReturned.Load() && dist != nil && dist.NodeID == action.Node()
case ActionTypeReduce:
return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node())
case ActionTypeUpdate:
return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions))
}
return false
}
18 changes: 3 additions & 15 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -2168,11 +2168,9 @@ type queryNodeConfig struct {

MemoryIndexLoadPredictMemoryUsageFactor ParamItem `refreshable:"true"`
EnableSegmentPrune ParamItem `refreshable:"false"`
// todo temporary work around must fix
EnableSyncPartitionStats ParamItem `refreshable:"false"`
DefaultSegmentFilterRatio ParamItem `refreshable:"false"`
UseStreamComputing ParamItem `refreshable:"false"`
QueryStreamBatchSize ParamItem `refreshable:"false"`
DefaultSegmentFilterRatio ParamItem `refreshable:"false"`
UseStreamComputing ParamItem `refreshable:"false"`
QueryStreamBatchSize ParamItem `refreshable:"false"`
}

func (p *queryNodeConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -2741,16 +2739,6 @@ user-task-polling:
Export: true,
}
p.EnableSegmentPrune.Init(base.mgr)

p.EnableSyncPartitionStats = ParamItem{
Key: "queryNode.enableSyncPartitionStats",
Version: "2.4.4",
DefaultValue: "false",
Doc: "enable sync partitionStats",
Export: true,
}
p.EnableSyncPartitionStats.Init(base.mgr)

p.DefaultSegmentFilterRatio = ParamItem{
Key: "queryNode.defaultSegmentFilterRatio",
Version: "2.4.0",
Expand Down
3 changes: 0 additions & 3 deletions tests/integration/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,12 @@ func (s *BalanceTestSuit) SetupSuite() {

// disable compaction
paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false")
// todo @wayblink repair this test
// paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key, "false")

s.Require().NoError(s.SetupEmbedEtcd())
}

func (s *BalanceTestSuit) TearDownSuite() {
defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key)
// defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key)

s.MiniClusterSuite.TearDownSuite()
}
Expand Down

0 comments on commit f7af323

Please sign in to comment.