From 0c31c7da2b6edd75d7e8403aa7a6f7974ebb92d2 Mon Sep 17 00:00:00 2001 From: jaime Date: Tue, 20 Aug 2024 16:24:35 +0800 Subject: [PATCH] enhance: optimize CPU usage for CheckHealth requests Signed-off-by: jaime --- go.mod | 2 +- internal/datacoord/mock_test.go | 4 ++ internal/datacoord/session_manager.go | 19 ++++- .../datanode/channel/channel_manager_test.go | 4 +- internal/datanode/services.go | 21 ++++++ .../distributed/datanode/client/client.go | 6 ++ internal/distributed/datanode/service.go | 4 ++ .../distributed/querynode/client/client.go | 6 ++ internal/distributed/querynode/service.go | 4 ++ internal/mocks/mock_datanode.go | 55 +++++++++++++++ internal/mocks/mock_datanode_client.go | 70 +++++++++++++++++++ internal/mocks/mock_querynode.go | 55 +++++++++++++++ internal/mocks/mock_querynode_client.go | 70 +++++++++++++++++++ internal/proto/data_coord.proto | 2 + internal/proto/query_coord.proto | 5 +- internal/querycoordv2/mocks/mock_querynode.go | 55 +++++++++++++++ internal/querycoordv2/server.go | 5 +- internal/querycoordv2/services.go | 38 +++++----- internal/querycoordv2/session/cluster.go | 15 ++++ internal/querycoordv2/session/mock_cluster.go | 55 +++++++++++++++ internal/querycoordv2/utils/util.go | 18 ++--- internal/querynodev2/services.go | 20 ++++++ internal/rootcoord/root_coord.go | 11 --- internal/util/mock/grpc_datanode_client.go | 4 ++ internal/util/mock/grpc_querynode_client.go | 4 ++ internal/util/wrappers/qn_wrapper.go | 4 ++ pkg/util/funcutil/func.go | 20 +++--- pkg/util/merr/utils.go | 3 + pkg/util/ratelimitutil/utils.go | 19 ++++- 29 files changed, 538 insertions(+), 60 deletions(-) diff --git a/go.mod b/go.mod index 958aebeefd30e..299284726292b 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/go-playground/validator/v10 v10.14.0 github.com/gofrs/flock v0.8.1 - github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/protobuf v1.5.4 github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index e2b04e9db2829..ec87a9b231765 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -343,6 +343,10 @@ func (c *mockDataNodeClient) Stop() error { return nil } +func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{Status: merr.Success()}, nil +} + type mockRootCoordClient struct { state commonpb.StateCode cnt int64 diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index d37e4f231fd8a..cd536c7e57be9 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -19,6 +19,7 @@ package datacoord import ( "context" "fmt" + "strings" "time" "github.com/cockroachdb/errors" @@ -520,11 +521,23 @@ func (c *SessionManagerImpl) CheckHealth(ctx context.Context) error { } sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { + if err = merr.CheckRPCCall(sta, err); err != nil { + return fmt.Errorf("GetComponentStates fails for datanode:%d, %w", nodeID, err) + } + + if err = merr.AnalyzeState("DataNode", nodeID, sta); err != nil { return err } - err = merr.AnalyzeState("DataNode", nodeID, sta) - return err + + checkHealthResp, err := cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil { + return fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err) + } + + if !checkHealthResp.IsHealthy && len(checkHealthResp.Reasons) > 0 { + return errors.New(strings.Join(checkHealthResp.Reasons, ",")) + } + return nil }) } diff --git a/internal/datanode/channel/channel_manager_test.go b/internal/datanode/channel/channel_manager_test.go index 7ce5218df837e..307b5be0261b3 100644 --- a/internal/datanode/channel/channel_manager_test.go +++ b/internal/datanode/channel/channel_manager_test.go @@ -181,9 +181,7 @@ func (s *ChannelManagerSuite) TearDownTest() { } func (s *ChannelManagerSuite) TestReleaseStuck() { - var ( - channel = "by-dev-rootcoord-dml-2" - ) + channel := "by-dev-rootcoord-dml-2" s.manager.releaseFunc = func(channel string) { time.Sleep(1 * time.Second) } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 874509233cdcb..22e5d4170b1f8 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -22,6 +22,7 @@ package datanode import ( "context" "fmt" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -32,10 +33,12 @@ import ( "github.com/milvus-io/milvus/internal/datanode/importv2" "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/internal/flushcommon/metacache" + "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -44,6 +47,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -589,3 +594,19 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID())) return merr.Success(), nil } + +func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt() + if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } + return componentutil.CheckHealthRespWithErr(nil), nil +} diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 67d5081a19e8c..3944d4039a85b 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -267,3 +267,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact return client.DropCompactionPlan(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 2e530546d1977..8d09b1f0a1aef 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -410,3 +410,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (* func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { return s.datanode.DropCompactionPlan(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.datanode.CheckHealth(ctx, req) +} diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 5d15fad49f6c1..2285e27a46513 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -332,3 +332,9 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest, _ ...gr return client.Delete(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 445d30fc5288a..55a795b247846 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -387,3 +387,7 @@ func (s *Server) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) { return s.querynode.Delete(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.querynode.CheckHealth(ctx, req) +} diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index 190da75be0885..5e2836fae8af9 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -87,6 +87,61 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func return _c } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call { + return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // CompactionV2 provides a mock function with given fields: _a0, _a1 func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index 91661051c390b..633cf103d2d53 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -101,6 +101,76 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru return _c } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call { + return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockDataNodeClient) Close() error { ret := _m.Called() diff --git a/internal/mocks/mock_querynode.go b/internal/mocks/mock_querynode.go index d84ee0d2a8d26..6ab7015b9e6a1 100644 --- a/internal/mocks/mock_querynode.go +++ b/internal/mocks/mock_querynode.go @@ -30,6 +30,61 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter { return &MockQueryNode_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call { + return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_querynode_client.go b/internal/mocks/mock_querynode_client.go index 3621a87884222..e6f1cc4044e92 100644 --- a/internal/mocks/mock_querynode_client.go +++ b/internal/mocks/mock_querynode_client.go @@ -31,6 +31,76 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter { return &MockQueryNodeClient_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call { + return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockQueryNodeClient) Close() error { ret := _m.Called() diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index e3296128125c6..8badd1c1aa6d1 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -137,6 +137,8 @@ service DataNode { rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {} rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {} + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } message FlushRequest { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index ed0e152bbd708..bc881d4114511 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -168,8 +168,9 @@ service QueryNode { } rpc SyncDistribution(SyncDistributionRequest) returns (common.Status) { } - rpc Delete(DeleteRequest) returns (common.Status) { - } + rpc Delete(DeleteRequest) returns (common.Status) {} + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } // --------------------QueryCoord grpc request and response proto------------------ diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index 039d03ee6b5a4..0fd3c9e3a6ef2 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -29,6 +29,61 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter { return &MockQueryNodeServer_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeServer_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call { + return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index d2c997e339c88..d7569e9fe4705 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -734,12 +734,11 @@ func (s *Server) tryHandleNodeUp() { log := log.Ctx(s.ctx).WithRateGroup("qcv2.Server", 1, 60) ctx, cancel := context.WithTimeout(s.ctx, Params.QueryCoordCfg.CheckHealthRPCTimeout.GetAsDuration(time.Millisecond)) defer cancel() - reasons, err := s.checkNodeHealth(ctx) + err := s.checkNodeHealth(ctx) if err != nil { log.RatedWarn(10, "unhealthy node exist, node up will be delayed", zap.Int("delayedNodeUpEvents", len(s.nodeUpEventChan)), - zap.Int("unhealthyNodeNum", len(reasons)), - zap.Strings("unhealthyReason", reasons)) + zap.String("unhealthyReason", err.Error())) return } for len(s.nodeUpEventChan) > 0 { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index ea132a3a910e2..6be97d694bd2b 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -19,7 +19,7 @@ package querycoordv2 import ( "context" "fmt" - "sync" + "strings" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -914,9 +914,9 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil } - errReasons, err := s.checkNodeHealth(ctx) - if err != nil || len(errReasons) != 0 { - return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil + err := s.checkNodeHealth(ctx) + if err != nil { + return componentutil.CheckHealthRespWithErr(err), nil } if err := utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { @@ -926,32 +926,32 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return componentutil.CheckHealthRespWithErr(nil), nil } -func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { +func (s *Server) checkNodeHealth(ctx context.Context) error { group, ctx := errgroup.WithContext(ctx) - errReasons := make([]string, 0) - - mu := &sync.Mutex{} for _, node := range s.nodeMgr.GetAll() { node := node group.Go(func() error { - resp, err := s.cluster.GetComponentStates(ctx, node.ID()) - if err != nil { + componentStateResp, err := s.cluster.GetComponentStates(ctx, node.ID()) + if err = merr.CheckRPCCall(componentStateResp, err); err != nil { + return fmt.Errorf("GetComponentStates fails for querynode:%d, %w", node.ID(), err) + } + + if err = merr.AnalyzeState("QueryNode", node.ID(), componentStateResp); err != nil { return err } - err = merr.AnalyzeState("QueryNode", node.ID(), resp) - if err != nil { - mu.Lock() - defer mu.Unlock() - errReasons = append(errReasons, err.Error()) + checkHealthResp, err := s.cluster.CheckHealth(ctx, node.ID()) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil { + return fmt.Errorf("CheckHealth fails for querynode:%d, %w", node.ID(), err) + } + + if !checkHealthResp.IsHealthy && len(checkHealthResp.Reasons) > 0 { + return errors.New(strings.Join(checkHealthResp.Reasons, ",")) } return nil }) } - - err := group.Wait() - - return errReasons, err + return group.Wait() } func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { diff --git a/internal/querycoordv2/session/cluster.go b/internal/querycoordv2/session/cluster.go index 7b6bc316ebe25..569dbb0029469 100644 --- a/internal/querycoordv2/session/cluster.go +++ b/internal/querycoordv2/session/cluster.go @@ -52,6 +52,7 @@ type Cluster interface { GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) + CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) Start() Stop() } @@ -272,6 +273,20 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types return nil } +func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + var ( + resp *milvuspb.CheckHealthResponse + err error + ) + err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) { + resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + }) + if err1 != nil { + return nil, err1 + } + return resp, err +} + type clients struct { sync.RWMutex clients map[int64]types.QueryNodeClient // nodeID -> client diff --git a/internal/querycoordv2/session/mock_cluster.go b/internal/querycoordv2/session/mock_cluster.go index dbc14c720ce98..136f6c4e23da0 100644 --- a/internal/querycoordv2/session/mock_cluster.go +++ b/internal/querycoordv2/session/mock_cluster.go @@ -27,6 +27,61 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter { return &MockCluster_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, nodeID +func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(ctx, nodeID) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, nodeID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, nodeID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, nodeID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockCluster_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call { + return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)} +} + +func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // GetComponentStates provides a mock function with given fields: ctx, nodeID func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) { ret := _m.Called(ctx, nodeID) diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 4ef2f685d368a..e7df24899e1ab 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -46,15 +46,17 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution // 4. All segments of the shard in target should be in the distribution func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, currentTargets map[int64]*datapb.SegmentInfo) error { - log := log.Ctx(context.TODO()). - WithRateGroup("utils.CheckLeaderAvailable", 1, 60). - With(zap.Int64("leaderID", leader.ID)) - info := nodeMgr.Get(leader.ID) + log := func() *log.MLogger { + return log.Ctx(context.TODO()). + WithRateGroup("utils.CheckLeaderAvailable", 1, 60). + With(zap.Int64("leaderID", leader.ID)) + } + info := nodeMgr.Get(leader.ID) // Check whether leader is online err := CheckNodeAvailable(leader.ID, info) if err != nil { - log.Info("leader is not available", zap.Error(err)) + log().Info("leader is not available", zap.Error(err)) return fmt.Errorf("leader not available: %w", err) } @@ -62,7 +64,7 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, info := nodeMgr.Get(version.GetNodeID()) err = CheckNodeAvailable(version.GetNodeID(), info) if err != nil { - log.Info("leader is not available due to QueryNode unavailable", + log().Info("leader is not available due to QueryNode unavailable", zap.Int64("segmentID", id), zap.Error(err)) return err @@ -77,7 +79,7 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, _, exist := leader.Segments[segmentID] if !exist { - log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) + log().RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } } @@ -112,8 +114,6 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInter ret := make([]*querypb.ShardLeadersList, 0) currentTargets := targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget) for _, channel := range channels { - log := log.With(zap.String("channel", channel.GetChannelName())) - var channelErr error leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) if len(leaders) == 0 { diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index b367e03c95bfc..f5c68e33a32e3 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -52,6 +53,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1418,6 +1420,24 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( return merr.Success(), nil } +func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := node.lifetime.Add(merr.IsHealthy); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + defer node.lifetime.Done() + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minTsafeChannel, minTsafe := node.tSafeManager.Min() + if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } + + return componentutil.CheckHealthRespWithErr(nil), nil +} + type deleteRequestStringer struct { *querypb.DeleteRequest } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 7a1ce068bc44b..05c15ca523f2f 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -2858,17 +2858,6 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) return true }) - maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) - if maxDelay > 0 { - group.Go(func() error { - err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay) - if err != nil { - errs.Insert(err) - } - return err - }) - } - err := group.Wait() if err != nil { return &milvuspb.CheckHealthResponse{ diff --git a/internal/util/mock/grpc_datanode_client.go b/internal/util/mock/grpc_datanode_client.go index 13ae355738d80..621a2317e61e8 100644 --- a/internal/util/mock/grpc_datanode_client.go +++ b/internal/util/mock/grpc_datanode_client.go @@ -112,3 +112,7 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } + +func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} diff --git a/internal/util/mock/grpc_querynode_client.go b/internal/util/mock/grpc_querynode_client.go index e20dc0d635422..7add24aeb6756 100644 --- a/internal/util/mock/grpc_querynode_client.go +++ b/internal/util/mock/grpc_querynode_client.go @@ -130,6 +130,10 @@ func (m *GrpcQueryNodeClient) Delete(ctx context.Context, in *querypb.DeleteRequ return &commonpb.Status{}, m.Err } +func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} + func (m *GrpcQueryNodeClient) Close() error { return m.Err } diff --git a/internal/util/wrappers/qn_wrapper.go b/internal/util/wrappers/qn_wrapper.go index 63147c0116986..cec6e8f09c627 100644 --- a/internal/util/wrappers/qn_wrapper.go +++ b/internal/util/wrappers/qn_wrapper.go @@ -148,6 +148,10 @@ func (qn *qnServerWrapper) Delete(ctx context.Context, in *querypb.DeleteRequest return qn.QueryNode.Delete(ctx, in) } +func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return qn.QueryNode.CheckHealth(ctx, req) +} + func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient { return &qnServerWrapper{ QueryNode: qn, diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 89da8c3877fb4..d448fe7371527 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -24,7 +24,6 @@ import ( "fmt" "net" "reflect" - "regexp" "strconv" "strings" "time" @@ -264,13 +263,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri } func GetCollectionIDFromVChannel(vChannelName string) int64 { - re := regexp.MustCompile(`.*_(\d+)v\d+`) - matches := re.FindStringSubmatch(vChannelName) - if len(matches) > 1 { - number, err := strconv.ParseInt(matches[1], 0, 64) - if err == nil { - return number - } + end := strings.LastIndexByte(vChannelName, 'v') + if end <= 0 { + return -1 + } + start := strings.LastIndexByte(vChannelName, '_') + if start <= 0 { + return -1 + } + + collectionIDStr := vChannelName[start+1 : end] + if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil { + return collectionID } return -1 } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 3f44ea45d81a6..6df2303e9f57b 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -300,6 +300,9 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error { } func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error { + if state == nil { + return nil + } if err := Error(state.GetStatus()); err != nil { return errors.Wrapf(err, "%s=%d not healthy", role, nodeID) } else if state := state.GetState().GetStateCode(); state != commonpb.StateCode_Healthy { diff --git a/pkg/util/ratelimitutil/utils.go b/pkg/util/ratelimitutil/utils.go index d72e28c22e682..ff20cd4ab26b4 100644 --- a/pkg/util/ratelimitutil/utils.go +++ b/pkg/util/ratelimitutil/utils.go @@ -16,7 +16,13 @@ package ratelimitutil -import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" +import ( + "fmt" + "time" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) var QuotaErrorString = map[commonpb.ErrorCode]string{ commonpb.ErrorCode_ForceDeny: "the writing has been deactivated by the administrator", @@ -28,3 +34,14 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{ func GetQuotaErrorString(errCode commonpb.ErrorCode) string { return QuotaErrorString[errCode] } + +func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error { + if channel != "" && maxDelay > 0 { + minTt, _ := tsoutil.ParseTS(minTT) + delay := time.Since(minTt) + if delay.Milliseconds() >= maxDelay.Milliseconds() { + return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel) + } + } + return nil +}