Skip to content

Commit

Permalink
enhance: Remove useless logic which block nodeUp progress
Browse files Browse the repository at this point in the history
before we supported balance node between replicas, to avoid assign node
to unexpected replica due to dirty node exist, we block the nodeUp
progress until all dirty node down.

This PR remove useless logic which block nodeUp progress

Signed-off-by: Wei Liu <[email protected]>
  • Loading branch information
weiliu1031 committed Jul 24, 2024
1 parent c5da4fa commit 2ae1094
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 107 deletions.
63 changes: 5 additions & 58 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ type Server struct {
enableActiveStandBy bool
activateFunc func() error

nodeUpEventChan chan int64
notifyNodeUp chan struct{}

// proxy client manager
proxyCreator proxyutil.ProxyCreator
proxyWatcher proxyutil.ProxyWatcherInterface
Expand All @@ -135,11 +132,9 @@ type Server struct {
func NewQueryCoord(ctx context.Context) (*Server, error) {
ctx, cancel := context.WithCancel(ctx)
server := &Server{
ctx: ctx,
cancel: cancel,
nodeUpEventChan: make(chan int64, 10240),
notifyNodeUp: make(chan struct{}),
balancerMap: make(map[string]balance.Balance),
ctx: ctx,
cancel: cancel,
balancerMap: make(map[string]balance.Balance),
}
server.UpdateStateCode(commonpb.StateCode_Abnormal)
server.queryNodeCreator = session.DefaultQueryNodeCreator
Expand Down Expand Up @@ -461,8 +456,7 @@ func (s *Server) startQueryCoord() error {
s.handleNodeUp(node.ServerID)
}

s.wg.Add(2)
go s.handleNodeUpLoop()
s.wg.Add(1)
go s.watchNodes(revision)

// check whether old node exist, if yes suspend auto balance until all old nodes down
Expand Down Expand Up @@ -685,11 +679,7 @@ func (s *Server) watchNodes(revision int64) {
Hostname: event.Session.HostName,
Version: event.Session.Version,
}))
s.nodeUpEventChan <- nodeID
select {
case s.notifyNodeUp <- struct{}{}:
default:
}
s.handleNodeUp(nodeID)

case sessionutil.SessionUpdateEvent:
nodeID := event.Session.ServerID
Expand All @@ -713,49 +703,6 @@ func (s *Server) watchNodes(revision int64) {
}
}

func (s *Server) handleNodeUpLoop() {
defer s.wg.Done()
ticker := time.NewTicker(Params.QueryCoordCfg.CheckHealthInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
log.Info("handle node up loop exit due to context done")
return
case <-s.notifyNodeUp:
s.tryHandleNodeUp()
case <-ticker.C:
s.tryHandleNodeUp()
}
}
}

func (s *Server) tryHandleNodeUp() {
log := log.Ctx(s.ctx).WithRateGroup("qcv2.Server", 1, 60)
ctx, cancel := context.WithTimeout(s.ctx, Params.QueryCoordCfg.CheckHealthRPCTimeout.GetAsDuration(time.Millisecond))
defer cancel()
reasons, err := s.checkNodeHealth(ctx)
if err != nil {
log.RatedWarn(10, "unhealthy node exist, node up will be delayed",
zap.Int("delayedNodeUpEvents", len(s.nodeUpEventChan)),
zap.Int("unhealthyNodeNum", len(reasons)),
zap.Strings("unhealthyReason", reasons))
return
}
for len(s.nodeUpEventChan) > 0 {
nodeID := <-s.nodeUpEventChan
if s.nodeMgr.Get(nodeID) != nil {
// only if all nodes are healthy, node up event will be handled
s.handleNodeUp(nodeID)
s.metricsCacheManager.InvalidateSystemInfoMetrics()
s.checkerController.Check()
} else {
log.Warn("node already down",
zap.Int64("nodeID", nodeID))
}
}
}

func (s *Server) handleNodeUp(node int64) {
s.taskScheduler.AddExecutor(node)
s.distController.StartDistInstance(s.ctx, node)
Expand Down
49 changes: 0 additions & 49 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func (suite *ServerSuite) TestNodeUp() {
suite.NoError(err)
defer node1.Stop()

suite.server.notifyNodeUp <- struct{}{}
suite.Eventually(func() bool {
node := suite.server.nodeMgr.Get(node1.ID)
if node == nil {
Expand All @@ -207,54 +206,6 @@ func (suite *ServerSuite) TestNodeUp() {
}
return true
}, 5*time.Second, time.Second)

// mock unhealthy node
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1001,
Address: "localhost",
Hostname: "localhost",
}))

node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 101)
node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Success()}, nil).Maybe()
err = node2.Start()
suite.NoError(err)
defer node2.Stop()

// expect node2 won't be add to qc, due to unhealthy nodes exist
suite.server.notifyNodeUp <- struct{}{}
suite.Eventually(func() bool {
node := suite.server.nodeMgr.Get(node2.ID)
if node == nil {
return false
}
for _, collection := range suite.collections {
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID)
if replica == nil {
return true
}
}
return false
}, 5*time.Second, time.Second)

// mock unhealthy node down, so no unhealthy nodes exist
suite.server.nodeMgr.Remove(1001)
suite.server.notifyNodeUp <- struct{}{}

// expect node2 will be add to qc
suite.Eventually(func() bool {
node := suite.server.nodeMgr.Get(node2.ID)
if node == nil {
return false
}
for _, collection := range suite.collections {
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID)
if replica == nil {
return false
}
}
return true
}, 5*time.Second, time.Second)
}

func (suite *ServerSuite) TestNodeUpdate() {
Expand Down

0 comments on commit 2ae1094

Please sign in to comment.