Skip to content

Commit

Permalink
enhance: Mark query node as read only after suspend (milvus-io#35492)
Browse files Browse the repository at this point in the history
issue: milvus-io#34985 milvus-io#35493
after querynode has been suspended, it's not allow to load
segment/channel on it, which means the node is read only. to be
compatible with resource group design, after query node has been
suspend, we remove it from it's original resource group, make it a read
only query node in replica. then two things will happens:
1. it's original resource group will be lacking of query nodes, query
coord will assign new node to it.
2. querycoord will try to move out all segments/channels after querynode
has been suspended

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Aug 20, 2024
1 parent cf8494e commit 59d2e58
Show file tree
Hide file tree
Showing 9 changed files with 175 additions and 259 deletions.
2 changes: 1 addition & 1 deletion internal/proxy/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ func (node *Proxy) TransferSegment(w http.ResponseWriter, req *http.Request) {
w.Write([]byte(fmt.Sprintf(`{"msg": "failed to transfer segment, %s"}`, err.Error())))
return
}
request.TargetNodeID = value
request.SegmentID = value
}

copyMode := req.FormValue("copy_mode")
Expand Down
55 changes: 0 additions & 55 deletions internal/querycoordv2/balance/channel_level_score_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,61 +249,6 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegment() {
}
}

func (suite *ChannelLevelScoreBalancerTestSuite) TestSuspendNode() {
cases := []struct {
name string
distributions map[int64][]*meta.Segment
assignments []*meta.Segment
nodes []int64
segmentCnts []int
states []session.State
expectPlans []SegmentAssignPlan
}{
{
name: "test suspend node",
distributions: map[int64][]*meta.Segment{
2: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20}, Node: 2}},
3: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 30}, Node: 3}},
},
assignments: []*meta.Segment{
{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 5}},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10}},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 15}},
},
nodes: []int64{1, 2, 3, 4},
states: []session.State{session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend},
segmentCnts: []int{0, 1, 1, 0},
expectPlans: []SegmentAssignPlan{},
},
}

for _, c := range cases {
suite.Run(c.name, func() {
// I do not find a better way to do the setup and teardown work for subtests yet.
// If you do, please replace with it.
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "localhost",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
}
plans := balancer.AssignSegment(0, c.assignments, c.nodes, false)
// all node has been suspend, so no node to assign segment
suite.ElementsMatch(c.expectPlans, plans)
})
}
}

func (suite *ChannelLevelScoreBalancerTestSuite) TestAssignSegmentWithGrowing() {
suite.SetupSuite()
defer suite.TearDownTest()
Expand Down
55 changes: 0 additions & 55 deletions internal/querycoordv2/balance/rowcount_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,61 +146,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestAssignSegment() {
}
}

func (suite *RowCountBasedBalancerTestSuite) TestSuspendNode() {
cases := []struct {
name string
distributions map[int64][]*meta.Segment
assignments []*meta.Segment
nodes []int64
segmentCnts []int
states []session.State
expectPlans []SegmentAssignPlan
}{
{
name: "test suspend node",
distributions: map[int64][]*meta.Segment{
2: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20}, Node: 2}},
3: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 30}, Node: 3}},
},
assignments: []*meta.Segment{
{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 5}},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10}},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 15}},
},
nodes: []int64{1, 2, 3, 4},
states: []session.State{session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend},
segmentCnts: []int{0, 1, 1, 0},
expectPlans: []SegmentAssignPlan{},
},
}

for _, c := range cases {
suite.Run(c.name, func() {
// I do not find a better way to do the setup and teardown work for subtests yet.
// If you do, please replace with it.
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "localhost",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
}
plans := balancer.AssignSegment(0, c.assignments, c.nodes, false)
// all node has been suspend, so no node to assign segment
suite.ElementsMatch(c.expectPlans, plans)
})
}
}

func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
cases := []struct {
name string
Expand Down
55 changes: 0 additions & 55 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,61 +249,6 @@ func (suite *ScoreBasedBalancerTestSuite) TestAssignSegment() {
}
}

