Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve the retry of the rpc client #26797

Merged
merged 1 commit into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -393,10 +393,10 @@ grpc:
dialTimeout: 200
keepAliveTime: 10000
keepAliveTimeout: 20000
maxMaxAttempts: 5
initialBackOff: 1.0
maxBackoff: 10.0
backoffMultiplier: 2.0
maxMaxAttempts: 10
initialBackOff: 0.2 # seconds
maxBackoff: 10 # seconds
backoffMultiplier: 2.0 # deprecated
server:
retryTimes: 5 # retry times when receiving a grpc return value with a failure and retryable state code

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k=
github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo=
github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/datacoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.DataNodeRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/indexcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/indexnode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.IndexNodeRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/proxy/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.ProxyRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/querycoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error)
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
}
client.grpcClient.SetRole(typeutil.QueryNodeRole)
Expand Down
1 change: 0 additions & 1 deletion internal/distributed/rootcoord/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) (
MaxAttempts: ClientParams.MaxAttempts,
InitialBackoff: ClientParams.InitialBackoff,
MaxBackoff: ClientParams.MaxBackoff,
BackoffMultiplier: ClientParams.BackoffMultiplier,
},
sess: sess,
}
Expand Down
18 changes: 12 additions & 6 deletions internal/querycoordv2/meta/collection_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package meta

import (
"context"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -84,22 +85,26 @@ func (m *CollectionManager) Recover(broker Broker) error {
if err != nil {
return err
}
ctx := log.WithTraceID(context.Background(), strconv.FormatInt(time.Now().UnixNano(), 10))
ctxLog := log.Ctx(ctx)
ctxLog.Info("recover collections and partitions from kv store")

for _, collection := range collections {
// Dropped collection should be deprecated
_, err = broker.GetCollectionSchema(context.Background(), collection.GetCollectionID())
_, err = broker.GetCollectionSchema(ctx, collection.GetCollectionID())
if common.IsCollectionNotExistError(err) {
log.Warn("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
ctxLog.Warn("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID()))
m.store.ReleaseCollection(collection.GetCollectionID())
continue
}
if err != nil {
ctxLog.Warn("failed to get collection schema", zap.Error(err))
return err
}

// Collections not loaded done should be deprecated
if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 {
log.Info("skip recovery and release collection",
ctxLog.Info("skip recovery and release collection",
zap.Int64("collectionID", collection.GetCollectionID()),
zap.String("status", collection.GetStatus().String()),
zap.Int32("replicaNumber", collection.GetReplicaNumber()),
Expand All @@ -114,13 +119,14 @@ func (m *CollectionManager) Recover(broker Broker) error {
}

for collection, partitions := range partitions {
existPartitions, err := broker.GetPartitions(context.Background(), collection)
existPartitions, err := broker.GetPartitions(ctx, collection)
if common.IsCollectionNotExistError(err) {
log.Warn("skip dropped collection during recovery", zap.Int64("collection", collection))
ctxLog.Warn("skip dropped collection during recovery", zap.Int64("collection", collection))
m.store.ReleaseCollection(collection)
continue
}
if err != nil {
ctxLog.Warn("failed to get partitions", zap.Error(err))
return err
}
omitPartitions := make([]int64, 0)
Expand All @@ -132,7 +138,7 @@ func (m *CollectionManager) Recover(broker Broker) error {
return true
})
if len(omitPartitions) > 0 {
log.Warn("skip dropped partitions during recovery",
ctxLog.Warn("skip dropped partitions during recovery",
zap.Int64("collection", collection), zap.Int64s("partitions", omitPartitions))
m.store.ReleasePartition(collection, omitPartitions...)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec
return nil, err
}
if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("failed to get collection schema",
log.Ctx(ctx).Warn("failed to get collection schema",
zap.Int64("collectionID", collectionID),
zap.String("code", resp.GetStatus().GetErrorCode().String()),
zap.String("reason", resp.GetStatus().GetReason()))
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (s *Server) initMeta() error {
log.Info("recover meta...")
err := s.meta.CollectionManager.Recover(s.broker)
if err != nil {
log.Error("failed to recover collections")
log.Error("failed to recover collections", zap.Error(err))
return err
}

Expand All @@ -319,13 +319,13 @@ func (s *Server) initMeta() error {

err = s.meta.ReplicaManager.Recover(s.meta.CollectionManager.GetAll())
if err != nil {
log.Error("failed to recover replicas")
log.Error("failed to recover replicas", zap.Error(err))
return err
}

err = s.meta.ResourceManager.Recover()
if err != nil {
log.Error("failed to recover resource groups")
log.Error("failed to recover resource groups", zap.Error(err))
return err
}

Expand Down
12 changes: 9 additions & 3 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ func (suite *ServerSuite) SetupTest() {

for i := range suite.nodes {
suite.nodes[i] = mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli)
suite.nodes[i].EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe()
suite.nodes[i].EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
Status: &commonpb.Status{},
}, nil).Maybe()
err := suite.nodes[i].Start()
suite.Require().NoError(err)
ok := suite.waitNodeUp(suite.nodes[i], 5*time.Second)
Expand Down Expand Up @@ -158,7 +160,9 @@ func (suite *ServerSuite) TestNodeUp() {
}
node1 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli)

node1.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe()
node1.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
Status: &commonpb.Status{},
}, nil).Maybe()
node1.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: successStatus}, nil).Maybe()
err := node1.Start()
suite.NoError(err)
Expand All @@ -176,7 +180,9 @@ func (suite *ServerSuite) TestNodeUp() {
suite.server.nodeMgr.Add(session.NewNodeInfo(101, "localhost"))

node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli)
node2.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe()
node2.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{
Status: &commonpb.Status{},
}, nil).Maybe()
node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: successStatus}, nil).Maybe()
err = node2.Start()
suite.NoError(err)
Expand Down
2 changes: 1 addition & 1 deletion internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (c *Core) Init() error {
if err != nil {
return err
}
log.Info("RootCoord startup success")
log.Info("RootCoord startup success", zap.String("address", c.session.Address))
return nil
}
c.UpdateStateCode(commonpb.StateCode_StandBy)
Expand Down
Loading