Skip to content

Commit

Permalink
enhance: enable channel-level score balance
Browse files Browse the repository at this point in the history
Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh committed Dec 26, 2024
1 parent 0fa436b commit 781ff46
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 21 deletions.
2 changes: 1 addition & 1 deletion internal/coordinator/snmanager/streaming_node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func newStreamingNodeManager() *StreamingNodeManager {
return snm
}

// StreamingNodeManager is a manager for manage the querynode that embeded into streaming node.
// StreamingNodeManager is a manager for manage the querynode that embedded into streaming node.
// StreamingNodeManager is exclusive with ResourceManager.
type StreamingNodeManager struct {
notifier *syncutil.AsyncTaskNotifier[struct{}]
Expand Down
26 changes: 13 additions & 13 deletions internal/querycoordv2/balance/channel_level_score_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
}
}()

// Make a plan to rebalance the channel first.
// The Streaming QueryNode doesn't make the channel level score, so just fallback to the ScoreBasedBalancer.
stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool()
channelPlan := b.ScoreBasedBalancer.balanceChannels(ctx, br, replica, stoppingBalance)
// If the channelPlan is not empty, do it directly, don't do the segment balance.
if len(channelPlan) > 0 {
return nil, channelPlan
}

exclusiveMode := true
channels := b.targetMgr.GetDmChannelsByCollection(ctx, replica.GetCollectionID(), meta.CurrentTarget)
for channelName := range channels {
Expand All @@ -82,7 +91,6 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
}

// TODO: assign by channel
channelPlans = make([]ChannelAssignPlan, 0)
segmentPlans = make([]SegmentAssignPlan, 0)
for channelName := range channels {
if replica.NodesCount() == 0 {
Expand Down Expand Up @@ -113,7 +121,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
}

if len(roNodes) != 0 {
if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() {
if !stoppingBalance {
log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes))
return nil, nil
}
Expand All @@ -122,26 +130,18 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica
zap.Any("stopping nodes", roNodes),
zap.Any("available nodes", rwNodes),
)
// handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score
if b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, channelName, rwNodes, roNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
if b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, channelName, rwNodes, roNodes)...)
}
} else {
if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) {
channelPlans = append(channelPlans, b.genChannelPlan(ctx, replica, channelName, rwNodes)...)
}

if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) {
if b.permitBalanceSegment(replica.GetCollectionID()) {
segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, br, replica, channelName, rwNodes)...)
}
}
}

return segmentPlans, channelPlans
return segmentPlans, nil
}

func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(ctx context.Context, replica *meta.Replica, channelName string, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan {
Expand Down
1 change: 0 additions & 1 deletion internal/querycoordv2/observers/replica_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func (ob *ReplicaObserver) checkStreamingQueryNodesInReplica(sqNodeIDs typeutil.
}
logger.Info("all segment/channel has been removed from ro streaming query node, remove it from replica")
}

}
}

Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/session/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (n *NodeInfo) Labels() map[string]string {
}

func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool {
return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbededQueryNode] == "1"
return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1"
}

func (n *NodeInfo) SegmentCnt() int {
Expand Down
6 changes: 3 additions & 3 deletions internal/util/sessionutil/session_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ const (
// DefaultServiceRoot default root path used in kv by Session
DefaultServiceRoot = "session/"
// DefaultIDKey default id key for Session
DefaultIDKey = "id"
SupportedLabelPrefix = "MILVUS_SERVER_LABEL_"
LabelStreamingNodeEmbededQueryNode = "QUERYNODE_STREAMING-EMBEDED"
DefaultIDKey = "id"
SupportedLabelPrefix = "MILVUS_SERVER_LABEL_"
LabelStreamingNodeEmbeddedQueryNode = "QUERYNODE_STREAMING-EMBEDDED"
)

// SessionEventType session event type
Expand Down
4 changes: 2 additions & 2 deletions internal/util/streamingutil/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ func MustEnableStreamingService() {
// EnableEmbededQueryNode set server labels for embedded query node.
func EnableEmbededQueryNode() {
MustEnableStreamingService()
os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbededQueryNode, "1")
os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode, "1")
}

// IsEmbeddedQueryNode returns whether the current node is an embedded query node in streaming node.
func IsEmbeddedQueryNode() bool {
return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbededQueryNode) == "1"
return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode) == "1"
}

0 comments on commit 781ff46

Please sign in to comment.