func (suite *ScoreBasedBalancerTestSuite) TestSuspendNode() {
cases := []struct {
name string
distributions map[int64][]*meta.Segment
assignments []*meta.Segment
nodes []int64
segmentCnts []int
states []session.State
expectPlans []SegmentAssignPlan
}{
{
name: "test suspend node",
distributions: map[int64][]*meta.Segment{
2: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, NumOfRows: 20}, Node: 2}},
3: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, NumOfRows: 30}, Node: 3}},
},
assignments: []*meta.Segment{
{SegmentInfo: &datapb.SegmentInfo{ID: 3, NumOfRows: 5}},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, NumOfRows: 10}},
{SegmentInfo: &datapb.SegmentInfo{ID: 5, NumOfRows: 15}},
},
nodes: []int64{1, 2, 3, 4},
states: []session.State{session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend, session.NodeStateSuspend},
segmentCnts: []int{0, 1, 1, 0},
expectPlans: []SegmentAssignPlan{},
},
}

for _, c := range cases {
suite.Run(c.name, func() {
// I do not find a better way to do the setup and teardown work for subtests yet.
// If you do, please replace with it.
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: c.nodes[i],
Address: "localhost",
Hostname: "localhost",
})
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
}
plans := balancer.AssignSegment(0, c.assignments, c.nodes, false)
// all node has been suspend, so no node to assign segment
suite.ElementsMatch(c.expectPlans, plans)
})
}
}

func (suite *ScoreBasedBalancerTestSuite) TestAssignSegmentWithGrowing() {
suite.SetupSuite()
defer suite.TearDownTest()
Expand Down
14 changes: 10 additions & 4 deletions internal/querycoordv2/ops_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,23 +433,29 @@ func (suite *OpsServiceSuite) TestSuspendAndResumeNode() {
Address: "localhost",
Hostname: "localhost",
}))
suite.meta.ResourceManager.HandleNodeUp(1)
nodes, err := suite.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.NoError(err)
suite.Contains(nodes, int64(1))
// test success
suite.server.UpdateStateCode(commonpb.StateCode_Healthy)
resp, err = suite.server.SuspendNode(ctx, &querypb.SuspendNodeRequest{
NodeID: 1,
})
suite.NoError(err)
suite.True(merr.Ok(resp))
node := suite.nodeMgr.Get(1)
suite.Equal(session.NodeStateSuspend, node.GetState())
nodes, err = suite.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.NoError(err)
suite.NotContains(nodes, int64(1))

resp, err = suite.server.ResumeNode(ctx, &querypb.ResumeNodeRequest{
NodeID: 1,
})
suite.NoError(err)
suite.True(merr.Ok(resp))
node = suite.nodeMgr.Get(1)
suite.Equal(session.NodeStateNormal, node.GetState())
nodes, err = suite.meta.ResourceManager.GetNodes(meta.DefaultResourceGroupName)
suite.NoError(err)
suite.Contains(nodes, int64(1))
}

func (suite *OpsServiceSuite) TestTransferSegment() {
Expand Down
44 changes: 13 additions & 31 deletions internal/querycoordv2/ops_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,7 @@ func (s *Server) SuspendNode(ctx context.Context, req *querypb.SuspendNodeReques
return merr.Status(err), nil
}

err := s.nodeMgr.Suspend(req.GetNodeID())
if err != nil {
log.Warn(errMsg, zap.Error(err))
return merr.Status(err), nil
}

s.meta.ResourceManager.HandleNodeDown(req.GetNodeID())
return merr.Success(), nil
}

Expand All @@ -238,11 +233,7 @@ func (s *Server) ResumeNode(ctx context.Context, req *querypb.ResumeNodeRequest)
return merr.Status(err), nil
}

err := s.nodeMgr.Resume(req.GetNodeID())
if err != nil {
log.Warn(errMsg, zap.Error(err))
return merr.Status(errors.Wrap(err, errMsg)), nil
}
s.meta.ResourceManager.HandleNodeUp(req.GetNodeID())

