From 07535320cef014da5aa03721aeb981ee2920af4c Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 27 Jun 2024 19:06:05 +0800 Subject: [PATCH] enhance: Avoid assign too much segment/channels to new querynode (#34096) issue: #34095 When a new query node comes online, the segment_checker, channel_checker, and balance_checker simultaneously attempt to allocate segments to it. If this occurs during the execution of a load task and the distribution of the new query node hasn't been updated, the query coordinator may mistakenly view the new query node as empty. As a result, it assigns segments or channels to it, potentially overloading the new query node with more segments or channels than expected. This PR measures the workload of the executing tasks on the target query node to prevent assigning an excessive number of segments to it. --------- Signed-off-by: Wei Liu --- internal/querycoordv2/balance/balance.go | 4 +- internal/querycoordv2/balance/balance_test.go | 10 +- .../balance/rowcount_based_balancer.go | 8 +- .../balance/rowcount_based_balancer_test.go | 7 +- .../balance/score_based_balancer.go | 15 +- .../balance/score_based_balancer_test.go | 105 +++++++++++++- internal/querycoordv2/ops_service_test.go | 15 ++ internal/querycoordv2/services_test.go | 14 +- internal/querycoordv2/task/mock_scheduler.go | 114 ++++++++------- internal/querycoordv2/task/scheduler.go | 137 +++++++++++------- 10 files changed, 302 insertions(+), 127 deletions(-) diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index dfb7f6d157f8c..018f40373068a 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -84,7 +84,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta. sort.Slice(nodesInfo, func(i, j int) bool { cnt1, cnt2 := nodesInfo[i].SegmentCnt(), nodesInfo[j].SegmentCnt() id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID() - delta1, delta2 := b.scheduler.GetNodeSegmentDelta(id1), b.scheduler.GetNodeSegmentDelta(id2) + delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1) return cnt1+delta1 < cnt2+delta2 }) ret := make([]SegmentAssignPlan, 0, len(segments)) @@ -114,7 +114,7 @@ func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []i sort.Slice(nodesInfo, func(i, j int) bool { cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt() id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID() - delta1, delta2 := b.scheduler.GetNodeChannelDelta(id1), b.scheduler.GetNodeChannelDelta(id2) + delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1) return cnt1+delta1 < cnt2+delta2 }) ret := make([]ChannelAssignPlan, 0, len(channels)) diff --git a/internal/querycoordv2/balance/balance_test.go b/internal/querycoordv2/balance/balance_test.go index ea0e4dbb1798d..febc997d8f539 100644 --- a/internal/querycoordv2/balance/balance_test.go +++ b/internal/querycoordv2/balance/balance_test.go @@ -19,6 +19,7 @@ package balance import ( "testing" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -37,6 +38,9 @@ func (suite *BalanceTestSuite) SetupTest() { nodeManager := session.NewNodeManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.roundRobinBalancer = NewRoundRobinBalancer(suite.mockScheduler, nodeManager) + + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() } func (suite *BalanceTestSuite) TestAssignBalance() { @@ -84,13 +88,14 @@ func (suite *BalanceTestSuite) TestAssignBalance() { for _, c := range cases { suite.Run(c.name, func() { suite.SetupTest() + suite.mockScheduler.ExpectedCalls = nil for i := range c.nodeIDs { nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0") nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) nodeInfo.SetState(c.states[i]) suite.roundRobinBalancer.nodeManager.Add(nodeInfo) if !nodeInfo.IsStoppingState() { - suite.mockScheduler.EXPECT().GetNodeSegmentDelta(c.nodeIDs[i]).Return(c.deltaCnts[i]) + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i]) } } plans := suite.roundRobinBalancer.AssignSegment(0, c.assignments, c.nodeIDs, false) @@ -144,13 +149,14 @@ func (suite *BalanceTestSuite) TestAssignChannel() { for _, c := range cases { suite.Run(c.name, func() { suite.SetupTest() + suite.mockScheduler.ExpectedCalls = nil for i := range c.nodeIDs { nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0") nodeInfo.UpdateStats(session.WithChannelCnt(c.channelCnts[i])) nodeInfo.SetState(c.states[i]) suite.roundRobinBalancer.nodeManager.Add(nodeInfo) if !nodeInfo.IsStoppingState() { - suite.mockScheduler.EXPECT().GetNodeChannelDelta(c.nodeIDs[i]).Return(c.deltaCnts[i]) + suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i]) } } plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false) diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index be2ecd082a56c..c54c24ee1b886 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -137,6 +137,9 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []* rowcnt += int(view.NumOfGrowingRows) } + // calculate executing task cost in scheduler + rowcnt += b.scheduler.GetSegmentTaskDelta(node, -1) + // more row count, less priority nodeItem := newNodeItem(rowcnt, node) ret = append(ret, &nodeItem) @@ -150,8 +153,11 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []* node := nodeInfo.ID() channels := b.dist.ChannelDistManager.GetByNode(node) + channelCount := len(channels) + // calculate executing task cost in scheduler + channelCount += b.scheduler.GetChannelTaskDelta(node, -1) // more channel num, less priority - nodeItem := newNodeItem(len(channels), node) + nodeItem := newNodeItem(channelCount, node) ret = append(ret, &nodeItem) } return ret diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index 8193508a2e294..3fcc4bdced21d 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -76,6 +76,9 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() { suite.balancer = NewRowCountBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget) suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe() + + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() } func (suite *RowCountBasedBalancerTestSuite) TearDownTest() { @@ -451,7 +454,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) balancer.targetMgr.UpdateCollectionCurrentTarget(1) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } @@ -658,7 +660,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { suite.broker.ExpectedCalls = nil suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } @@ -756,7 +757,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { }, } - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for _, c := range cases { suite.Run(c.name, func() { suite.SetupSuite() @@ -1020,7 +1020,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() { balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) balancer.targetMgr.UpdateCollectionCurrentTarget(1) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index d3f54058b0ff1..874a070fdfc1a 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -148,19 +148,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in } func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { - rowCount := 0 + nodeRowCount := 0 // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID) for _, s := range globalSegments { - rowCount += int(s.GetNumOfRows()) + nodeRowCount += int(s.GetNumOfRows()) } // calculate global growing segment row count views := b.dist.GetLeaderView(nodeID) for _, view := range views { - rowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) } + // calculate executing task cost in scheduler + nodeRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, -1) + collectionRowCount := 0 // calculate collection sealed segment row count collectionSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(collectionID, nodeID) @@ -173,7 +176,11 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { for _, view := range collectionViews { collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) } - return collectionRowCount + int(float64(rowCount)* + + // calculate executing task cost in scheduler + collectionRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, collectionID) + + return collectionRowCount + int(float64(nodeRowCount)* params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 7a693e71987d8..bc48c5b18fe88 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -72,6 +72,9 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() { distManager := meta.NewDistributionManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.balancer = NewScoreBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget) + + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() } func (suite *ScoreBasedBalancerTestSuite) TearDownTest() { @@ -431,6 +434,103 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { } } +func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() { + cases := []struct { + name string + nodes []int64 + collectionID int64 + replicaID int64 + collectionsSegments []*datapb.SegmentInfo + states []session.State + shouldMock bool + distributions map[int64][]*meta.Segment + distributionChannels map[int64][]*meta.DmChannel + deltaCounts []int + expectPlans []SegmentAssignPlan + expectChannelPlans []ChannelAssignPlan + }{ + { + name: "normal balance for one collection only", + nodes: []int64{1, 2, 3}, + deltaCounts: []int{30, 0, 0}, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, + }, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + distributions: map[int64][]*meta.Segment{ + 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}}, + 2: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}}, + 3: { + {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, + {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 3}, + }, + }, + expectPlans: []SegmentAssignPlan{ + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, From: 3, To: 2, Replica: newReplicaDefaultRG(1)}, + }, + expectChannelPlans: []ChannelAssignPlan{}, + }, + } + + for _, c := range cases { + suite.Run(c.name, func() { + suite.SetupSuite() + defer suite.TearDownTest() + balancer := suite.balancer + + // 1. set up target for multi collections + collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID)) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return( + nil, c.collectionsSegments, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe() + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(collection) + balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID)) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + + // 2. set up target for distribution for multi collections + for node, s := range c.distributions { + balancer.dist.SegmentDistManager.Update(node, s...) + } + for node, v := range c.distributionChannels { + balancer.dist.ChannelDistManager.Update(node, v...) + } + + // 3. set up nodes info and resourceManager for balancer + for i := range c.nodes { + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) + nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) + nodeInfo.SetState(c.states[i]) + suite.balancer.nodeManager.Add(nodeInfo) + suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i]) + } + utils.RecoverAllCollection(balancer.meta) + + // set node delta count + suite.mockScheduler.ExpectedCalls = nil + for i, node := range c.nodes { + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe() + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe() + } + + // 4. balance and verify result + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) + }) + } +} + func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { balanceCase := struct { name string @@ -636,11 +736,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { expectChannelPlans: []ChannelAssignPlan{}, }, } - for i, c := range cases { + for _, c := range cases { suite.Run(c.name, func() { - if i == 0 { - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() - } suite.SetupSuite() defer suite.TearDownTest() balancer := suite.balancer diff --git a/internal/querycoordv2/ops_service_test.go b/internal/querycoordv2/ops_service_test.go index dd8c048dbf784..16c407c990aa4 100644 --- a/internal/querycoordv2/ops_service_test.go +++ b/internal/querycoordv2/ops_service_test.go @@ -108,6 +108,9 @@ func (suite *OpsServiceSuite) SetupTest() { suite.cluster = session.NewMockCluster(suite.T()) suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.jobScheduler.Start() suite.balancer = balance.NewScoreBasedBalancer( suite.taskScheduler, @@ -539,6 +542,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() { // test copy mode, expect generate 1 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() suite.Equal(len(actions), 1) @@ -556,6 +561,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() { // test transfer all segments, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter := atomic.NewInt64(0) suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() @@ -575,6 +582,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() { // test transfer all segment to all nodes, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter = atomic.NewInt64(0) nodeSet := typeutil.NewUniqueSet() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { @@ -745,6 +754,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() { // test copy mode, expect generate 1 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() suite.Equal(len(actions), 1) @@ -762,6 +773,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() { // test transfer all channels, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter := atomic.NewInt64(0) suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() @@ -781,6 +794,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() { // test transfer all channels to all nodes, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter = atomic.NewInt64(0) nodeSet := typeutil.NewUniqueSet() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 61bd8dc2713e6..e257ea7d55e76 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -162,6 +162,8 @@ func (suite *ServiceSuite) SetupTest() { suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.jobScheduler.Start() suite.balancer = balance.NewRowCountBasedBalancer( suite.taskScheduler, @@ -1096,7 +1098,9 @@ func (suite *ServiceSuite) TestLoadBalance() { DstNodeIDs: []int64{dstNode}, SealedSegmentIDs: segments, } - suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0) + suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) { actions := task.Actions() suite.Len(actions, 2) @@ -1141,7 +1145,9 @@ func (suite *ServiceSuite) TestLoadBalanceWithNoDstNode() { SourceNodeIDs: []int64{srcNode}, SealedSegmentIDs: segments, } - suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0) + suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) { actions := task.Actions() suite.Len(actions, 2) @@ -1214,7 +1220,9 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() { SourceNodeIDs: []int64{srcNode}, DstNodeIDs: []int64{dstNode}, } - suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0) + suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(t task.Task) { actions := t.Actions() suite.Len(actions, 2) diff --git a/internal/querycoordv2/task/mock_scheduler.go b/internal/querycoordv2/task/mock_scheduler.go index 5d6a7cf86a3f3..cf7184874c96d 100644 --- a/internal/querycoordv2/task/mock_scheduler.go +++ b/internal/querycoordv2/task/mock_scheduler.go @@ -125,6 +125,49 @@ func (_c *MockScheduler_Dispatch_Call) RunAndReturn(run func(int64)) *MockSchedu return _c } +// GetChannelTaskDelta provides a mock function with given fields: nodeID, collectionID +func (_m *MockScheduler) GetChannelTaskDelta(nodeID int64, collectionID int64) int { + ret := _m.Called(nodeID, collectionID) + + var r0 int + if rf, ok := ret.Get(0).(func(int64, int64) int); ok { + r0 = rf(nodeID, collectionID) + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MockScheduler_GetChannelTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelTaskDelta' +type MockScheduler_GetChannelTaskDelta_Call struct { + *mock.Call +} + +// GetChannelTaskDelta is a helper method to define mock.On call +// - nodeID int64 +// - collectionID int64 +func (_e *MockScheduler_Expecter) GetChannelTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetChannelTaskDelta_Call { + return &MockScheduler_GetChannelTaskDelta_Call{Call: _e.mock.On("GetChannelTaskDelta", nodeID, collectionID)} +} + +func (_c *MockScheduler_GetChannelTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetChannelTaskDelta_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64)) + }) + return _c +} + +func (_c *MockScheduler_GetChannelTaskDelta_Call) Return(_a0 int) *MockScheduler_GetChannelTaskDelta_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScheduler_GetChannelTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetChannelTaskDelta_Call { + _c.Call.Return(run) + return _c +} + // GetChannelTaskNum provides a mock function with given fields: func (_m *MockScheduler) GetChannelTaskNum() int { ret := _m.Called() @@ -210,55 +253,13 @@ func (_c *MockScheduler_GetExecutedFlag_Call) RunAndReturn(run func(int64) <-cha return _c } -// GetNodeChannelDelta provides a mock function with given fields: nodeID -func (_m *MockScheduler) GetNodeChannelDelta(nodeID int64) int { - ret := _m.Called(nodeID) - - var r0 int - if rf, ok := ret.Get(0).(func(int64) int); ok { - r0 = rf(nodeID) - } else { - r0 = ret.Get(0).(int) - } - - return r0 -} - -// MockScheduler_GetNodeChannelDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelDelta' -type MockScheduler_GetNodeChannelDelta_Call struct { - *mock.Call -} - -// GetNodeChannelDelta is a helper method to define mock.On call -// - nodeID int64 -func (_e *MockScheduler_Expecter) GetNodeChannelDelta(nodeID interface{}) *MockScheduler_GetNodeChannelDelta_Call { - return &MockScheduler_GetNodeChannelDelta_Call{Call: _e.mock.On("GetNodeChannelDelta", nodeID)} -} - -func (_c *MockScheduler_GetNodeChannelDelta_Call) Run(run func(nodeID int64)) *MockScheduler_GetNodeChannelDelta_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockScheduler_GetNodeChannelDelta_Call) Return(_a0 int) *MockScheduler_GetNodeChannelDelta_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockScheduler_GetNodeChannelDelta_Call) RunAndReturn(run func(int64) int) *MockScheduler_GetNodeChannelDelta_Call { - _c.Call.Return(run) - return _c -} - -// GetNodeSegmentDelta provides a mock function with given fields: nodeID -func (_m *MockScheduler) GetNodeSegmentDelta(nodeID int64) int { - ret := _m.Called(nodeID) +// GetSegmentTaskDelta provides a mock function with given fields: nodeID, collectionID +func (_m *MockScheduler) GetSegmentTaskDelta(nodeID int64, collectionID int64) int { + ret := _m.Called(nodeID, collectionID) var r0 int - if rf, ok := ret.Get(0).(func(int64) int); ok { - r0 = rf(nodeID) + if rf, ok := ret.Get(0).(func(int64, int64) int); ok { + r0 = rf(nodeID, collectionID) } else { r0 = ret.Get(0).(int) } @@ -266,30 +267,31 @@ func (_m *MockScheduler) GetNodeSegmentDelta(nodeID int64) int { return r0 } -// MockScheduler_GetNodeSegmentDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeSegmentDelta' -type MockScheduler_GetNodeSegmentDelta_Call struct { +// MockScheduler_GetSegmentTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentTaskDelta' +type MockScheduler_GetSegmentTaskDelta_Call struct { *mock.Call } -// GetNodeSegmentDelta is a helper method to define mock.On call +// GetSegmentTaskDelta is a helper method to define mock.On call // - nodeID int64 -func (_e *MockScheduler_Expecter) GetNodeSegmentDelta(nodeID interface{}) *MockScheduler_GetNodeSegmentDelta_Call { - return &MockScheduler_GetNodeSegmentDelta_Call{Call: _e.mock.On("GetNodeSegmentDelta", nodeID)} +// - collectionID int64 +func (_e *MockScheduler_Expecter) GetSegmentTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetSegmentTaskDelta_Call { + return &MockScheduler_GetSegmentTaskDelta_Call{Call: _e.mock.On("GetSegmentTaskDelta", nodeID, collectionID)} } -func (_c *MockScheduler_GetNodeSegmentDelta_Call) Run(run func(nodeID int64)) *MockScheduler_GetNodeSegmentDelta_Call { +func (_c *MockScheduler_GetSegmentTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetSegmentTaskDelta_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) + run(args[0].(int64), args[1].(int64)) }) return _c } -func (_c *MockScheduler_GetNodeSegmentDelta_Call) Return(_a0 int) *MockScheduler_GetNodeSegmentDelta_Call { +func (_c *MockScheduler_GetSegmentTaskDelta_Call) Return(_a0 int) *MockScheduler_GetSegmentTaskDelta_Call { _c.Call.Return(_a0) return _c } -func (_c *MockScheduler_GetNodeSegmentDelta_Call) RunAndReturn(run func(int64) int) *MockScheduler_GetNodeSegmentDelta_Call { +func (_c *MockScheduler_GetSegmentTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetSegmentTaskDelta_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index b69c3e53676ed..b5539e4d1fe27 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -133,13 +133,14 @@ type Scheduler interface { Add(task Task) error Dispatch(node int64) RemoveByNode(node int64) - GetNodeSegmentDelta(nodeID int64) int - GetNodeChannelDelta(nodeID int64) int GetExecutedFlag(nodeID int64) <-chan struct{} GetChannelTaskNum() int GetSegmentTaskNum() int Sync(segmentID, replicaID int64) bool RemoveSync(segmentID, replicaID int64) + + GetSegmentTaskDelta(nodeID int64, collectionID int64) int + GetChannelTaskDelta(nodeID int64, collectionID int64) int } type taskScheduler struct { @@ -162,6 +163,8 @@ type taskScheduler struct { waitQueue *taskQueue syncTasks map[replicaSegmentIndex]struct{} + // delta changes measure by segment row count and channel num + channelExecutingTaskDelta map[int64]map[int64]int } func NewScheduler(ctx context.Context, @@ -188,12 +191,14 @@ func NewScheduler(ctx context.Context, cluster: cluster, nodeMgr: nodeMgr, - tasks: make(UniqueSet), - segmentTasks: make(map[replicaSegmentIndex]Task), - channelTasks: make(map[replicaChannelIndex]Task), - processQueue: newTaskQueue(), - waitQueue: newTaskQueue(), - syncTasks: make(map[replicaSegmentIndex]struct{}), + tasks: make(UniqueSet), + segmentTasks: make(map[replicaSegmentIndex]Task), + channelTasks: make(map[replicaChannelIndex]Task), + processQueue: newTaskQueue(), + waitQueue: newTaskQueue(), + syncTasks: make(map[replicaSegmentIndex]struct{}), + segmentExecutingTaskDelta: make(map[int64]map[int64]int), + channelExecutingTaskDelta: make(map[int64]map[int64]int), } } @@ -206,6 +211,8 @@ func (scheduler *taskScheduler) Stop() { for nodeID, executor := range scheduler.executors { executor.Stop() delete(scheduler.executors, nodeID) + delete(scheduler.segmentExecutingTaskDelta, nodeID) + delete(scheduler.channelExecutingTaskDelta, nodeID) } for _, task := range scheduler.segmentTasks { @@ -231,6 +238,8 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) { scheduler.cluster, scheduler.nodeMgr) + scheduler.segmentExecutingTaskDelta[nodeID] = make(map[int64]int) + scheduler.channelExecutingTaskDelta[nodeID] = make(map[int64]int) scheduler.executors[nodeID] = executor executor.Start(scheduler.ctx) log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) @@ -244,6 +253,8 @@ func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { if ok { executor.Stop() delete(scheduler.executors, nodeID) + delete(scheduler.segmentExecutingTaskDelta, nodeID) + delete(scheduler.channelExecutingTaskDelta, nodeID) log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) } } @@ -301,10 +312,51 @@ func (scheduler *taskScheduler) Add(task Task) error { } scheduler.updateTaskMetrics() + scheduler.updateTaskDelta(task) + log.Ctx(task.Context()).Info("task added", zap.String("task", task.String())) return nil } +func (scheduler *taskScheduler) updateTaskDelta(task Task) { + var delta int + var deltaMap map[int64]map[int64]int + switch task := task.(type) { + case *SegmentTask: + // skip growing segment's count, cause doesn't know realtime row number of growing segment + if task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Historical { + segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst) + if segment != nil { + delta = int(segment.GetNumOfRows()) + } + } + + deltaMap = scheduler.segmentExecutingTaskDelta + + case *ChannelTask: + delta = 1 + deltaMap = scheduler.channelExecutingTaskDelta + } + + // turn delta to negative when try to remove task + if task.Status() == TaskStatusSucceeded || task.Status() == TaskStatusFailed || task.Status() == TaskStatusCanceled { + delta = -delta + } + + if delta != 0 { + for _, action := range task.Actions() { + if deltaMap[action.Node()] == nil { + deltaMap[action.Node()] = make(map[int64]int) + } + if action.Type() == ActionTypeGrow { + deltaMap[action.Node()][task.CollectionID()] += delta + } else if action.Type() == ActionTypeReduce { + deltaMap[action.Node()][task.CollectionID()] -= delta + } + } + } +} + func (scheduler *taskScheduler) updateTaskMetrics() { segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 @@ -484,18 +536,39 @@ func (scheduler *taskScheduler) Dispatch(node int64) { } } -func (scheduler *taskScheduler) GetNodeSegmentDelta(nodeID int64) int { +func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int { scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - return calculateNodeDelta(nodeID, scheduler.segmentTasks) + return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.segmentExecutingTaskDelta) } -func (scheduler *taskScheduler) GetNodeChannelDelta(nodeID int64) int { +func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - return calculateNodeDelta(nodeID, scheduler.channelTasks) + return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.channelExecutingTaskDelta) +} + +func (scheduler *taskScheduler) calculateTaskDelta(nodeID, collectionID int64, deltaMap map[int64]map[int64]int) int { + if nodeID == -1 && collectionID == -1 { + return 0 + } + + sum := 0 + for nid, nInfo := range deltaMap { + if nid != nodeID && -1 != nodeID { + continue + } + + for cid, cInfo := range nInfo { + if cid == collectionID || -1 == collectionID { + sum += cInfo + } + } + } + + return sum } func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { @@ -524,45 +597,6 @@ func (scheduler *taskScheduler) GetSegmentTaskNum() int { return len(scheduler.segmentTasks) } -func calculateNodeDelta[K comparable, T ~map[K]Task](nodeID int64, tasks T) int { - delta := 0 - for _, task := range tasks { - for _, action := range task.Actions() { - if action.Node() != nodeID { - continue - } - if action.Type() == ActionTypeGrow { - delta++ - } else if action.Type() == ActionTypeReduce { - delta-- - } - } - } - return delta -} - -func (scheduler *taskScheduler) GetNodeSegmentCntDelta(nodeID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - delta := 0 - for _, task := range scheduler.segmentTasks { - for _, action := range task.Actions() { - if action.Node() != nodeID { - continue - } - segmentAction := action.(*SegmentAction) - segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), segmentAction.SegmentID(), meta.NextTarget) - if action.Type() == ActionTypeGrow { - delta += int(segment.GetNumOfRows()) - } else { - delta -= int(segment.GetNumOfRows()) - } - } - } - return delta -} - // schedule selects some tasks to execute, follow these steps for each started selected tasks: // 1. check whether this task is stale, set status to canceled if stale // 2. step up the task's actions, set status to succeeded if all actions finished @@ -814,6 +848,7 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.String("channel", task.Channel())) } + scheduler.updateTaskDelta(task) scheduler.updateTaskMetrics() log.Info("task removed") }