Skip to content

Commit

Permalink
Improve the retry of the rpc client (#26797)
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <[email protected]>
  • Loading branch information
SimFG authored Sep 12, 2023
1 parent 3f425f0 commit 6023bcb
Show file tree
Hide file tree
Showing 21 changed files with 176 additions and 335 deletions.
8 changes: 4 additions & 4 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,10 @@ grpc:
dialTimeout: 200
keepAliveTime: 10000
keepAliveTimeout: 20000
maxMaxAttempts: 5
initialBackOff: 1.0
maxBackoff: 10.0
backoffMultiplier: 2.0
maxMaxAttempts: 10
initialBackOff: 0.2 # seconds
maxBackoff: 10 # seconds
backoffMultiplier: 2.0 # deprecated
server:
retryTimes: 5 # retry times when receiving a grpc return value with a failure and retryable state code

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k=
github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.DataNodeRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/indexcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/indexnode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.IndexNodeRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/proxy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.ProxyRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/querycoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.QueryNodeRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/rootcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
18 changes: 12 additions & 6 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package meta

import (
"context"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -84,22 +85,26 @@ func (m *CollectionManager) Recover(broker Broker) error {
if err != nil {
return err
}
ctx := log.WithTraceID(context.Background(), strconv.FormatInt(time.Now().UnixNano(), 10))
ctxLog := log.Ctx(ctx)
ctxLog.Info("recover collections and partitions from kv store")

for _, collection := range collections {
// Dropped collection should be deprecated
_, err = broker.GetCollectionSchema(context.Background(), collection.GetCollectionID())
_, err = broker.GetCollectionSchema(ctx, collection.GetCollectionID())
if common.IsCollectionNotExistError(err) {
log.Warn("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
ctxLog.Warn("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
m.store.ReleaseCollection(collection.GetCollectionID())
continue
}
if err != nil {
ctxLog.Warn("failed to get collection schema", zap.Error(err))
return err
}

// Collections not loaded done should be deprecated
if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 {
log.Info("skip recovery and release collection",
ctxLog.Info("skip recovery and release collection",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.String("status", collection.GetStatus().String()),
zap.Int32("replicaNumber", collection.GetReplicaNumber()),
Expand All @@ -114,13 +119,14 @@ func (m *CollectionManager) Recover(broker Broker) error {
}

for collection, partitions := range partitions {
existPartitions, err := broker.GetPartitions(context.Background(), collection)
existPartitions, err := broker.GetPartitions(ctx, collection)
if common.IsCollectionNotExistError(err) {
log.Warn("skip dropped collection during recovery", zap.Int64("collection", collection))
ctxLog.Warn("skip dropped collection during recovery", zap.Int64("collection", collection))
m.store.ReleaseCollection(collection)
continue
}
if err != nil {
ctxLog.Warn("failed to get partitions", zap.Error(err))
return err
}
omitPartitions := make([]int64, 0)
Expand All @@ -132,7 +138,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
return true
})
if len(omitPartitions) > 0 {
log.Warn("skip dropped partitions during recovery",
ctxLog.Warn("skip dropped partitions during recovery",
zap.Int64("collection", collection), zap.Int64s("partitions", omitPartitions))
m.store.ReleasePartition(collection, omitPartitions...)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec
return nil, err
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("failed to get collection schema",
log.Ctx(ctx).Warn("failed to get collection schema",
zap.Int64("collectionID", collectionID),
zap.String("code", resp.GetStatus().GetErrorCode().String()),
zap.String("reason", resp.GetStatus().GetReason()))
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (s *Server) initMeta() error {
log.Info("recover meta...")
err := s.meta.CollectionManager.Recover(s.broker)
if err != nil {
log.Error("failed to recover collections")
log.Error("failed to recover collections", zap.Error(err))
return err
}

Expand All @@ -319,13 +319,13 @@ func (s *Server) initMeta() error {

err = s.meta.ReplicaManager.Recover(s.meta.CollectionManager.GetAll())
if err != nil {
log.Error("failed to recover replicas")
log.Error("failed to recover replicas", zap.Error(err))
return err
}

err = s.meta.ResourceManager.Recover()
if err != nil {
log.Error("failed to recover resource groups")
log.Error("failed to recover resource groups", zap.Error(err))
return err
}

Expand Down
12 changes: 9 additions & 3 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func (suite *ServerSuite) SetupTest() {

for i := range suite.nodes {
suite.nodes[i] = mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli)
suite.nodes[i].EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe()
suite.nodes[i].EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
Status: &commonpb.Status{},
}, nil).Maybe()
err := suite.nodes[i].Start()
suite.Require().NoError(err)
ok := suite.waitNodeUp(suite.nodes[i], 5*time.Second)
Expand Down Expand Up @@ -158,7 +160,9 @@ func (suite *ServerSuite) TestNodeUp() {
}
node1 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli)

node1.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe()
node1.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
Status: &commonpb.Status{},
}, nil).Maybe()
node1.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: successStatus}, nil).Maybe()
err := node1.Start()
suite.NoError(err)
Expand All @@ -176,7 +180,9 @@ func (suite *ServerSuite) TestNodeUp() {
suite.server.nodeMgr.Add(session.NewNodeInfo(101, "localhost"))

node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli)
node2.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe()
node2.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
Status: &commonpb.Status{},
}, nil).Maybe()
node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: successStatus}, nil).Maybe()
err = node2.Start()
suite.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (c *Core) Init() error {
if err != nil {
return err
}
log.Info("RootCoord startup success")
log.Info("RootCoord startup success", zap.String("address", c.session.Address))
return nil
}
c.UpdateStateCode(commonpb.StateCode_StandBy)
Expand Down
Loading

0 comments on commit 6023bcb

Please sign in to comment.