diff --git a/configs/milvus.yaml b/configs/milvus.yaml index b575af456d553..e508238fbeb2d 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -382,7 +382,7 @@ queryCoord: channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode collectionObserverInterval: 200 # the interval of collection observer checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist - updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health + updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address port: 19531 # TCP port of queryCoord diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index f3f1890a9c972..cf246ec278192 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -304,22 +304,6 @@ func (c *mockDataNodeClient) Stop() error { return nil } -func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - if c.state == commonpb.StateCode_Healthy { - return &milvuspb.CheckHealthResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - IsHealthy: true, - Reasons: []string{}, - }, nil - } else { - return &milvuspb.CheckHealthResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe}, - IsHealthy: false, - Reasons: []string{"fails"}, - }, nil - } -} - type mockRootCoordClient struct { state commonpb.StateCode cnt atomic.Int64 diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index f054da9e38fa3..b5c0ac15dc29f 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -52,7 +52,6 @@ import ( streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/kv" @@ -168,8 +167,6 @@ type Server struct { streamingCoord *streamingcoord.Server metricsRequest *metricsinfo.MetricsRequest - - healthChecker *healthcheck.Checker } type CollectionNameInfo struct { @@ -432,8 +429,6 @@ func (s *Server) initDataCoord() error { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) - interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) - s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return nil } @@ -778,8 +773,6 @@ func (s *Server) startServerLoop() { if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) { s.syncSegmentsScheduler.Start() } - - s.healthChecker.Start() } func (s *Server) startTaskScheduler() { @@ -1106,9 +1099,6 @@ func (s *Server) Stop() error { return nil } log.Info("datacoord server shutdown") - if s.healthChecker != nil { - s.healthChecker.Close() - } s.garbageCollector.close() log.Info("datacoord garbage collector stopped") diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 4a9499dca2278..83df41f7bb36f 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -54,7 +54,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/workerpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -2536,12 +2535,12 @@ func Test_CheckHealth(t *testing.T) { return sm } - getChannelManager := func(findWatcherOk bool) ChannelManager { + getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager { channelManager := NewMockChannelManager(t) if findWatcherOk { - channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe() + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil) } else { - channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe() + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")) } return channelManager } @@ -2554,21 +2553,6 @@ func Test_CheckHealth(t *testing.T) { 2: nil, } - newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server { - svr := &Server{ - ctx: context.TODO(), - sessionManager: getSessionManager(isHealthy), - channelManager: getChannelManager(findWatcherOk), - meta: meta, - session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, - } - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn) - svr.healthChecker.Start() - time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker - return svr - } - t.Run("not healthy", func(t *testing.T) { ctx := context.Background() s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} @@ -2580,8 +2564,9 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("data node health check is fail", func(t *testing.T) { - svr := newServer(false, true, &meta{channelCPs: newChannelCps()}) - defer svr.healthChecker.Close() + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.sessionManager = getSessionManager(false) ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2590,8 +2575,11 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("check channel watched fail", func(t *testing.T) { - svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()}) - defer svr.healthChecker.Close() + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.sessionManager = getSessionManager(true) + svr.channelManager = getChannelManager(t, false) + svr.meta = &meta{collections: collections} ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2600,7 +2588,11 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("check checkpoint fail", func(t *testing.T) { - svr := newServer(true, true, &meta{ + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.sessionManager = getSessionManager(true) + svr.channelManager = getChannelManager(t, true) + svr.meta = &meta{ collections: collections, channelCPs: &channelCPs{ checkpoints: map[string]*msgpb.MsgPosition{ @@ -2610,8 +2602,8 @@ func Test_CheckHealth(t *testing.T) { }, }, }, - }) - defer svr.healthChecker.Close() + } + ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2620,7 +2612,11 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("ok", func(t *testing.T) { - svr := newServer(true, true, &meta{ + svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.sessionManager = getSessionManager(true) + svr.channelManager = getChannelManager(t, true) + svr.meta = &meta{ collections: collections, channelCPs: &channelCPs{ checkpoints: map[string]*msgpb.MsgPosition{ @@ -2638,8 +2634,7 @@ func Test_CheckHealth(t *testing.T) { }, }, }, - }) - defer svr.healthChecker.Close() + } ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 4e4f222ed49c9..01703ab13ef91 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -35,7 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/healthcheck" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -1588,24 +1588,20 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque }, nil } - latestCheckResult := s.healthChecker.GetLatestCheckResult() - return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil -} - -func (s *Server) healthCheckFn() *healthcheck.Result { - timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) - ctx, cancel := context.WithTimeout(s.ctx, timeout) - defer cancel() + err := s.sessionManager.CheckHealth(ctx) + if err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } - checkResults := s.sessionManager.CheckDNHealth(ctx) - for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) { - checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched)) + if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil } - for collectionID, failReason := range CheckCheckPointsHealth(s.meta) { - checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed)) + if err = CheckCheckPointsHealth(s.meta); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil } - return checkResults + + return componentutil.CheckHealthRespWithErr(nil), nil } func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { diff --git a/internal/datacoord/session/datanode_manager.go b/internal/datacoord/session/datanode_manager.go index 4bca2ee621efb..e65f2cb95931c 100644 --- a/internal/datacoord/session/datanode_manager.go +++ b/internal/datacoord/session/datanode_manager.go @@ -19,7 +19,6 @@ package session import ( "context" "fmt" - "sync" "time" "github.com/cockroachdb/errors" @@ -32,7 +31,6 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -71,7 +69,7 @@ type DataNodeManager interface { QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) DropImport(nodeID int64, in *datapb.DropImportRequest) error - CheckDNHealth(ctx context.Context) *healthcheck.Result + CheckHealth(ctx context.Context) error QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error Close() @@ -509,44 +507,28 @@ func (c *DataNodeManagerImpl) DropImport(nodeID int64, in *datapb.DropImportRequ return merr.CheckRPCCall(status, err) } -func (c *DataNodeManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result { - result := healthcheck.NewResult() - wg := sync.WaitGroup{} - wlock := sync.Mutex{} - ids := c.GetSessionIDs() +func (c *DataNodeManagerImpl) CheckHealth(ctx context.Context) error { + group, ctx := errgroup.WithContext(ctx) + ids := c.GetSessionIDs() for _, nodeID := range ids { nodeID := nodeID - wg.Add(1) - go func() { - defer wg.Done() - - datanodeClient, err := c.getClient(ctx, nodeID) + group.Go(func() error { + cli, err := c.getClient(ctx, nodeID) if err != nil { - err = fmt.Errorf("failed to get node:%d: %v", nodeID, err) - return - } - - checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) { - err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err) - wlock.Lock() - result.AppendUnhealthyClusterMsg( - healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck)) - wlock.Unlock() - return + return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err) } - if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 { - wlock.Lock() - result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp)) - wlock.Unlock() + sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + return err } - }() + err = merr.AnalyzeState("DataNode", nodeID, sta) + return err + }) } - wg.Wait() - return result + return group.Wait() } func (c *DataNodeManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) { diff --git a/internal/datacoord/session/mock_datanode_manager.go b/internal/datacoord/session/mock_datanode_manager.go index c3f75a8558db9..9bd42f2847052 100644 --- a/internal/datacoord/session/mock_datanode_manager.go +++ b/internal/datacoord/session/mock_datanode_manager.go @@ -6,8 +6,6 @@ import ( context "context" datapb "github.com/milvus-io/milvus/internal/proto/datapb" - healthcheck "github.com/milvus-io/milvus/internal/util/healthcheck" - mock "github.com/stretchr/testify/mock" typeutil "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -119,50 +117,48 @@ func (_c *MockDataNodeManager_CheckChannelOperationProgress_Call) RunAndReturn(r return _c } -// CheckDNHealth provides a mock function with given fields: ctx -func (_m *MockDataNodeManager) CheckDNHealth(ctx context.Context) *healthcheck.Result { +// CheckHealth provides a mock function with given fields: ctx +func (_m *MockDataNodeManager) CheckHealth(ctx context.Context) error { ret := _m.Called(ctx) if len(ret) == 0 { - panic("no return value specified for CheckDNHealth") + panic("no return value specified for CheckHealth") } - var r0 *healthcheck.Result - if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok { + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { r0 = rf(ctx) } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*healthcheck.Result) - } + r0 = ret.Error(0) } return r0 } -// MockDataNodeManager_CheckDNHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckDNHealth' -type MockDataNodeManager_CheckDNHealth_Call struct { +// MockDataNodeManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNodeManager_CheckHealth_Call struct { *mock.Call } -// CheckDNHealth is a helper method to define mock.On call +// CheckHealth is a helper method to define mock.On call // - ctx context.Context -func (_e *MockDataNodeManager_Expecter) CheckDNHealth(ctx interface{}) *MockDataNodeManager_CheckDNHealth_Call { - return &MockDataNodeManager_CheckDNHealth_Call{Call: _e.mock.On("CheckDNHealth", ctx)} +func (_e *MockDataNodeManager_Expecter) CheckHealth(ctx interface{}) *MockDataNodeManager_CheckHealth_Call { + return &MockDataNodeManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)} } -func (_c *MockDataNodeManager_CheckDNHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckDNHealth_Call { +func (_c *MockDataNodeManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckHealth_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context)) }) return _c } -func (_c *MockDataNodeManager_CheckDNHealth_Call) Return(_a0 *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call { +func (_c *MockDataNodeManager_CheckHealth_Call) Return(_a0 error) *MockDataNodeManager_CheckHealth_Call { _c.Call.Return(_a0) return _c } -func (_c *MockDataNodeManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call { +func (_c *MockDataNodeManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockDataNodeManager_CheckHealth_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 7ce2f8a6ea6e7..19b4fa2f68949 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -285,8 +285,7 @@ func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 { return binlogIDs } -func CheckCheckPointsHealth(meta *meta) map[int64]string { - checkResult := make(map[int64]string) +func CheckCheckPointsHealth(meta *meta) error { for channel, cp := range meta.GetChannelCheckpoints() { collectionID := funcutil.GetCollectionIDFromVChannel(channel) if collectionID == -1 { @@ -300,30 +299,31 @@ func CheckCheckPointsHealth(meta *meta) map[int64]string { ts, _ := tsoutil.ParseTS(cp.Timestamp) lag := time.Since(ts) if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) { - checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel) + return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes())) } } - return checkResult + return nil } -func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string { +func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error { collIDs := meta.ListCollections() - checkResult := make(map[int64]string) for _, collID := range collIDs { collInfo := meta.GetCollection(collID) if collInfo == nil { - log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID)) + log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID)) continue } for _, channelName := range collInfo.VChannelNames { _, err := channelManager.FindWatcher(channelName) if err != nil { - checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName) + log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID), + zap.String("channelName", channelName), zap.Error(err)) + return err } } } - return checkResult + return nil } func createStorageConfig() *indexpb.StorageConfig { diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 9072d6d14f515..4872b6f6c5eb4 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -52,7 +52,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro return nil, err } - minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph() + minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt() return &metricsinfo.DataNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, Rms: rms, diff --git a/internal/datanode/services.go b/internal/datanode/services.go index be6c6a2042ce1..825f63a14c1ad 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -22,7 +22,6 @@ package datanode import ( "context" "fmt" - "time" "github.com/samber/lo" "go.uber.org/zap" @@ -37,7 +36,6 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -47,7 +45,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -576,20 +573,3 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID())) return merr.Success(), nil } - -func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - if err := merr.CheckHealthy(node.GetStateCode()); err != nil { - return &milvuspb.CheckHealthResponse{ - Status: merr.Status(err), - Reasons: []string{err.Error()}, - }, nil - } - - maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) - minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph() - if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil { - msg := healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed) - return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil - } - return healthcheck.OK(), nil -} diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 5cc4d87b72518..616354a16482d 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -110,7 +110,6 @@ func (s *DataNodeServicesSuite) SetupTest() { s.Require().NoError(err) s.node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode")) - s.node.flowgraphManager = pipeline.NewFlowgraphManager() paramtable.SetNodeID(1) } @@ -1162,41 +1161,6 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { }) } -func (s *DataNodeServicesSuite) TestCheckHealth() { - s.Run("node not healthy", func() { - s.SetupTest() - s.node.UpdateStateCode(commonpb.StateCode_Abnormal) - ctx := context.Background() - resp, err := s.node.CheckHealth(ctx, nil) - s.NoError(err) - s.False(merr.Ok(resp.GetStatus())) - s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - }) - - s.Run("exceeded timetick lag on pipeline", func() { - s.SetupTest() - fgm := pipeline.NewMockFlowgraphManager(s.T()) - fgm.EXPECT().GetMinTTFlowGraph().Return("timetick-lag-ch", uint64(3600)).Once() - s.node.flowgraphManager = fgm - ctx := context.Background() - resp, err := s.node.CheckHealth(ctx, nil) - s.NoError(err) - s.True(merr.Ok(resp.GetStatus())) - s.False(resp.GetIsHealthy()) - s.NotEmpty(resp.Reasons) - }) - - s.Run("ok", func() { - s.SetupTest() - ctx := context.Background() - resp, err := s.node.CheckHealth(ctx, nil) - s.NoError(err) - s.True(merr.Ok(resp.GetStatus())) - s.True(resp.GetIsHealthy()) - s.Empty(resp.Reasons) - }) -} - func (s *DataNodeServicesSuite) TestDropCompactionPlan() { s.Run("node not healthy", func() { s.SetupTest() diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 510b218aecac9..aeacb69e7c715 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -281,9 +281,3 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact return client.DropCompactionPlan(ctx, req) }) } - -func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) { - return client.CheckHealth(ctx, req) - }) -} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 2443e5a95ae3c..f26fb1d63ca47 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -410,7 +410,3 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (* func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { return s.datanode.DropCompactionPlan(ctx, req) } - -func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - return s.datanode.CheckHealth(ctx, req) -} diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 2aede0b83be99..8e0bff6acfcca 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -185,10 +185,6 @@ func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropC return m.status, m.err } -func (m *MockDataNode) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - return &milvuspb.CheckHealthResponse{}, m.err -} - // ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func Test_NewServer(t *testing.T) { paramtable.Init() diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 9e9d06c1dc190..a7e933a5eaca2 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -360,9 +360,3 @@ func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchReques return client.DeleteBatch(ctx, req) }) } - -func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) { - return client.CheckHealth(ctx, req) - }) -} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 7871598fd462a..5ae035f0a1bf9 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -394,7 +394,3 @@ func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { return s.querynode.DeleteBatch(ctx, req) } - -func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - return s.querynode.CheckHealth(ctx, req) -} diff --git a/internal/flushcommon/pipeline/flow_graph_manager.go b/internal/flushcommon/pipeline/flow_graph_manager.go index 994978ff4581b..8527e65ee91c6 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager.go +++ b/internal/flushcommon/pipeline/flow_graph_manager.go @@ -22,6 +22,7 @@ import ( "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -42,7 +43,6 @@ type FlowgraphManager interface { GetFlowgraphCount() int GetCollectionIDs() []int64 - GetMinTTFlowGraph() (string, typeutil.Timestamp) GetChannelsJSON() string GetSegmentsJSON() string Close() @@ -76,6 +76,7 @@ func (fm *fgManagerImpl) RemoveFlowgraph(channel string) { fm.flowgraphs.Remove(channel) metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() + util.GetRateCollector().RemoveFlowGraphChannel(channel) } } @@ -119,22 +120,6 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 { return collectionSet.Collect() } -// GetMinTTFlowGraph returns the vchannel and minimal time tick of flow graphs. -func (fm *fgManagerImpl) GetMinTTFlowGraph() (string, typeutil.Timestamp) { - minTt := typeutil.MaxTimestamp - var channel string - fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { - latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch) - if minTt > latestTimeTick { - minTt = latestTimeTick - channel = ch - } - return true - }) - - return channel, minTt -} - // GetChannelsJSON returns all channels in json format. func (fm *fgManagerImpl) GetChannelsJSON() string { var channels []*metricsinfo.Channel diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index a72faac2e6463..cf8cd6b2aa1ca 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -309,61 +309,6 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s return _c } -// GetMinTTFlowGraph provides a mock function with given fields: -func (_m *MockFlowgraphManager) GetMinTTFlowGraph() (string, uint64) { - ret := _m.Called() - - if len(ret) == 0 { - panic("no return value specified for GetMinTTFlowGraph") - } - - var r0 string - var r1 uint64 - if rf, ok := ret.Get(0).(func() (string, uint64)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() string); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(string) - } - - if rf, ok := ret.Get(1).(func() uint64); ok { - r1 = rf() - } else { - r1 = ret.Get(1).(uint64) - } - - return r0, r1 -} - -// MockFlowgraphManager_GetMinTTFlowGraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMinTTFlowGraph' -type MockFlowgraphManager_GetMinTTFlowGraph_Call struct { - *mock.Call -} - -// GetMinTTFlowGraph is a helper method to define mock.On call -func (_e *MockFlowgraphManager_Expecter) GetMinTTFlowGraph() *MockFlowgraphManager_GetMinTTFlowGraph_Call { - return &MockFlowgraphManager_GetMinTTFlowGraph_Call{Call: _e.mock.On("GetMinTTFlowGraph")} -} - -func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Run(run func()) *MockFlowgraphManager_GetMinTTFlowGraph_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Return(_a0 string, _a1 uint64) *MockFlowgraphManager_GetMinTTFlowGraph_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) RunAndReturn(run func() (string, uint64)) *MockFlowgraphManager_GetMinTTFlowGraph_Call { - _c.Call.Return(run) - return _c -} - // GetSegmentsJSON provides a mock function with given fields: func (_m *MockFlowgraphManager) GetSegmentsJSON() string { ret := _m.Called() diff --git a/internal/flushcommon/util/rate_collector.go b/internal/flushcommon/util/rate_collector.go index f20d1e562e8c4..4736eb2209ee3 100644 --- a/internal/flushcommon/util/rate_collector.go +++ b/internal/flushcommon/util/rate_collector.go @@ -24,6 +24,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) // rateCol is global RateCollector in DataNode. @@ -37,6 +38,7 @@ type RateCollector struct { *ratelimitutil.RateCollector flowGraphTtMu sync.Mutex + flowGraphTt map[string]typeutil.Timestamp } func initGlobalRateCollector() { @@ -73,5 +75,35 @@ func newRateCollector() (*RateCollector, error) { } return &RateCollector{ RateCollector: rc, + flowGraphTt: make(map[string]typeutil.Timestamp), }, nil } + +// UpdateFlowGraphTt updates RateCollector's flow graph time tick. +func (r *RateCollector) UpdateFlowGraphTt(channel string, t typeutil.Timestamp) { + r.flowGraphTtMu.Lock() + defer r.flowGraphTtMu.Unlock() + r.flowGraphTt[channel] = t +} + +// RemoveFlowGraphChannel removes channel from flowGraphTt. +func (r *RateCollector) RemoveFlowGraphChannel(channel string) { + r.flowGraphTtMu.Lock() + defer r.flowGraphTtMu.Unlock() + delete(r.flowGraphTt, channel) +} + +// GetMinFlowGraphTt returns the vchannel and minimal time tick of flow graphs. +func (r *RateCollector) GetMinFlowGraphTt() (string, typeutil.Timestamp) { + r.flowGraphTtMu.Lock() + defer r.flowGraphTtMu.Unlock() + minTt := typeutil.MaxTimestamp + var channel string + for c, t := range r.flowGraphTt { + if minTt > t { + minTt = t + channel = c + } + } + return channel, minTt +} diff --git a/internal/flushcommon/util/rate_collector_test.go b/internal/flushcommon/util/rate_collector_test.go new file mode 100644 index 0000000000000..f672b5869e860 --- /dev/null +++ b/internal/flushcommon/util/rate_collector_test.go @@ -0,0 +1,42 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +func TestRateCollector(t *testing.T) { + t.Run("test FlowGraphTt", func(t *testing.T) { + collector, err := newRateCollector() + assert.NoError(t, err) + + c, minTt := collector.GetMinFlowGraphTt() + assert.Equal(t, "", c) + assert.Equal(t, typeutil.MaxTimestamp, minTt) + collector.UpdateFlowGraphTt("channel1", 100) + collector.UpdateFlowGraphTt("channel2", 200) + collector.UpdateFlowGraphTt("channel3", 50) + c, minTt = collector.GetMinFlowGraphTt() + assert.Equal(t, "channel3", c) + assert.Equal(t, typeutil.Timestamp(50), minTt) + }) +} diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index f77ed0d12e55c..f03f66c8afb07 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -91,61 +91,6 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func return _c } -// CheckHealth provides a mock function with given fields: _a0, _a1 -func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - ret := _m.Called(_a0, _a1) - - var r0 *milvuspb.CheckHealthResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { - return rf(_a0, _a1) - } - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { - r0 = rf(_a0, _a1) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { - r1 = rf(_a0, _a1) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockDataNode_CheckHealth_Call struct { - *mock.Call -} - -// CheckHealth is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *milvuspb.CheckHealthRequest -func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call { - return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} -} - -func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) - }) - return _c -} - -func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call { - _c.Call.Return(run) - return _c -} - // CompactionV2 provides a mock function with given fields: _a0, _a1 func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index e41d56e241cbf..fc493844fba44 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -105,76 +105,6 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru return _c } -// CheckHealth provides a mock function with given fields: ctx, in, opts -func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *milvuspb.CheckHealthResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { - return rf(ctx, in, opts...) - } - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockDataNodeClient_CheckHealth_Call struct { - *mock.Call -} - -// CheckHealth is a helper method to define mock.On call -// - ctx context.Context -// - in *milvuspb.CheckHealthRequest -// - opts ...grpc.CallOption -func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call { - return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", - append([]interface{}{ctx, in}, opts...)...)} -} - -func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]grpc.CallOption, len(args)-2) - for i, a := range args[2:] { - if a != nil { - variadicArgs[i] = a.(grpc.CallOption) - } - } - run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) - }) - return _c -} - -func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call { - _c.Call.Return(run) - return _c -} - // Close provides a mock function with given fields: func (_m *MockDataNodeClient) Close() error { ret := _m.Called() diff --git a/internal/mocks/mock_querynode.go b/internal/mocks/mock_querynode.go index d52fe85b644b1..a8c32577fa5d6 100644 --- a/internal/mocks/mock_querynode.go +++ b/internal/mocks/mock_querynode.go @@ -30,61 +30,6 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter { return &MockQueryNode_Expecter{mock: &_m.Mock} } -// CheckHealth provides a mock function with given fields: _a0, _a1 -func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - ret := _m.Called(_a0, _a1) - - var r0 *milvuspb.CheckHealthResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { - return rf(_a0, _a1) - } - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { - r0 = rf(_a0, _a1) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { - r1 = rf(_a0, _a1) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockQueryNode_CheckHealth_Call struct { - *mock.Call -} - -// CheckHealth is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *milvuspb.CheckHealthRequest -func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call { - return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} -} - -func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) - }) - return _c -} - -func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call { - _c.Call.Return(run) - return _c -} - // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_querynode_client.go b/internal/mocks/mock_querynode_client.go index 3bc16e66436f9..3b3d465610bec 100644 --- a/internal/mocks/mock_querynode_client.go +++ b/internal/mocks/mock_querynode_client.go @@ -31,76 +31,6 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter { return &MockQueryNodeClient_Expecter{mock: &_m.Mock} } -// CheckHealth provides a mock function with given fields: ctx, in, opts -func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - _va := make([]interface{}, len(opts)) - for _i := range opts { - _va[_i] = opts[_i] - } - var _ca []interface{} - _ca = append(_ca, ctx, in) - _ca = append(_ca, _va...) - ret := _m.Called(_ca...) - - var r0 *milvuspb.CheckHealthResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { - return rf(ctx, in, opts...) - } - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { - r0 = rf(ctx, in, opts...) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { - r1 = rf(ctx, in, opts...) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockQueryNodeClient_CheckHealth_Call struct { - *mock.Call -} - -// CheckHealth is a helper method to define mock.On call -// - ctx context.Context -// - in *milvuspb.CheckHealthRequest -// - opts ...grpc.CallOption -func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call { - return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", - append([]interface{}{ctx, in}, opts...)...)} -} - -func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call { - _c.Call.Run(func(args mock.Arguments) { - variadicArgs := make([]grpc.CallOption, len(args)-2) - for i, a := range args[2:] { - if a != nil { - variadicArgs[i] = a.(grpc.CallOption) - } - } - run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) - }) - return _c -} - -func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call { - _c.Call.Return(run) - return _c -} - // Close provides a mock function with given fields: func (_m *MockQueryNodeClient) Close() error { ret := _m.Called() diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index aaeb455d65d27..be2fd77d4db6b 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -137,8 +137,6 @@ service DataNode { rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {} rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {} - - rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } message FlushRequest { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index ba28cd47a82aa..f05a07dc7b028 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -175,9 +175,7 @@ service QueryNode { // DeleteBatch is the API to apply same delete data into multiple segments. // it's basically same as `Delete` but cost less memory pressure. rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) { - } - - rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} + } } // --------------------QueryCoord grpc request and response proto------------------ diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index 9a2c228fdc444..c161c66309ab4 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -29,61 +29,6 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter { return &MockQueryNodeServer_Expecter{mock: &_m.Mock} } -// CheckHealth provides a mock function with given fields: _a0, _a1 -func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - ret := _m.Called(_a0, _a1) - - var r0 *milvuspb.CheckHealthResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { - return rf(_a0, _a1) - } - if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { - r0 = rf(_a0, _a1) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { - r1 = rf(_a0, _a1) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockQueryNodeServer_CheckHealth_Call struct { - *mock.Call -} - -// CheckHealth is a helper method to define mock.On call -// - _a0 context.Context -// - _a1 *milvuspb.CheckHealthRequest -func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call { - return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} -} - -func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) - }) - return _c -} - -func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call { - _c.Call.Return(run) - return _c -} - // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index e29ca6ee4b5bf..8ae802cf7848f 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -55,7 +55,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/componentutil" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -139,8 +138,6 @@ type Server struct { proxyClientManager proxyutil.ProxyClientManagerInterface metricsRequest *metricsinfo.MetricsRequest - - healthChecker *healthcheck.Checker } func NewQueryCoord(ctx context.Context) (*Server, error) { @@ -427,8 +424,6 @@ func (s *Server) initQueryCoord() error { // Init load status cache meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() - interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) - s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init querycoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return err } @@ -572,7 +567,6 @@ func (s *Server) startQueryCoord() error { s.startServerLoop() s.afterStart() - s.healthChecker.Start() s.UpdateStateCode(commonpb.StateCode_Healthy) sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID()) return nil @@ -611,9 +605,7 @@ func (s *Server) Stop() error { // FOLLOW the dependence graph: // job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session // observers -> dist controller - if s.healthChecker != nil { - s.healthChecker.Close() - } + if s.jobScheduler != nil { log.Info("stop job scheduler...") s.jobScheduler.Stop() diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 5e9b6a8334d41..69e3d8d1c56a3 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "sync" - "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -36,7 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/internal/util/healthcheck" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -914,20 +913,16 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil } - latestCheckResult := s.healthChecker.GetLatestCheckResult() - return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil -} - -func (s *Server) healthCheckFn() *healthcheck.Result { - timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) - ctx, cancel := context.WithTimeout(s.ctx, timeout) - defer cancel() + errReasons, err := s.checkNodeHealth(ctx) + if err != nil || len(errReasons) != 0 { + return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil + } - checkResults := s.broadcastCheckHealth(ctx) - for collectionID, failReason := range utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr) { - checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CollectionQueryable)) + if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { + log.Ctx(ctx).Warn("some collection is not queryable during health check", zap.Error(err)) } - return checkResults + + return componentutil.CheckHealthRespWithErr(nil), nil } func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { @@ -958,39 +953,6 @@ func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { return errReasons, err } -func (s *Server) broadcastCheckHealth(ctx context.Context) *healthcheck.Result { - result := healthcheck.NewResult() - wg := sync.WaitGroup{} - wlock := sync.Mutex{} - - for _, node := range s.nodeMgr.GetAll() { - node := node - wg.Add(1) - go func() { - defer wg.Done() - - checkHealthResp, err := s.cluster.CheckHealth(ctx, node.ID()) - if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) { - err = fmt.Errorf("CheckHealth fails for querynode:%d, %w", node.ID(), err) - wlock.Lock() - result.AppendUnhealthyClusterMsg( - healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.ID(), err.Error(), healthcheck.NodeHealthCheck)) - wlock.Unlock() - return - } - - if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 { - wlock.Lock() - result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp)) - wlock.Unlock() - } - }() - } - - wg.Wait() - return result -} - func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.String("rgName", req.GetResourceGroup()), diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index d82e8a8269acc..1381bc2a23b51 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -47,7 +47,6 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" @@ -171,13 +170,6 @@ func (suite *ServiceSuite) SetupTest() { })) suite.meta.ResourceManager.HandleNodeUp(context.TODO(), node) } - suite.cluster = session.NewMockCluster(suite.T()) - suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() - suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(&milvuspb.CheckHealthResponse{ - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - IsHealthy: true, - Reasons: []string{}, - }, nil).Maybe() suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() @@ -1635,9 +1627,6 @@ func (suite *ServiceSuite) TestCheckHealth() { suite.loadAll() ctx := context.Background() server := suite.server - server.healthChecker = healthcheck.NewChecker(50*time.Millisecond, suite.server.healthCheckFn) - server.healthChecker.Start() - defer server.healthChecker.Close() assertCheckHealthResult := func(isHealthy bool) { resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -1650,38 +1639,28 @@ func (suite *ServiceSuite) TestCheckHealth() { } } - setNodeSate := func(isHealthy bool, isRPCFail bool) { - var resp *milvuspb.CheckHealthResponse - if isHealthy { - resp = healthcheck.OK() - } else { - resp = healthcheck.GetCheckHealthResponseFromClusterMsg(healthcheck.NewUnhealthyClusterMsg("dn", 1, "check fails", healthcheck.NodeHealthCheck)) - } - resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} - if isRPCFail { - resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_ForceDeny} - } - suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Unset() - suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(resp, nil).Maybe() - time.Sleep(1 * time.Second) + setNodeSate := func(state commonpb.StateCode) { + // Test for components state fail + suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Unset() + suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return( + &milvuspb.ComponentStates{ + State: &milvuspb.ComponentInfo{StateCode: state}, + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + }, + nil).Maybe() } // Test for server is not healthy server.UpdateStateCode(commonpb.StateCode_Initializing) assertCheckHealthResult(false) - // Test for check health has some error reasons - setNodeSate(false, false) - server.UpdateStateCode(commonpb.StateCode_Healthy) - assertCheckHealthResult(false) - - // Test for check health rpc fail - setNodeSate(true, true) + // Test for components state fail + setNodeSate(commonpb.StateCode_Abnormal) server.UpdateStateCode(commonpb.StateCode_Healthy) assertCheckHealthResult(false) // Test for check load percentage fail - setNodeSate(true, false) + setNodeSate(commonpb.StateCode_Healthy) assertCheckHealthResult(true) // Test for check channel ok @@ -1703,14 +1682,7 @@ func (suite *ServiceSuite) TestCheckHealth() { for _, node := range suite.nodes { suite.nodeMgr.Stopping(node) } - - paramtable.Get().Save(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key, "1") - defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key) - time.Sleep(1500 * time.Millisecond) - resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - suite.NoError(err) - suite.Equal(resp.IsHealthy, true) - suite.NotEmpty(resp.Reasons) + assertCheckHealthResult(true) } func (suite *ServiceSuite) TestGetShardLeaders() { diff --git a/internal/querycoordv2/session/cluster.go b/internal/querycoordv2/session/cluster.go index 569dbb0029469..7b6bc316ebe25 100644 --- a/internal/querycoordv2/session/cluster.go +++ b/internal/querycoordv2/session/cluster.go @@ -52,7 +52,6 @@ type Cluster interface { GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) - CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) Start() Stop() } @@ -273,20 +272,6 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types return nil } -func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { - var ( - resp *milvuspb.CheckHealthResponse - err error - ) - err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) { - resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - }) - if err1 != nil { - return nil, err1 - } - return resp, err -} - type clients struct { sync.RWMutex clients map[int64]types.QueryNodeClient // nodeID -> client diff --git a/internal/querycoordv2/session/mock_cluster.go b/internal/querycoordv2/session/mock_cluster.go index a61954d7f8644..bd7d3b3eae622 100644 --- a/internal/querycoordv2/session/mock_cluster.go +++ b/internal/querycoordv2/session/mock_cluster.go @@ -27,61 +27,6 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter { return &MockCluster_Expecter{mock: &_m.Mock} } -// CheckHealth provides a mock function with given fields: ctx, nodeID -func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { - ret := _m.Called(ctx, nodeID) - - var r0 *milvuspb.CheckHealthResponse - var r1 error - if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok { - return rf(ctx, nodeID) - } - if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok { - r0 = rf(ctx, nodeID) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) - } - } - - if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { - r1 = rf(ctx, nodeID) - } else { - r1 = ret.Error(1) - } - - return r0, r1 -} - -// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockCluster_CheckHealth_Call struct { - *mock.Call -} - -// CheckHealth is a helper method to define mock.On call -// - ctx context.Context -// - nodeID int64 -func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call { - return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)} -} - -func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(int64)) - }) - return _c -} - -func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call { - _c.Call.Return(_a0, _a1) - return _c -} - -func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call { - _c.Call.Return(run) - return _c -} - // GetComponentStates provides a mock function with given fields: ctx, nodeID func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) { ret := _m.Called(ctx, nodeID) diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 6492db0b3507c..5e283b926ee60 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -73,13 +73,13 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target for segmentID, info := range segmentDist { _, exist := leader.Segments[segmentID] if !exist { - log.RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) + log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID if l0WithWrongLocation { - log.RatedWarn(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) + log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } } @@ -113,6 +113,8 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me ) ([]*querypb.ShardLeadersList, error) { ret := make([]*querypb.ShardLeadersList, 0) for _, channel := range channels { + log := log.With(zap.String("channel", channel.GetChannelName())) + var channelErr error leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) if len(leaders) == 0 { @@ -130,7 +132,7 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me if len(readableLeaders) == 0 { msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) - log.RatedWarn(60, msg, zap.Error(channelErr)) + log.Warn(msg, zap.Error(channelErr)) err := merr.WrapErrChannelNotAvailable(channel.GetChannelName(), channelErr.Error()) return nil, err } @@ -183,9 +185,8 @@ func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr meta.TargetMan } // CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection -func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) map[int64]string { - maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second) - checkResult := make(map[int64]string) +func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { + maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) for _, coll := range m.GetAllCollections(ctx) { err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll) // the collection is not queryable, if meet following conditions: @@ -193,10 +194,15 @@ func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta // 2. Collection is not starting to release // 3. The load percentage has not been updated in the last 5 minutes. if err != nil && m.Exist(ctx, coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval { - checkResult[coll.CollectionID] = err.Error() + log.Ctx(ctx).Warn("collection not querable", + zap.Int64("collectionID", coll.CollectionID), + zap.Time("lastUpdated", coll.UpdatedAt), + zap.Duration("maxInterval", maxInterval), + zap.Error(err)) + return err } } - return checkResult + return nil } // checkCollectionQueryable check all channels are watched and all segments are loaded for this collection diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 968fafe59fcd8..3cedfedb1df0b 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -54,7 +54,13 @@ func getRateMetric() ([]metricsinfo.RateMetric, error) { return rms, nil } -func getMinTSafe(node *QueryNode) (string, uint64) { +// getQuotaMetrics returns QueryNodeQuotaMetrics. +func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) { + rms, err := getRateMetric() + if err != nil { + return nil, err + } + minTsafeChannel := "" minTsafe := uint64(math.MaxUint64) node.delegators.Range(func(channel string, delegator delegator.ShardDelegator) bool { @@ -65,17 +71,7 @@ func getMinTSafe(node *QueryNode) (string, uint64) { } return true }) - return minTsafeChannel, minTsafe -} - -// getQuotaMetrics returns QueryNodeQuotaMetrics. -func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) { - rms, err := getRateMetric() - if err != nil { - return nil, err - } - minTsafeChannel, minTsafe := getMinTSafe(node) collections := node.manager.Collection.ListWithName() nodeID := fmt.Sprint(node.GetNodeID()) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index af46fd1aada91..1eca7efe950cd 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -42,7 +42,6 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" @@ -55,7 +54,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" - "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1386,25 +1384,6 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( return merr.Success(), nil } -func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { - if err := node.lifetime.Add(merr.IsHealthy); err != nil { - return &milvuspb.CheckHealthResponse{ - Status: merr.Status(err), - Reasons: []string{err.Error()}, - }, nil - } - defer node.lifetime.Done() - - maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) - minTsafeChannel, minTsafe := getMinTSafe(node) - if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil { - msg := healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed) - return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil - } - - return healthcheck.OK(), nil -} - // DeleteBatch is the API to apply same delete data into multiple segments. // it's basically same as `Delete` but cost less memory pressure. func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index f3edb64965629..99edfa234b312 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -98,7 +98,7 @@ func (suite *ServiceSuite) SetupSuite() { paramtable.Init() paramtable.Get().Save(paramtable.Get().CommonCfg.GCEnabled.Key, "false") - suite.rootPath = path.Join("/tmp/milvus/test", suite.T().Name()) + suite.rootPath = suite.T().Name() suite.collectionID = 111 suite.collectionName = "test-collection" suite.partitionIDs = []int64{222} @@ -2222,44 +2222,6 @@ func (suite *ServiceSuite) TestLoadPartition() { suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) } -func (suite *ServiceSuite) TestCheckHealth() { - suite.Run("node not healthy", func() { - suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) - - ctx := context.Background() - resp, err := suite.node.CheckHealth(ctx, nil) - suite.NoError(err) - suite.False(merr.Ok(resp.GetStatus())) - suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) - }) - - suite.Run("exceeded timetick lag on pipeline", func() { - sd1 := delegator.NewMockShardDelegator(suite.T()) - sd1.EXPECT().GetTSafe().Return(100) - sd1.EXPECT().Close().Maybe() - suite.node.delegators.Insert("timetick-lag-ch", sd1) - defer suite.node.delegators.GetAndRemove("timetick-lag-ch") - - ctx := context.Background() - suite.node.UpdateStateCode(commonpb.StateCode_Healthy) - resp, err := suite.node.CheckHealth(ctx, nil) - suite.NoError(err) - suite.True(merr.Ok(resp.GetStatus())) - suite.False(resp.GetIsHealthy()) - suite.NotEmpty(resp.Reasons) - }) - - suite.Run("ok", func() { - ctx := context.Background() - suite.node.UpdateStateCode(commonpb.StateCode_Healthy) - resp, err := suite.node.CheckHealth(ctx, nil) - suite.NoError(err) - suite.True(merr.Ok(resp.GetStatus())) - suite.True(resp.GetIsHealthy()) - suite.Empty(resp.Reasons) - }) -} - func TestQueryNodeService(t *testing.T) { suite.Run(t, new(ServiceSuite)) } diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index 19c28253c9da8..5f59f27c1246a 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -405,7 +405,6 @@ func newMockProxy() *mockProxy { func newTestCore(opts ...Opt) *Core { c := &Core{ - ctx: context.TODO(), metricsRequest: metricsinfo.NewMetricsRequest(), session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}}, } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index d4ac677b6e038..8f13b6496ef9e 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -32,6 +32,7 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "google.golang.org/protobuf/proto" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -50,7 +51,6 @@ import ( tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -131,7 +131,6 @@ type Core struct { activateFunc func() error metricsRequest *metricsinfo.MetricsRequest - healthChecker *healthcheck.Checker } // --------------------- function -------------------------- @@ -502,8 +501,6 @@ func (c *Core) initInternal() error { return err } - interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) - c.healthChecker = healthcheck.NewChecker(interval, c.healthCheckFn) log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address)) return nil } @@ -798,7 +795,6 @@ func (c *Core) startInternal() error { }() c.startServerLoop() - c.healthChecker.Start() c.UpdateStateCode(commonpb.StateCode_Healthy) sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID) log.Info("rootcoord startup successfully") @@ -860,10 +856,6 @@ func (c *Core) revokeSession() { // Stop stops rootCoord. func (c *Core) Stop() error { c.UpdateStateCode(commonpb.StateCode_Abnormal) - if c.healthChecker != nil { - c.healthChecker.Close() - } - c.stopExecutor() c.stopScheduler() if c.proxyWatcher != nil { @@ -3130,40 +3122,53 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) }, nil } - latestCheckResult := c.healthChecker.GetLatestCheckResult() - return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil -} - -func (c *Core) healthCheckFn() *healthcheck.Result { - timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) - ctx, cancel := context.WithTimeout(c.ctx, timeout) - defer cancel() + group, ctx := errgroup.WithContext(ctx) + errs := typeutil.NewConcurrentSet[error]() proxyClients := c.proxyClientManager.GetProxyClients() - wg := sync.WaitGroup{} - lock := sync.Mutex{} - result := healthcheck.NewResult() - proxyClients.Range(func(key int64, value types.ProxyClient) bool { nodeID := key proxyClient := value - wg.Add(1) - go func() { - defer wg.Done() - resp, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - err = merr.AnalyzeComponentStateResp(typeutil.ProxyRole, nodeID, resp, err) + group.Go(func() error { + sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + if err != nil { + errs.Insert(err) + return err + } - lock.Lock() - defer lock.Unlock() + err = merr.AnalyzeState("Proxy", nodeID, sta) if err != nil { - result.AppendUnhealthyClusterMsg(healthcheck.NewUnhealthyClusterMsg(typeutil.ProxyRole, nodeID, err.Error(), healthcheck.NodeHealthCheck)) + errs.Insert(err) } - }() + + return err + }) return true }) - wg.Wait() - return result + maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + if maxDelay > 0 { + group.Go(func() error { + err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay) + if err != nil { + errs.Insert(err) + } + return err + }) + } + + err := group.Wait() + if err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Success(), + IsHealthy: false, + Reasons: lo.Map(errs.Collect(), func(e error, i int) string { + return err.Error() + }), + }, nil + } + + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil } func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index 1f75028dbc126..e6b8c380f4e9b 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -32,6 +32,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" + "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" @@ -39,7 +40,6 @@ import ( mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/util/dependency" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" - "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util" @@ -1479,6 +1479,65 @@ func TestRootCoord_AlterCollection(t *testing.T) { } func TestRootCoord_CheckHealth(t *testing.T) { + getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { + clusterTopology := metricsinfo.QueryClusterTopology{ + ConnectedNodes: []metricsinfo.QueryNodeInfos{ + { + QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{ + MinFlowGraphChannel: "ch1", + MinFlowGraphTt: tt, + NumFlowGraph: 1, + }, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology}) + return &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0), + }, nil + } + + getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { + clusterTopology := metricsinfo.DataClusterTopology{ + ConnectedDataNodes: []metricsinfo.DataNodeInfos{ + { + QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{ + Fgm: metricsinfo.FlowGraphMetric{ + MinFlowGraphChannel: "ch1", + MinFlowGraphTt: tt, + NumFlowGraph: 1, + }, + }, + }, + }, + } + + resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology}) + return &milvuspb.GetMetricsResponse{ + Status: merr.Success(), + Response: resp, + ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0), + }, nil + } + + querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0) + datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0) + + dcClient := mocks.NewMockDataCoordClient(t) + dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT)) + qcClient := mocks.NewMockQueryCoordClient(t) + qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT)) + + errDataCoordClient := mocks.NewMockDataCoordClient(t) + errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) + errQueryCoordClient := mocks.NewMockQueryCoordClient(t) + errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) + t.Run("not healthy", func(t *testing.T) { ctx := context.Background() c := newTestCore(withAbnormalCode()) @@ -1488,13 +1547,25 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) + t.Run("ok with disabled tt lag configuration", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + + c := newTestCore(withHealthyCode(), withValidProxyManager()) + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, true, resp.IsHealthy) + assert.Empty(t, resp.Reasons) + }) + t.Run("proxy health check fail with invalid proxy", func(t *testing.T) { - c := newTestCore(withHealthyCode(), withInvalidProxyManager()) - c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn) - c.healthChecker.Start() - defer c.healthChecker.Close() + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - time.Sleep(50 * time.Millisecond) + c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -1503,14 +1574,55 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) - t.Run("ok", func(t *testing.T) { - c := newTestCore(withHealthyCode(), withValidProxyManager()) - c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn) - c.healthChecker.Start() - defer c.healthChecker.Close() + t.Run("proxy health check fail with get metrics error", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - time.Sleep(50 * time.Millisecond) + { + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient)) + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + } + + { + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient)) + + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + } + }) + + t.Run("ok with tt lag exceeded", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) + ctx := context.Background() + resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + assert.NoError(t, err) + assert.Equal(t, false, resp.IsHealthy) + assert.NotEmpty(t, resp.Reasons) + }) + + t.Run("ok with tt lag checking", func(t *testing.T) { + v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() + Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600") + defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + + c := newTestCore(withHealthyCode(), + withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index ef873b59785ba..8ee2b25463166 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -20,8 +20,10 @@ import ( "context" "fmt" "strconv" + "time" "go.uber.org/zap" + "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/json" @@ -32,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -281,3 +284,97 @@ func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerIn return ret, nil } + +func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout) + defer cancel() + + now := time.Now() + group := &errgroup.Group{} + queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() + dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() + + group.Go(func() error { + queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord) + if err != nil { + return err + } + + for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes { + qm := queryNodeMetric.QuotaMetrics + if qm != nil { + if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" { + minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt) + delay := now.Sub(minTt) + + if delay.Milliseconds() >= maxDelay.Milliseconds() { + queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay) + } + } + } + } + return nil + }) + + // get Data cluster metrics + group.Go(func() error { + dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord) + if err != nil { + return err + } + + for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes { + dm := dataNodeMetric.QuotaMetrics + if dm != nil { + if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" { + minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt) + delay := now.Sub(minTt) + + if delay.Milliseconds() >= maxDelay.Milliseconds() { + dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay) + } + } + } + } + return nil + }) + + err := group.Wait() + if err != nil { + return err + } + + var maxLagChannel string + var maxLag time.Duration + findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) { + for _, param := range params { + param.Range(func(k string, v time.Duration) bool { + if v > maxLag { + maxLag = v + maxLagChannel = k + } + return true + }) + } + } + + var errStr string + findMaxLagChannel(queryNodeTTDelay) + if maxLag > 0 && len(maxLagChannel) != 0 { + errStr = fmt.Sprintf("query max timetick lag:%s on channel:%s", maxLag, maxLagChannel) + } + maxLagChannel = "" + maxLag = 0 + findMaxLagChannel(dataNodeTTDelay) + if maxLag > 0 && len(maxLagChannel) != 0 { + if errStr != "" { + errStr += ", " + } + errStr += fmt.Sprintf("data max timetick lag:%s on channel:%s", maxLag, maxLagChannel) + } + if errStr != "" { + return fmt.Errorf("max timetick lag execced threhold: %s", errStr) + } + + return nil +} diff --git a/internal/util/componentutil/componentutil.go b/internal/util/componentutil/componentutil.go index 93537d24451d8..d89c9db72bd67 100644 --- a/internal/util/componentutil/componentutil.go +++ b/internal/util/componentutil/componentutil.go @@ -84,3 +84,17 @@ func WaitForComponentHealthy[T interface { }](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error { return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep) } + +func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse { + if err != nil { + return CheckHealthRespWithErrMsg(err.Error()) + } + return CheckHealthRespWithErrMsg() +} + +func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse { + if len(errMsg) != 0 { + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg} + } + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}} +} diff --git a/internal/util/healthcheck/checker.go b/internal/util/healthcheck/checker.go deleted file mode 100644 index c1f06a8e105d5..0000000000000 --- a/internal/util/healthcheck/checker.go +++ /dev/null @@ -1,276 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package healthcheck - -import ( - "fmt" - "sync" - "time" - - "go.uber.org/zap" - - "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" - "github.com/milvus-io/milvus/internal/json" - "github.com/milvus-io/milvus/pkg/log" - "github.com/milvus-io/milvus/pkg/util/merr" -) - -// UnHealthyLevel represents the health level of a system. -type UnHealthyLevel int - -const ( - // Healthy means the system is operating normally. - Healthy UnHealthyLevel = iota - // Warning indicates minor issues that might escalate. - Warning - // Critical indicates major issues that need immediate attention. - Critical - // Fatal indicates system failure. - Fatal -) - -// String returns the string representation of the UnHealthyLevel. -func (u UnHealthyLevel) String() string { - switch u { - case Healthy: - return "Healthy" - case Warning: - return "Warning" - case Critical: - return "Critical" - case Fatal: - return "Fatal" - default: - return "Unknown" - } -} - -type Item int - -const ( - ChannelsWatched Item = iota - CheckpointLagExceed - CollectionQueryable - TimeTickLagExceed - NodeHealthCheck -) - -func getUnhealthyLevel(item Item) UnHealthyLevel { - switch item { - case ChannelsWatched: - return Fatal - case CheckpointLagExceed: - return Fatal - case TimeTickLagExceed: - return Fatal - case NodeHealthCheck: - return Fatal - case CollectionQueryable: - return Critical - default: - panic(fmt.Sprintf("unknown health check item: %d", int(item))) - } -} - -type Result struct { - UnhealthyClusterMsgs []*UnhealthyClusterMsg `json:"unhealthy_cluster_msgs"` - UnhealthyCollectionMsgs []*UnhealthyCollectionMsg `json:"unhealthy_collection_msgs"` -} - -func NewResult() *Result { - return &Result{} -} - -func (r *Result) AppendUnhealthyClusterMsg(unm *UnhealthyClusterMsg) { - r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, unm) -} - -func (r *Result) AppendUnhealthyCollectionMsgs(udm *UnhealthyCollectionMsg) { - r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, udm) -} - -func (r *Result) AppendResult(other *Result) { - if other == nil { - return - } - r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, other.UnhealthyClusterMsgs...) - r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, other.UnhealthyCollectionMsgs...) -} - -func (r *Result) IsEmpty() bool { - return len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 -} - -func (r *Result) IsHealthy() bool { - if len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 { - return true - } - - for _, unm := range r.UnhealthyClusterMsgs { - if unm.Reason.UnhealthyLevel == Fatal { - return false - } - } - - for _, ucm := range r.UnhealthyCollectionMsgs { - if ucm.Reason.UnhealthyLevel == Fatal { - return false - } - } - - return true -} - -type UnhealthyReason struct { - UnhealthyMsg string `json:"unhealthy_msg"` - UnhealthyLevel UnHealthyLevel `json:"unhealthy_level"` -} - -type UnhealthyClusterMsg struct { - Role string `json:"role"` - NodeID int64 `json:"node_id"` - Reason *UnhealthyReason `json:"reason"` -} - -func NewUnhealthyClusterMsg(role string, nodeID int64, unhealthyMsg string, item Item) *UnhealthyClusterMsg { - return &UnhealthyClusterMsg{ - Role: role, - NodeID: nodeID, - Reason: &UnhealthyReason{ - UnhealthyMsg: unhealthyMsg, - UnhealthyLevel: getUnhealthyLevel(item), - }, - } -} - -type UnhealthyCollectionMsg struct { - DatabaseID int64 `json:"database_id"` - CollectionID int64 `json:"collection_id"` - Reason *UnhealthyReason `json:"reason"` -} - -func NewUnhealthyCollectionMsg(collectionID int64, unhealthyMsg string, item Item) *UnhealthyCollectionMsg { - return &UnhealthyCollectionMsg{ - CollectionID: collectionID, - Reason: &UnhealthyReason{ - UnhealthyMsg: unhealthyMsg, - UnhealthyLevel: getUnhealthyLevel(item), - }, - } -} - -type Checker struct { - sync.RWMutex - interval time.Duration - done chan struct{} - checkFn func() *Result - latestResult *Result - once sync.Once -} - -func NewChecker(interval time.Duration, checkFn func() *Result) *Checker { - checker := &Checker{ - interval: interval, - checkFn: checkFn, - latestResult: NewResult(), - done: make(chan struct{}, 1), - once: sync.Once{}, - } - return checker -} - -func (hc *Checker) Start() { - go func() { - ticker := time.NewTicker(hc.interval) - defer ticker.Stop() - log.Info("start health checker") - for { - select { - case <-ticker.C: - hc.Lock() - hc.latestResult = hc.checkFn() - hc.Unlock() - case <-hc.done: - log.Info("stop health checker") - return - } - } - }() -} - -func (hc *Checker) GetLatestCheckResult() *Result { - hc.RLock() - defer hc.RUnlock() - return hc.latestResult -} - -func (hc *Checker) Close() { - hc.once.Do(func() { - close(hc.done) - }) -} - -func GetHealthCheckResultFromResp(resp *milvuspb.CheckHealthResponse) *Result { - var r Result - if len(resp.Reasons) == 0 { - return &r - } - if len(resp.Reasons) > 1 { - log.Error("invalid check result", zap.Any("reasons", resp.Reasons)) - return &r - } - - err := json.Unmarshal([]byte(resp.Reasons[0]), &r) - if err != nil { - log.Error("unmarshal check result error", zap.String("error", err.Error())) - } - return &r -} - -func GetCheckHealthResponseFromClusterMsg(msg ...*UnhealthyClusterMsg) *milvuspb.CheckHealthResponse { - r := &Result{UnhealthyClusterMsgs: msg} - reasons, err := json.Marshal(r) - if err != nil { - log.Error("marshal check result error", zap.String("error", err.Error())) - } - return &milvuspb.CheckHealthResponse{ - Status: merr.Success(), - IsHealthy: r.IsHealthy(), - Reasons: []string{string(reasons)}, - } -} - -func GetCheckHealthResponseFromResult(checkResult *Result) *milvuspb.CheckHealthResponse { - if checkResult.IsEmpty() { - return OK() - } - - reason, err := json.Marshal(checkResult) - if err != nil { - log.Error("marshal check result error", zap.String("error", err.Error())) - } - - return &milvuspb.CheckHealthResponse{ - Status: merr.Success(), - IsHealthy: checkResult.IsHealthy(), - Reasons: []string{string(reason)}, - } -} - -func OK() *milvuspb.CheckHealthResponse { - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}} -} diff --git a/internal/util/healthcheck/checker_test.go b/internal/util/healthcheck/checker_test.go deleted file mode 100644 index 7fdcb8cd6e8d1..0000000000000 --- a/internal/util/healthcheck/checker_test.go +++ /dev/null @@ -1,60 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package healthcheck - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/util/merr" -) - -func TestChecker(t *testing.T) { - expected1 := NewResult() - expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched)) - expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched)) - - expected1.AppendUnhealthyCollectionMsgs(&UnhealthyCollectionMsg{ - CollectionID: 1, - Reason: &UnhealthyReason{ - UnhealthyMsg: "msg2", - UnhealthyLevel: Critical, - }, - }) - - checkFn := func() *Result { - return expected1 - } - checker := NewChecker(100*time.Millisecond, checkFn) - go checker.Start() - - time.Sleep(150 * time.Millisecond) - actual1 := checker.GetLatestCheckResult() - assert.Equal(t, expected1, actual1) - assert.False(t, actual1.IsHealthy()) - - chr := GetCheckHealthResponseFromResult(actual1) - assert.Equal(t, merr.Success(), chr.Status) - assert.Equal(t, actual1.IsHealthy(), chr.IsHealthy) - assert.Equal(t, 1, len(chr.Reasons)) - - actualResult := GetHealthCheckResultFromResp(chr) - assert.Equal(t, actual1, actualResult) - checker.Close() -} diff --git a/internal/util/mock/grpc_datanode_client.go b/internal/util/mock/grpc_datanode_client.go index 621a2317e61e8..13ae355738d80 100644 --- a/internal/util/mock/grpc_datanode_client.go +++ b/internal/util/mock/grpc_datanode_client.go @@ -112,7 +112,3 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } - -func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - return &milvuspb.CheckHealthResponse{}, m.Err -} diff --git a/internal/util/mock/grpc_querynode_client.go b/internal/util/mock/grpc_querynode_client.go index 5db4bee2a4984..dadfb3157897d 100644 --- a/internal/util/mock/grpc_querynode_client.go +++ b/internal/util/mock/grpc_querynode_client.go @@ -134,10 +134,6 @@ func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.Delet return &querypb.DeleteBatchResponse{}, m.Err } -func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - return &milvuspb.CheckHealthResponse{}, m.Err -} - func (m *GrpcQueryNodeClient) Close() error { return m.Err } diff --git a/internal/util/wrappers/qn_wrapper.go b/internal/util/wrappers/qn_wrapper.go index c186fdf1596dd..def2e64f01bef 100644 --- a/internal/util/wrappers/qn_wrapper.go +++ b/internal/util/wrappers/qn_wrapper.go @@ -152,10 +152,6 @@ func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBa return qn.QueryNode.DeleteBatch(ctx, in) } -func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { - return qn.QueryNode.CheckHealth(ctx, req) -} - func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient { return &qnServerWrapper{ QueryNode: qn, diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 8f627e8e0241e..81208393a40ac 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -24,6 +24,7 @@ import ( "fmt" "net" "reflect" + "regexp" "strconv" "strings" "time" @@ -290,18 +291,13 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri } func GetCollectionIDFromVChannel(vChannelName string) int64 { - end := strings.LastIndexByte(vChannelName, 'v') - if end <= 0 { - return -1 - } - start := strings.LastIndexByte(vChannelName, '_') - if start <= 0 { - return -1 - } - - collectionIDStr := vChannelName[start+1 : end] - if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil { - return collectionID + re := regexp.MustCompile(`.*_(\d+)v\d+`) + matches := re.FindStringSubmatch(vChannelName) + if len(matches) > 1 { + number, err := strconv.ParseInt(matches[1], 0, 64) + if err == nil { + return number + } } return -1 } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index dc72e80640e06..aef0bed7a344b 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -299,13 +299,6 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error { return CheckHealthy(stateCode) } -func AnalyzeComponentStateResp(role string, nodeID int64, resp *milvuspb.ComponentStates, err error) error { - if err != nil { - return errors.Wrap(err, "service is unhealthy") - } - return AnalyzeState(role, nodeID, resp) -} - func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error { if err := Error(state.GetStatus()); err != nil { return errors.Wrapf(err, "%s=%d not healthy", role, nodeID) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index f663cea499c23..3f8723482311c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -288,9 +288,6 @@ type commonConfig struct { // Local RPC enabled for milvus internal communication when mix or standalone mode. LocalRPCEnabled ParamItem `refreshable:"false"` - HealthCheckInterval ParamItem `refreshable:"true"` - HealthCheckRPCTimeout ParamItem `refreshable:"true"` - SyncTaskPoolReleaseTimeoutSeconds ParamItem `refreshable:"true"` } @@ -952,22 +949,6 @@ This helps Milvus-CDC synchronize incremental data`, } p.LocalRPCEnabled.Init(base.mgr) - p.HealthCheckInterval = ParamItem{ - Key: "common.healthcheck.interval.seconds", - Version: "2.4.8", - DefaultValue: "30", - Doc: `health check interval in seconds, default 30s`, - } - p.HealthCheckInterval.Init(base.mgr) - - p.HealthCheckRPCTimeout = ParamItem{ - Key: "common.healthcheck.timeout.seconds", - Version: "2.4.8", - DefaultValue: "10", - Doc: `RPC timeout for health check request`, - } - p.HealthCheckRPCTimeout.Init(base.mgr) - p.SyncTaskPoolReleaseTimeoutSeconds = ParamItem{ Key: "common.sync.taskPoolReleaseTimeoutSeconds", DefaultValue: "60", @@ -2280,9 +2261,9 @@ If this parameter is set false, Milvus simply searches the growing segments with p.UpdateCollectionLoadStatusInterval = ParamItem{ Key: "queryCoord.updateCollectionLoadStatusInterval", Version: "2.4.7", - DefaultValue: "300", + DefaultValue: "5", PanicIfEmpty: true, - Doc: "300s, max interval of updating collection loaded status for check health", + Doc: "5m, max interval of updating collection loaded status for check health", Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index f33efe6fb33f4..2fbe4079ccfbb 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -131,11 +131,6 @@ func TestComponentParam(t *testing.T) { params.Save("common.gchelper.minimumGoGC", "80") assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt()) - params.Save("common.healthcheck.interval.seconds", "60") - assert.Equal(t, time.Second*60, Params.HealthCheckInterval.GetAsDuration(time.Second)) - params.Save("common.healthcheck.timeout.seconds", "5") - assert.Equal(t, 5, Params.HealthCheckRPCTimeout.GetAsInt()) - assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings())) @@ -335,8 +330,8 @@ func TestComponentParam(t *testing.T) { checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt() assert.Equal(t, 2000, checkHealthRPCTimeout) - updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second) - assert.Equal(t, time.Second*300, updateInterval) + updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) + assert.Equal(t, updateInterval, time.Minute*5) assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat()) params.Save("queryCoord.globalRowCountFactor", "0.4") diff --git a/pkg/util/ratelimitutil/utils.go b/pkg/util/ratelimitutil/utils.go index 1c70049ca88fd..ae65eb14c7429 100644 --- a/pkg/util/ratelimitutil/utils.go +++ b/pkg/util/ratelimitutil/utils.go @@ -16,13 +16,7 @@ package ratelimitutil -import ( - "fmt" - "time" - - "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" - "github.com/milvus-io/milvus/pkg/util/tsoutil" -) +import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" var QuotaErrorString = map[commonpb.ErrorCode]string{ commonpb.ErrorCode_ForceDeny: "access has been disabled by the administrator", @@ -34,14 +28,3 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{ func GetQuotaErrorString(errCode commonpb.ErrorCode) string { return QuotaErrorString[errCode] } - -func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error { - if channel != "" && maxDelay > 0 { - minTt, _ := tsoutil.ParseTS(minTT) - delay := time.Since(minTt) - if delay.Milliseconds() >= maxDelay.Milliseconds() { - return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel) - } - } - return nil -}