diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 7e39f54a4bb0e..3f214caf6ff0b 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -123,9 +123,6 @@ type Server struct { enableActiveStandBy bool activateFunc func() error - nodeUpEventChan chan int64 - notifyNodeUp chan struct{} - // proxy client manager proxyCreator proxyutil.ProxyCreator proxyWatcher proxyutil.ProxyWatcherInterface @@ -135,11 +132,9 @@ type Server struct { func NewQueryCoord(ctx context.Context) (*Server, error) { ctx, cancel := context.WithCancel(ctx) server := &Server{ - ctx: ctx, - cancel: cancel, - nodeUpEventChan: make(chan int64, 10240), - notifyNodeUp: make(chan struct{}), - balancerMap: make(map[string]balance.Balance), + ctx: ctx, + cancel: cancel, + balancerMap: make(map[string]balance.Balance), } server.UpdateStateCode(commonpb.StateCode_Abnormal) server.queryNodeCreator = session.DefaultQueryNodeCreator @@ -461,8 +456,7 @@ func (s *Server) startQueryCoord() error { s.handleNodeUp(node.ServerID) } - s.wg.Add(2) - go s.handleNodeUpLoop() + s.wg.Add(1) go s.watchNodes(revision) // check whether old node exist, if yes suspend auto balance until all old nodes down @@ -685,11 +679,7 @@ func (s *Server) watchNodes(revision int64) { Hostname: event.Session.HostName, Version: event.Session.Version, })) - s.nodeUpEventChan <- nodeID - select { - case s.notifyNodeUp <- struct{}{}: - default: - } + s.handleNodeUp(nodeID) case sessionutil.SessionUpdateEvent: nodeID := event.Session.ServerID @@ -713,49 +703,6 @@ func (s *Server) watchNodes(revision int64) { } } -func (s *Server) handleNodeUpLoop() { - defer s.wg.Done() - ticker := time.NewTicker(Params.QueryCoordCfg.CheckHealthInterval.GetAsDuration(time.Millisecond)) - defer ticker.Stop() - for { - select { - case <-s.ctx.Done(): - log.Info("handle node up loop exit due to context done") - return - case <-s.notifyNodeUp: - s.tryHandleNodeUp() - case <-ticker.C: - s.tryHandleNodeUp() - } - } -} - -func (s *Server) tryHandleNodeUp() { - log := log.Ctx(s.ctx).WithRateGroup("qcv2.Server", 1, 60) - ctx, cancel := context.WithTimeout(s.ctx, Params.QueryCoordCfg.CheckHealthRPCTimeout.GetAsDuration(time.Millisecond)) - defer cancel() - reasons, err := s.checkNodeHealth(ctx) - if err != nil { - log.RatedWarn(10, "unhealthy node exist, node up will be delayed", - zap.Int("delayedNodeUpEvents", len(s.nodeUpEventChan)), - zap.Int("unhealthyNodeNum", len(reasons)), - zap.Strings("unhealthyReason", reasons)) - return - } - for len(s.nodeUpEventChan) > 0 { - nodeID := <-s.nodeUpEventChan - if s.nodeMgr.Get(nodeID) != nil { - // only if all nodes are healthy, node up event will be handled - s.handleNodeUp(nodeID) - s.metricsCacheManager.InvalidateSystemInfoMetrics() - s.checkerController.Check() - } else { - log.Warn("node already down", - zap.Int64("nodeID", nodeID)) - } - } -} - func (s *Server) handleNodeUp(node int64) { s.taskScheduler.AddExecutor(node) s.distController.StartDistInstance(s.ctx, node) diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 78c2fdb89b6f1..82be2bb6e66ac 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -193,7 +193,6 @@ func (suite *ServerSuite) TestNodeUp() { suite.NoError(err) defer node1.Stop() - suite.server.notifyNodeUp <- struct{}{} suite.Eventually(func() bool { node := suite.server.nodeMgr.Get(node1.ID) if node == nil { @@ -207,54 +206,6 @@ func (suite *ServerSuite) TestNodeUp() { } return true }, 5*time.Second, time.Second) - - // mock unhealthy node - suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{ - NodeID: 1001, - Address: "localhost", - Hostname: "localhost", - })) - - node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 101) - node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Success()}, nil).Maybe() - err = node2.Start() - suite.NoError(err) - defer node2.Stop() - - // expect node2 won't be add to qc, due to unhealthy nodes exist - suite.server.notifyNodeUp <- struct{}{} - suite.Eventually(func() bool { - node := suite.server.nodeMgr.Get(node2.ID) - if node == nil { - return false - } - for _, collection := range suite.collections { - replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID) - if replica == nil { - return true - } - } - return false - }, 5*time.Second, time.Second) - - // mock unhealthy node down, so no unhealthy nodes exist - suite.server.nodeMgr.Remove(1001) - suite.server.notifyNodeUp <- struct{}{} - - // expect node2 will be add to qc - suite.Eventually(func() bool { - node := suite.server.nodeMgr.Get(node2.ID) - if node == nil { - return false - } - for _, collection := range suite.collections { - replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID) - if replica == nil { - return false - } - } - return true - }, 5*time.Second, time.Second) } func (suite *ServerSuite) TestNodeUpdate() {