diff --git a/go.mod b/go.mod index 958aebeefd30e..f08f66a7d8f35 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/gin-gonic/gin v1.9.1 github.com/go-playground/validator/v10 v10.14.0 github.com/gofrs/flock v0.8.1 - github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/protobuf v1.5.4 github.com/google/btree v1.1.2 github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/klauspost/compress v1.17.7 @@ -32,6 +32,7 @@ require ( github.com/samber/lo v1.27.0 github.com/sbinet/npyio v0.6.0 github.com/soheilhy/cmux v0.1.5 + github.com/sourcegraph/conc v0.3.0 github.com/spf13/cast v1.3.1 github.com/spf13/viper v1.8.1 github.com/stretchr/testify v1.9.0 diff --git a/go.sum b/go.sum index b5dc7c85b0230..aa1acf79f806f 100644 --- a/go.sum +++ b/go.sum @@ -798,6 +798,8 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9 github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js= github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= diff --git a/internal/datacoord/mock_session_manager.go b/internal/datacoord/mock_session_manager.go index 04942453da192..f66966a9220e6 100644 --- a/internal/datacoord/mock_session_manager.go +++ b/internal/datacoord/mock_session_manager.go @@ -6,6 +6,8 @@ import ( context "context" datapb "github.com/milvus-io/milvus/internal/proto/datapb" + healthcheck "github.com/milvus-io/milvus/internal/util/healthcheck" + mock "github.com/stretchr/testify/mock" typeutil "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -113,44 +115,44 @@ func (_c *MockSessionManager_CheckChannelOperationProgress_Call) RunAndReturn(ru return _c } -// CheckHealth provides a mock function with given fields: ctx -func (_m *MockSessionManager) CheckHealth(ctx context.Context) error { +// CheckDNHealth provides a mock function with given fields: ctx +func (_m *MockSessionManager) CheckDNHealth(ctx context.Context) healthcheck.Result { ret := _m.Called(ctx) - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { + var r0 healthcheck.Result + if rf, ok := ret.Get(0).(func(context.Context) healthcheck.Result); ok { r0 = rf(ctx) } else { - r0 = ret.Error(0) + r0 = ret.Get(0).(healthcheck.Result) } return r0 } -// MockSessionManager_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' -type MockSessionManager_CheckHealth_Call struct { +// MockSessionManager_CheckDNHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckDNHealth' +type MockSessionManager_CheckDNHealth_Call struct { *mock.Call } -// CheckHealth is a helper method to define mock.On call +// CheckDNHealth is a helper method to define mock.On call // - ctx context.Context -func (_e *MockSessionManager_Expecter) CheckHealth(ctx interface{}) *MockSessionManager_CheckHealth_Call { - return &MockSessionManager_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx)} +func (_e *MockSessionManager_Expecter) CheckDNHealth(ctx interface{}) *MockSessionManager_CheckDNHealth_Call { + return &MockSessionManager_CheckDNHealth_Call{Call: _e.mock.On("CheckDNHealth", ctx)} } -func (_c *MockSessionManager_CheckHealth_Call) Run(run func(ctx context.Context)) *MockSessionManager_CheckHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) Run(run func(ctx context.Context)) *MockSessionManager_CheckDNHealth_Call { _c.Call.Run(func(args mock.Arguments) { run(args[0].(context.Context)) }) return _c } -func (_c *MockSessionManager_CheckHealth_Call) Return(_a0 error) *MockSessionManager_CheckHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) Return(_a0 healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { _c.Call.Return(_a0) return _c } -func (_c *MockSessionManager_CheckHealth_Call) RunAndReturn(run func(context.Context) error) *MockSessionManager_CheckHealth_Call { +func (_c *MockSessionManager_CheckDNHealth_Call) RunAndReturn(run func(context.Context) healthcheck.Result) *MockSessionManager_CheckDNHealth_Call { _c.Call.Return(run) return _c } diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index e2b04e9db2829..ec87a9b231765 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -343,6 +343,10 @@ func (c *mockDataNodeClient) Stop() error { return nil } +func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{Status: merr.Success()}, nil +} + type mockRootCoordClient struct { state commonpb.StateCode cnt int64 diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index f461c4c009d13..46fa776bfcfbb 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -47,6 +47,7 @@ import ( streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server" "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/streamingutil" "github.com/milvus-io/milvus/pkg/kv" @@ -161,6 +162,8 @@ type Server struct { // streamingcoord server is embedding in datacoord now. streamingCoord *streamingcoord.Server + + healthChecker *healthcheck.Checker } type CollectionNameInfo struct { @@ -402,6 +405,8 @@ func (s *Server) initDataCoord() error { s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return nil } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 288761ababc8a..116698244c8c8 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -36,6 +36,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/internal/util/segmentutil" "github.com/milvus-io/milvus/pkg/common" @@ -1576,20 +1577,26 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque }, nil } - err := s.sessionManager.CheckHealth(ctx) - if err != nil { - return componentutil.CheckHealthRespWithErr(err), nil + latestCheckResult := s.healthChecker.GetLatestCheckResult() + if !latestCheckResult.IsHealthy() { + return healthcheck.GetCheckHealthResponseFrom(&latestCheckResult), nil } + return componentutil.CheckHealthRespWithErr(nil), nil +} - if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil { - return componentutil.CheckHealthRespWithErr(err), nil - } +func (s *Server) healthCheckFn() healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() - if err = CheckCheckPointsHealth(s.meta); err != nil { - return componentutil.CheckHealthRespWithErr(err), nil + checkResults := s.sessionManager.CheckDNHealth(ctx) + for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.UnhealthyCollectionMsg{CollectionID: collectionID, UnhealthyMsg: failReason}) } - - return componentutil.CheckHealthRespWithErr(nil), nil + for collectionID, failReason := range CheckCheckPointsHealth(s.meta) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.UnhealthyCollectionMsg{CollectionID: collectionID, UnhealthyMsg: failReason}) + } + return checkResults } func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) { diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index d37e4f231fd8a..7d8b5d3e0c250 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -19,6 +19,8 @@ package datacoord import ( "context" "fmt" + "strings" + "sync" "time" "github.com/cockroachdb/errors" @@ -31,6 +33,7 @@ import ( "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/commonpbutil" @@ -69,7 +72,7 @@ type SessionManager interface { QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error) QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error) DropImport(nodeID int64, in *datapb.DropImportRequest) error - CheckHealth(ctx context.Context) error + CheckDNHealth(ctx context.Context) healthcheck.Result QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error Close() @@ -507,28 +510,58 @@ func (c *SessionManagerImpl) DropImport(nodeID int64, in *datapb.DropImportReque return VerifyResponse(status, err) } -func (c *SessionManagerImpl) CheckHealth(ctx context.Context) error { - group, ctx := errgroup.WithContext(ctx) - +func (c *SessionManagerImpl) CheckDNHealth(ctx context.Context) healthcheck.Result { + result := healthcheck.NewResult() + wg := sync.WaitGroup{} + wlock := sync.Mutex{} ids := c.GetSessionIDs() + for _, nodeID := range ids { nodeID := nodeID - group.Go(func() error { - cli, err := c.getClient(ctx, nodeID) + wg.Add(1) + go func() { + var datanodeClient types.DataNodeClient + var checkHealthResp *milvuspb.CheckHealthResponse + var err error + + defer func() { + if err != nil { + wlock.Lock() + result.UnhealthyNodeMsgs = append(result.UnhealthyNodeMsgs, healthcheck.UnhealthyNodeMsg{ + Role: "DataNode", + NodeID: nodeID, + UnhealthyMsg: err.Error(), + }) + wlock.Unlock() + } + wg.Done() + }() + + datanodeClient, err = c.getClient(ctx, nodeID) if err != nil { - return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err) + err = fmt.Errorf("failed to get node:%d: %v", nodeID, err) + return } - sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) + err = merr.AnalyzeComponentStateResp(datanodeClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})) if err != nil { - return err + return } - err = merr.AnalyzeState("DataNode", nodeID, sta) - return err - }) + + checkHealthResp, err = datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil { + err = fmt.Errorf("CheckHealth fails for node:%d, %w", nodeID, err) + return + } + + if !checkHealthResp.IsHealthy && len(checkHealthResp.Reasons) > 0 { + err = errors.New(strings.Join(checkHealthResp.Reasons, ",")) + } + }() } - return group.Wait() + wg.Wait() + return result } func (c *SessionManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) { diff --git a/internal/datacoord/util.go b/internal/datacoord/util.go index 2def3eb484151..03bd4eeb1410a 100644 --- a/internal/datacoord/util.go +++ b/internal/datacoord/util.go @@ -278,7 +278,8 @@ func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 { return binlogIDs } -func CheckCheckPointsHealth(meta *meta) error { +func CheckCheckPointsHealth(meta *meta) map[int64]string { + checkResult := make(map[int64]string) for channel, cp := range meta.GetChannelCheckpoints() { collectionID := funcutil.GetCollectionIDFromVChannel(channel) if collectionID == -1 { @@ -292,29 +293,28 @@ func CheckCheckPointsHealth(meta *meta) error { ts, _ := tsoutil.ParseTS(cp.Timestamp) lag := time.Since(ts) if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) { - return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes())) + checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel) } } - return nil + return checkResult } -func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error { +func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string { collIDs := meta.ListCollections() + checkResult := make(map[int64]string) for _, collID := range collIDs { collInfo := meta.GetCollection(collID) if collInfo == nil { - log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID)) + log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID)) continue } for _, channelName := range collInfo.VChannelNames { _, err := channelManager.FindWatcher(channelName) if err != nil { - log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID), - zap.String("channelName", channelName), zap.Error(err)) - return err + checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName) } } } - return nil + return checkResult } diff --git a/internal/datanode/channel/channel_manager_test.go b/internal/datanode/channel/channel_manager_test.go index 7ce5218df837e..307b5be0261b3 100644 --- a/internal/datanode/channel/channel_manager_test.go +++ b/internal/datanode/channel/channel_manager_test.go @@ -181,9 +181,7 @@ func (s *ChannelManagerSuite) TearDownTest() { } func (s *ChannelManagerSuite) TestReleaseStuck() { - var ( - channel = "by-dev-rootcoord-dml-2" - ) + channel := "by-dev-rootcoord-dml-2" s.manager.releaseFunc = func(channel string) { time.Sleep(1 * time.Second) } diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 874509233cdcb..22e5d4170b1f8 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -22,6 +22,7 @@ package datanode import ( "context" "fmt" + "time" "github.com/samber/lo" "go.uber.org/zap" @@ -32,10 +33,12 @@ import ( "github.com/milvus-io/milvus/internal/datanode/importv2" "github.com/milvus-io/milvus/internal/flushcommon/io" "github.com/milvus-io/milvus/internal/flushcommon/metacache" + "github.com/milvus-io/milvus/internal/flushcommon/util" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -44,6 +47,8 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -589,3 +594,19 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID())) return merr.Success(), nil } + +func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := merr.CheckHealthy(node.GetStateCode()); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt() + if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } + return componentutil.CheckHealthRespWithErr(nil), nil +} diff --git a/internal/distributed/datanode/client/client.go b/internal/distributed/datanode/client/client.go index 67d5081a19e8c..3944d4039a85b 100644 --- a/internal/distributed/datanode/client/client.go +++ b/internal/distributed/datanode/client/client.go @@ -267,3 +267,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact return client.DropCompactionPlan(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/datanode/service.go b/internal/distributed/datanode/service.go index 2e530546d1977..8d09b1f0a1aef 100644 --- a/internal/distributed/datanode/service.go +++ b/internal/distributed/datanode/service.go @@ -410,3 +410,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (* func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) { return s.datanode.DropCompactionPlan(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.datanode.CheckHealth(ctx, req) +} diff --git a/internal/distributed/datanode/service_test.go b/internal/distributed/datanode/service_test.go index 66390ae4fcb13..39734db91eb62 100644 --- a/internal/distributed/datanode/service_test.go +++ b/internal/distributed/datanode/service_test.go @@ -185,6 +185,10 @@ func (m *MockDataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropC return m.status, m.err } +func (m *MockDataNode) CheckHealth(ctx context.Context, request *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.err +} + // ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////// func Test_NewServer(t *testing.T) { paramtable.Init() diff --git a/internal/distributed/querynode/client/client.go b/internal/distributed/querynode/client/client.go index 5d15fad49f6c1..2285e27a46513 100644 --- a/internal/distributed/querynode/client/client.go +++ b/internal/distributed/querynode/client/client.go @@ -332,3 +332,9 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest, _ ...gr return client.Delete(ctx, req) }) } + +func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) { + return client.CheckHealth(ctx, req) + }) +} diff --git a/internal/distributed/querynode/service.go b/internal/distributed/querynode/service.go index 445d30fc5288a..55a795b247846 100644 --- a/internal/distributed/querynode/service.go +++ b/internal/distributed/querynode/service.go @@ -387,3 +387,7 @@ func (s *Server) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) { return s.querynode.Delete(ctx, req) } + +func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + return s.querynode.CheckHealth(ctx, req) +} diff --git a/internal/mocks/mock_datanode.go b/internal/mocks/mock_datanode.go index 190da75be0885..5e2836fae8af9 100644 --- a/internal/mocks/mock_datanode.go +++ b/internal/mocks/mock_datanode.go @@ -87,6 +87,61 @@ func (_c *MockDataNode_CheckChannelOperationProgress_Call) RunAndReturn(run func return _c } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockDataNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockDataNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockDataNode_CheckHealth_Call { + return &MockDataNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockDataNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockDataNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockDataNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // CompactionV2 provides a mock function with given fields: _a0, _a1 func (_m *MockDataNode) CompactionV2(_a0 context.Context, _a1 *datapb.CompactionPlan) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_datanode_client.go b/internal/mocks/mock_datanode_client.go index 91661051c390b..633cf103d2d53 100644 --- a/internal/mocks/mock_datanode_client.go +++ b/internal/mocks/mock_datanode_client.go @@ -101,6 +101,76 @@ func (_c *MockDataNodeClient_CheckChannelOperationProgress_Call) RunAndReturn(ru return _c } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockDataNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockDataNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockDataNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockDataNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockDataNodeClient_CheckHealth_Call { + return &MockDataNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockDataNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockDataNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockDataNodeClient) Close() error { ret := _m.Called() diff --git a/internal/mocks/mock_querynode.go b/internal/mocks/mock_querynode.go index d84ee0d2a8d26..6ab7015b9e6a1 100644 --- a/internal/mocks/mock_querynode.go +++ b/internal/mocks/mock_querynode.go @@ -30,6 +30,61 @@ func (_m *MockQueryNode) EXPECT() *MockQueryNode_Expecter { return &MockQueryNode_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNode) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNode_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNode_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNode_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNode_CheckHealth_Call { + return &MockQueryNode_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNode_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNode_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNode_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNode_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNode) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/mocks/mock_querynode_client.go b/internal/mocks/mock_querynode_client.go index 3621a87884222..e6f1cc4044e92 100644 --- a/internal/mocks/mock_querynode_client.go +++ b/internal/mocks/mock_querynode_client.go @@ -31,6 +31,76 @@ func (_m *MockQueryNodeClient) EXPECT() *MockQueryNodeClient_Expecter { return &MockQueryNodeClient_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, in, opts +func (_m *MockQueryNodeClient) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, in, opts...) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeClient_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeClient_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - in *milvuspb.CheckHealthRequest +// - opts ...grpc.CallOption +func (_e *MockQueryNodeClient_Expecter) CheckHealth(ctx interface{}, in interface{}, opts ...interface{}) *MockQueryNodeClient_CheckHealth_Call { + return &MockQueryNodeClient_CheckHealth_Call{Call: _e.mock.On("CheckHealth", + append([]interface{}{ctx, in}, opts...)...)} +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Run(run func(ctx context.Context, in *milvuspb.CheckHealthRequest, opts ...grpc.CallOption)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]grpc.CallOption, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(grpc.CallOption) + } + } + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest), variadicArgs...) + }) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeClient_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest, ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeClient_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: func (_m *MockQueryNodeClient) Close() error { ret := _m.Called() diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index e3296128125c6..8badd1c1aa6d1 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -137,6 +137,8 @@ service DataNode { rpc QuerySlot(QuerySlotRequest) returns(QuerySlotResponse) {} rpc DropCompactionPlan(DropCompactionPlanRequest) returns(common.Status) {} + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } message FlushRequest { diff --git a/internal/proto/query_coord.proto b/internal/proto/query_coord.proto index ed0e152bbd708..bc881d4114511 100644 --- a/internal/proto/query_coord.proto +++ b/internal/proto/query_coord.proto @@ -168,8 +168,9 @@ service QueryNode { } rpc SyncDistribution(SyncDistributionRequest) returns (common.Status) { } - rpc Delete(DeleteRequest) returns (common.Status) { - } + rpc Delete(DeleteRequest) returns (common.Status) {} + + rpc CheckHealth(milvus.CheckHealthRequest)returns (milvus.CheckHealthResponse) {} } // --------------------QueryCoord grpc request and response proto------------------ diff --git a/internal/querycoordv2/mocks/mock_querynode.go b/internal/querycoordv2/mocks/mock_querynode.go index 039d03ee6b5a4..0fd3c9e3a6ef2 100644 --- a/internal/querycoordv2/mocks/mock_querynode.go +++ b/internal/querycoordv2/mocks/mock_querynode.go @@ -29,6 +29,61 @@ func (_m *MockQueryNodeServer) EXPECT() *MockQueryNodeServer_Expecter { return &MockQueryNodeServer_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: _a0, _a1 +func (_m *MockQueryNodeServer) CheckHealth(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(_a0, _a1) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(_a0, _a1) + } + if rf, ok := ret.Get(0).(func(context.Context, *milvuspb.CheckHealthRequest) *milvuspb.CheckHealthResponse); ok { + r0 = rf(_a0, _a1) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, *milvuspb.CheckHealthRequest) error); ok { + r1 = rf(_a0, _a1) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockQueryNodeServer_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockQueryNodeServer_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 *milvuspb.CheckHealthRequest +func (_e *MockQueryNodeServer_Expecter) CheckHealth(_a0 interface{}, _a1 interface{}) *MockQueryNodeServer_CheckHealth_Call { + return &MockQueryNodeServer_CheckHealth_Call{Call: _e.mock.On("CheckHealth", _a0, _a1)} +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Run(run func(_a0 context.Context, _a1 *milvuspb.CheckHealthRequest)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*milvuspb.CheckHealthRequest)) + }) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockQueryNodeServer_CheckHealth_Call) RunAndReturn(run func(context.Context, *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error)) *MockQueryNodeServer_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // Delete provides a mock function with given fields: _a0, _a1 func (_m *MockQueryNodeServer) Delete(_a0 context.Context, _a1 *querypb.DeleteRequest) (*commonpb.Status, error) { ret := _m.Called(_a0, _a1) diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index d2c997e339c88..bb2118c5d5782 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "os" + "strings" "sync" "syscall" "time" @@ -50,6 +51,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -130,6 +132,8 @@ type Server struct { proxyCreator proxyutil.ProxyCreator proxyWatcher proxyutil.ProxyWatcherInterface proxyClientManager proxyutil.ProxyClientManagerInterface + + healthChecker *healthcheck.Checker } func NewQueryCoord(ctx context.Context) (*Server, error) { @@ -338,6 +342,8 @@ func (s *Server) initQueryCoord() error { // Init load status cache meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn) log.Info("init querycoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) return err } @@ -734,12 +740,11 @@ func (s *Server) tryHandleNodeUp() { log := log.Ctx(s.ctx).WithRateGroup("qcv2.Server", 1, 60) ctx, cancel := context.WithTimeout(s.ctx, Params.QueryCoordCfg.CheckHealthRPCTimeout.GetAsDuration(time.Millisecond)) defer cancel() - reasons, err := s.checkNodeHealth(ctx) - if err != nil { + checkResult := s.checkNodeHealth(ctx) + if !checkResult.IsHealthy() { log.RatedWarn(10, "unhealthy node exist, node up will be delayed", zap.Int("delayedNodeUpEvents", len(s.nodeUpEventChan)), - zap.Int("unhealthyNodeNum", len(reasons)), - zap.Strings("unhealthyReason", reasons)) + zap.String("unhealthyReason", strings.Join(checkResult.CollectUnhealthyReasons(), ","))) return } for len(s.nodeUpEventChan) > 0 { diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index ea132a3a910e2..d7e58a17d167b 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -19,12 +19,13 @@ package querycoordv2 import ( "context" "fmt" + "strings" "sync" + "time" "github.com/cockroachdb/errors" "github.com/samber/lo" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -34,6 +35,7 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/meta" "github.com/milvus-io/milvus/internal/querycoordv2/utils" "github.com/milvus-io/milvus/internal/util/componentutil" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/merr" @@ -914,44 +916,69 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque return &milvuspb.CheckHealthResponse{Status: merr.Status(err), IsHealthy: false, Reasons: []string{err.Error()}}, nil } - errReasons, err := s.checkNodeHealth(ctx) - if err != nil || len(errReasons) != 0 { - return componentutil.CheckHealthRespWithErrMsg(errReasons...), nil + latestCheckResult := s.healthChecker.GetLatestCheckResult() + if !latestCheckResult.IsHealthy() { + return healthcheck.GetCheckHealthResponseFrom(&latestCheckResult), nil } + return componentutil.CheckHealthRespWithErr(nil), nil +} - if err := utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr); err != nil { - return componentutil.CheckHealthRespWithErr(err), nil - } +func (s *Server) healthCheckFn() healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(s.ctx, timeout) + defer cancel() - return componentutil.CheckHealthRespWithErr(nil), nil + checkResults := s.checkNodeHealth(ctx) + for collectionID, failReason := range utils.CheckCollectionsQueryable(s.meta, s.targetMgr, s.dist, s.nodeMgr) { + checkResults.AppendUnhealthyCollectionMsgs(healthcheck.UnhealthyCollectionMsg{CollectionID: collectionID, UnhealthyMsg: failReason}) + } + return checkResults } -func (s *Server) checkNodeHealth(ctx context.Context) ([]string, error) { - group, ctx := errgroup.WithContext(ctx) - errReasons := make([]string, 0) +func (s *Server) checkNodeHealth(ctx context.Context) healthcheck.Result { + result := healthcheck.NewResult() + wg := sync.WaitGroup{} + wlock := sync.Mutex{} - mu := &sync.Mutex{} for _, node := range s.nodeMgr.GetAll() { node := node - group.Go(func() error { - resp, err := s.cluster.GetComponentStates(ctx, node.ID()) + wg.Add(1) + go func() { + var checkHealthResp *milvuspb.CheckHealthResponse + var err error + + defer func() { + if err != nil { + wlock.Lock() + result.AppendUnhealthyNodeMsg(healthcheck.UnhealthyNodeMsg{ + Role: "QueryNode", + NodeID: node.ID(), + UnhealthyMsg: err.Error(), + }) + wlock.Unlock() + } + wg.Done() + }() + + err = merr.AnalyzeComponentStateResp(s.cluster.GetComponentStates(ctx, node.ID())) if err != nil { - return err + return } - err = merr.AnalyzeState("QueryNode", node.ID(), resp) - if err != nil { - mu.Lock() - defer mu.Unlock() - errReasons = append(errReasons, err.Error()) + checkHealthResp, err = s.cluster.CheckHealth(ctx, node.ID()) + if err = merr.CheckRPCCall(checkHealthResp, err); err != nil { + err = fmt.Errorf("CheckHealth fails for node:%d, %w", node.ID(), err) + return } - return nil - }) - } - err := group.Wait() + if !checkHealthResp.IsHealthy && len(checkHealthResp.Reasons) > 0 { + err = errors.New(strings.Join(checkHealthResp.Reasons, ",")) + } + }() + } - return errReasons, err + wg.Wait() + return result } func (s *Server) CreateResourceGroup(ctx context.Context, req *milvuspb.CreateResourceGroupRequest) (*commonpb.Status, error) { diff --git a/internal/querycoordv2/session/cluster.go b/internal/querycoordv2/session/cluster.go index 7b6bc316ebe25..569dbb0029469 100644 --- a/internal/querycoordv2/session/cluster.go +++ b/internal/querycoordv2/session/cluster.go @@ -52,6 +52,7 @@ type Cluster interface { GetMetrics(ctx context.Context, nodeID int64, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) SyncDistribution(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) (*commonpb.Status, error) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) + CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) Start() Stop() } @@ -272,6 +273,20 @@ func (c *QueryCluster) send(ctx context.Context, nodeID int64, fn func(cli types return nil } +func (c *QueryCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + var ( + resp *milvuspb.CheckHealthResponse + err error + ) + err1 := c.send(ctx, nodeID, func(cli types.QueryNodeClient) { + resp, err = cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{}) + }) + if err1 != nil { + return nil, err1 + } + return resp, err +} + type clients struct { sync.RWMutex clients map[int64]types.QueryNodeClient // nodeID -> client diff --git a/internal/querycoordv2/session/mock_cluster.go b/internal/querycoordv2/session/mock_cluster.go index dbc14c720ce98..136f6c4e23da0 100644 --- a/internal/querycoordv2/session/mock_cluster.go +++ b/internal/querycoordv2/session/mock_cluster.go @@ -27,6 +27,61 @@ func (_m *MockCluster) EXPECT() *MockCluster_Expecter { return &MockCluster_Expecter{mock: &_m.Mock} } +// CheckHealth provides a mock function with given fields: ctx, nodeID +func (_m *MockCluster) CheckHealth(ctx context.Context, nodeID int64) (*milvuspb.CheckHealthResponse, error) { + ret := _m.Called(ctx, nodeID) + + var r0 *milvuspb.CheckHealthResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)); ok { + return rf(ctx, nodeID) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) *milvuspb.CheckHealthResponse); ok { + r0 = rf(ctx, nodeID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*milvuspb.CheckHealthResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, nodeID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockCluster_CheckHealth_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckHealth' +type MockCluster_CheckHealth_Call struct { + *mock.Call +} + +// CheckHealth is a helper method to define mock.On call +// - ctx context.Context +// - nodeID int64 +func (_e *MockCluster_Expecter) CheckHealth(ctx interface{}, nodeID interface{}) *MockCluster_CheckHealth_Call { + return &MockCluster_CheckHealth_Call{Call: _e.mock.On("CheckHealth", ctx, nodeID)} +} + +func (_c *MockCluster_CheckHealth_Call) Run(run func(ctx context.Context, nodeID int64)) *MockCluster_CheckHealth_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) Return(_a0 *milvuspb.CheckHealthResponse, _a1 error) *MockCluster_CheckHealth_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCluster_CheckHealth_Call) RunAndReturn(run func(context.Context, int64) (*milvuspb.CheckHealthResponse, error)) *MockCluster_CheckHealth_Call { + _c.Call.Return(run) + return _c +} + // GetComponentStates provides a mock function with given fields: ctx, nodeID func (_m *MockCluster) GetComponentStates(ctx context.Context, nodeID int64) (*milvuspb.ComponentStates, error) { ret := _m.Called(ctx, nodeID) diff --git a/internal/querycoordv2/utils/util.go b/internal/querycoordv2/utils/util.go index 4ef2f685d368a..383dbecccbf8d 100644 --- a/internal/querycoordv2/utils/util.go +++ b/internal/querycoordv2/utils/util.go @@ -46,15 +46,17 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error { // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution // 4. All segments of the shard in target should be in the distribution func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, currentTargets map[int64]*datapb.SegmentInfo) error { - log := log.Ctx(context.TODO()). - WithRateGroup("utils.CheckLeaderAvailable", 1, 60). - With(zap.Int64("leaderID", leader.ID)) - info := nodeMgr.Get(leader.ID) + log := func() *log.MLogger { + return log.Ctx(context.TODO()). + WithRateGroup("utils.CheckLeaderAvailable", 1, 60). + With(zap.Int64("leaderID", leader.ID)) + } + info := nodeMgr.Get(leader.ID) // Check whether leader is online err := CheckNodeAvailable(leader.ID, info) if err != nil { - log.Info("leader is not available", zap.Error(err)) + log().Info("leader is not available", zap.Error(err)) return fmt.Errorf("leader not available: %w", err) } @@ -62,7 +64,7 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, info := nodeMgr.Get(version.GetNodeID()) err = CheckNodeAvailable(version.GetNodeID(), info) if err != nil { - log.Info("leader is not available due to QueryNode unavailable", + log().Info("leader is not available due to QueryNode unavailable", zap.Int64("segmentID", id), zap.Error(err)) return err @@ -77,7 +79,7 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, leader *meta.LeaderView, _, exist := leader.Segments[segmentID] if !exist { - log.RatedInfo(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) + log().RatedWarn(10, "leader is not available due to lack of segment", zap.Int64("segmentID", segmentID)) return merr.WrapErrSegmentLack(segmentID) } } @@ -112,8 +114,6 @@ func GetShardLeadersWithChannels(m *meta.Meta, targetMgr meta.TargetManagerInter ret := make([]*querypb.ShardLeadersList, 0) currentTargets := targetMgr.GetSealedSegmentsByCollection(collectionID, meta.CurrentTarget) for _, channel := range channels { - log := log.With(zap.String("channel", channel.GetChannelName())) - var channelErr error leaders := dist.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) if len(leaders) == 0 { @@ -181,8 +181,9 @@ func GetShardLeaders(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist * } // CheckCollectionsQueryable check all channels are watched and all segments are loaded for this collection -func CheckCollectionsQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) error { +func CheckCollectionsQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterface, dist *meta.DistributionManager, nodeMgr *session.NodeManager) map[int64]string { maxInterval := paramtable.Get().QueryCoordCfg.UpdateCollectionLoadStatusInterval.GetAsDuration(time.Minute) + checkResult := make(map[int64]string) for _, coll := range m.GetAllCollections() { err := checkCollectionQueryable(m, targetMgr, dist, nodeMgr, coll) // the collection is not queryable, if meet following conditions: @@ -190,10 +191,11 @@ func CheckCollectionsQueryable(m *meta.Meta, targetMgr meta.TargetManagerInterfa // 2. Collection is not starting to release // 3. The load percentage has not been updated in the last 5 minutes. if err != nil && m.Exist(coll.CollectionID) && time.Since(coll.UpdatedAt) >= maxInterval { - return err + checkResult[coll.CollectionID] = err.Error() } + } - return nil + return checkResult } // checkCollectionQueryable check all channels are watched and all segments are loaded for this collection diff --git a/internal/querynodev2/services.go b/internal/querynodev2/services.go index b367e03c95bfc..f5c68e33a32e3 100644 --- a/internal/querynodev2/services.go +++ b/internal/querynodev2/services.go @@ -43,6 +43,7 @@ import ( "github.com/milvus-io/milvus/internal/querynodev2/segments" "github.com/milvus-io/milvus/internal/querynodev2/tasks" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/streamrpc" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" @@ -52,6 +53,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/ratelimitutil" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -1418,6 +1420,24 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) ( return merr.Success(), nil } +func (node *QueryNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) { + if err := node.lifetime.Add(merr.IsHealthy); err != nil { + return &milvuspb.CheckHealthResponse{ + Status: merr.Status(err), + Reasons: []string{err.Error()}, + }, nil + } + defer node.lifetime.Done() + + maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) + minTsafeChannel, minTsafe := node.tSafeManager.Min() + if err := ratelimitutil.CheckTimeTickDelay(minTsafeChannel, minTsafe, maxDelay); err != nil { + return componentutil.CheckHealthRespWithErr(err), nil + } + + return componentutil.CheckHealthRespWithErr(nil), nil +} + type deleteRequestStringer struct { *querypb.DeleteRequest } diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index 7a1ce068bc44b..fdd0dcd5094d3 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -30,7 +30,6 @@ import ( clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/atomic" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" @@ -47,7 +46,9 @@ import ( "github.com/milvus-io/milvus/internal/proto/rootcoordpb" tso2 "github.com/milvus-io/milvus/internal/tso" "github.com/milvus-io/milvus/internal/types" + "github.com/milvus-io/milvus/internal/util/componentutil" "github.com/milvus-io/milvus/internal/util/dependency" + "github.com/milvus-io/milvus/internal/util/healthcheck" "github.com/milvus-io/milvus/internal/util/proxyutil" "github.com/milvus-io/milvus/internal/util/sessionutil" tsoutil2 "github.com/milvus-io/milvus/internal/util/tsoutil" @@ -126,6 +127,7 @@ type Core struct { enableActiveStandBy bool activateFunc func() error + healthChecker *healthcheck.Checker } // --------------------- function -------------------------- @@ -480,6 +482,8 @@ func (c *Core) initInternal() error { return err } + interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second) + c.healthChecker = healthcheck.NewChecker(interval, c.healthCheckFn) log.Info("init rootcoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", c.address)) return nil } @@ -2834,51 +2838,40 @@ func (c *Core) CheckHealth(ctx context.Context, in *milvuspb.CheckHealthRequest) }, nil } - group, ctx := errgroup.WithContext(ctx) - errs := typeutil.NewConcurrentSet[error]() + latestCheckResult := c.healthChecker.GetLatestCheckResult() + if !latestCheckResult.IsHealthy() { + return healthcheck.GetCheckHealthResponseFrom(&latestCheckResult), nil + } + return componentutil.CheckHealthRespWithErr(nil), nil +} + +func (c *Core) healthCheckFn() healthcheck.Result { + timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(c.ctx, timeout) + defer cancel() proxyClients := c.proxyClientManager.GetProxyClients() + + wg := sync.WaitGroup{} + lock := sync.Mutex{} + result := healthcheck.NewResult() + proxyClients.Range(func(key int64, value types.ProxyClient) bool { nodeID := key proxyClient := value - group.Go(func() error { - sta, err := proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}) - if err != nil { - errs.Insert(err) - return err - } - - err = merr.AnalyzeState("Proxy", nodeID, sta) + wg.Add(1) + go func() { + defer wg.Done() + err := merr.AnalyzeComponentStateResp(proxyClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})) if err != nil { - errs.Insert(err) + lock.Lock() + result.AppendUnhealthyNodeMsg(healthcheck.UnhealthyNodeMsg{Role: "Proxy", NodeID: nodeID, UnhealthyMsg: err.Error()}) + lock.Unlock() } - - return err - }) + }() return true }) - maxDelay := Params.QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second) - if maxDelay > 0 { - group.Go(func() error { - err := CheckTimeTickLagExceeded(ctx, c.queryCoord, c.dataCoord, maxDelay) - if err != nil { - errs.Insert(err) - } - return err - }) - } - - err := group.Wait() - if err != nil { - return &milvuspb.CheckHealthResponse{ - Status: merr.Success(), - IsHealthy: false, - Reasons: lo.Map(errs.Collect(), func(e error, i int) string { - return err.Error() - }), - }, nil - } - - return &milvuspb.CheckHealthResponse{Status: merr.Success(), IsHealthy: true, Reasons: []string{}}, nil + wg.Wait() + return result } diff --git a/internal/util/healthcheck/checker.go b/internal/util/healthcheck/checker.go new file mode 100644 index 0000000000000..dc030e8cc00f4 --- /dev/null +++ b/internal/util/healthcheck/checker.go @@ -0,0 +1,137 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package healthcheck + +import ( + "fmt" + "sync" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" +) + +type Result struct { + UnhealthyNodeMsgs []UnhealthyNodeMsg + UnhealthyCollectionMsgs []UnhealthyCollectionMsg +} + +func NewResult() Result { + return Result{ + UnhealthyNodeMsgs: make([]UnhealthyNodeMsg, 0), + UnhealthyCollectionMsgs: make([]UnhealthyCollectionMsg, 0), + } +} + +func (r *Result) AppendUnhealthyNodeMsg(unm UnhealthyNodeMsg) { + r.UnhealthyNodeMsgs = append(r.UnhealthyNodeMsgs, unm) +} + +func (r *Result) AppendUnhealthyCollectionMsgs(udm UnhealthyCollectionMsg) { + r.UnhealthyCollectionMsgs = append(r.UnhealthyCollectionMsgs, udm) +} + +func (r *Result) IsHealthy() bool { + return len(r.UnhealthyNodeMsgs) == 0 && len(r.UnhealthyCollectionMsgs) == 0 +} + +func (r *Result) CollectUnhealthyReasons() []string { + nodeUnHealthyMsgs := lo.Map(r.UnhealthyNodeMsgs, func(unm UnhealthyNodeMsg, i int) string { + return unm.UnhealthyReason() + }) + dbUnHealthyMsgs := lo.Map(r.UnhealthyCollectionMsgs, func(udm UnhealthyCollectionMsg, i int) string { + return udm.UnhealthyReason() + }) + return append(nodeUnHealthyMsgs, dbUnHealthyMsgs...) +} + +type UnhealthyNodeMsg struct { + Role string + NodeID int64 + UnhealthyMsg string +} + +func (unm *UnhealthyNodeMsg) UnhealthyReason() string { + return fmt.Sprintf("role:%s, nodeID:%d, unhealty reason:%s", unm.Role, unm.NodeID, unm.UnhealthyMsg) +} + +// UnhealthyCollectionMsg TODO check and collect health information of the collection +type UnhealthyCollectionMsg struct { + CollectionID int64 + UnhealthyMsg string +} + +func (ucm *UnhealthyCollectionMsg) UnhealthyReason() string { + return fmt.Sprintf("databaseID:%d, unhealty reason:%s", ucm.CollectionID, ucm.UnhealthyMsg) +} + +type Checker struct { + sync.RWMutex + interval time.Duration + location string + done chan struct{} + checkFn func() Result + latestResult Result +} + +func NewChecker(interval time.Duration, checkFn func() Result) *Checker { + return &Checker{ + interval: interval, + checkFn: checkFn, + latestResult: NewResult(), + done: make(chan struct{}, 1), + } +} + +func (hc *Checker) Start() { + ticker := time.NewTicker(hc.interval) + defer ticker.Stop() + log.Debug("start health checker", zap.String("location", hc.location)) + for { + select { + case <-ticker.C: + hc.Lock() + hc.latestResult = hc.checkFn() + hc.Unlock() + case <-hc.done: + log.Info("stop health checker ", zap.String("location", hc.location)) + return + } + } +} + +func (hc *Checker) GetLatestCheckResult() Result { + hc.RLock() + defer hc.RUnlock() + return hc.latestResult +} + +func (hc *Checker) Close() { + hc.done <- struct{}{} +} + +func GetCheckHealthResponseFrom(checkResult *Result) *milvuspb.CheckHealthResponse { + return &milvuspb.CheckHealthResponse{ + Status: merr.Success(), + IsHealthy: checkResult.IsHealthy(), + Reasons: checkResult.CollectUnhealthyReasons(), + } +} diff --git a/internal/util/mock/grpc_datanode_client.go b/internal/util/mock/grpc_datanode_client.go index 13ae355738d80..621a2317e61e8 100644 --- a/internal/util/mock/grpc_datanode_client.go +++ b/internal/util/mock/grpc_datanode_client.go @@ -112,3 +112,7 @@ func (m *GrpcDataNodeClient) QuerySlot(ctx context.Context, req *datapb.QuerySlo func (m *GrpcDataNodeClient) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest, opts ...grpc.CallOption) (*commonpb.Status, error) { return &commonpb.Status{}, m.Err } + +func (m *GrpcDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} diff --git a/internal/util/mock/grpc_querynode_client.go b/internal/util/mock/grpc_querynode_client.go index e20dc0d635422..7add24aeb6756 100644 --- a/internal/util/mock/grpc_querynode_client.go +++ b/internal/util/mock/grpc_querynode_client.go @@ -130,6 +130,10 @@ func (m *GrpcQueryNodeClient) Delete(ctx context.Context, in *querypb.DeleteRequ return &commonpb.Status{}, m.Err } +func (m *GrpcQueryNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return &milvuspb.CheckHealthResponse{}, m.Err +} + func (m *GrpcQueryNodeClient) Close() error { return m.Err } diff --git a/internal/util/wrappers/qn_wrapper.go b/internal/util/wrappers/qn_wrapper.go index 63147c0116986..cec6e8f09c627 100644 --- a/internal/util/wrappers/qn_wrapper.go +++ b/internal/util/wrappers/qn_wrapper.go @@ -148,6 +148,10 @@ func (qn *qnServerWrapper) Delete(ctx context.Context, in *querypb.DeleteRequest return qn.QueryNode.Delete(ctx, in) } +func (qn *qnServerWrapper) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) { + return qn.QueryNode.CheckHealth(ctx, req) +} + func WrapQueryNodeServerAsClient(qn types.QueryNode) types.QueryNodeClient { return &qnServerWrapper{ QueryNode: qn, diff --git a/pkg/util/funcutil/func.go b/pkg/util/funcutil/func.go index 89da8c3877fb4..d448fe7371527 100644 --- a/pkg/util/funcutil/func.go +++ b/pkg/util/funcutil/func.go @@ -24,7 +24,6 @@ import ( "fmt" "net" "reflect" - "regexp" "strconv" "strings" "time" @@ -264,13 +263,18 @@ func ConvertChannelName(chanName string, tokenFrom string, tokenTo string) (stri } func GetCollectionIDFromVChannel(vChannelName string) int64 { - re := regexp.MustCompile(`.*_(\d+)v\d+`) - matches := re.FindStringSubmatch(vChannelName) - if len(matches) > 1 { - number, err := strconv.ParseInt(matches[1], 0, 64) - if err == nil { - return number - } + end := strings.LastIndexByte(vChannelName, 'v') + if end <= 0 { + return -1 + } + start := strings.LastIndexByte(vChannelName, '_') + if start <= 0 { + return -1 + } + + collectionIDStr := vChannelName[start+1 : end] + if collectionID, err := strconv.ParseInt(collectionIDStr, 0, 64); err == nil { + return collectionID } return -1 } diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 3f44ea45d81a6..c98d837404b45 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -299,6 +299,17 @@ func IsHealthyOrStopping(stateCode commonpb.StateCode) error { return CheckHealthy(stateCode) } +func AnalyzeComponentStateResp(resp *milvuspb.ComponentStates, err error) error { + if err != nil { + return errors.Wrap(err, "service is unhealthy") + } + + if err := Error(resp.GetStatus()); err != nil || resp.GetState().GetStateCode() != commonpb.StateCode_Healthy { + return errors.Wrap(err, "service is unhealthy") + } + return nil +} + func AnalyzeState(role string, nodeID int64, state *milvuspb.ComponentStates) error { if err := Error(state.GetStatus()); err != nil { return errors.Wrapf(err, "%s=%d not healthy", role, nodeID) diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index d29aa1a2d257f..eee5c9911155f 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -279,6 +279,9 @@ type commonConfig struct { ReadOnlyPrivileges ParamItem `refreshable:"false"` ReadWritePrivileges ParamItem `refreshable:"false"` AdminPrivileges ParamItem `refreshable:"false"` + + HealthCheckInterval ParamItem `refreshable:"true"` + HealthCheckRPCTimeout ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -924,6 +927,22 @@ This helps Milvus-CDC synchronize incremental data`, Doc: `use to override the default value of admin privileges, example: "PrivilegeCreateOwnership,PrivilegeDropOwnership"`, } p.AdminPrivileges.Init(base.mgr) + + p.HealthCheckInterval = ParamItem{ + Key: "common.healthcheck.interval.seconds", + Version: "2.4.8", + DefaultValue: "30", + Doc: `health check interval in seconds, default 30s`, + } + p.HealthCheckInterval.Init(base.mgr) + + p.HealthCheckRPCTimeout = ParamItem{ + Key: "common.healthcheck.timeout.seconds", + Version: "2.4.8", + DefaultValue: "3", + Doc: `RPC timeout for health check request`, + } + p.HealthCheckRPCTimeout.Init(base.mgr) } type gpuConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 101ed92e14527..642f3137e911c 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -131,6 +131,11 @@ func TestComponentParam(t *testing.T) { params.Save("common.gchelper.minimumGoGC", "80") assert.Equal(t, 80, Params.MinimumGOGCConfig.GetAsInt()) + params.Save("common.healthcheck.interval.seconds", "60") + assert.Equal(t, time.Second*60, Params.MinimumGOGCConfig.GetAsDuration(time.Second)) + params.Save("common.healthcheck.timeout.seconds", "5") + assert.Equal(t, time.Second*5, Params.MinimumGOGCConfig.GetAsInt()) + assert.Equal(t, 0, len(Params.ReadOnlyPrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.ReadWritePrivileges.GetAsStrings())) assert.Equal(t, 0, len(Params.AdminPrivileges.GetAsStrings())) diff --git a/pkg/util/ratelimitutil/utils.go b/pkg/util/ratelimitutil/utils.go index d72e28c22e682..ff20cd4ab26b4 100644 --- a/pkg/util/ratelimitutil/utils.go +++ b/pkg/util/ratelimitutil/utils.go @@ -16,7 +16,13 @@ package ratelimitutil -import "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" +import ( + "fmt" + "time" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus/pkg/util/tsoutil" +) var QuotaErrorString = map[commonpb.ErrorCode]string{ commonpb.ErrorCode_ForceDeny: "the writing has been deactivated by the administrator", @@ -28,3 +34,14 @@ var QuotaErrorString = map[commonpb.ErrorCode]string{ func GetQuotaErrorString(errCode commonpb.ErrorCode) string { return QuotaErrorString[errCode] } + +func CheckTimeTickDelay(channel string, minTT uint64, maxDelay time.Duration) error { + if channel != "" && maxDelay > 0 { + minTt, _ := tsoutil.ParseTS(minTT) + delay := time.Since(minTt) + if delay.Milliseconds() >= maxDelay.Milliseconds() { + return fmt.Errorf("max timetick lag execced threhold, lag:%s on channel:%s", delay, channel) + } + } + return nil +}