Skip to content

Commit

Permalink
enhance: Remove load task limit in one round (#38436) (#38497)
Browse files Browse the repository at this point in the history
pr: #38436 #38454
the task limit in assignSegment/assignChannel will works for both load
task and balance task.

this PR remove the load task limit, only limit balance task num in one
round.

---------

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 authored Dec 17, 2024
1 parent aad59b8 commit e03d3d5
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 37 deletions.
12 changes: 6 additions & 6 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (chanPlan *ChannelAssignPlan) String() string {
}

type Balance interface {
AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan
AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan
AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan
AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan
BalanceReplica(replica *meta.Replica) ([]SegmentAssignPlan, []ChannelAssignPlan)
}

Expand All @@ -67,9 +67,9 @@ type RoundRobinBalancer struct {
nodeManager *session.NodeManager
}

func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
return info != nil && info.GetState() == session.NodeStateNormal
Expand Down Expand Up @@ -103,9 +103,9 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
return ret
}

func (b *RoundRobinBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RoundRobinBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
if !forceAssign {
versionRangeFilter := semver.MustParseRange(">2.3.x")
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
Expand Down
9 changes: 4 additions & 5 deletions internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,8 @@ type RowCountBasedBalancer struct {

// AssignSegment, when row count based balancer assign segments, it will assign segment to node with least global row count.
// try to make every query node has same row count.
func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
return info != nil && info.GetState() == session.NodeStateNormal
Expand Down Expand Up @@ -87,9 +86,9 @@ func (b *RowCountBasedBalancer) AssignSegment(collectionID int64, segments []*me

// AssignSegment, when row count based balancer assign segments, it will assign channel to node with least global channel count.
// try to make every query node has channel count
func (b *RowCountBasedBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *RowCountBasedBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
if !forceAssign {
versionRangeFilter := semver.MustParseRange(">2.3.x")
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
Expand Down
39 changes: 16 additions & 23 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func NewScoreBasedBalancer(scheduler task.Scheduler,
}

// AssignSegment got a segment list, and try to assign each segment to node's with lowest score
func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
func (b *ScoreBasedBalancer) AssignSegment(collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
br := NewBalanceReport()
return b.assignSegment(br, collectionID, segments, nodes, manualBalance)
return b.assignSegment(br, collectionID, segments, nodes, forceAssign)
}

func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64, segments []*meta.Segment, nodes []int64, manualBalance bool) []SegmentAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64, segments []*meta.Segment, nodes []int64, forceAssign bool) []SegmentAssignPlan {
balanceBatchSize := math.MaxInt64
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
normalNode := info != nil && info.GetState() == session.NodeStateNormal
Expand All @@ -66,6 +66,7 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
}
return normalNode
})
balanceBatchSize = paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
}

// calculate each node's score
Expand All @@ -92,10 +93,6 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
return segments[i].GetNumOfRows() > segments[j].GetNumOfRows()
})

balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceSegmentBatchSize.GetAsInt()
if manualBalance {
balanceBatchSize = math.MaxInt64
}
plans := make([]SegmentAssignPlan, 0, len(segments))
for _, s := range segments {
func(s *meta.Segment) {
Expand All @@ -108,8 +105,8 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
sourceNode := nodeItemsMap[s.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for manual balance
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
// notice: we should skip benefit check for forceAssign
if !forceAssign && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
br.AddRecord(StrRecordf("skip generate balance plan for segment %d since no enough benefit", s.ID))
return
}
Expand Down Expand Up @@ -146,14 +143,14 @@ func (b *ScoreBasedBalancer) assignSegment(br *balanceReport, collectionID int64
return plans
}

func (b *ScoreBasedBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
func (b *ScoreBasedBalancer) AssignChannel(collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
br := NewBalanceReport()
return b.assignChannel(br, collectionID, channels, nodes, manualBalance)
return b.assignChannel(br, collectionID, channels, nodes, forceAssign)
}

func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, manualBalance bool) []ChannelAssignPlan {
// skip out suspend node and stopping node during assignment, but skip this check for manual balance
if !manualBalance {
func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64, channels []*meta.DmChannel, nodes []int64, forceAssign bool) []ChannelAssignPlan {
balanceBatchSize := math.MaxInt64
if !forceAssign {
nodes = lo.Filter(nodes, func(node int64, _ int) bool {
info := b.nodeManager.Get(node)
normalNode := info != nil && info.GetState() == session.NodeStateNormal
Expand All @@ -162,6 +159,7 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
}
return normalNode
})
balanceBatchSize = paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.GetAsInt()
}

// calculate each node's score
Expand All @@ -174,11 +172,6 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
for _, item := range nodeItemsMap {
queue.push(item)
}

balanceBatchSize := paramtable.Get().QueryCoordCfg.CollectionBalanceChannelBatchSize.GetAsInt()
if manualBalance {
balanceBatchSize = math.MaxInt64
}
plans := make([]ChannelAssignPlan, 0, len(channels))
for _, ch := range channels {
func(ch *meta.DmChannel) {
Expand All @@ -191,8 +184,8 @@ func (b *ScoreBasedBalancer) assignChannel(br *balanceReport, collectionID int64
sourceNode := nodeItemsMap[ch.Node]
// if segment's node exist, which means this segment comes from balancer. we should consider the benefit
// if the segment reassignment doesn't got enough benefit, we should skip this reassignment
// notice: we should skip benefit check for manual balance
if !manualBalance && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
// notice: we should skip benefit check for forceAssign
if !forceAssign && sourceNode != nil && !b.hasEnoughBenefit(sourceNode, targetNode, scoreChanges) {
br.AddRecord(StrRecordf("skip generate balance plan for channel %s since no enough benefit", ch.GetChannelName()))
return
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/channel_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (c *ChannelChecker) createChannelLoadTask(ctx context.Context, channels []*
if len(rwNodes) == 0 {
rwNodes = replica.GetRWNodes()
}
plan := c.getBalancerFunc().AssignChannel(replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, false)
plan := c.getBalancerFunc().AssignChannel(replica.GetCollectionID(), []*meta.DmChannel{ch}, rwNodes, true)
plans = append(plans, plan...)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,7 @@ func (c *SegmentChecker) createSegmentLoadTasks(ctx context.Context, segments []
SegmentInfo: s,
}
})
shardPlans := c.getBalancerFunc().AssignSegment(replica.GetCollectionID(), segmentInfos, rwNodes, false)
shardPlans := c.getBalancerFunc().AssignSegment(replica.GetCollectionID(), segmentInfos, rwNodes, true)
for i := range shardPlans {
shardPlans[i].Replica = replica
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/observers/target_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
Version: time.Now().UnixNano(),
IndexInfoList: indexInfo,
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.BrokerTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := ob.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
Expand Down

0 comments on commit e03d3d5

Please sign in to comment.