return merr.Success(), nil
}
Expand Down Expand Up @@ -423,38 +414,29 @@ func (s *Server) CheckQueryNodeDistribution(ctx context.Context, req *querypb.Ch
return ch.GetChannelName(), ch
})
for _, ch := range channelOnSrc {
if _, ok := channelDstMap[ch.GetChannelName()]; !ok {
return merr.Status(merr.WrapErrChannelLack(ch.GetChannelName())), nil
if s.targetMgr.GetDmChannel(ch.GetCollectionID(), ch.GetChannelName(), meta.CurrentTargetFirst) == nil {
continue
}
}
channelSrcMap := lo.SliceToMap(channelOnSrc, func(ch *meta.DmChannel) (string, *meta.DmChannel) {
return ch.GetChannelName(), ch
})
for _, ch := range channelOnDst {
if _, ok := channelSrcMap[ch.GetChannelName()]; !ok {

if _, ok := channelDstMap[ch.GetChannelName()]; !ok {
return merr.Status(merr.WrapErrChannelLack(ch.GetChannelName())), nil
}
}

// check segment list
// check whether all segment exist in source node has been loaded in target node
segmentOnSrc := s.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(req.GetSourceNodeID()))
segmentOnDst := s.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(req.GetTargetNodeID()))
segmentDstMap := lo.SliceToMap(segmentOnDst, func(s *meta.Segment) (int64, *meta.Segment) {
return s.GetID(), s
})
for _, s := range segmentOnSrc {
if _, ok := segmentDstMap[s.GetID()]; !ok {
return merr.Status(merr.WrapErrSegmentLack(s.GetID())), nil
for _, segment := range segmentOnSrc {
if s.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTargetFirst) == nil {
continue
}
}
segmentSrcMap := lo.SliceToMap(segmentOnSrc, func(s *meta.Segment) (int64, *meta.Segment) {
return s.GetID(), s
})
for _, s := range segmentOnDst {
if _, ok := segmentSrcMap[s.GetID()]; !ok {
return merr.Status(merr.WrapErrSegmentLack(s.GetID())), nil

if _, ok := segmentDstMap[segment.GetID()]; !ok {
return merr.Status(merr.WrapErrSegmentLack(segment.GetID())), nil
}
}

return merr.Success(), nil
}
41 changes: 0 additions & 41 deletions internal/querycoordv2/session/node_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,8 @@ import (

"github.com/blang/semver/v4"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/merr"
)

type Manager interface {
Expand Down Expand Up @@ -68,42 +65,6 @@ func (m *NodeManager) Stopping(nodeID int64) {
}
}

func (m *NodeManager) Suspend(nodeID int64) error {
m.mu.Lock()
defer m.mu.Unlock()
nodeInfo, ok := m.nodes[nodeID]
if !ok {
return merr.WrapErrNodeNotFound(nodeID)
}
switch nodeInfo.GetState() {
case NodeStateNormal:
nodeInfo.SetState(NodeStateSuspend)
return nil
default:
log.Warn("failed to suspend query node", zap.Int64("nodeID", nodeID), zap.String("state", nodeInfo.GetState().String()))
return merr.WrapErrNodeStateUnexpected(nodeID, nodeInfo.GetState().String(), "failed to suspend a query node")
}
}

func (m *NodeManager) Resume(nodeID int64) error {
m.mu.Lock()
defer m.mu.Unlock()
nodeInfo, ok := m.nodes[nodeID]
if !ok {
return merr.WrapErrNodeNotFound(nodeID)
}

switch nodeInfo.GetState() {
case NodeStateSuspend:
nodeInfo.SetState(NodeStateNormal)
return nil

default:
log.Warn("failed to resume query node", zap.Int64("nodeID", nodeID), zap.String("state", nodeInfo.GetState().String()))
return merr.WrapErrNodeStateUnexpected(nodeID, nodeInfo.GetState().String(), "failed to resume query node")
}
}

func (m *NodeManager) IsStoppingNode(nodeID int64) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
Expand Down Expand Up @@ -155,13 +116,11 @@ type ImmutableNodeInfo struct {
const (
NodeStateNormal State = iota
NodeStateStopping
NodeStateSuspend
)

var stateNameMap = map[State]string{
NodeStateNormal: NormalStateName,
NodeStateStopping: StoppingStateName,
NodeStateSuspend: SuspendStateName,
}

func (s State) String() string {
Expand Down
Loading

0 comments on commit 59d2e58

Please sign in to comment.