Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Remove useless logic which block nodeUp progress #33917

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 5 additions & 58 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ type Server struct {
enableActiveStandBy bool
activateFunc func() error

nodeUpEventChan chan int64
notifyNodeUp chan struct{}

// proxy client manager
proxyCreator proxyutil.ProxyCreator
proxyWatcher proxyutil.ProxyWatcherInterface
Expand All @@ -135,11 +132,9 @@ type Server struct {
func NewQueryCoord(ctx context.Context) (*Server, error) {
ctx, cancel := context.WithCancel(ctx)
server := &Server{
ctx: ctx,
cancel: cancel,
nodeUpEventChan: make(chan int64, 10240),
notifyNodeUp: make(chan struct{}),
balancerMap: make(map[string]balance.Balance),
ctx: ctx,
cancel: cancel,
balancerMap: make(map[string]balance.Balance),
}
server.UpdateStateCode(commonpb.StateCode_Abnormal)
server.queryNodeCreator = session.DefaultQueryNodeCreator
Expand Down Expand Up @@ -461,8 +456,7 @@ func (s *Server) startQueryCoord() error {
s.handleNodeUp(node.ServerID)
}

s.wg.Add(2)
go s.handleNodeUpLoop()
s.wg.Add(1)
go s.watchNodes(revision)

// check whether old node exist, if yes suspend auto balance until all old nodes down
Expand Down Expand Up @@ -685,11 +679,7 @@ func (s *Server) watchNodes(revision int64) {
Hostname: event.Session.HostName,
Version: event.Session.Version,
}))
s.nodeUpEventChan <- nodeID
select {
case s.notifyNodeUp <- struct{}{}:
default:
}
s.handleNodeUp(nodeID)

case sessionutil.SessionUpdateEvent:
nodeID := event.Session.ServerID
Expand All @@ -713,49 +703,6 @@ func (s *Server) watchNodes(revision int64) {
}
}

func (s *Server) handleNodeUpLoop() {
defer s.wg.Done()
ticker := time.NewTicker(Params.QueryCoordCfg.CheckHealthInterval.GetAsDuration(time.Millisecond))
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
log.Info("handle node up loop exit due to context done")
return
case <-s.notifyNodeUp:
s.tryHandleNodeUp()
case <-ticker.C:
s.tryHandleNodeUp()
}
}
}

func (s *Server) tryHandleNodeUp() {
log := log.Ctx(s.ctx).WithRateGroup("qcv2.Server", 1, 60)
ctx, cancel := context.WithTimeout(s.ctx, Params.QueryCoordCfg.CheckHealthRPCTimeout.GetAsDuration(time.Millisecond))
defer cancel()
reasons, err := s.checkNodeHealth(ctx)
if err != nil {
log.RatedWarn(10, "unhealthy node exist, node up will be delayed",
zap.Int("delayedNodeUpEvents", len(s.nodeUpEventChan)),
zap.Int("unhealthyNodeNum", len(reasons)),
zap.Strings("unhealthyReason", reasons))
return
}
for len(s.nodeUpEventChan) > 0 {
nodeID := <-s.nodeUpEventChan
if s.nodeMgr.Get(nodeID) != nil {
// only if all nodes are healthy, node up event will be handled
s.handleNodeUp(nodeID)
s.metricsCacheManager.InvalidateSystemInfoMetrics()
s.checkerController.Check()
} else {
log.Warn("node already down",
zap.Int64("nodeID", nodeID))
}
}
}

func (s *Server) handleNodeUp(node int64) {
s.taskScheduler.AddExecutor(node)
s.distController.StartDistInstance(s.ctx, node)
Expand Down
49 changes: 0 additions & 49 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,6 @@ func (suite *ServerSuite) TestNodeUp() {
suite.NoError(err)
defer node1.Stop()

suite.server.notifyNodeUp <- struct{}{}
suite.Eventually(func() bool {
node := suite.server.nodeMgr.Get(node1.ID)
if node == nil {
Expand All @@ -207,54 +206,6 @@ func (suite *ServerSuite) TestNodeUp() {
}
return true
}, 5*time.Second, time.Second)

// mock unhealthy node
suite.server.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1001,
Address: "localhost",
Hostname: "localhost",
}))

node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli, 101)
node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: merr.Success()}, nil).Maybe()
err = node2.Start()
suite.NoError(err)
defer node2.Stop()

// expect node2 won't be add to qc, due to unhealthy nodes exist
suite.server.notifyNodeUp <- struct{}{}
suite.Eventually(func() bool {
node := suite.server.nodeMgr.Get(node2.ID)
if node == nil {
return false
}
for _, collection := range suite.collections {
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID)
if replica == nil {
return true
}
}
return false
}, 5*time.Second, time.Second)

// mock unhealthy node down, so no unhealthy nodes exist
suite.server.nodeMgr.Remove(1001)
suite.server.notifyNodeUp <- struct{}{}

// expect node2 will be add to qc
suite.Eventually(func() bool {
node := suite.server.nodeMgr.Get(node2.ID)
if node == nil {
return false
}
for _, collection := range suite.collections {
replica := suite.server.meta.ReplicaManager.GetByCollectionAndNode(collection, node2.ID)
if replica == nil {
return false
}
}
return true
}, 5*time.Second, time.Second)
}

func (suite *ServerSuite) TestNodeUpdate() {
Expand Down
Loading