Skip to content

Commit

Permalink
enhance: optimize CPU usage for CheckHealth requests (milvus-io#35589)
Browse files Browse the repository at this point in the history
issue: milvus-io#35563
1. Use an internal health checker to monitor the cluster's health state,
storing the latest state on the coordinator node. The CheckHealth
request retrieves the cluster's health from this latest state on the
proxy sides, which enhances cluster stability.
2. Each health check will assess all collections and channels, with
detailed failure messages temporarily saved in the latest state.
3. Use CheckHealth request instead of the heavy GetMetrics request on
the querynode and datanode

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Dec 17, 2024
1 parent 2afe2ea commit 28fdbc4
Show file tree
Hide file tree
Showing 51 changed files with 1,297 additions and 482 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ queryCoord:
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
collectionObserverInterval: 200 # the interval of collection observer
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health
updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
port: 19531 # TCP port of queryCoord
Expand Down
16 changes: 16 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,22 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}

func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
if c.state == commonpb.StateCode_Healthy {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IsHealthy: true,
Reasons: []string{},
}, nil
} else {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe},
IsHealthy: false,
Reasons: []string{"fails"},
}, nil
}
}

type mockRootCoordClient struct {
state commonpb.StateCode
cnt atomic.Int64
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
Expand Down Expand Up @@ -167,6 +168,8 @@ type Server struct {
streamingCoord *streamingcoord.Server

metricsRequest *metricsinfo.MetricsRequest

healthChecker *healthcheck.Checker
}

type CollectionNameInfo struct {
Expand Down Expand Up @@ -429,6 +432,8 @@ func (s *Server) initDataCoord() error {

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return nil
}
Expand Down Expand Up @@ -773,6 +778,8 @@ func (s *Server) startServerLoop() {
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
s.syncSegmentsScheduler.Start()
}

s.healthChecker.Start()
}

func (s *Server) startTaskScheduler() {
Expand Down Expand Up @@ -1099,6 +1106,9 @@ func (s *Server) Stop() error {
return nil
}
log.Info("datacoord server shutdown")
if s.healthChecker != nil {
s.healthChecker.Close()
}
s.garbageCollector.close()
log.Info("datacoord garbage collector stopped")

Expand Down
53 changes: 29 additions & 24 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -2509,12 +2510,12 @@ func Test_CheckHealth(t *testing.T) {
return sm
}

getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager {
getChannelManager := func(findWatcherOk bool) ChannelManager {
channelManager := NewMockChannelManager(t)
if findWatcherOk {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil)
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe()
} else {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error"))
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe()
}
return channelManager
}
Expand All @@ -2527,6 +2528,21 @@ func Test_CheckHealth(t *testing.T) {
2: nil,
}

newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server {
svr := &Server{
ctx: context.TODO(),
sessionManager: getSessionManager(isHealthy),
channelManager: getChannelManager(findWatcherOk),
meta: meta,
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn)
svr.healthChecker.Start()
time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker
return svr
}

t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
Expand All @@ -2538,9 +2554,8 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("data node health check is fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(false)
svr := newServer(false, true, &meta{channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2549,11 +2564,8 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("check channel watched fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, false)
svr.meta = &meta{collections: collections}
svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2562,11 +2574,7 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("check checkpoint fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
svr := newServer(true, true, &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
Expand All @@ -2576,8 +2584,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
}

})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2586,11 +2594,7 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("ok", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
svr := newServer(true, true, &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
Expand All @@ -2608,7 +2612,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
}
})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand Down
26 changes: 15 additions & 11 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
Expand Down Expand Up @@ -1583,20 +1583,24 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
}, nil
}

err := s.sessionManager.CheckHealth(ctx)
if err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
latestCheckResult := s.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}

if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
func (s *Server) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()

if err = CheckCheckPointsHealth(s.meta); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
checkResults := s.sessionManager.CheckDNHealth(ctx)
for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched))
}

return componentutil.CheckHealthRespWithErr(nil), nil
for collectionID, failReason := range CheckCheckPointsHealth(s.meta) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed))
}
return checkResults
}

func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
Expand Down
46 changes: 32 additions & 14 deletions internal/datacoord/session/datanode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package session
import (
"context"
"fmt"
"sync"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
Expand Down Expand Up @@ -69,7 +71,7 @@ type DataNodeManager interface {
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
CheckHealth(ctx context.Context) error
CheckDNHealth(ctx context.Context) *healthcheck.Result
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
Close()
Expand Down Expand Up @@ -507,28 +509,44 @@ func (c *DataNodeManagerImpl) DropImport(nodeID int64, in *datapb.DropImportRequ
return merr.CheckRPCCall(status, err)
}

func (c *DataNodeManagerImpl) CheckHealth(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)

func (c *DataNodeManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
ids := c.GetSessionIDs()

for _, nodeID := range ids {
nodeID := nodeID
group.Go(func() error {
cli, err := c.getClient(ctx, nodeID)
wg.Add(1)
go func() {
defer wg.Done()

datanodeClient, err := c.getClient(ctx, nodeID)
if err != nil {
return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err)
err = fmt.Errorf("failed to get node:%d: %v", nodeID, err)
return
}

sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return err
checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err)
wlock.Lock()
result.AppendUnhealthyClusterMsg(
healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
wlock.Unlock()
return
}
err = merr.AnalyzeState("DataNode", nodeID, sta)
return err
})

if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 {
wlock.Lock()
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
wlock.Unlock()
}
}()
}

return group.Wait()
wg.Wait()
return result
}

func (c *DataNodeManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) {
Expand Down
32 changes: 18 additions & 14 deletions internal/datacoord/session/mock_datanode_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 28fdbc4

Please sign in to comment.