diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 7d164267c91e7..a78d7d42d5d1f 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -393,10 +393,10 @@ grpc: dialTimeout: 200 keepAliveTime: 10000 keepAliveTimeout: 20000 - maxMaxAttempts: 5 - initialBackOff: 1.0 - maxBackoff: 10.0 - backoffMultiplier: 2.0 + maxMaxAttempts: 10 + initialBackOff: 0.2 # seconds + maxBackoff: 10 # seconds + backoffMultiplier: 2.0 # deprecated server: retryTimes: 5 # retry times when receiving a grpc return value with a failure and retryable state code diff --git a/go.sum b/go.sum index fc731e6784666..a6d7657066dbb 100644 --- a/go.sum +++ b/go.sum @@ -293,6 +293,7 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k= github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= diff --git a/internal/distributed/datacoord/client/client.go b/internal/distributed/datacoord/client/client.go index 2795b0e188cb6..bfb738b7cd6d6 100644 --- a/internal/distributed/datacoord/client/client.go +++ b/internal/distributed/datacoord/client/client.go @@ -71,7 +71,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) ( MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, sess: sess, } diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 8c468136e0c1a..5289c6ff94e51 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -60,7 +60,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, } client.grpcClient.SetRole(typeutil.DataNodeRole) diff --git a/internal/distributed/indexcoord/client/client.go b/internal/distributed/indexcoord/client/client.go index 84c8b1384ae95..4398d20bb8881 100644 --- a/internal/distributed/indexcoord/client/client.go +++ b/internal/distributed/indexcoord/client/client.go @@ -67,7 +67,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) ( MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, sess: sess, } diff --git a/internal/distributed/indexnode/client/client.go b/internal/distributed/indexnode/client/client.go index 20659d58d048a..f8c90cab44cac 100644 --- a/internal/distributed/indexnode/client/client.go +++ b/internal/distributed/indexnode/client/client.go @@ -61,7 +61,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64, encryption bool) MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, } client.grpcClient.SetRole(typeutil.IndexNodeRole) diff --git a/internal/distributed/proxy/client/client.go b/internal/distributed/proxy/client/client.go index 3080003f137db..9fa20a6a0ee07 100644 --- a/internal/distributed/proxy/client/client.go +++ b/internal/distributed/proxy/client/client.go @@ -60,7 +60,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, } client.grpcClient.SetRole(typeutil.ProxyRole) diff --git a/internal/distributed/querycoord/client/client.go b/internal/distributed/querycoord/client/client.go index e127a336788a6..67104e45be277 100644 --- a/internal/distributed/querycoord/client/client.go +++ b/internal/distributed/querycoord/client/client.go @@ -66,7 +66,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) ( MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, sess: sess, } diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 9493be9951d09..8a9ef8c688429 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -61,7 +61,6 @@ func NewClient(ctx context.Context, addr string, nodeID int64) (*Client, error) MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, } client.grpcClient.SetRole(typeutil.QueryNodeRole) diff --git a/internal/distributed/rootcoord/client/client.go b/internal/distributed/rootcoord/client/client.go index 9c48b8af953a7..b693c648cf81e 100644 --- a/internal/distributed/rootcoord/client/client.go +++ b/internal/distributed/rootcoord/client/client.go @@ -74,7 +74,6 @@ func NewClient(ctx context.Context, metaRoot string, etcdCli *clientv3.Client) ( MaxAttempts: ClientParams.MaxAttempts, InitialBackoff: ClientParams.InitialBackoff, MaxBackoff: ClientParams.MaxBackoff, - BackoffMultiplier: ClientParams.BackoffMultiplier, }, sess: sess, } diff --git a/internal/querycoordv2/meta/collection_manager.go b/internal/querycoordv2/meta/collection_manager.go index 4e7f99b3b7bfa..b117965315767 100644 --- a/internal/querycoordv2/meta/collection_manager.go +++ b/internal/querycoordv2/meta/collection_manager.go @@ -18,6 +18,7 @@ package meta import ( "context" + "strconv" "sync" "time" @@ -84,22 +85,26 @@ func (m *CollectionManager) Recover(broker Broker) error { if err != nil { return err } + ctx := log.WithTraceID(context.Background(), strconv.FormatInt(time.Now().UnixNano(), 10)) + ctxLog := log.Ctx(ctx) + ctxLog.Info("recover collections and partitions from kv store") for _, collection := range collections { // Dropped collection should be deprecated - _, err = broker.GetCollectionSchema(context.Background(), collection.GetCollectionID()) + _, err = broker.GetCollectionSchema(ctx, collection.GetCollectionID()) if common.IsCollectionNotExistError(err) { - log.Warn("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID())) + ctxLog.Warn("skip dropped collection during recovery", zap.Int64("collection", collection.GetCollectionID())) m.store.ReleaseCollection(collection.GetCollectionID()) continue } if err != nil { + ctxLog.Warn("failed to get collection schema", zap.Error(err)) return err } // Collections not loaded done should be deprecated if collection.GetStatus() != querypb.LoadStatus_Loaded || collection.GetReplicaNumber() <= 0 { - log.Info("skip recovery and release collection", + ctxLog.Info("skip recovery and release collection", zap.Int64("collectionID", collection.GetCollectionID()), zap.String("status", collection.GetStatus().String()), zap.Int32("replicaNumber", collection.GetReplicaNumber()), @@ -114,13 +119,14 @@ func (m *CollectionManager) Recover(broker Broker) error { } for collection, partitions := range partitions { - existPartitions, err := broker.GetPartitions(context.Background(), collection) + existPartitions, err := broker.GetPartitions(ctx, collection) if common.IsCollectionNotExistError(err) { - log.Warn("skip dropped collection during recovery", zap.Int64("collection", collection)) + ctxLog.Warn("skip dropped collection during recovery", zap.Int64("collection", collection)) m.store.ReleaseCollection(collection) continue } if err != nil { + ctxLog.Warn("failed to get partitions", zap.Error(err)) return err } omitPartitions := make([]int64, 0) @@ -132,7 +138,7 @@ func (m *CollectionManager) Recover(broker Broker) error { return true }) if len(omitPartitions) > 0 { - log.Warn("skip dropped partitions during recovery", + ctxLog.Warn("skip dropped partitions during recovery", zap.Int64("collection", collection), zap.Int64s("partitions", omitPartitions)) m.store.ReleasePartition(collection, omitPartitions...) } diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 71f728d9146ca..a57f6c9055c8b 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -85,7 +85,7 @@ func (broker *CoordinatorBroker) GetCollectionSchema(ctx context.Context, collec return nil, err } if resp.GetStatus().GetErrorCode() != commonpb.ErrorCode_Success { - log.Warn("failed to get collection schema", + log.Ctx(ctx).Warn("failed to get collection schema", zap.Int64("collectionID", collectionID), zap.String("code", resp.GetStatus().GetErrorCode().String()), zap.String("reason", resp.GetStatus().GetReason())) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index a29938e741af1..91c79e58eff20 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -310,7 +310,7 @@ func (s *Server) initMeta() error { log.Info("recover meta...") err := s.meta.CollectionManager.Recover(s.broker) if err != nil { - log.Error("failed to recover collections") + log.Error("failed to recover collections", zap.Error(err)) return err } @@ -319,13 +319,13 @@ func (s *Server) initMeta() error { err = s.meta.ReplicaManager.Recover(s.meta.CollectionManager.GetAll()) if err != nil { - log.Error("failed to recover replicas") + log.Error("failed to recover replicas", zap.Error(err)) return err } err = s.meta.ResourceManager.Recover() if err != nil { - log.Error("failed to recover resource groups") + log.Error("failed to recover resource groups", zap.Error(err)) return err } diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index fdcceb6a46377..2c2062fdaf09a 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -109,7 +109,9 @@ func (suite *ServerSuite) SetupTest() { for i := range suite.nodes { suite.nodes[i] = mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli) - suite.nodes[i].EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe() + suite.nodes[i].EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ + Status: &commonpb.Status{}, + }, nil).Maybe() err := suite.nodes[i].Start() suite.Require().NoError(err) ok := suite.waitNodeUp(suite.nodes[i], 5*time.Second) @@ -158,7 +160,9 @@ func (suite *ServerSuite) TestNodeUp() { } node1 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli) - node1.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe() + node1.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ + Status: &commonpb.Status{}, + }, nil).Maybe() node1.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: successStatus}, nil).Maybe() err := node1.Start() suite.NoError(err) @@ -176,7 +180,9 @@ func (suite *ServerSuite) TestNodeUp() { suite.server.nodeMgr.Add(session.NewNodeInfo(101, "localhost")) node2 := mocks.NewMockQueryNode(suite.T(), suite.server.etcdCli) - node2.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{}, nil).Maybe() + node2.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return(&milvuspb.ComponentStates{ + Status: &commonpb.Status{}, + }, nil).Maybe() node2.EXPECT().GetDataDistribution(mock.Anything, mock.Anything).Return(&querypb.GetDataDistributionResponse{Status: successStatus}, nil).Maybe() err = node2.Start() suite.NoError(err) diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index dd76b6aadc49d..10eee17f68a64 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -513,7 +513,7 @@ func (c *Core) Init() error { if err != nil { return err } - log.Info("RootCoord startup success") + log.Info("RootCoord startup success", zap.String("address", c.session.Address)) return nil } c.UpdateStateCode(commonpb.StateCode_StandBy) diff --git a/internal/util/grpcclient/client.go b/internal/util/grpcclient/client.go index bf89b386427f0..b27294e4ba2e1 100644 --- a/internal/util/grpcclient/client.go +++ b/internal/util/grpcclient/client.go @@ -19,21 +19,14 @@ package grpcclient import ( "context" "crypto/tls" - "fmt" "strings" "sync" "time" + "github.com/cockroachdb/errors" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpcopentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing" - "go.uber.org/zap" - "golang.org/x/sync/singleflight" - "google.golang.org/grpc" - "google.golang.org/grpc/backoff" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/keepalive" - + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util" @@ -44,6 +37,12 @@ import ( "github.com/milvus-io/milvus/internal/util/retry" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/trace" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/backoff" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" ) // GrpcClient abstracts client of grpc @@ -84,14 +83,11 @@ type ClientBase[T interface { KeepAliveTime time.Duration KeepAliveTimeout time.Duration - MaxAttempts int - InitialBackoff float32 - MaxBackoff float32 - BackoffMultiplier float32 - NodeID int64 - sess *sessionutil.Session - - sf singleflight.Group + MaxAttempts int + InitialBackoff float64 + MaxBackoff float64 + NodeID int64 + sess *sessionutil.Session } // SetRole sets role of client @@ -169,19 +165,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { opts := trace.GetInterceptorOpts() dialContext, cancel := context.WithTimeout(ctx, c.DialTimeout) - // refer to https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto - retryPolicy := fmt.Sprintf(`{ - "methodConfig": [{ - "name": [{"service": "%s"}], - "retryPolicy": { - "MaxAttempts": %d, - "InitialBackoff": "%fs", - "MaxBackoff": "%fs", - "BackoffMultiplier": %f, - "RetryableStatusCodes": [ "UNAVAILABLE" ] - } - }]}`, c.RetryServiceNameConfig, c.MaxAttempts, c.InitialBackoff, c.MaxBackoff, c.BackoffMultiplier) - var conn *grpc.ClientConn if c.encryption { conn, err = grpc.DialContext( @@ -204,7 +187,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { interceptor.ClusterInjectionStreamClientInterceptor(), interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()), )), - grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: c.KeepAliveTime, Timeout: c.KeepAliveTimeout, @@ -222,6 +204,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}), grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), + grpc.WithDisableRetry(), ) } else { conn, err = grpc.DialContext( @@ -243,7 +226,6 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { interceptor.ClusterInjectionStreamClientInterceptor(), interceptor.ServerIDInjectionStreamClientInterceptor(c.GetNodeID()), )), - grpc.WithDefaultServiceConfig(retryPolicy), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: c.KeepAliveTime, Timeout: c.KeepAliveTimeout, @@ -261,6 +243,7 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { grpc.WithPerRPCCredentials(&Token{Value: crypto.Base64Encode(util.MemberCredID)}), grpc.FailOnNonTempDialError(true), grpc.WithReturnConnectionError(), + grpc.WithDisableRetry(), ) } @@ -277,62 +260,102 @@ func (c *ClientBase[T]) connect(ctx context.Context) error { return nil } -func (c *ClientBase[T]) callOnce(ctx context.Context, caller func(client T) (any, error)) (any, error) { - log := log.Ctx(ctx).With(zap.String("role", c.GetRole())) - client, err := c.GetGrpcClient(ctx) - if err != nil { - return generic.Zero[T](), err - } - - var ret any +func (c *ClientBase[T]) call(ctx context.Context, caller func(client T) (any, error)) (any, error) { + log := log.Ctx(ctx).With(zap.String("client_role", c.GetRole())) + var ( + ret any + clientErr error + callErr error + client T + ) - _, _ = retry.DoGrpc(ctx, uint(c.MaxAttempts*2), func() (any, error) { - ret, err = caller(client) - return ret, err - }) - if err == nil { - return ret, nil + client, clientErr = c.GetGrpcClient(ctx) + if clientErr != nil { + log.Warn("fail to get grpc client", zap.Error(clientErr)) } - if IsCrossClusterRoutingErr(err) { - log.Warn("CrossClusterRoutingErr, start to reset connection", zap.Error(err)) - c.resetConnection(client) - return ret, interceptor.ErrServiceUnavailable // For concealing ErrCrossClusterRouting from the client - } - if IsServerIDMismatchErr(err) { - log.Warn("Server ID mismatch, start to reset connection", zap.Error(err)) + resetClientFunc := func() { c.resetConnection(client) - return ret, err + client, clientErr = c.GetGrpcClient(ctx) + if clientErr != nil { + log.Warn("fail to get grpc client in the retry state", zap.Error(clientErr)) + } } - if !funcutil.CheckCtxValid(ctx) { - // check if server ID matches coord session, if not, reset connection - if c.sess != nil { - sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole()) - if getSessionErr != nil { - // Only log but not handle this error as it is an auxiliary logic - log.Warn("Fail to GetSessions", zap.Error(getSessionErr)) + innerCtx, cancel := context.WithCancel(ctx) + defer cancel() + _ = retry.Do(innerCtx, func() error { + if generic.IsZero(client) { + callErr = errors.Wrap(clientErr, "empty grpc client") + log.Warn("grpc client is nil, maybe fail to get client in the retry state") + resetClientFunc() + return callErr + } + ret, callErr = caller(client) + if callErr != nil { + if funcutil.IsGrpcErr(callErr) || + IsCrossClusterRoutingErr(callErr) || IsServerIDMismatchErr(callErr) { + log.Warn("start to reset connection because of specific reasons", zap.Error(callErr)) + resetClientFunc() + return callErr } - if coordSess, exist := sessions[c.GetRole()]; exist { - if c.GetNodeID() != coordSess.ServerID { - log.Warn("Server ID mismatch, may connected to a old server, start to reset connection", zap.Error(err)) - c.resetConnection(client) - return ret, err + if !funcutil.CheckCtxValid(ctx) { + // check if server ID matches coord session, if not, reset connection + if c.sess != nil { + sessions, _, getSessionErr := c.sess.GetSessions(c.GetRole()) + if getSessionErr != nil { + // Only log but not handle this error as it is an auxiliary logic + log.Warn("Fail to GetSessions", zap.Error(getSessionErr)) + } + if coordSess, exist := sessions[c.GetRole()]; exist { + if c.GetNodeID() != coordSess.ServerID { + log.Warn("server id mismatch, may connected to a old server, start to reset connection", zap.Error(callErr)) + resetClientFunc() + return callErr + } + } } } + + log.Warn("fail to grpc call because of unknown error", zap.Error(callErr)) + // not rpc error, it will stop to retry + return retry.Unrecoverable(callErr) } - // start bg check in case of https://github.com/milvus-io/milvus/issues/22435 - go c.bgHealthCheck(client) - return generic.Zero[T](), err - } - if !funcutil.IsGrpcErr(err) { - log.Warn("ClientBase:isNotGrpcErr", zap.Error(err)) - return generic.Zero[T](), err + var errorCode commonpb.ErrorCode + switch res := ret.(type) { + case *commonpb.Status: + errorCode = res.GetErrorCode() + case interface{ GetStatus() *commonpb.Status }: + errorCode = res.GetStatus().GetErrorCode() + default: + // it will directly return the result + log.Warn("unknown return type", zap.Any("return", ret)) + return nil + } + + if errorCode == commonpb.ErrorCode_Success { + return nil + } + + // This is just to wait for the service to switch from not ready to healthy state, without reset. + if errorCode == commonpb.ErrorCode_NotReadyServe { + return errors.New("not ready to server") + } + + return nil + }, retry.Attempts(uint(c.MaxAttempts)), + // Because the previous InitialBackoff and MaxBackoff were float, and the unit was s. + // For compatibility, this is multiplied by 1000. + retry.Sleep(time.Duration(c.InitialBackoff*1000)*time.Millisecond), + retry.MaxSleepTime(time.Duration(c.MaxBackoff*1000)*time.Millisecond)) + // default value list: MaxAttempts 10, InitialBackoff 0.2s, MaxBackoff 10s + // and consume 52.8s if all retry failed + + if callErr != nil { + return generic.Zero[T](), callErr } - log.Info("ClientBase grpc error, start to reset connection", zap.Error(err)) - c.resetConnection(client) - return ret, err + return ret, nil } // Call does a grpc call @@ -341,9 +364,9 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er return generic.Zero[T](), ctx.Err() } - ret, err := c.callOnce(ctx, caller) + ret, err := c.call(ctx, caller) if err != nil { - traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) + traceErr := errors.Wrapf(err, "stack trace: %s", trace.StackTrace()) log.Warn("ClientBase Call grpc first call get error", zap.String("role", c.GetRole()), zap.Error(traceErr), @@ -355,47 +378,8 @@ func (c *ClientBase[T]) Call(ctx context.Context, caller func(client T) (any, er // ReCall does the grpc call twice func (c *ClientBase[T]) ReCall(ctx context.Context, caller func(client T) (any, error)) (any, error) { - if !funcutil.CheckCtxValid(ctx) { - return generic.Zero[T](), ctx.Err() - } - - ret, err := c.callOnce(ctx, caller) - if err == nil { - return ret, nil - } - - traceErr := fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) - log.Warn("ClientBase ReCall grpc first call get error", - zap.String("role", c.GetRole()), - zap.Error(traceErr), - ) - - if !funcutil.CheckCtxValid(ctx) { - return generic.Zero[T](), ctx.Err() - } - - ret, err = c.callOnce(ctx, caller) - if err != nil { - traceErr = fmt.Errorf("err: %w\n, %s", err, trace.StackTrace()) - log.Error("ClientBase ReCall grpc second call get error", zap.String("role", c.GetRole()), zap.Error(traceErr)) - return generic.Zero[T](), traceErr - } - return ret, err -} - -func (c *ClientBase[T]) bgHealthCheck(client T) { - c.sf.Do("healthcheck", func() (any, error) { - // v2.2.0 does not has paramtable, use magic nubmer here - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - - _, err := client.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - c.resetConnection(client) - } - - return struct{}{}, nil - }) + // All retry operations are done in `call` function. + return c.Call(ctx, caller) } // Close close the client connection diff --git a/internal/util/grpcclient/client_test.go b/internal/util/grpcclient/client_test.go index 84e2ff5dc429d..4eadeaa955aae 100644 --- a/internal/util/grpcclient/client_test.go +++ b/internal/util/grpcclient/client_test.go @@ -24,7 +24,6 @@ import ( "math/rand" "net" "strings" - "sync" "testing" "time" @@ -90,11 +89,18 @@ func TestClientBase_connect(t *testing.T) { func TestClientBase_Call(t *testing.T) { // mock client with nothing base := ClientBase[*mockClient]{} - base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} - base.grpcClientMtx.Unlock() + initClient := func() { + base.grpcClientMtx.Lock() + base.grpcClient = &mockClient{} + base.grpcClientMtx.Unlock() + } + base.MaxAttempts = 1 + base.SetGetAddrFunc(func() (string, error) { + return "", errors.New("mocked address error") + }) t.Run("Call normal return", func(t *testing.T) { + initClient() _, err := base.Call(context.Background(), func(client *mockClient) (any, error) { return struct{}{}, nil }) @@ -102,6 +108,7 @@ func TestClientBase_Call(t *testing.T) { }) t.Run("Call with canceled context", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := base.Call(ctx, func(client *mockClient) (any, error) { @@ -112,6 +119,7 @@ func TestClientBase_Call(t *testing.T) { }) t.Run("Call canceled in caller func", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) errMock := errors.New("mocked") _, err := base.Call(ctx, func(client *mockClient) (any, error) { @@ -128,6 +136,7 @@ func TestClientBase_Call(t *testing.T) { }) t.Run("Call returns non-grpc error", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() errMock := errors.New("mocked") @@ -144,6 +153,7 @@ func TestClientBase_Call(t *testing.T) { }) t.Run("Call returns grpc error", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) defer cancel() errGrpc := status.Error(codes.Unknown, "mocked") @@ -179,11 +189,18 @@ func TestClientBase_Call(t *testing.T) { func TestClientBase_Recall(t *testing.T) { // mock client with nothing base := ClientBase[*mockClient]{} - base.grpcClientMtx.Lock() - base.grpcClient = &mockClient{} - base.grpcClientMtx.Unlock() + initClient := func() { + base.grpcClientMtx.Lock() + base.grpcClient = &mockClient{} + base.grpcClientMtx.Unlock() + } + base.MaxAttempts = 1 + base.SetGetAddrFunc(func() (string, error) { + return "", errors.New("mocked address error") + }) t.Run("Recall normal return", func(t *testing.T) { + initClient() _, err := base.ReCall(context.Background(), func(client *mockClient) (any, error) { return struct{}{}, nil }) @@ -191,6 +208,7 @@ func TestClientBase_Recall(t *testing.T) { }) t.Run("ReCall with canceled context", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) cancel() _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { @@ -200,24 +218,8 @@ func TestClientBase_Recall(t *testing.T) { assert.True(t, errors.Is(err, context.Canceled)) }) - t.Run("ReCall fails first and success second", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - flag := false - var mut sync.Mutex - _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { - mut.Lock() - defer mut.Unlock() - if flag { - return struct{}{}, nil - } - flag = true - return nil, errors.New("mock first") - }) - assert.NoError(t, err) - }) - t.Run("ReCall canceled in caller func", func(t *testing.T) { + initClient() ctx, cancel := context.WithCancel(context.Background()) errMock := errors.New("mocked") _, err := base.ReCall(ctx, func(client *mockClient) (any, error) { @@ -226,7 +228,7 @@ func TestClientBase_Recall(t *testing.T) { }) assert.Error(t, err) - assert.True(t, errors.Is(err, context.Canceled)) + assert.True(t, errors.Is(err, errMock)) base.grpcClientMtx.RLock() // client shall not be reset assert.NotNil(t, base.grpcClient) @@ -283,7 +285,7 @@ func TestClientBase_RetryPolicy(t *testing.T) { Timeout: 60 * time.Second, } - maxAttempts := 5 + maxAttempts := 1 s := grpc.NewServer( grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), @@ -307,7 +309,6 @@ func TestClientBase_RetryPolicy(t *testing.T) { MaxAttempts: maxAttempts, InitialBackoff: 10.0, MaxBackoff: 60.0, - BackoffMultiplier: 2.0, } clientBase.SetRole(typeutil.DataCoordRole) clientBase.SetGetAddrFunc(func() (string, error) { diff --git a/internal/util/paramtable/grpc_param.go b/internal/util/paramtable/grpc_param.go index 939b4d2bff96b..f9cd32e66ba54 100644 --- a/internal/util/paramtable/grpc_param.go +++ b/internal/util/paramtable/grpc_param.go @@ -44,10 +44,9 @@ const ( DefaultKeepAliveTimeout = 20000 * time.Millisecond // Grpc retry policy - DefaultMaxAttempts = 5 - DefaultInitialBackoff float32 = 1.0 - DefaultMaxBackoff float32 = 10.0 - DefaultBackoffMultiplier float32 = 2.0 + DefaultMaxAttempts = 10 + DefaultInitialBackoff float64 = 0.2 + DefaultMaxBackoff float64 = 10 ProxyInternalPort = 19529 ProxyExternalPort = 19530 @@ -187,10 +186,9 @@ type GrpcClientConfig struct { KeepAliveTime time.Duration KeepAliveTimeout time.Duration - MaxAttempts int - InitialBackoff float32 - MaxBackoff float32 - BackoffMultiplier float32 + MaxAttempts int + InitialBackoff float64 + MaxBackoff float64 } // InitOnce initialize grpc client config once @@ -211,7 +209,6 @@ func (p *GrpcClientConfig) init(domain string) { p.initMaxAttempts() p.initInitialBackoff() p.initMaxBackoff() - p.initBackoffMultiplier() } func (p *GrpcClientConfig) ParseConfig(funcDesc string, key string, backKey string, parseValue func(string) (interface{}, error), applyValue func(interface{}, error)) { @@ -390,7 +387,7 @@ func (p *GrpcClientConfig) initInitialBackoff() { key := "grpc.client.initialBackOff" p.ParseConfig(funcDesc, key, "", func(s string) (interface{}, error) { - return strconv.ParseFloat(s, 32) + return strconv.ParseFloat(s, 64) }, func(i interface{}, err error) { if err != nil { @@ -404,7 +401,7 @@ func (p *GrpcClientConfig) initInitialBackoff() { p.InitialBackoff = DefaultInitialBackoff return } - p.InitialBackoff = float32(v) + p.InitialBackoff = v }) } @@ -413,7 +410,7 @@ func (p *GrpcClientConfig) initMaxBackoff() { key := "grpc.client.maxBackoff" p.ParseConfig(funcDesc, key, "", func(s string) (interface{}, error) { - return strconv.ParseFloat(s, 32) + return strconv.ParseFloat(s, 64) }, func(i interface{}, err error) { if err != nil { @@ -427,29 +424,6 @@ func (p *GrpcClientConfig) initMaxBackoff() { p.MaxBackoff = DefaultMaxBackoff return } - p.MaxBackoff = float32(v) - }) -} - -func (p *GrpcClientConfig) initBackoffMultiplier() { - funcDesc := "Init back off multiplier" - key := "grpc.client.backoffMultiplier" - p.ParseConfig(funcDesc, key, "", - func(s string) (interface{}, error) { - return strconv.ParseFloat(s, 32) - }, - func(i interface{}, err error) { - if err != nil { - p.BackoffMultiplier = DefaultBackoffMultiplier - return - } - v, ok := i.(float64) - if !ok { - log.Warn(fmt.Sprintf("Failed to convert float64 when parsing %s, set to default", key), - zap.String("role", p.Domain), zap.Any(key, i)) - p.BackoffMultiplier = DefaultBackoffMultiplier - return - } - p.BackoffMultiplier = float32(v) + p.MaxBackoff = v }) } diff --git a/internal/util/paramtable/grpc_param_test.go b/internal/util/paramtable/grpc_param_test.go index fb65640a053ca..a8b471ee38d68 100644 --- a/internal/util/paramtable/grpc_param_test.go +++ b/internal/util/paramtable/grpc_param_test.go @@ -155,7 +155,7 @@ func TestGrpcClientParams(t *testing.T) { assert.Equal(t, Params.InitialBackoff, DefaultInitialBackoff) Params.Save("grpc.client.initialBackOff", "2.0") Params.initInitialBackoff() - assert.Equal(t, Params.InitialBackoff, float32(2.0)) + assert.Equal(t, Params.InitialBackoff, 2.0) Params.initMaxBackoff() assert.Equal(t, Params.MaxBackoff, DefaultMaxBackoff) @@ -164,16 +164,7 @@ func TestGrpcClientParams(t *testing.T) { assert.Equal(t, Params.MaxBackoff, DefaultMaxBackoff) Params.Save("grpc.client.maxBackOff", "50.0") Params.initMaxBackoff() - assert.Equal(t, Params.MaxBackoff, float32(50.0)) - - Params.initBackoffMultiplier() - assert.Equal(t, Params.BackoffMultiplier, DefaultBackoffMultiplier) - Params.Save("grpc.client.backoffMultiplier", "a") - Params.initBackoffMultiplier() - assert.Equal(t, Params.BackoffMultiplier, DefaultBackoffMultiplier) - Params.Save("grpc.client.backoffMultiplier", "3.0") - Params.initBackoffMultiplier() - assert.Equal(t, Params.BackoffMultiplier, float32(3.0)) + assert.Equal(t, Params.MaxBackoff, 50.0) Params.Save("common.security.tlsMode", "1") Params.Save("tls.serverPemPath", "/pem") diff --git a/internal/util/retry/retry.go b/internal/util/retry/retry.go index 9b7800b2cb3bf..f8874d618459e 100644 --- a/internal/util/retry/retry.go +++ b/internal/util/retry/retry.go @@ -13,10 +13,8 @@ package retry import ( "context" - "errors" "time" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/util/errorutil" "go.uber.org/zap" @@ -40,8 +38,8 @@ func Do(ctx context.Context, fn func() error, opts ...Option) error { for i := uint(0); i < c.attempts; i++ { if err := fn(); err != nil { - if i%10 == 0 { - log.Warn("retry func failed", zap.Uint("retry time", i), zap.Error(err)) + if i%10 == 4 { + log.Ctx(ctx).Warn("retry func failed", zap.Uint("retry time", i), zap.Error(err)) } el = append(el, err) @@ -83,48 +81,3 @@ func IsUnRecoverable(err error) bool { _, isUnrecoverable := err.(unrecoverableError) return isUnrecoverable } - -func DoGrpc(ctx context.Context, times uint, rpcFunc func() (any, error)) (any, error) { - innerCtx, cancel := context.WithCancel(ctx) - defer cancel() - var ( - result any - innerError error - ) - - err := Do(innerCtx, func() error { - result, innerError = rpcFunc() - if innerError != nil { - result = nil - cancel() - return innerError - } - - var errorCode commonpb.ErrorCode - var reason string - switch res := result.(type) { - case *commonpb.Status: - errorCode = res.GetErrorCode() - reason = res.GetReason() - case interface{ GetStatus() *commonpb.Status }: - errorCode = res.GetStatus().GetErrorCode() - reason = res.GetStatus().GetReason() - default: - cancel() - return innerError - } - - if errorCode == commonpb.ErrorCode_Success { - return nil - } - innerError = errors.New(reason) - if errorCode != commonpb.ErrorCode_NotReadyServe { - cancel() - } - return innerError - }, Attempts(times), Sleep(50*time.Millisecond), MaxSleepTime(time.Second)) - if result != nil { - return result, nil - } - return result, err -} diff --git a/internal/util/retry/retry_test.go b/internal/util/retry/retry_test.go index 19d00ae11303c..3e06c0a164a5a 100644 --- a/internal/util/retry/retry_test.go +++ b/internal/util/retry/retry_test.go @@ -13,14 +13,10 @@ package retry import ( "context" - "errors" "fmt" "testing" "time" - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/lingdor/stackerror" "github.com/stretchr/testify/assert" ) @@ -132,66 +128,3 @@ func TestContextCancel(t *testing.T) { assert.NotNil(t, err) fmt.Println(err) } - -func TestDoGrpc(t *testing.T) { - var ( - result any - err error - times uint = 5 - ctx = context.Background() - ) - - _, err = DoGrpc(ctx, times, func() (any, error) { - return nil, errors.New("foo") - }) - assert.Error(t, err) - - successStatus := &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} - result, err = DoGrpc(ctx, times, func() (any, error) { - return successStatus, nil - }) - assert.Equal(t, successStatus, result) - assert.NoError(t, err) - unExpectStatus := &commonpb.Status{ErrorCode: commonpb.ErrorCode_UnexpectedError} - result, err = DoGrpc(ctx, times, func() (any, error) { - return unExpectStatus, nil - }) - assert.Equal(t, unExpectStatus, result) - assert.NoError(t, err) - notReadyStatus := &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe, Reason: "fo"} - result, err = DoGrpc(ctx, times, func() (any, error) { - return notReadyStatus, nil - }) - assert.Equal(t, notReadyStatus, result) - assert.NoError(t, err) - - showCollectionsResp := &milvuspb.ShowCollectionsResponse{Status: successStatus} - result, err = DoGrpc(ctx, times, func() (any, error) { - return showCollectionsResp, nil - }) - assert.Equal(t, showCollectionsResp, result) - assert.NoError(t, err) - - ctx2, cancel := context.WithCancel(ctx) - testDone := make(chan struct{}) - timer := time.NewTimer(5 * time.Second) - defer timer.Stop() - go func() { - time.Sleep(time.Second) - cancel() - }() - go func() { - result, err = DoGrpc(ctx2, 100, func() (any, error) { - return notReadyStatus, nil - }) - assert.Equal(t, notReadyStatus, result) - assert.NoError(t, err) - testDone <- struct{}{} - }() - select { - case <-testDone: - return - case <-timer.C: - t.FailNow() - } -}