diff --git a/internal/proxy/management.go b/internal/proxy/management.go index 9b31d487fabf6..31e7883267776 100644 --- a/internal/proxy/management.go +++ b/internal/proxy/management.go @@ -343,7 +343,7 @@ func (node *Proxy) TransferSegment(w http.ResponseWriter, req *http.Request) { w.Write([]byte(fmt.Sprintf(`{"msg": "failed to transfer segment, %s"}`, err.Error()))) return } - request.TargetNodeID = value + request.SegmentID = value } copyMode := req.FormValue("copy_mode") diff --git a/internal/querycoordv2/balance/channel_level_score_balancer_test.go b/internal/querycoordv2/balance/channel_level_score_balancer_test.go index b09b3f943d340..0e5c94ce63788 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer_test.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer_test.go @@ -249,61 +249,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() { } } -func (suite *ChannelLevelScoreBalancerTestSuite) TestSuspendNode() { - cases := []struct { - name string - distributions map[int64][]*meta.Segment - assignments []*meta.Segment - nodes []int64 - segmentCnts []int - states []session.State - expectPlans []SegmentAssignPlan - }{ - { - name: "test suspend node", - distributions: map[int64][]*meta.Segment{ - 2: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20}, Node: 2}}, - 3: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 30}, Node: 3}}, - }, - assignments: []*meta.Segment{ - {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 5}}, - {SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10}}, - {SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 15}}, - }, - nodes: []int64{1, 2, 3, 4}, - states: []session.State{session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend}, - segmentCnts: []int{0, 1, 1, 0}, - expectPlans: []SegmentAssignPlan{}, - }, - } - - for _, c := range cases { - suite.Run(c.name, func() { - // I do not find a better way to do the setup and teardown work for subtests yet. - // If you do, please replace with it. - suite.SetupSuite() - defer suite.TearDownTest() - balancer := suite.balancer - for node, s := range c.distributions { - balancer.dist.SegmentDistManager.Update(node, s...) - } - for i := range c.nodes { - nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: c.nodes[i], - Address: "localhost", - Hostname: "localhost", - }) - nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) - nodeInfo.SetState(c.states[i]) - suite.balancer.nodeManager.Add(nodeInfo) - } - plans := balancer.AssignSegment(0, c.assignments, c.nodes, false) - // all node has been suspend, so no node to assign segment - suite.ElementsMatch(c.expectPlans, plans) - }) - } -} - func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing() { suite.SetupSuite() defer suite.TearDownTest() diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index f6d6300512d10..901ab989e97e6 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -146,61 +146,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestAssignSegment() { } } -func (suite *RowCountBasedBalancerTestSuite) TestSuspendNode() { - cases := []struct { - name string - distributions map[int64][]*meta.Segment - assignments []*meta.Segment - nodes []int64 - segmentCnts []int - states []session.State - expectPlans []SegmentAssignPlan - }{ - { - name: "test suspend node", - distributions: map[int64][]*meta.Segment{ - 2: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20}, Node: 2}}, - 3: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 30}, Node: 3}}, - }, - assignments: []*meta.Segment{ - {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 5}}, - {SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10}}, - {SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 15}}, - }, - nodes: []int64{1, 2, 3, 4}, - states: []session.State{session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend}, - segmentCnts: []int{0, 1, 1, 0}, - expectPlans: []SegmentAssignPlan{}, - }, - } - - for _, c := range cases { - suite.Run(c.name, func() { - // I do not find a better way to do the setup and teardown work for subtests yet. - // If you do, please replace with it. - suite.SetupSuite() - defer suite.TearDownTest() - balancer := suite.balancer - for node, s := range c.distributions { - balancer.dist.SegmentDistManager.Update(node, s...) - } - for i := range c.nodes { - nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: c.nodes[i], - Address: "localhost", - Hostname: "localhost", - }) - nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) - nodeInfo.SetState(c.states[i]) - suite.balancer.nodeManager.Add(nodeInfo) - } - plans := balancer.AssignSegment(0, c.assignments, c.nodes, false) - // all node has been suspend, so no node to assign segment - suite.ElementsMatch(c.expectPlans, plans) - }) - } -} - func (suite *RowCountBasedBalancerTestSuite) TestBalance() { cases := []struct { name string diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index 3b55d4b7528b1..da342a21a4864 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -249,61 +249,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() { } } -func (suite *ScoreBasedBalancerTestSuite) TestSuspendNode() { - cases := []struct { - name string - distributions map[int64][]*meta.Segment - assignments []*meta.Segment - nodes []int64 - segmentCnts []int - states []session.State - expectPlans []SegmentAssignPlan - }{ - { - name: "test suspend node", - distributions: map[int64][]*meta.Segment{ - 2: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20}, Node: 2}}, - 3: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 30}, Node: 3}}, - }, - assignments: []*meta.Segment{ - {SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 5}}, - {SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10}}, - {SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 15}}, - }, - nodes: []int64{1, 2, 3, 4}, - states: []session.State{session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend}, - segmentCnts: []int{0, 1, 1, 0}, - expectPlans: []SegmentAssignPlan{}, - }, - } - - for _, c := range cases { - suite.Run(c.name, func() { - // I do not find a better way to do the setup and teardown work for subtests yet. - // If you do, please replace with it. - suite.SetupSuite() - defer suite.TearDownTest() - balancer := suite.balancer - for node, s := range c.distributions { - balancer.dist.SegmentDistManager.Update(node, s...) - } - for i := range c.nodes { - nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: c.nodes[i], - Address: "localhost", - Hostname: "localhost", - }) - nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i])) - nodeInfo.SetState(c.states[i]) - suite.balancer.nodeManager.Add(nodeInfo) - } - plans := balancer.AssignSegment(0, c.assignments, c.nodes, false) - // all node has been suspend, so no node to assign segment - suite.ElementsMatch(c.expectPlans, plans) - }) - } -} - func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() { suite.SetupSuite() defer suite.TearDownTest() diff --git a/internal/querycoordv2/ops_service_test.go b/internal/querycoordv2/ops_service_test.go index c073bdf0f5fd7..82ef1f696df1a 100644 --- a/internal/querycoordv2/ops_service_test.go +++ b/internal/querycoordv2/ops_service_test.go @@ -433,6 +433,10 @@ func (suite *OpsServiceSuite) TestSuspendAndResumeNode() { Address: "localhost", Hostname: "localhost", })) + suite.meta.ResourceManager.HandleNodeUp(1) + nodes, err := suite.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName) + suite.NoError(err) + suite.Contains(nodes, int64(1)) // test success suite.server.UpdateStateCode(commonpb.StateCode_Healthy) resp, err = suite.server.SuspendNode(ctx, &querypb.SuspendNodeRequest{ @@ -440,16 +444,18 @@ func (suite *OpsServiceSuite) TestSuspendAndResumeNode() { }) suite.NoError(err) suite.True(merr.Ok(resp)) - node := suite.nodeMgr.Get(1) - suite.Equal(session.NodeStateSuspend, node.GetState()) + nodes, err = suite.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName) + suite.NoError(err) + suite.NotContains(nodes, int64(1)) resp, err = suite.server.ResumeNode(ctx, &querypb.ResumeNodeRequest{ NodeID: 1, }) suite.NoError(err) suite.True(merr.Ok(resp)) - node = suite.nodeMgr.Get(1) - suite.Equal(session.NodeStateNormal, node.GetState()) + nodes, err = suite.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName) + suite.NoError(err) + suite.Contains(nodes, int64(1)) } func (suite *OpsServiceSuite) TestTransferSegment() { diff --git a/internal/querycoordv2/ops_services.go b/internal/querycoordv2/ops_services.go index 46b3792207706..e9d76feb6635d 100644 --- a/internal/querycoordv2/ops_services.go +++ b/internal/querycoordv2/ops_services.go @@ -212,12 +212,7 @@ func (s *Server) SuspendNode(ctx context.Context, req *querypb.SuspendNodeReques return merr.Status(err), nil } - err := s.nodeMgr.Suspend(req.GetNodeID()) - if err != nil { - log.Warn(errMsg, zap.Error(err)) - return merr.Status(err), nil - } - + s.meta.ResourceManager.HandleNodeDown(req.GetNodeID()) return merr.Success(), nil } @@ -238,11 +233,7 @@ func (s *Server) ResumeNode(ctx context.Context, req *querypb.ResumeNodeRequest) return merr.Status(err), nil } - err := s.nodeMgr.Resume(req.GetNodeID()) - if err != nil { - log.Warn(errMsg, zap.Error(err)) - return merr.Status(errors.Wrap(err, errMsg)), nil - } + s.meta.ResourceManager.HandleNodeUp(req.GetNodeID()) return merr.Success(), nil } @@ -423,38 +414,29 @@ func (s *Server) CheckQueryNodeDistribution(ctx context.Context, req *querypb.Ch return ch.GetChannelName(), ch }) for _, ch := range channelOnSrc { - if _, ok := channelDstMap[ch.GetChannelName()]; !ok { - return merr.Status(merr.WrapErrChannelLack(ch.GetChannelName())), nil + if s.targetMgr.GetDmChannel(ch.GetCollectionID(), ch.GetChannelName(), meta.CurrentTargetFirst) == nil { + continue } - } - channelSrcMap := lo.SliceToMap(channelOnSrc, func(ch *meta.DmChannel) (string, *meta.DmChannel) { - return ch.GetChannelName(), ch - }) - for _, ch := range channelOnDst { - if _, ok := channelSrcMap[ch.GetChannelName()]; !ok { + + if _, ok := channelDstMap[ch.GetChannelName()]; !ok { return merr.Status(merr.WrapErrChannelLack(ch.GetChannelName())), nil } } - // check segment list + // check whether all segment exist in source node has been loaded in target node segmentOnSrc := s.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(req.GetSourceNodeID())) segmentOnDst := s.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(req.GetTargetNodeID())) segmentDstMap := lo.SliceToMap(segmentOnDst, func(s *meta.Segment) (int64, *meta.Segment) { return s.GetID(), s }) - for _, s := range segmentOnSrc { - if _, ok := segmentDstMap[s.GetID()]; !ok { - return merr.Status(merr.WrapErrSegmentLack(s.GetID())), nil + for _, segment := range segmentOnSrc { + if s.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst) == nil { + continue } - } - segmentSrcMap := lo.SliceToMap(segmentOnSrc, func(s *meta.Segment) (int64, *meta.Segment) { - return s.GetID(), s - }) - for _, s := range segmentOnDst { - if _, ok := segmentSrcMap[s.GetID()]; !ok { - return merr.Status(merr.WrapErrSegmentLack(s.GetID())), nil + + if _, ok := segmentDstMap[segment.GetID()]; !ok { + return merr.Status(merr.WrapErrSegmentLack(segment.GetID())), nil } } - return merr.Success(), nil } diff --git a/internal/querycoordv2/session/node_manager.go b/internal/querycoordv2/session/node_manager.go index 43799ae467b33..a43edabd0f189 100644 --- a/internal/querycoordv2/session/node_manager.go +++ b/internal/querycoordv2/session/node_manager.go @@ -23,11 +23,8 @@ import ( "github.com/blang/semver/v4" "go.uber.org/atomic" - "go.uber.org/zap" - "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" - "github.com/milvus-io/milvus/pkg/util/merr" ) type Manager interface { @@ -68,42 +65,6 @@ func (m *NodeManager) Stopping(nodeID int64) { } } -func (m *NodeManager) Suspend(nodeID int64) error { - m.mu.Lock() - defer m.mu.Unlock() - nodeInfo, ok := m.nodes[nodeID] - if !ok { - return merr.WrapErrNodeNotFound(nodeID) - } - switch nodeInfo.GetState() { - case NodeStateNormal: - nodeInfo.SetState(NodeStateSuspend) - return nil - default: - log.Warn("failed to suspend query node", zap.Int64("nodeID", nodeID), zap.String("state", nodeInfo.GetState().String())) - return merr.WrapErrNodeStateUnexpected(nodeID, nodeInfo.GetState().String(), "failed to suspend a query node") - } -} - -func (m *NodeManager) Resume(nodeID int64) error { - m.mu.Lock() - defer m.mu.Unlock() - nodeInfo, ok := m.nodes[nodeID] - if !ok { - return merr.WrapErrNodeNotFound(nodeID) - } - - switch nodeInfo.GetState() { - case NodeStateSuspend: - nodeInfo.SetState(NodeStateNormal) - return nil - - default: - log.Warn("failed to resume query node", zap.Int64("nodeID", nodeID), zap.String("state", nodeInfo.GetState().String())) - return merr.WrapErrNodeStateUnexpected(nodeID, nodeInfo.GetState().String(), "failed to resume query node") - } -} - func (m *NodeManager) IsStoppingNode(nodeID int64) (bool, error) { m.mu.RLock() defer m.mu.RUnlock() @@ -155,13 +116,11 @@ type ImmutableNodeInfo struct { const ( NodeStateNormal State = iota NodeStateStopping - NodeStateSuspend ) var stateNameMap = map[State]string{ NodeStateNormal: NormalStateName, NodeStateStopping: StoppingStateName, - NodeStateSuspend: SuspendStateName, } func (s State) String() string { diff --git a/internal/querycoordv2/session/node_manager_test.go b/internal/querycoordv2/session/node_manager_test.go index fd49fa051fdbd..2a79f2efe6c37 100644 --- a/internal/querycoordv2/session/node_manager_test.go +++ b/internal/querycoordv2/session/node_manager_test.go @@ -21,8 +21,6 @@ import ( "time" "github.com/stretchr/testify/suite" - - "github.com/milvus-io/milvus/pkg/util/merr" ) type NodeManagerSuite struct { @@ -63,24 +61,9 @@ func (s *NodeManagerSuite) TestNodeOperation() { s.nodeManager.Stopping(2) s.True(s.nodeManager.IsStoppingNode(2)) - err := s.nodeManager.Resume(2) - s.ErrorIs(err, merr.ErrNodeStateUnexpected) - s.True(s.nodeManager.IsStoppingNode(2)) node := s.nodeManager.Get(2) node.SetState(NodeStateNormal) s.False(s.nodeManager.IsStoppingNode(2)) - - err = s.nodeManager.Resume(3) - s.ErrorIs(err, merr.ErrNodeStateUnexpected) - - s.nodeManager.Suspend(3) - node = s.nodeManager.Get(3) - s.NotNil(node) - s.Equal(NodeStateSuspend, node.GetState()) - s.nodeManager.Resume(3) - node = s.nodeManager.Get(3) - s.NotNil(node) - s.Equal(NodeStateNormal, node.GetState()) } func (s *NodeManagerSuite) TestNodeInfo() { diff --git a/tests/integration/ops/suspend_node_test.go b/tests/integration/ops/suspend_node_test.go new file mode 100644 index 0000000000000..02ca0634f7bf1 --- /dev/null +++ b/tests/integration/ops/suspend_node_test.go @@ -0,0 +1,151 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ops + +import ( + "context" + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +const ( + dim = 128 + dbName = "" + collectionName = "test_suspend_node" +) + +type SuspendNodeTestSuite struct { + integration.MiniClusterSuite +} + +func (s *SuspendNodeTestSuite) SetupSuite() { + paramtable.Init() + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.BalanceCheckInterval.Key, "1000") + paramtable.Get().Save(paramtable.Get().QueryNodeCfg.GracefulStopTimeout.Key, "1") + + s.Require().NoError(s.SetupEmbedEtcd()) +} + +func (s *SuspendNodeTestSuite) loadCollection(collectionName string, db string, replica int, rgs []string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // load + loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: db, + CollectionName: collectionName, + ReplicaNumber: int32(replica), + ResourceGroups: rgs, + }) + s.NoError(err) + s.True(merr.Ok(loadStatus)) + s.WaitForLoadWithDB(ctx, db, collectionName) +} + +func (s *SuspendNodeTestSuite) releaseCollection(db, collectionName string) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // load + status, err := s.Cluster.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ + DbName: db, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(status)) +} + +func (s *SuspendNodeTestSuite) TestSuspendNode() { + ctx := context.Background() + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + + qns := make([]*grpcquerynode.Server, 0) + for i := 1; i < 3; i++ { + qn := s.Cluster.AddQueryNode() + qns = append(qns, qn) + } + + // load collection without specified replica and rgs + s.loadCollection(collectionName, dbName, 1, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 1) + defer s.releaseCollection(dbName, collectionName) + + resp3, err := s.Cluster.QueryCoord.SuspendNode(ctx, &querypb.SuspendNodeRequest{ + NodeID: qns[0].GetQueryNode().GetNodeID(), + }) + s.NoError(err) + s.True(merr.Ok(resp3)) + + // expect suspend node to be removed from resource group + resp5, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: meta.DefaultResourceGroupName, + }) + s.NoError(err) + s.True(merr.Ok(resp5.GetStatus())) + s.Equal(2, len(resp5.GetResourceGroup().GetNodes())) + + resp6, err := s.Cluster.QueryCoord.ResumeNode(ctx, &querypb.ResumeNodeRequest{ + NodeID: qns[0].GetQueryNode().GetNodeID(), + }) + s.NoError(err) + s.True(merr.Ok(resp6)) + + // expect node state to be resume + resp7, err := s.Cluster.QueryCoord.ListQueryNode(ctx, &querypb.ListQueryNodeRequest{}) + s.NoError(err) + s.True(merr.Ok(resp7.GetStatus())) + for _, node := range resp7.GetNodeInfos() { + if node.GetID() == qns[0].GetQueryNode().GetNodeID() { + s.Equal(session.NodeStateNormal.String(), node.GetState()) + } + } + + // expect suspend node to be added to resource group + resp8, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: meta.DefaultResourceGroupName, + }) + s.NoError(err) + s.True(merr.Ok(resp8.GetStatus())) + s.Equal(3, len(resp8.GetResourceGroup().GetNodes())) +} + +func TestSuspendNode(t *testing.T) { + suite.Run(t, new(SuspendNodeTestSuite)) +}