diff --git a/internal/coordinator/snmanager/streaming_node_manager.go b/internal/coordinator/snmanager/streaming_node_manager.go index e0e09a9d59069..efed218e5d452 100644 --- a/internal/coordinator/snmanager/streaming_node_manager.go +++ b/internal/coordinator/snmanager/streaming_node_manager.go @@ -30,7 +30,7 @@ func newStreamingNodeManager() *StreamingNodeManager { return snm } -// StreamingNodeManager is a manager for manage the querynode that embeded into streaming node. +// StreamingNodeManager is a manager for manage the querynode that embedded into streaming node. // StreamingNodeManager is exclusive with ResourceManager. type StreamingNodeManager struct { notifier *syncutil.AsyncTaskNotifier[struct{}] diff --git a/internal/querycoordv2/balance/channel_level_score_balancer.go b/internal/querycoordv2/balance/channel_level_score_balancer.go index ddc1d95981337..716d0c379ebd1 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer.go @@ -67,6 +67,15 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica } }() + // Make a plan to rebalance the channel first. + // The Streaming QueryNode doesn't make the channel level score, so just fallback to the ScoreBasedBalancer. + stoppingBalance := paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() + channelPlan := b.ScoreBasedBalancer.balanceChannels(ctx, br, replica, stoppingBalance) + // If the channelPlan is not empty, do it directly, don't do the segment balance. + if len(channelPlan) > 0 { + return nil, channelPlan + } + exclusiveMode := true channels := b.targetMgr.GetDmChannelsByCollection(ctx, replica.GetCollectionID(), meta.CurrentTarget) for channelName := range channels { @@ -82,7 +91,6 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica } // TODO: assign by channel - channelPlans = make([]ChannelAssignPlan, 0) segmentPlans = make([]SegmentAssignPlan, 0) for channelName := range channels { if replica.NodesCount() == 0 { @@ -113,7 +121,7 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica } if len(roNodes) != 0 { - if !paramtable.Get().QueryCoordCfg.EnableStoppingBalance.GetAsBool() { + if !stoppingBalance { log.RatedInfo(10, "stopping balance is disabled!", zap.Int64s("stoppingNode", roNodes)) return nil, nil } @@ -122,26 +130,18 @@ func (b *ChannelLevelScoreBalancer) BalanceReplica(ctx context.Context, replica zap.Any("stopping nodes", roNodes), zap.Any("available nodes", rwNodes), ) - // handle stopped nodes here, have to assign segments on stopping nodes to nodes with the smallest score - if b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genStoppingChannelPlan(ctx, replica, channelName, rwNodes, roNodes)...) - } - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { + if b.permitBalanceSegment(replica.GetCollectionID()) { segmentPlans = append(segmentPlans, b.genStoppingSegmentPlan(ctx, replica, channelName, rwNodes, roNodes)...) } } else { - if paramtable.Get().QueryCoordCfg.AutoBalanceChannel.GetAsBool() && b.permitBalanceChannel(replica.GetCollectionID()) { - channelPlans = append(channelPlans, b.genChannelPlan(ctx, replica, channelName, rwNodes)...) - } - - if len(channelPlans) == 0 && b.permitBalanceSegment(replica.GetCollectionID()) { + if b.permitBalanceSegment(replica.GetCollectionID()) { segmentPlans = append(segmentPlans, b.genSegmentPlan(ctx, br, replica, channelName, rwNodes)...) } } } - return segmentPlans, channelPlans + return segmentPlans, nil } func (b *ChannelLevelScoreBalancer) genStoppingChannelPlan(ctx context.Context, replica *meta.Replica, channelName string, onlineNodes []int64, offlineNodes []int64) []ChannelAssignPlan { diff --git a/internal/querycoordv2/observers/replica_observer.go b/internal/querycoordv2/observers/replica_observer.go index e14a9c872264e..1a9ffcf598864 100644 --- a/internal/querycoordv2/observers/replica_observer.go +++ b/internal/querycoordv2/observers/replica_observer.go @@ -153,7 +153,6 @@ func (ob *ReplicaObserver) checkStreamingQueryNodesInReplica(sqNodeIDs typeutil. } logger.Info("all segment/channel has been removed from ro streaming query node, remove it from replica") } - } } diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index 8704087bf5a01..8d5bf6e0239f1 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -154,7 +154,7 @@ func (n *NodeInfo) Labels() map[string]string { } func (n *NodeInfo) IsEmbeddedQueryNodeInStreamingNode() bool { - return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbededQueryNode] == "1" + return n.immutableInfo.Labels[sessionutil.LabelStreamingNodeEmbeddedQueryNode] == "1" } func (n *NodeInfo) SegmentCnt() int { diff --git a/internal/util/sessionutil/session_util.go b/internal/util/sessionutil/session_util.go index 53e03f792fcb3..dca9dad9f3a80 100644 --- a/internal/util/sessionutil/session_util.go +++ b/internal/util/sessionutil/session_util.go @@ -48,9 +48,9 @@ const ( // DefaultServiceRoot default root path used in kv by Session DefaultServiceRoot = "session/" // DefaultIDKey default id key for Session - DefaultIDKey = "id" - SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" - LabelStreamingNodeEmbededQueryNode = "QUERYNODE_STREAMING-EMBEDED" + DefaultIDKey = "id" + SupportedLabelPrefix = "MILVUS_SERVER_LABEL_" + LabelStreamingNodeEmbeddedQueryNode = "QUERYNODE_STREAMING-EMBEDDED" ) // SessionEventType session event type diff --git a/internal/util/streamingutil/env.go b/internal/util/streamingutil/env.go index ecb2e11d7a80e..6a160f42d02ed 100644 --- a/internal/util/streamingutil/env.go +++ b/internal/util/streamingutil/env.go @@ -24,10 +24,10 @@ func MustEnableStreamingService() { // EnableEmbededQueryNode set server labels for embedded query node. func EnableEmbededQueryNode() { MustEnableStreamingService() - os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbededQueryNode, "1") + os.Setenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode, "1") } // IsEmbeddedQueryNode returns whether the current node is an embedded query node in streaming node. func IsEmbeddedQueryNode() bool { - return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbededQueryNode) == "1" + return os.Getenv(sessionutil.SupportedLabelPrefix+sessionutil.LabelStreamingNodeEmbeddedQueryNode) == "1" }