Skip to content

Commit

Permalink
fix: revert optimize CPU usage for CheckHealth requests (#35589) (#38555
Browse files Browse the repository at this point in the history
)

issue: #35563

Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 authored Dec 18, 2024
1 parent 134d06f commit 78438ef
Show file tree
Hide file tree
Showing 51 changed files with 481 additions and 1,297 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ queryCoord:
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
collectionObserverInterval: 200 # the interval of collection observer
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
updateCollectionLoadStatusInterval: 300 # 300s, max interval of updating collection loaded status for check health
updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
port: 19531 # TCP port of queryCoord
Expand Down
16 changes: 0 additions & 16 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,22 +304,6 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}

func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
if c.state == commonpb.StateCode_Healthy {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IsHealthy: true,
Reasons: []string{},
}, nil
} else {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe},
IsHealthy: false,
Reasons: []string{"fails"},
}, nil
}
}

type mockRootCoordClient struct {
state commonpb.StateCode
cnt atomic.Int64
Expand Down
10 changes: 0 additions & 10 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import (
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
Expand Down Expand Up @@ -168,8 +167,6 @@ type Server struct {
streamingCoord *streamingcoord.Server

metricsRequest *metricsinfo.MetricsRequest

healthChecker *healthcheck.Checker
}

type CollectionNameInfo struct {
Expand Down Expand Up @@ -432,8 +429,6 @@ func (s *Server) initDataCoord() error {

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return nil
}
Expand Down Expand Up @@ -778,8 +773,6 @@ func (s *Server) startServerLoop() {
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
s.syncSegmentsScheduler.Start()
}

s.healthChecker.Start()
}

func (s *Server) startTaskScheduler() {
Expand Down Expand Up @@ -1106,9 +1099,6 @@ func (s *Server) Stop() error {
return nil
}
log.Info("datacoord server shutdown")
if s.healthChecker != nil {
s.healthChecker.Close()
}
s.garbageCollector.close()
log.Info("datacoord garbage collector stopped")

Expand Down
53 changes: 24 additions & 29 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -2536,12 +2535,12 @@ func Test_CheckHealth(t *testing.T) {
return sm
}

getChannelManager := func(findWatcherOk bool) ChannelManager {
getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager {
channelManager := NewMockChannelManager(t)
if findWatcherOk {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe()
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil)
} else {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe()
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error"))
}
return channelManager
}
Expand All @@ -2554,21 +2553,6 @@ func Test_CheckHealth(t *testing.T) {
2: nil,
}

newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server {
svr := &Server{
ctx: context.TODO(),
sessionManager: getSessionManager(isHealthy),
channelManager: getChannelManager(findWatcherOk),
meta: meta,
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn)
svr.healthChecker.Start()
time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker
return svr
}

t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
Expand All @@ -2580,8 +2564,9 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("data node health check is fail", func(t *testing.T) {
svr := newServer(false, true, &meta{channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(false)
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2590,8 +2575,11 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("check channel watched fail", func(t *testing.T) {
svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, false)
svr.meta = &meta{collections: collections}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2600,7 +2588,11 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("check checkpoint fail", func(t *testing.T) {
svr := newServer(true, true, &meta{
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
Expand All @@ -2610,8 +2602,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
})
defer svr.healthChecker.Close()
}

ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2620,7 +2612,11 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("ok", func(t *testing.T) {
svr := newServer(true, true, &meta{
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
Expand All @@ -2638,8 +2634,7 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
})
defer svr.healthChecker.Close()
}
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand Down
26 changes: 11 additions & 15 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
Expand Down Expand Up @@ -1588,24 +1588,20 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
}, nil
}

latestCheckResult := s.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}

func (s *Server) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()
err := s.sessionManager.CheckHealth(ctx)
if err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}

checkResults := s.sessionManager.CheckDNHealth(ctx)
for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched))
if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}

for collectionID, failReason := range CheckCheckPointsHealth(s.meta) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed))
if err = CheckCheckPointsHealth(s.meta); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return checkResults

return componentutil.CheckHealthRespWithErr(nil), nil
}

func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
Expand Down
46 changes: 14 additions & 32 deletions internal/datacoord/session/datanode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package session
import (
"context"
"fmt"
"sync"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -32,7 +31,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
Expand Down Expand Up @@ -71,7 +69,7 @@ type DataNodeManager interface {
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
CheckDNHealth(ctx context.Context) *healthcheck.Result
CheckHealth(ctx context.Context) error
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
Close()
Expand Down Expand Up @@ -509,44 +507,28 @@ func (c *DataNodeManagerImpl) DropImport(nodeID int64, in *datapb.DropImportRequ
return merr.CheckRPCCall(status, err)
}

func (c *DataNodeManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
ids := c.GetSessionIDs()
func (c *DataNodeManagerImpl) CheckHealth(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)

ids := c.GetSessionIDs()
for _, nodeID := range ids {
nodeID := nodeID
wg.Add(1)
go func() {
defer wg.Done()

datanodeClient, err := c.getClient(ctx, nodeID)
group.Go(func() error {
cli, err := c.getClient(ctx, nodeID)
if err != nil {
err = fmt.Errorf("failed to get node:%d: %v", nodeID, err)
return
}

checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err)
wlock.Lock()
result.AppendUnhealthyClusterMsg(
healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
wlock.Unlock()
return
return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err)
}

if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 {
wlock.Lock()
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
wlock.Unlock()
sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return err
}
}()
err = merr.AnalyzeState("DataNode", nodeID, sta)
return err
})
}

wg.Wait()
return result
return group.Wait()
}

func (c *DataNodeManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) {
Expand Down
32 changes: 14 additions & 18 deletions internal/datacoord/session/mock_datanode_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 78438ef

Please sign in to comment.