Skip to content

Commit

Permalink
fix: delay the wait other coord logic in query coord after query coor…
Browse files Browse the repository at this point in the history
…d change into standby state (#38259)

issue: #37764

- After removing rpc layer from mixcoord, the querycoord at standby mode
will be blocked forever of deployment rolling

---------

Signed-off-by: chyezh <[email protected]>
  • Loading branch information
chyezh authored Dec 11, 2024
1 parent 43e0e2b commit d3ae8e9
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 24 deletions.
16 changes: 0 additions & 16 deletions internal/distributed/querycoord/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/querypb"
qc "github.com/milvus-io/milvus/internal/querycoordv2"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/dependency"
_ "github.com/milvus-io/milvus/internal/util/grpcclient"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -172,33 +171,18 @@ func (s *Server) init() error {
}

// wait for master init or healthy
log.Info("QueryCoord try to wait for RootCoord ready")
err = componentutil.WaitForComponentHealthy(s.loopCtx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err))
panic(err)
}

if err := s.SetRootCoord(s.rootCoord); err != nil {
panic(err)
}
log.Info("QueryCoord report RootCoord ready")

// --- Data service client ---
if s.dataCoord == nil {
s.dataCoord = coordclient.GetDataCoordClient(s.loopCtx)
}

log.Info("QueryCoord try to wait for DataCoord ready")
err = componentutil.WaitForComponentHealthy(s.loopCtx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err))
panic(err)
}
if err := s.SetDataCoord(s.dataCoord); err != nil {
panic(err)
}
log.Info("QueryCoord report DataCoord ready")

if err := s.queryCoord.Init(); err != nil {
return err
Expand Down
8 changes: 0 additions & 8 deletions internal/distributed/querycoord/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,8 @@ func Test_NewServer(t *testing.T) {
assert.NotNil(t, server)

mdc := mocks.NewMockDataCoordClient(t)
mdc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil)

mrc := mocks.NewMockRootCoordClient(t)
mrc.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{StateCode: commonpb.StateCode_Healthy},
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
}, nil)

mqc := getQueryCoord()
successStatus := merr.Success()
Expand Down
17 changes: 17 additions & 0 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/proxyutil"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/tsoutil"
Expand Down Expand Up @@ -276,6 +277,22 @@ func (s *Server) Init() error {
}

func (s *Server) initQueryCoord() error {
// wait for master init or healthy
log.Info("QueryCoord try to wait for RootCoord ready")
if err := componentutil.WaitForComponentHealthy(s.ctx, s.rootCoord, "RootCoord", 1000000, time.Millisecond*200); err != nil {
log.Error("QueryCoord wait for RootCoord ready failed", zap.Error(err))
return errors.Wrap(err, "RootCoord not ready")
}
log.Info("QueryCoord report RootCoord ready")

// wait for master init or healthy
log.Info("QueryCoord try to wait for DataCoord ready")
if err := componentutil.WaitForComponentHealthy(s.ctx, s.dataCoord, "DataCoord", 1000000, time.Millisecond*200); err != nil {
log.Error("QueryCoord wait for DataCoord ready failed", zap.Error(err))
return errors.Wrap(err, "DataCoord not ready")
}
log.Info("QueryCoord report DataCoord ready")

s.UpdateStateCode(commonpb.StateCode_Initializing)
log.Info("start init querycoord", zap.Any("State", commonpb.StateCode_Initializing))
// Init KV and ID allocator
Expand Down
24 changes: 24 additions & 0 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,19 @@ func (suite *ServerSuite) TestEnableActiveStandby() {
suite.server, err = suite.newQueryCoord()
suite.NoError(err)
mockRootCoord := coordMocks.NewMockRootCoordClient(suite.T())
mockRootCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
StateCode: commonpb.StateCode_Healthy,
},
Status: merr.Success(),
}, nil).Maybe()
mockDataCoord := coordMocks.NewMockDataCoordClient(suite.T())
mockDataCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
StateCode: commonpb.StateCode_Healthy,
},
Status: merr.Success(),
}, nil).Maybe()

mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Status: merr.Success(),
Expand Down Expand Up @@ -612,7 +624,19 @@ func (suite *ServerSuite) hackServer() {

func (suite *ServerSuite) hackBroker(server *Server) {
mockRootCoord := coordMocks.NewMockRootCoordClient(suite.T())
mockRootCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
StateCode: commonpb.StateCode_Healthy,
},
Status: merr.Success(),
}, nil).Maybe()
mockDataCoord := coordMocks.NewMockDataCoordClient(suite.T())
mockDataCoord.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
State: &milvuspb.ComponentInfo{
StateCode: commonpb.StateCode_Healthy,
},
Status: merr.Success(),
}, nil).Maybe()

for _, collection := range suite.collections {
mockRootCoord.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(&milvuspb.DescribeCollectionResponse{
Expand Down

0 comments on commit d3ae8e9

Please sign in to comment.