From d6f6ebf4f18be43f7bb39fe94be440aa164762e9 Mon Sep 17 00:00:00 2001 From: jaime Date: Tue, 3 Sep 2024 16:48:56 +0800 Subject: [PATCH] enhance: decrease cpu overhead during filter segments on datacoord (#33130) issue: #33129 Signed-off-by: jaime (cherry picked from commit 3d29907b6e728fe0699298812c0a6ffdb4d4ba37) Signed-off-by: jaime --- configs/milvus.yaml | 2 +- internal/datacoord/mock_test.go | 16 + internal/datacoord/server.go | 10 + internal/datacoord/server_test.go | 53 ++-- internal/datacoord/services.go | 26 +- .../datacoord/session/datanode_manager.go | 46 ++- .../session/mock_datanode_manager.go | 32 +- internal/datacoord/util.go | 18 +- internal/datanode/metrics_info.go | 2 +- internal/datanode/services.go | 20 ++ internal/datanode/services_test.go | 36 +++ .../distributed/datanode/client/client.go | 6 + internal/distributed/datanode/service.go | 4 + internal/distributed/datanode/service_test.go | 4 + .../distributed/querynode/client/client.go | 6 + internal/distributed/querynode/service.go | 4 + .../pipeline/flow_graph_manager.go | 19 +- .../flushcommon/pipeline/mock_fgmanager.go | 55 ++++ internal/flushcommon/util/rate_collector.go | 32 -- .../flushcommon/util/rate_collector_test.go | 42 --- internal/mocks/mock_datanode.go | 55 ++++ internal/mocks/mock_datanode_client.go | 70 +++++ internal/mocks/mock_querynode.go | 55 ++++ internal/mocks/mock_querynode_client.go | 70 +++++ internal/proto/data_coord.proto | 2 + internal/proto/query_coord.proto | 4 +- internal/querycoordv2/mocks/mock_querynode.go | 55 ++++ internal/querycoordv2/server.go | 10 +- internal/querycoordv2/services.go | 56 +++- internal/querycoordv2/services_test.go | 54 +++- internal/querycoordv2/session/cluster.go | 15 + internal/querycoordv2/session/mock_cluster.go | 55 ++++ internal/querycoordv2/utils/util.go | 22 +- internal/querynodev2/metrics_info.go | 18 +- internal/querynodev2/services.go | 21 ++ internal/querynodev2/services_test.go | 40 ++- internal/rootcoord/mock_test.go | 1 + internal/rootcoord/root_coord.go | 69 ++--- internal/rootcoord/root_coord_test.go | 137 +-------- internal/rootcoord/util.go | 97 ------ internal/util/componentutil/componentutil.go | 14 - internal/util/healthcheck/checker.go | 276 ++++++++++++++++++ internal/util/healthcheck/checker_test.go | 60 ++++ internal/util/mock/grpc_datanode_client.go | 4 + internal/util/mock/grpc_querynode_client.go | 4 + internal/util/wrappers/qn_wrapper.go | 4 + pkg/util/funcutil/func.go | 20 +- pkg/util/merr/utils.go | 7 + pkg/util/paramtable/component_param.go | 23 +- pkg/util/paramtable/component_param_test.go | 9 +- pkg/util/ratelimitutil/utils.go | 19 +- 51 files changed, 1297 insertions(+), 482 deletions(-) delete mode 100644 internal/flushcommon/util/rate_collector_test.go create mode 100644 internal/util/healthcheck/checker.go create mode 100644 internal/util/healthcheck/checker_test.go diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 69f4c4a4d9099..85582060e1638 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -382,7 +382,7 @@ queryCoord: channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode collectionObserverInterval: 200 # the interval of collection observer checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist - updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health + updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address port: 19531 # TCP port of queryCoord diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 2a1973d916497..963ec0ac73cf0 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -304,6 +304,22 @@ func (c *mockDataNodeClient) Stop() error { return nil } +func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + if c.state == commonpb.StateCode_Healthy { + return &milvuspb.CheckHealthResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + IsHealthy: true, + Reasons: []string{}, + }, nil + } else { + return &milvuspb.CheckHealthResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe}, + IsHealthy: false, + Reasons: []string{"fails"}, + }, nil + } +} + type mockRootCoordClient struct { state commonpb.StateCode cnt atomic.Int64 diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index b5c0ac15dc29f..f054da9e38fa3 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -52,6 +52,7 @@ import ( streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/kv" @@ -167,6 +168,8 @@ type Server struct { streamingCoord *streamingcoord.Server metricsRequest *metricsinfo.MetricsRequest + + healthChecker *healthcheck.Checker } type CollectionNameInfo struct { @@ -429,6 +432,8 @@ func (s *Server) initDataCoord() error { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return nil } @@ -773,6 +778,8 @@ func (s *Server) startServerLoop() { if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) { s.syncSegmentsScheduler.Start() } + + s.healthChecker.Start() } func (s *Server) startTaskScheduler() { @@ -1099,6 +1106,9 @@ func (s *Server) Stop() error { return nil } log.Info("datacoord server shutdown") + if s.healthChecker != nil { + s.healthChecker.Close() + } s.garbageCollector.close() log.Info("datacoord garbage collector stopped") diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index acfb03d8af543..4aad3616fd238 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -53,6 +53,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/workerpb" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -2509,12 +2510,12 @@ func Test_CheckHealth(t *testing.T) { return sm } - getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager { + getChannelManager := func(findWatcherOk bool) ChannelManager { channelManager := NewMockChannelManager(t) if findWatcherOk { - channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil) + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe() } else { - channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")) + channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe() } return channelManager } @@ -2527,6 +2528,21 @@ func Test_CheckHealth(t *testing.T) { 2: nil, } + newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server { + svr := &Server{ + ctx: context.TODO(), + sessionManager: getSessionManager(isHealthy), + channelManager: getChannelManager(findWatcherOk), + meta: meta, + session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}, + } + svr.stateCode.Store(commonpb.StateCode_Healthy) + svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn) + svr.healthChecker.Start() + time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker + return svr + } + t.Run("not healthy", func(t *testing.T) { ctx := context.Background() s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} @@ -2538,9 +2554,8 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("data node health check is fail", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(false) + svr := newServer(false, true, &meta{channelCPs: newChannelCps()}) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2549,11 +2564,8 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("check channel watched fail", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(true) - svr.channelManager = getChannelManager(t, false) - svr.meta = &meta{collections: collections} + svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()}) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2562,11 +2574,7 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("check checkpoint fail", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(true) - svr.channelManager = getChannelManager(t, true) - svr.meta = &meta{ + svr := newServer(true, true, &meta{ collections: collections, channelCPs: &channelCPs{ checkpoints: map[string]*msgpb.MsgPosition{ @@ -2576,8 +2584,8 @@ func Test_CheckHealth(t *testing.T) { }, }, }, - } - + }) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) @@ -2586,11 +2594,7 @@ func Test_CheckHealth(t *testing.T) { }) t.Run("ok", func(t *testing.T) { - svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}} - svr.stateCode.Store(commonpb.StateCode_Healthy) - svr.sessionManager = getSessionManager(true) - svr.channelManager = getChannelManager(t, true) - svr.meta = &meta{ + svr := newServer(true, true, &meta{ collections: collections, channelCPs: &channelCPs{ checkpoints: map[string]*msgpb.MsgPosition{ @@ -2608,7 +2612,8 @@ func Test_CheckHealth(t *testing.T) { }, }, }, - } + }) + defer svr.healthChecker.Close() ctx := context.Background() resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8b8d042b2a65f..02de4e20ea8fa 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -35,7 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" - "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -1581,20 +1581,24 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque }, nil } - err := s.sessionManager.CheckHealth(ctx) - if err != nil { - return componentutil.CheckHealthRespWithErr(err), nil - } + latestCheckResult := s.healthChecker.GetLatestCheckResult() + return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil +} - if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil { - return componentutil.CheckHealthRespWithErr(err), nil - } +func (s *Server) healthCheckFn() *healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() - if err = CheckCheckPointsHealth(s.meta); err != nil { - return componentutil.CheckHealthRespWithErr(err), nil + checkResults := s.sessionManager.CheckDNHealth(ctx) + for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched)) } - return componentutil.CheckHealthRespWithErr(nil), nil + for collectionID, failReason := range CheckCheckPointsHealth(s.meta) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed)) + } + return checkResults } func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { diff --git a/internal/datacoord/session/datanode_manager.go b/internal/datacoord/session/datanode_manager.go index e65f2cb95931c..4bca2ee621efb 100644 --- a/internal/datacoord/session/datanode_manager.go +++ b/internal/datacoord/session/datanode_manager.go @@ -19,6 +19,7 @@ package session import ( "context" "fmt" + "sync" "time" "github.com/cockroachdb/errors" @@ -31,6 +32,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -69,7 +71,7 @@ type DataNodeManager interface { QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) DropImport(nodeID int64, in *datapb.DropImportRequest) error - CheckHealth(ctx context.Context) error + CheckDNHealth(ctx context.Context) *healthcheck.Result QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error Close() @@ -507,28 +509,44 @@ func (c *DataNodeManagerImpl) DropImport(nodeID int64, in *datapb.DropImportRequ return merr.CheckRPCCall(status, err) } -func (c *DataNodeManagerImpl) CheckHealth(ctx context.Context) error { - group, ctx := errgroup.WithContext(ctx) - +func (c *DataNodeManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result { + result := healthcheck.NewResult() + wg := sync.WaitGroup{} + wlock := sync.Mutex{} ids := c.GetSessionIDs() + for _, nodeID := range ids { nodeID := nodeID - group.Go(func() error { - cli, err := c.getClient(ctx, nodeID) + wg.Add(1) + go func() { + defer wg.Done() + + datanodeClient, err := c.getClient(ctx, nodeID) if err != nil { - return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err) + err = fmt.Errorf("failed to get node:%d: %v", nodeID, err) + return } - sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - return err + checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) { + err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err) + wlock.Lock() + result.AppendUnhealthyClusterMsg( + healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck)) + wlock.Unlock() + return } - err = merr.AnalyzeState("DataNode", nodeID, sta) - return err - }) + + if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 { + wlock.Lock() + result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp)) + wlock.Unlock() + } + }() } - return group.Wait() + wg.Wait() + return result } func (c *DataNodeManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) { diff --git a/internal/datacoord/session/mock_datanode_manager.go b/internal/datacoord/session/mock_datanode_manager.go index 9bd42f2847052..c3f75a8558db9 100644 --- a/internal/datacoord/session/mock_datanode_manager.go +++ b/internal/datacoord/session/mock_datanode_manager.go @@ -6,6 +6,8 @@ import ( context "context" datapb "github.com/milvus-io/milvus/internal/proto/datapb" + healthcheck "github.com/milvus-io/milvus/internal/util/healthcheck" + mock "github.com/stretchr/testify/mock" typeutil "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -117,48 +119,50 @@ func (_c *MockDataNodeManager_CheckChannelOperationProgress_Call) RunAndReturn(r return _c } -// CheckHealth provides a mock function with given fields: ctx -func (_m *MockDataNodeManager) CheckHealth(ctx context.Context) error { +// CheckDNHealth provides a mock function with given fields: ctx +func (_m *MockDataNodeManager) CheckDNHealth(ctx context.Context) *healthcheck.Result { ret := _m.Called(ctx) if len(ret) == 0 { - panic("no return value specified for CheckHealth") + panic("no return value specified for CheckDNHealth") } - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { + var r0 *healthcheck.Result + if rf, ok := ret.Get(0).(func(context.Context) *healthcheck.Result); ok { r0 = rf(ctx) } else { - r0 = ret.Error(0) + if ret.Get(0) != nil { + r0 = ret.Get(0).(*healthcheck.Result) + } } return r0 } -// MockDataNodeManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockDataNodeManager_CheckHealth_Call struct { +// MockDataNodeManager_CheckDNHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckDNHealth' +type MockDataNodeManager_CheckDNHealth_Call struct { *mock.Call } -// CheckHealth is a helper method to define mock.On call +// CheckDNHealth is a helper method to define mock.On call // - ctx context.Context -func (_e *MockDataNodeManager_Expecter) CheckHealth(ctx interface{}) *MockDataNodeManager_CheckHealth_Call { - return &MockDataNodeManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)} +func (_e *MockDataNodeManager_Expecter) CheckDNHealth(ctx interface{}) *MockDataNodeManager_CheckDNHealth_Call { + return &MockDataNodeManager_CheckDNHealth_Call{Call: _e.mock.On("CheckDNHealth", ctx)} } -func (_c *MockDataNodeManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckHealth_Call { +func (_c *MockDataNodeManager_CheckDNHealth_Call) Run(run func(ctx context.Context)) *MockDataNodeManager_CheckDNHealth_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context)) }) return _c } -func (_c *MockDataNodeManager_CheckHealth_Call) Return(_a0 error) *MockDataNodeManager_CheckHealth_Call { +func (_c *MockDataNodeManager_CheckDNHealth_Call) Return(_a0 *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call { _c.Call.Return(_a0) return _c } -func (_c *MockDataNodeManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockDataNodeManager_CheckHealth_Call { +func (_c *MockDataNodeManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) *healthcheck.Result) *MockDataNodeManager_CheckDNHealth_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 19b4fa2f68949..7ce2f8a6ea6e7 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -285,7 +285,8 @@ func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 { return binlogIDs } -func CheckCheckPointsHealth(meta *meta) error { +func CheckCheckPointsHealth(meta *meta) map[int64]string { + checkResult := make(map[int64]string) for channel, cp := range meta.GetChannelCheckpoints() { collectionID := funcutil.GetCollectionIDFromVChannel(channel) if collectionID == -1 { @@ -299,31 +300,30 @@ func CheckCheckPointsHealth(meta *meta) error { ts, _ := tsoutil.ParseTS(cp.Timestamp) lag := time.Since(ts) if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) { - return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes())) + checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel) } } - return nil + return checkResult } -func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error { +func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string { collIDs := meta.ListCollections() + checkResult := make(map[int64]string) for _, collID := range collIDs { collInfo := meta.GetCollection(collID) if collInfo == nil { - log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID)) + log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID)) continue } for _, channelName := range collInfo.VChannelNames { _, err := channelManager.FindWatcher(channelName) if err != nil { - log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID), - zap.String("channelName", channelName), zap.Error(err)) - return err + checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName) } } } - return nil + return checkResult } func createStorageConfig() *indexpb.StorageConfig { diff --git a/internal/datanode/metrics_info.go b/internal/datanode/metrics_info.go index 4872b6f6c5eb4..9072d6d14f515 100644 --- a/internal/datanode/metrics_info.go +++ b/internal/datanode/metrics_info.go @@ -52,7 +52,7 @@ func (node *DataNode) getQuotaMetrics() (*metricsinfo.DataNodeQuotaMetrics, erro return nil, err } - minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt() + minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph() return &metricsinfo.DataNodeQuotaMetrics{ Hms: metricsinfo.HardwareMetrics{}, Rms: rms, diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 825f63a14c1ad..be6c6a2042ce1 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -22,6 +22,7 @@ package datanode import ( "context" "fmt" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -36,6 +37,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -45,6 +47,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -573,3 +576,20 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID())) return merr.Success(), nil } + +func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minFGChannel, minFGTt := node.flowgraphManager.GetMinTTFlowGraph() + if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil { + msg := healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed) + return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil + } + return healthcheck.OK(), nil +} diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 616354a16482d..5cc4d87b72518 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -110,6 +110,7 @@ func (s *DataNodeServicesSuite) SetupTest() { s.Require().NoError(err) s.node.chunkManager = storage.NewLocalChunkManager(storage.RootPath("/tmp/milvus_test/datanode")) + s.node.flowgraphManager = pipeline.NewFlowgraphManager() paramtable.SetNodeID(1) } @@ -1161,6 +1162,41 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { }) } +func (s *DataNodeServicesSuite) TestCheckHealth() { + s.Run("node not healthy", func() { + s.SetupTest() + s.node.UpdateStateCode(commonpb.StateCode_Abnormal) + ctx := context.Background() + resp, err := s.node.CheckHealth(ctx, nil) + s.NoError(err) + s.False(merr.Ok(resp.GetStatus())) + s.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) + }) + + s.Run("exceeded timetick lag on pipeline", func() { + s.SetupTest() + fgm := pipeline.NewMockFlowgraphManager(s.T()) + fgm.EXPECT().GetMinTTFlowGraph().Return("timetick-lag-ch", uint64(3600)).Once() + s.node.flowgraphManager = fgm + ctx := context.Background() + resp, err := s.node.CheckHealth(ctx, nil) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.False(resp.GetIsHealthy()) + s.NotEmpty(resp.Reasons) + }) + + s.Run("ok", func() { + s.SetupTest() + ctx := context.Background() + resp, err := s.node.CheckHealth(ctx, nil) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.True(resp.GetIsHealthy()) + s.Empty(resp.Reasons) + }) +} + func (s *DataNodeServicesSuite) TestDropCompactionPlan() { s.Run("node not healthy", func() { s.SetupTest() diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index aeacb69e7c715..510b218aecac9 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -281,3 +281,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact return client.DropCompactionPlan(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index f26fb1d63ca47..2443e5a95ae3c 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -410,3 +410,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (* func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { return s.datanode.DropCompactionPlan(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.datanode.CheckHealth(ctx, req) +} diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 8e0bff6acfcca..2aede0b83be99 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -185,6 +185,10 @@ func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropC return m.status, m.err } +func (m *MockDataNode) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.err +} + // ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func Test_NewServer(t *testing.T) { paramtable.Init() diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index a7e933a5eaca2..9e9d06c1dc190 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -360,3 +360,9 @@ func (c *Client) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchReques return client.DeleteBatch(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 5ae035f0a1bf9..7871598fd462a 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -394,3 +394,7 @@ func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commo func (s *Server) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { return s.querynode.DeleteBatch(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.querynode.CheckHealth(ctx, req) +} diff --git a/internal/flushcommon/pipeline/flow_graph_manager.go b/internal/flushcommon/pipeline/flow_graph_manager.go index 8527e65ee91c6..994978ff4581b 100644 --- a/internal/flushcommon/pipeline/flow_graph_manager.go +++ b/internal/flushcommon/pipeline/flow_graph_manager.go @@ -22,7 +22,6 @@ import ( "go.uber.org/zap" - "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/json" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -43,6 +42,7 @@ type FlowgraphManager interface { GetFlowgraphCount() int GetCollectionIDs() []int64 + GetMinTTFlowGraph() (string, typeutil.Timestamp) GetChannelsJSON() string GetSegmentsJSON() string Close() @@ -76,7 +76,6 @@ func (fm *fgManagerImpl) RemoveFlowgraph(channel string) { fm.flowgraphs.Remove(channel) metrics.DataNodeNumFlowGraphs.WithLabelValues(fmt.Sprint(paramtable.GetNodeID())).Dec() - util.GetRateCollector().RemoveFlowGraphChannel(channel) } } @@ -120,6 +119,22 @@ func (fm *fgManagerImpl) GetCollectionIDs() []int64 { return collectionSet.Collect() } +// GetMinTTFlowGraph returns the vchannel and minimal time tick of flow graphs. +func (fm *fgManagerImpl) GetMinTTFlowGraph() (string, typeutil.Timestamp) { + minTt := typeutil.MaxTimestamp + var channel string + fm.flowgraphs.Range(func(ch string, ds *DataSyncService) bool { + latestTimeTick := ds.timetickSender.GetLatestTimestamp(ch) + if minTt > latestTimeTick { + minTt = latestTimeTick + channel = ch + } + return true + }) + + return channel, minTt +} + // GetChannelsJSON returns all channels in json format. func (fm *fgManagerImpl) GetChannelsJSON() string { var channels []*metricsinfo.Channel diff --git a/internal/flushcommon/pipeline/mock_fgmanager.go b/internal/flushcommon/pipeline/mock_fgmanager.go index cf8cd6b2aa1ca..a72faac2e6463 100644 --- a/internal/flushcommon/pipeline/mock_fgmanager.go +++ b/internal/flushcommon/pipeline/mock_fgmanager.go @@ -309,6 +309,61 @@ func (_c *MockFlowgraphManager_GetFlowgraphService_Call) RunAndReturn(run func(s return _c } +// GetMinTTFlowGraph provides a mock function with given fields: +func (_m *MockFlowgraphManager) GetMinTTFlowGraph() (string, uint64) { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetMinTTFlowGraph") + } + + var r0 string + var r1 uint64 + if rf, ok := ret.Get(0).(func() (string, uint64)); ok { + return rf() + } + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func() uint64); ok { + r1 = rf() + } else { + r1 = ret.Get(1).(uint64) + } + + return r0, r1 +} + +// MockFlowgraphManager_GetMinTTFlowGraph_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetMinTTFlowGraph' +type MockFlowgraphManager_GetMinTTFlowGraph_Call struct { + *mock.Call +} + +// GetMinTTFlowGraph is a helper method to define mock.On call +func (_e *MockFlowgraphManager_Expecter) GetMinTTFlowGraph() *MockFlowgraphManager_GetMinTTFlowGraph_Call { + return &MockFlowgraphManager_GetMinTTFlowGraph_Call{Call: _e.mock.On("GetMinTTFlowGraph")} +} + +func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Run(run func()) *MockFlowgraphManager_GetMinTTFlowGraph_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) Return(_a0 string, _a1 uint64) *MockFlowgraphManager_GetMinTTFlowGraph_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockFlowgraphManager_GetMinTTFlowGraph_Call) RunAndReturn(run func() (string, uint64)) *MockFlowgraphManager_GetMinTTFlowGraph_Call { + _c.Call.Return(run) + return _c +} + // GetSegmentsJSON provides a mock function with given fields: func (_m *MockFlowgraphManager) GetSegmentsJSON() string { ret := _m.Called() diff --git a/internal/flushcommon/util/rate_collector.go b/internal/flushcommon/util/rate_collector.go index 4736eb2209ee3..f20d1e562e8c4 100644 --- a/internal/flushcommon/util/rate_collector.go +++ b/internal/flushcommon/util/rate_collector.go @@ -24,7 +24,6 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/ratelimitutil" - "github.com/milvus-io/milvus/pkg/util/typeutil" ) // rateCol is global RateCollector in DataNode. @@ -38,7 +37,6 @@ type RateCollector struct { *ratelimitutil.RateCollector flowGraphTtMu sync.Mutex - flowGraphTt map[string]typeutil.Timestamp } func initGlobalRateCollector() { @@ -75,35 +73,5 @@ func newRateCollector() (*RateCollector, error) { } return &RateCollector{ RateCollector: rc, - flowGraphTt: make(map[string]typeutil.Timestamp), }, nil } - -// UpdateFlowGraphTt updates RateCollector's flow graph time tick. -func (r *RateCollector) UpdateFlowGraphTt(channel string, t typeutil.Timestamp) { - r.flowGraphTtMu.Lock() - defer r.flowGraphTtMu.Unlock() - r.flowGraphTt[channel] = t -} - -// RemoveFlowGraphChannel removes channel from flowGraphTt. -func (r *RateCollector) RemoveFlowGraphChannel(channel string) { - r.flowGraphTtMu.Lock() - defer r.flowGraphTtMu.Unlock() - delete(r.flowGraphTt, channel) -} - -// GetMinFlowGraphTt returns the vchannel and minimal time tick of flow graphs. -func (r *RateCollector) GetMinFlowGraphTt() (string, typeutil.Timestamp) { - r.flowGraphTtMu.Lock() - defer r.flowGraphTtMu.Unlock() - minTt := typeutil.MaxTimestamp - var channel string - for c, t := range r.flowGraphTt { - if minTt > t { - minTt = t - channel = c - } - } - return channel, minTt -} diff --git a/internal/flushcommon/util/rate_collector_test.go b/internal/flushcommon/util/rate_collector_test.go deleted file mode 100644 index f672b5869e860..0000000000000 --- a/internal/flushcommon/util/rate_collector_test.go +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the LF AI & Data foundation under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/milvus-io/milvus/pkg/util/typeutil" -) - -func TestRateCollector(t *testing.T) { - t.Run("test FlowGraphTt", func(t *testing.T) { - collector, err := newRateCollector() - assert.NoError(t, err) - - c, minTt := collector.GetMinFlowGraphTt() - assert.Equal(t, "", c) - assert.Equal(t, typeutil.MaxTimestamp, minTt) - collector.UpdateFlowGraphTt("channel1", 100) - collector.UpdateFlowGraphTt("channel2", 200) - collector.UpdateFlowGraphTt("channel3", 50) - c, minTt = collector.GetMinFlowGraphTt() - assert.Equal(t, "channel3", c) - assert.Equal(t, typeutil.Timestamp(50), minTt) - }) -} diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index f03f66c8afb07..f77ed0d12e55c 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -91,6 +91,61 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func return _c } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call { + return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // CompactionV2 provides a mock function with given fields: _a0, _a1 func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index fc493844fba44..e41d56e241cbf 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -105,6 +105,76 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru return _c } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call { + return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockDataNodeClient) Close() error { ret := _m.Called() diff --git a/internal/mocks/mock_querynode.go b/internal/mocks/mock_querynode.go index a8c32577fa5d6..d52fe85b644b1 100644 --- a/internal/mocks/mock_querynode.go +++ b/internal/mocks/mock_querynode.go @@ -30,6 +30,61 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter { return &MockQueryNode_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call { + return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_querynode_client.go b/internal/mocks/mock_querynode_client.go index 3b3d465610bec..3bc16e66436f9 100644 --- a/internal/mocks/mock_querynode_client.go +++ b/internal/mocks/mock_querynode_client.go @@ -31,6 +31,76 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter { return &MockQueryNodeClient_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call { + return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockQueryNodeClient) Close() error { ret := _m.Called() diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index ecab1cdcfb07c..5a5a069ca3319 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -137,6 +137,8 @@ service DataNode { rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {} rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {} + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } message FlushRequest { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index 8a5d688c893b0..771655c622344 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -175,7 +175,9 @@ service QueryNode { // DeleteBatch is the API to apply same delete data into multiple segments. // it's basically same as `Delete` but cost less memory pressure. rpc DeleteBatch(DeleteBatchRequest) returns (DeleteBatchResponse) { - } + } + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } // --------------------QueryCoord grpc request and response proto------------------ diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index c161c66309ab4..9a2c228fdc444 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -29,6 +29,61 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter { return &MockQueryNodeServer_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeServer_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call { + return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 8ae802cf7848f..e29ca6ee4b5bf 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -55,6 +55,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -138,6 +139,8 @@ type Server struct { proxyClientManager proxyutil.ProxyClientManagerInterface metricsRequest *metricsinfo.MetricsRequest + + healthChecker *healthcheck.Checker } func NewQueryCoord(ctx context.Context) (*Server, error) { @@ -424,6 +427,8 @@ func (s *Server) initQueryCoord() error { // Init load status cache meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init querycoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return err } @@ -567,6 +572,7 @@ func (s *Server) startQueryCoord() error { s.startServerLoop() s.afterStart() + s.healthChecker.Start() s.UpdateStateCode(commonpb.StateCode_Healthy) sessionutil.SaveServerInfo(typeutil.QueryCoordRole, s.session.GetServerID()) return nil @@ -605,7 +611,9 @@ func (s *Server) Stop() error { // FOLLOW the dependence graph: // job scheduler -> checker controller -> task scheduler -> dist controller -> cluster -> session // observers -> dist controller - + if s.healthChecker != nil { + s.healthChecker.Close() + } if s.jobScheduler != nil { log.Info("stop job scheduler...") s.jobScheduler.Stop() diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index b3ba5148ed92b..ca74a0fd5dabb 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" @@ -35,7 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/job" "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/utils" - "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -915,16 +916,20 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil } - errReasons, err := s.checkNodeHealth(ctx) - if err != nil || len(errReasons) != 0 { - return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil - } + latestCheckResult := s.healthChecker.GetLatestCheckResult() + return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil +} - if err := utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { - log.Ctx(ctx).Warn("some collection is not queryable during health check", zap.Error(err)) - } +func (s *Server) healthCheckFn() *healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() - return componentutil.CheckHealthRespWithErr(nil), nil + checkResults := s.broadcastCheckHealth(ctx) + for collectionID, failReason := range utils.CheckCollectionsQueryable(ctx, s.meta, s.targetMgr, s.dist, s.nodeMgr) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CollectionQueryable)) + } + return checkResults } func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { @@ -955,6 +960,39 @@ func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { return errReasons, err } +func (s *Server) broadcastCheckHealth(ctx context.Context) *healthcheck.Result { + result := healthcheck.NewResult() + wg := sync.WaitGroup{} + wlock := sync.Mutex{} + + for _, node := range s.nodeMgr.GetAll() { + node := node + wg.Add(1) + go func() { + defer wg.Done() + + checkHealthResp, err := s.cluster.CheckHealth(ctx, node.ID()) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) { + err = fmt.Errorf("CheckHealth fails for querynode:%d, %w", node.ID(), err) + wlock.Lock() + result.AppendUnhealthyClusterMsg( + healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.ID(), err.Error(), healthcheck.NodeHealthCheck)) + wlock.Unlock() + return + } + + if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 { + wlock.Lock() + result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp)) + wlock.Unlock() + } + }() + } + + wg.Wait() + return result +} + func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { log := log.Ctx(ctx).With( zap.String("rgName", req.GetResourceGroup()), diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 1381bc2a23b51..d82e8a8269acc 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -47,6 +47,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/kv" @@ -170,6 +171,13 @@ func (suite *ServiceSuite) SetupTest() { })) suite.meta.ResourceManager.HandleNodeUp(context.TODO(), node) } + suite.cluster = session.NewMockCluster(suite.T()) + suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() + suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(&milvuspb.CheckHealthResponse{ + Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, + IsHealthy: true, + Reasons: []string{}, + }, nil).Maybe() suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() @@ -1627,6 +1635,9 @@ func (suite *ServiceSuite) TestCheckHealth() { suite.loadAll() ctx := context.Background() server := suite.server + server.healthChecker = healthcheck.NewChecker(50*time.Millisecond, suite.server.healthCheckFn) + server.healthChecker.Start() + defer server.healthChecker.Close() assertCheckHealthResult := func(isHealthy bool) { resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -1639,28 +1650,38 @@ func (suite *ServiceSuite) TestCheckHealth() { } } - setNodeSate := func(state commonpb.StateCode) { - // Test for components state fail - suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Unset() - suite.cluster.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Return( - &milvuspb.ComponentStates{ - State: &milvuspb.ComponentInfo{StateCode: state}, - Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}, - }, - nil).Maybe() + setNodeSate := func(isHealthy bool, isRPCFail bool) { + var resp *milvuspb.CheckHealthResponse + if isHealthy { + resp = healthcheck.OK() + } else { + resp = healthcheck.GetCheckHealthResponseFromClusterMsg(healthcheck.NewUnhealthyClusterMsg("dn", 1, "check fails", healthcheck.NodeHealthCheck)) + } + resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success} + if isRPCFail { + resp.Status = &commonpb.Status{ErrorCode: commonpb.ErrorCode_ForceDeny} + } + suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Unset() + suite.cluster.EXPECT().CheckHealth(mock.Anything, mock.Anything).Return(resp, nil).Maybe() + time.Sleep(1 * time.Second) } // Test for server is not healthy server.UpdateStateCode(commonpb.StateCode_Initializing) assertCheckHealthResult(false) - // Test for components state fail - setNodeSate(commonpb.StateCode_Abnormal) + // Test for check health has some error reasons + setNodeSate(false, false) + server.UpdateStateCode(commonpb.StateCode_Healthy) + assertCheckHealthResult(false) + + // Test for check health rpc fail + setNodeSate(true, true) server.UpdateStateCode(commonpb.StateCode_Healthy) assertCheckHealthResult(false) // Test for check load percentage fail - setNodeSate(commonpb.StateCode_Healthy) + setNodeSate(true, false) assertCheckHealthResult(true) // Test for check channel ok @@ -1682,7 +1703,14 @@ func (suite *ServiceSuite) TestCheckHealth() { for _, node := range suite.nodes { suite.nodeMgr.Stopping(node) } - assertCheckHealthResult(true) + + paramtable.Get().Save(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.Key) + time.Sleep(1500 * time.Millisecond) + resp, err := server.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + suite.NoError(err) + suite.Equal(resp.IsHealthy, true) + suite.NotEmpty(resp.Reasons) } func (suite *ServiceSuite) TestGetShardLeaders() { diff --git a/internal/querycoordv2/session/cluster.go b/internal/querycoordv2/session/cluster.go index 7b6bc316ebe25..569dbb0029469 100644 --- a/internal/querycoordv2/session/cluster.go +++ b/internal/querycoordv2/session/cluster.go @@ -52,6 +52,7 @@ type Cluster interface { GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) + CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) Start() Stop() } @@ -272,6 +273,20 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types return nil } +func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + var ( + resp *milvuspb.CheckHealthResponse + err error + ) + err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) { + resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + }) + if err1 != nil { + return nil, err1 + } + return resp, err +} + type clients struct { sync.RWMutex clients map[int64]types.QueryNodeClient // nodeID -> client diff --git a/internal/querycoordv2/session/mock_cluster.go b/internal/querycoordv2/session/mock_cluster.go index bd7d3b3eae622..a61954d7f8644 100644 --- a/internal/querycoordv2/session/mock_cluster.go +++ b/internal/querycoordv2/session/mock_cluster.go @@ -27,6 +27,61 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter { return &MockCluster_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, nodeID +func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(ctx, nodeID) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, nodeID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, nodeID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, nodeID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockCluster_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call { + return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)} +} + +func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // GetComponentStates provides a mock function with given fields: ctx, nodeID func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) { ret := _m.Called(ctx, nodeID) diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 5e283b926ee60..6492db0b3507c 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -73,13 +73,13 @@ func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.Target for segmentID, info := range segmentDist { _, exist := leader.Segments[segmentID] if !exist { - log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) + log.RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } l0WithWrongLocation := info.GetLevel() == datapb.SegmentLevel_L0 && leader.Segments[segmentID].GetNodeID() != leader.ID if l0WithWrongLocation { - log.RatedInfo(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) + log.RatedWarn(10, "leader is not available due to lack of L0 segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } } @@ -113,8 +113,6 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me ) ([]*querypb.ShardLeadersList, error) { ret := make([]*querypb.ShardLeadersList, 0) for _, channel := range channels { - log := log.With(zap.String("channel", channel.GetChannelName())) - var channelErr error leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) if len(leaders) == 0 { @@ -132,7 +130,7 @@ func GetShardLeadersWithChannels(ctx context.Context, m *meta.Meta, targetMgr me if len(readableLeaders) == 0 { msg := fmt.Sprintf("channel %s is not available in any replica", channel.GetChannelName()) - log.Warn(msg, zap.Error(channelErr)) + log.RatedWarn(60, msg, zap.Error(channelErr)) err := merr.WrapErrChannelNotAvailable(channel.GetChannelName(), channelErr.Error()) return nil, err } @@ -185,8 +183,9 @@ func GetShardLeaders(ctx context.Context, m *meta.Meta, targetMgr meta.TargetMan } // CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection -func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { - maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) +func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) map[int64]string { + maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second) + checkResult := make(map[int64]string) for _, coll := range m.GetAllCollections(ctx) { err := checkCollectionQueryable(ctx, m, targetMgr, dist, nodeMgr, coll) // the collection is not queryable, if meet following conditions: @@ -194,15 +193,10 @@ func CheckCollectionsQueryable(ctx context.Context, m *meta.Meta, targetMgr meta // 2. Collection is not starting to release // 3. The load percentage has not been updated in the last 5 minutes. if err != nil && m.Exist(ctx, coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval { - log.Ctx(ctx).Warn("collection not querable", - zap.Int64("collectionID", coll.CollectionID), - zap.Time("lastUpdated", coll.UpdatedAt), - zap.Duration("maxInterval", maxInterval), - zap.Error(err)) - return err + checkResult[coll.CollectionID] = err.Error() } } - return nil + return checkResult } // checkCollectionQueryable check all channels are watched and all segments are loaded for this collection diff --git a/internal/querynodev2/metrics_info.go b/internal/querynodev2/metrics_info.go index 3cedfedb1df0b..968fafe59fcd8 100644 --- a/internal/querynodev2/metrics_info.go +++ b/internal/querynodev2/metrics_info.go @@ -54,13 +54,7 @@ func getRateMetric() ([]metricsinfo.RateMetric, error) { return rms, nil } -// getQuotaMetrics returns QueryNodeQuotaMetrics. -func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) { - rms, err := getRateMetric() - if err != nil { - return nil, err - } - +func getMinTSafe(node *QueryNode) (string, uint64) { minTsafeChannel := "" minTsafe := uint64(math.MaxUint64) node.delegators.Range(func(channel string, delegator delegator.ShardDelegator) bool { @@ -71,7 +65,17 @@ func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error } return true }) + return minTsafeChannel, minTsafe +} + +// getQuotaMetrics returns QueryNodeQuotaMetrics. +func getQuotaMetrics(node *QueryNode) (*metricsinfo.QueryNodeQuotaMetrics, error) { + rms, err := getRateMetric() + if err != nil { + return nil, err + } + minTsafeChannel, minTsafe := getMinTSafe(node) collections := node.manager.Collection.ListWithName() nodeID := fmt.Sprint(node.GetNodeID()) diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index 1eca7efe950cd..af46fd1aada91 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -42,6 +42,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/searchutil/scheduler" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" @@ -54,6 +55,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1384,6 +1386,25 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( return merr.Success(), nil } +func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := node.lifetime.Add(merr.IsHealthy); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + defer node.lifetime.Done() + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minTsafeChannel, minTsafe := getMinTSafe(node) + if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil { + msg := healthcheck.NewUnhealthyClusterMsg(typeutil.QueryNodeRole, node.GetNodeID(), err.Error(), healthcheck.TimeTickLagExceed) + return healthcheck.GetCheckHealthResponseFromClusterMsg(msg), nil + } + + return healthcheck.OK(), nil +} + // DeleteBatch is the API to apply same delete data into multiple segments. // it's basically same as `Delete` but cost less memory pressure. func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatchRequest) (*querypb.DeleteBatchResponse, error) { diff --git a/internal/querynodev2/services_test.go b/internal/querynodev2/services_test.go index 99edfa234b312..f3edb64965629 100644 --- a/internal/querynodev2/services_test.go +++ b/internal/querynodev2/services_test.go @@ -98,7 +98,7 @@ func (suite *ServiceSuite) SetupSuite() { paramtable.Init() paramtable.Get().Save(paramtable.Get().CommonCfg.GCEnabled.Key, "false") - suite.rootPath = suite.T().Name() + suite.rootPath = path.Join("/tmp/milvus/test", suite.T().Name()) suite.collectionID = 111 suite.collectionName = "test-collection" suite.partitionIDs = []int64{222} @@ -2222,6 +2222,44 @@ func (suite *ServiceSuite) TestLoadPartition() { suite.Equal(commonpb.ErrorCode_Success, status.ErrorCode) } +func (suite *ServiceSuite) TestCheckHealth() { + suite.Run("node not healthy", func() { + suite.node.UpdateStateCode(commonpb.StateCode_Abnormal) + + ctx := context.Background() + resp, err := suite.node.CheckHealth(ctx, nil) + suite.NoError(err) + suite.False(merr.Ok(resp.GetStatus())) + suite.ErrorIs(merr.Error(resp.GetStatus()), merr.ErrServiceNotReady) + }) + + suite.Run("exceeded timetick lag on pipeline", func() { + sd1 := delegator.NewMockShardDelegator(suite.T()) + sd1.EXPECT().GetTSafe().Return(100) + sd1.EXPECT().Close().Maybe() + suite.node.delegators.Insert("timetick-lag-ch", sd1) + defer suite.node.delegators.GetAndRemove("timetick-lag-ch") + + ctx := context.Background() + suite.node.UpdateStateCode(commonpb.StateCode_Healthy) + resp, err := suite.node.CheckHealth(ctx, nil) + suite.NoError(err) + suite.True(merr.Ok(resp.GetStatus())) + suite.False(resp.GetIsHealthy()) + suite.NotEmpty(resp.Reasons) + }) + + suite.Run("ok", func() { + ctx := context.Background() + suite.node.UpdateStateCode(commonpb.StateCode_Healthy) + resp, err := suite.node.CheckHealth(ctx, nil) + suite.NoError(err) + suite.True(merr.Ok(resp.GetStatus())) + suite.True(resp.GetIsHealthy()) + suite.Empty(resp.Reasons) + }) +} + func TestQueryNodeService(t *testing.T) { suite.Run(t, new(ServiceSuite)) } diff --git a/internal/rootcoord/mock_test.go b/internal/rootcoord/mock_test.go index 9dc973be8133a..0a1259d6d9d9b 100644 --- a/internal/rootcoord/mock_test.go +++ b/internal/rootcoord/mock_test.go @@ -405,6 +405,7 @@ func newMockProxy() *mockProxy { func newTestCore(opts ...Opt) *Core { c := &Core{ + ctx: context.TODO(), metricsRequest: metricsinfo.NewMetricsRequest(), session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: TestRootCoordID}}, } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 5d684001d9cbf..b17a8a72333e1 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -32,7 +32,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -50,6 +49,7 @@ import ( tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" @@ -130,6 +130,7 @@ type Core struct { activateFunc func() error metricsRequest *metricsinfo.MetricsRequest + healthChecker *healthcheck.Checker } // --------------------- function -------------------------- @@ -500,6 +501,8 @@ func (c *Core) initInternal() error { return err } + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + c.healthChecker = healthcheck.NewChecker(interval, c.healthCheckFn) log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address)) return nil } @@ -794,6 +797,7 @@ func (c *Core) startInternal() error { }() c.startServerLoop() + c.healthChecker.Start() c.UpdateStateCode(commonpb.StateCode_Healthy) sessionutil.SaveServerInfo(typeutil.RootCoordRole, c.session.ServerID) log.Info("rootcoord startup successfully") @@ -855,6 +859,10 @@ func (c *Core) revokeSession() { // Stop stops rootCoord. func (c *Core) Stop() error { c.UpdateStateCode(commonpb.StateCode_Abnormal) + if c.healthChecker != nil { + c.healthChecker.Close() + } + c.stopExecutor() c.stopScheduler() if c.proxyWatcher != nil { @@ -3088,53 +3096,40 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) }, nil } - group, ctx := errgroup.WithContext(ctx) - errs := typeutil.NewConcurrentSet[error]() + latestCheckResult := c.healthChecker.GetLatestCheckResult() + return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil +} + +func (c *Core) healthCheckFn() *healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(c.ctx, timeout) + defer cancel() proxyClients := c.proxyClientManager.GetProxyClients() + wg := sync.WaitGroup{} + lock := sync.Mutex{} + result := healthcheck.NewResult() + proxyClients.Range(func(key int64, value types.ProxyClient) bool { nodeID := key proxyClient := value - group.Go(func() error { - sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - errs.Insert(err) - return err - } + wg.Add(1) + go func() { + defer wg.Done() + resp, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + err = merr.AnalyzeComponentStateResp(typeutil.ProxyRole, nodeID, resp, err) - err = merr.AnalyzeState("Proxy", nodeID, sta) + lock.Lock() + defer lock.Unlock() if err != nil { - errs.Insert(err) + result.AppendUnhealthyClusterMsg(healthcheck.NewUnhealthyClusterMsg(typeutil.ProxyRole, nodeID, err.Error(), healthcheck.NodeHealthCheck)) } - - return err - }) + }() return true }) - maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) - if maxDelay > 0 { - group.Go(func() error { - err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay) - if err != nil { - errs.Insert(err) - } - return err - }) - } - - err := group.Wait() - if err != nil { - return &milvuspb.CheckHealthResponse{ - Status: merr.Success(), - IsHealthy: false, - Reasons: lo.Map(errs.Collect(), func(e error, i int) string { - return err.Error() - }), - }, nil - } - - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil + wg.Wait() + return result } func (c *Core) CreatePrivilegeGroup(ctx context.Context, in *milvuspb.CreatePrivilegeGroupRequest) (*commonpb.Status, error) { diff --git a/internal/rootcoord/root_coord_test.go b/internal/rootcoord/root_coord_test.go index b8915e5deab79..38192dcf983d5 100644 --- a/internal/rootcoord/root_coord_test.go +++ b/internal/rootcoord/root_coord_test.go @@ -32,7 +32,6 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus/internal/metastore/model" - "github.com/milvus-io/milvus/internal/mocks" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/proto/proxypb" @@ -40,6 +39,7 @@ import ( mockrootcoord "github.com/milvus-io/milvus/internal/rootcoord/mocks" "github.com/milvus-io/milvus/internal/util/dependency" kvfactory "github.com/milvus-io/milvus/internal/util/dependency/kv" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/pkg/util" @@ -49,7 +49,6 @@ import ( "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tikv" - "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1453,65 +1452,6 @@ func TestRootCoord_AlterCollection(t *testing.T) { } func TestRootCoord_CheckHealth(t *testing.T) { - getQueryCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { - clusterTopology := metricsinfo.QueryClusterTopology{ - ConnectedNodes: []metricsinfo.QueryNodeInfos{ - { - QuotaMetrics: &metricsinfo.QueryNodeQuotaMetrics{ - Fgm: metricsinfo.FlowGraphMetric{ - MinFlowGraphChannel: "ch1", - MinFlowGraphTt: tt, - NumFlowGraph: 1, - }, - }, - }, - }, - } - - resp, _ := metricsinfo.MarshalTopology(metricsinfo.QueryCoordTopology{Cluster: clusterTopology}) - return &milvuspb.GetMetricsResponse{ - Status: merr.Success(), - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.QueryCoordRole, 0), - }, nil - } - - getDataCoordMetricsFunc := func(tt typeutil.Timestamp) (*milvuspb.GetMetricsResponse, error) { - clusterTopology := metricsinfo.DataClusterTopology{ - ConnectedDataNodes: []metricsinfo.DataNodeInfos{ - { - QuotaMetrics: &metricsinfo.DataNodeQuotaMetrics{ - Fgm: metricsinfo.FlowGraphMetric{ - MinFlowGraphChannel: "ch1", - MinFlowGraphTt: tt, - NumFlowGraph: 1, - }, - }, - }, - }, - } - - resp, _ := metricsinfo.MarshalTopology(metricsinfo.DataCoordTopology{Cluster: clusterTopology}) - return &milvuspb.GetMetricsResponse{ - Status: merr.Success(), - Response: resp, - ComponentName: metricsinfo.ConstructComponentName(typeutil.DataCoordRole, 0), - }, nil - } - - querynodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-1*time.Minute), 0) - datanodeTT := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Minute), 0) - - dcClient := mocks.NewMockDataCoordClient(t) - dcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getDataCoordMetricsFunc(datanodeTT)) - qcClient := mocks.NewMockQueryCoordClient(t) - qcClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(getQueryCoordMetricsFunc(querynodeTT)) - - errDataCoordClient := mocks.NewMockDataCoordClient(t) - errDataCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) - errQueryCoordClient := mocks.NewMockQueryCoordClient(t) - errQueryCoordClient.EXPECT().GetMetrics(mock.Anything, mock.Anything).Return(nil, errors.New("error")) - t.Run("not healthy", func(t *testing.T) { ctx := context.Background() c := newTestCore(withAbnormalCode()) @@ -1521,25 +1461,13 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) - t.Run("ok with disabled tt lag configuration", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "-1") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - - c := newTestCore(withHealthyCode(), withValidProxyManager()) - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, true, resp.IsHealthy) - assert.Empty(t, resp.Reasons) - }) - t.Run("proxy health check fail with invalid proxy", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + c := newTestCore(withHealthyCode(), withInvalidProxyManager()) + c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn) + c.healthChecker.Start() + defer c.healthChecker.Close() - c := newTestCore(withHealthyCode(), withInvalidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) + time.Sleep(50 * time.Millisecond) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) @@ -1548,55 +1476,14 @@ func TestRootCoord_CheckHealth(t *testing.T) { assert.NotEmpty(t, resp.Reasons) }) - t.Run("proxy health check fail with get metrics error", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "6000") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - - { - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(errQueryCoordClient)) - - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, false, resp.IsHealthy) - assert.NotEmpty(t, resp.Reasons) - } - - { - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(errDataCoordClient), withQueryCoord(qcClient)) - - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, false, resp.IsHealthy) - assert.NotEmpty(t, resp.Reasons) - } - }) - - t.Run("ok with tt lag exceeded", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "90") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) - - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) - ctx := context.Background() - resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) - assert.NoError(t, err) - assert.Equal(t, false, resp.IsHealthy) - assert.NotEmpty(t, resp.Reasons) - }) + t.Run("ok", func(t *testing.T) { + c := newTestCore(withHealthyCode(), withValidProxyManager()) + c.healthChecker = healthcheck.NewChecker(40*time.Millisecond, c.healthCheckFn) + c.healthChecker.Start() + defer c.healthChecker.Close() - t.Run("ok with tt lag checking", func(t *testing.T) { - v := Params.QuotaConfig.MaxTimeTickDelay.GetValue() - Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, "600") - defer Params.Save(Params.QuotaConfig.MaxTimeTickDelay.Key, v) + time.Sleep(50 * time.Millisecond) - c := newTestCore(withHealthyCode(), - withValidProxyManager(), withDataCoord(dcClient), withQueryCoord(qcClient)) ctx := context.Background() resp, err := c.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) assert.NoError(t, err) diff --git a/internal/rootcoord/util.go b/internal/rootcoord/util.go index 8ee2b25463166..ef873b59785ba 100644 --- a/internal/rootcoord/util.go +++ b/internal/rootcoord/util.go @@ -20,10 +20,8 @@ import ( "context" "fmt" "strconv" - "time" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus/internal/json" @@ -34,7 +32,6 @@ import ( "github.com/milvus-io/milvus/pkg/mq/msgstream" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" - "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -284,97 +281,3 @@ func getProxyMetrics(ctx context.Context, proxies proxyutil.ProxyClientManagerIn return ret, nil } - -func CheckTimeTickLagExceeded(ctx context.Context, queryCoord types.QueryCoordClient, dataCoord types.DataCoordClient, maxDelay time.Duration) error { - ctx, cancel := context.WithTimeout(ctx, GetMetricsTimeout) - defer cancel() - - now := time.Now() - group := &errgroup.Group{} - queryNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() - dataNodeTTDelay := typeutil.NewConcurrentMap[string, time.Duration]() - - group.Go(func() error { - queryCoordTopology, err := getQueryCoordMetrics(ctx, queryCoord) - if err != nil { - return err - } - - for _, queryNodeMetric := range queryCoordTopology.Cluster.ConnectedNodes { - qm := queryNodeMetric.QuotaMetrics - if qm != nil { - if qm.Fgm.NumFlowGraph > 0 && qm.Fgm.MinFlowGraphChannel != "" { - minTt, _ := tsoutil.ParseTS(qm.Fgm.MinFlowGraphTt) - delay := now.Sub(minTt) - - if delay.Milliseconds() >= maxDelay.Milliseconds() { - queryNodeTTDelay.Insert(qm.Fgm.MinFlowGraphChannel, delay) - } - } - } - } - return nil - }) - - // get Data cluster metrics - group.Go(func() error { - dataCoordTopology, err := getDataCoordMetrics(ctx, dataCoord) - if err != nil { - return err - } - - for _, dataNodeMetric := range dataCoordTopology.Cluster.ConnectedDataNodes { - dm := dataNodeMetric.QuotaMetrics - if dm != nil { - if dm.Fgm.NumFlowGraph > 0 && dm.Fgm.MinFlowGraphChannel != "" { - minTt, _ := tsoutil.ParseTS(dm.Fgm.MinFlowGraphTt) - delay := now.Sub(minTt) - - if delay.Milliseconds() >= maxDelay.Milliseconds() { - dataNodeTTDelay.Insert(dm.Fgm.MinFlowGraphChannel, delay) - } - } - } - } - return nil - }) - - err := group.Wait() - if err != nil { - return err - } - - var maxLagChannel string - var maxLag time.Duration - findMaxLagChannel := func(params ...*typeutil.ConcurrentMap[string, time.Duration]) { - for _, param := range params { - param.Range(func(k string, v time.Duration) bool { - if v > maxLag { - maxLag = v - maxLagChannel = k - } - return true - }) - } - } - - var errStr string - findMaxLagChannel(queryNodeTTDelay) - if maxLag > 0 && len(maxLagChannel) != 0 { - errStr = fmt.Sprintf("query max timetick lag:%s on channel:%s", maxLag, maxLagChannel) - } - maxLagChannel = "" - maxLag = 0 - findMaxLagChannel(dataNodeTTDelay) - if maxLag > 0 && len(maxLagChannel) != 0 { - if errStr != "" { - errStr += ", " - } - errStr += fmt.Sprintf("data max timetick lag:%s on channel:%s", maxLag, maxLagChannel) - } - if errStr != "" { - return fmt.Errorf("max timetick lag execced threhold: %s", errStr) - } - - return nil -} diff --git a/internal/util/componentutil/componentutil.go b/internal/util/componentutil/componentutil.go index d89c9db72bd67..93537d24451d8 100644 --- a/internal/util/componentutil/componentutil.go +++ b/internal/util/componentutil/componentutil.go @@ -84,17 +84,3 @@ func WaitForComponentHealthy[T interface { }](ctx context.Context, client T, serviceName string, attempts uint, sleep time.Duration) error { return WaitForComponentStates(ctx, client, serviceName, []commonpb.StateCode{commonpb.StateCode_Healthy}, attempts, sleep) } - -func CheckHealthRespWithErr(err error) *milvuspb.CheckHealthResponse { - if err != nil { - return CheckHealthRespWithErrMsg(err.Error()) - } - return CheckHealthRespWithErrMsg() -} - -func CheckHealthRespWithErrMsg(errMsg ...string) *milvuspb.CheckHealthResponse { - if len(errMsg) != 0 { - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: false, Reasons: errMsg} - } - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}} -} diff --git a/internal/util/healthcheck/checker.go b/internal/util/healthcheck/checker.go new file mode 100644 index 0000000000000..c1f06a8e105d5 --- /dev/null +++ b/internal/util/healthcheck/checker.go @@ -0,0 +1,276 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthcheck + +import ( + "fmt" + "sync" + "time" + + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/json" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +// UnHealthyLevel represents the health level of a system. +type UnHealthyLevel int + +const ( + // Healthy means the system is operating normally. + Healthy UnHealthyLevel = iota + // Warning indicates minor issues that might escalate. + Warning + // Critical indicates major issues that need immediate attention. + Critical + // Fatal indicates system failure. + Fatal +) + +// String returns the string representation of the UnHealthyLevel. +func (u UnHealthyLevel) String() string { + switch u { + case Healthy: + return "Healthy" + case Warning: + return "Warning" + case Critical: + return "Critical" + case Fatal: + return "Fatal" + default: + return "Unknown" + } +} + +type Item int + +const ( + ChannelsWatched Item = iota + CheckpointLagExceed + CollectionQueryable + TimeTickLagExceed + NodeHealthCheck +) + +func getUnhealthyLevel(item Item) UnHealthyLevel { + switch item { + case ChannelsWatched: + return Fatal + case CheckpointLagExceed: + return Fatal + case TimeTickLagExceed: + return Fatal + case NodeHealthCheck: + return Fatal + case CollectionQueryable: + return Critical + default: + panic(fmt.Sprintf("unknown health check item: %d", int(item))) + } +} + +type Result struct { + UnhealthyClusterMsgs []*UnhealthyClusterMsg `json:"unhealthy_cluster_msgs"` + UnhealthyCollectionMsgs []*UnhealthyCollectionMsg `json:"unhealthy_collection_msgs"` +} + +func NewResult() *Result { + return &Result{} +} + +func (r *Result) AppendUnhealthyClusterMsg(unm *UnhealthyClusterMsg) { + r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, unm) +} + +func (r *Result) AppendUnhealthyCollectionMsgs(udm *UnhealthyCollectionMsg) { + r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, udm) +} + +func (r *Result) AppendResult(other *Result) { + if other == nil { + return + } + r.UnhealthyClusterMsgs = append(r.UnhealthyClusterMsgs, other.UnhealthyClusterMsgs...) + r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, other.UnhealthyCollectionMsgs...) +} + +func (r *Result) IsEmpty() bool { + return len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 +} + +func (r *Result) IsHealthy() bool { + if len(r.UnhealthyClusterMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 { + return true + } + + for _, unm := range r.UnhealthyClusterMsgs { + if unm.Reason.UnhealthyLevel == Fatal { + return false + } + } + + for _, ucm := range r.UnhealthyCollectionMsgs { + if ucm.Reason.UnhealthyLevel == Fatal { + return false + } + } + + return true +} + +type UnhealthyReason struct { + UnhealthyMsg string `json:"unhealthy_msg"` + UnhealthyLevel UnHealthyLevel `json:"unhealthy_level"` +} + +type UnhealthyClusterMsg struct { + Role string `json:"role"` + NodeID int64 `json:"node_id"` + Reason *UnhealthyReason `json:"reason"` +} + +func NewUnhealthyClusterMsg(role string, nodeID int64, unhealthyMsg string, item Item) *UnhealthyClusterMsg { + return &UnhealthyClusterMsg{ + Role: role, + NodeID: nodeID, + Reason: &UnhealthyReason{ + UnhealthyMsg: unhealthyMsg, + UnhealthyLevel: getUnhealthyLevel(item), + }, + } +} + +type UnhealthyCollectionMsg struct { + DatabaseID int64 `json:"database_id"` + CollectionID int64 `json:"collection_id"` + Reason *UnhealthyReason `json:"reason"` +} + +func NewUnhealthyCollectionMsg(collectionID int64, unhealthyMsg string, item Item) *UnhealthyCollectionMsg { + return &UnhealthyCollectionMsg{ + CollectionID: collectionID, + Reason: &UnhealthyReason{ + UnhealthyMsg: unhealthyMsg, + UnhealthyLevel: getUnhealthyLevel(item), + }, + } +} + +type Checker struct { + sync.RWMutex + interval time.Duration + done chan struct{} + checkFn func() *Result + latestResult *Result + once sync.Once +} + +func NewChecker(interval time.Duration, checkFn func() *Result) *Checker { + checker := &Checker{ + interval: interval, + checkFn: checkFn, + latestResult: NewResult(), + done: make(chan struct{}, 1), + once: sync.Once{}, + } + return checker +} + +func (hc *Checker) Start() { + go func() { + ticker := time.NewTicker(hc.interval) + defer ticker.Stop() + log.Info("start health checker") + for { + select { + case <-ticker.C: + hc.Lock() + hc.latestResult = hc.checkFn() + hc.Unlock() + case <-hc.done: + log.Info("stop health checker") + return + } + } + }() +} + +func (hc *Checker) GetLatestCheckResult() *Result { + hc.RLock() + defer hc.RUnlock() + return hc.latestResult +} + +func (hc *Checker) Close() { + hc.once.Do(func() { + close(hc.done) + }) +} + +func GetHealthCheckResultFromResp(resp *milvuspb.CheckHealthResponse) *Result { + var r Result + if len(resp.Reasons) == 0 { + return &r + } + if len(resp.Reasons) > 1 { + log.Error("invalid check result", zap.Any("reasons", resp.Reasons)) + return &r + } + + err := json.Unmarshal([]byte(resp.Reasons[0]), &r) + if err != nil { + log.Error("unmarshal check result error", zap.String("error", err.Error())) + } + return &r +} + +func GetCheckHealthResponseFromClusterMsg(msg ...*UnhealthyClusterMsg) *milvuspb.CheckHealthResponse { + r := &Result{UnhealthyClusterMsgs: msg} + reasons, err := json.Marshal(r) + if err != nil { + log.Error("marshal check result error", zap.String("error", err.Error())) + } + return &milvuspb.CheckHealthResponse{ + Status: merr.Success(), + IsHealthy: r.IsHealthy(), + Reasons: []string{string(reasons)}, + } +} + +func GetCheckHealthResponseFromResult(checkResult *Result) *milvuspb.CheckHealthResponse { + if checkResult.IsEmpty() { + return OK() + } + + reason, err := json.Marshal(checkResult) + if err != nil { + log.Error("marshal check result error", zap.String("error", err.Error())) + } + + return &milvuspb.CheckHealthResponse{ + Status: merr.Success(), + IsHealthy: checkResult.IsHealthy(), + Reasons: []string{string(reason)}, + } +} + +func OK() *milvuspb.CheckHealthResponse { + return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}} +} diff --git a/internal/util/healthcheck/checker_test.go b/internal/util/healthcheck/checker_test.go new file mode 100644 index 0000000000000..7fdcb8cd6e8d1 --- /dev/null +++ b/internal/util/healthcheck/checker_test.go @@ -0,0 +1,60 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthcheck + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/milvus-io/milvus/pkg/util/merr" +) + +func TestChecker(t *testing.T) { + expected1 := NewResult() + expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched)) + expected1.AppendUnhealthyClusterMsg(NewUnhealthyClusterMsg("role1", 1, "msg1", ChannelsWatched)) + + expected1.AppendUnhealthyCollectionMsgs(&UnhealthyCollectionMsg{ + CollectionID: 1, + Reason: &UnhealthyReason{ + UnhealthyMsg: "msg2", + UnhealthyLevel: Critical, + }, + }) + + checkFn := func() *Result { + return expected1 + } + checker := NewChecker(100*time.Millisecond, checkFn) + go checker.Start() + + time.Sleep(150 * time.Millisecond) + actual1 := checker.GetLatestCheckResult() + assert.Equal(t, expected1, actual1) + assert.False(t, actual1.IsHealthy()) + + chr := GetCheckHealthResponseFromResult(actual1) + assert.Equal(t, merr.Success(), chr.Status) + assert.Equal(t, actual1.IsHealthy(), chr.IsHealthy) + assert.Equal(t, 1, len(chr.Reasons)) + + actualResult := GetHealthCheckResultFromResp(chr) + assert.Equal(t, actual1, actualResult) + checker.Close() +} diff --git a/internal/util/mock/grpc_datanode_client.go b/internal/util/mock/grpc_datanode_client.go index 13ae355738d80..621a2317e61e8 100644 --- a/internal/util/mock/grpc_datanode_client.go +++ b/internal/util/mock/grpc_datanode_client.go @@ -112,3 +112,7 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } + +func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} diff --git a/internal/util/mock/grpc_querynode_client.go b/internal/util/mock/grpc_querynode_client.go index dadfb3157897d..5db4bee2a4984 100644 --- a/internal/util/mock/grpc_querynode_client.go +++ b/internal/util/mock/grpc_querynode_client.go @@ -134,6 +134,10 @@ func (m *GrpcQueryNodeClient) DeleteBatch(ctx context.Context, in *querypb.Delet return &querypb.DeleteBatchResponse{}, m.Err } +func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} + func (m *GrpcQueryNodeClient) Close() error { return m.Err } diff --git a/internal/util/wrappers/qn_wrapper.go b/internal/util/wrappers/qn_wrapper.go index def2e64f01bef..c186fdf1596dd 100644 --- a/internal/util/wrappers/qn_wrapper.go +++ b/internal/util/wrappers/qn_wrapper.go @@ -152,6 +152,10 @@ func (qn *qnServerWrapper) DeleteBatch(ctx context.Context, in *querypb.DeleteBa return qn.QueryNode.DeleteBatch(ctx, in) } +func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return qn.QueryNode.CheckHealth(ctx, req) +} + func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient { return &qnServerWrapper{ QueryNode: qn, diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 81208393a40ac..8f627e8e0241e 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -24,7 +24,6 @@ import ( "fmt" "net" "reflect" - "regexp" "strconv" "strings" "time" @@ -291,13 +290,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri } func GetCollectionIDFromVChannel(vChannelName string) int64 { - re := regexp.MustCompile(`.*_(\d+)v\d+`) - matches := re.FindStringSubmatch(vChannelName) - if len(matches) > 1 { - number, err := strconv.ParseInt(matches[1], 0, 64) - if err == nil { - return number - } + end := strings.LastIndexByte(vChannelName, 'v') + if end <= 0 { + return -1 + } + start := strings.LastIndexByte(vChannelName, '_') + if start <= 0 { + return -1 + } + + collectionIDStr := vChannelName[start+1 : end] + if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil { + return collectionID } return -1 } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 3cf0d888458b6..abd83f527dd69 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -299,6 +299,13 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error { return CheckHealthy(stateCode) } +func AnalyzeComponentStateResp(role string, nodeID int64, resp *milvuspb.ComponentStates, err error) error { + if err != nil { + return errors.Wrap(err, "service is unhealthy") + } + return AnalyzeState(role, nodeID, resp) +} + func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error { if err := Error(state.GetStatus()); err != nil { return errors.Wrapf(err, "%s=%d not healthy", role, nodeID) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 0950233569af4..d24a7aae1534c 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -286,6 +286,9 @@ type commonConfig struct { // Local RPC enabled for milvus internal communication when mix or standalone mode. LocalRPCEnabled ParamItem `refreshable:"false"` + + HealthCheckInterval ParamItem `refreshable:"true"` + HealthCheckRPCTimeout ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -936,6 +939,22 @@ This helps Milvus-CDC synchronize incremental data`, Export: true, } p.LocalRPCEnabled.Init(base.mgr) + + p.HealthCheckInterval = ParamItem{ + Key: "common.healthcheck.interval.seconds", + Version: "2.4.8", + DefaultValue: "30", + Doc: `health check interval in seconds, default 30s`, + } + p.HealthCheckInterval.Init(base.mgr) + + p.HealthCheckRPCTimeout = ParamItem{ + Key: "common.healthcheck.timeout.seconds", + Version: "2.4.8", + DefaultValue: "10", + Doc: `RPC timeout for health check request`, + } + p.HealthCheckRPCTimeout.Init(base.mgr) } type gpuConfig struct { @@ -2240,9 +2259,9 @@ If this parameter is set false, Milvus simply searches the growing segments with p.UpdateCollectionLoadStatusInterval = ParamItem{ Key: "queryCoord.updateCollectionLoadStatusInterval", Version: "2.4.7", - DefaultValue: "5", + DefaultValue: "300", PanicIfEmpty: true, - Doc: "5m, max interval of updating collection loaded status for check health", + Doc: "300s, max interval of updating collection loaded status for check health", Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 63c7c0b54db81..28c86e3ab4bd9 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -131,6 +131,11 @@ func TestComponentParam(t *testing.T) { params.Save("common.gchelper.minimumGoGC", "80") assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt()) + params.Save("common.healthcheck.interval.seconds", "60") + assert.Equal(t, time.Second*60, Params.HealthCheckInterval.GetAsDuration(time.Second)) + params.Save("common.healthcheck.timeout.seconds", "5") + assert.Equal(t, 5, Params.HealthCheckRPCTimeout.GetAsInt()) + assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings())) @@ -326,8 +331,8 @@ func TestComponentParam(t *testing.T) { checkHealthRPCTimeout := Params.CheckHealthRPCTimeout.GetAsInt() assert.Equal(t, 2000, checkHealthRPCTimeout) - updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) - assert.Equal(t, updateInterval, time.Minute*5) + updateInterval := Params.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Second) + assert.Equal(t, time.Second*300, updateInterval) assert.Equal(t, 0.1, Params.GlobalRowCountFactor.GetAsFloat()) params.Save("queryCoord.globalRowCountFactor", "0.4") diff --git a/pkg/util/ratelimitutil/utils.go b/pkg/util/ratelimitutil/utils.go index ae65eb14c7429..1c70049ca88fd 100644 --- a/pkg/util/ratelimitutil/utils.go +++ b/pkg/util/ratelimitutil/utils.go @@ -16,7 +16,13 @@ package ratelimitutil -import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" +import ( + "fmt" + "time" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) var QuotaErrorString = map[commonpb.ErrorCode]string{ commonpb.ErrorCode_ForceDeny: "access has been disabled by the administrator", @@ -28,3 +34,14 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{ func GetQuotaErrorString(errCode commonpb.ErrorCode) string { return QuotaErrorString[errCode] } + +func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error { + if channel != "" && maxDelay > 0 { + minTt, _ := tsoutil.ParseTS(minTT) + delay := time.Since(minTt) + if delay.Milliseconds() >= maxDelay.Milliseconds() { + return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel) + } + } + return nil +}