Skip to content

Commit

Permalink
enhance: decrease cpu overhead during filter segments on datacoord (#…
Browse files Browse the repository at this point in the history
…33130)

issue: #33129

Signed-off-by: jaime <[email protected]>

(cherry picked from commit 3d29907)
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Dec 10, 2024
1 parent 7a5aea1 commit 064e956
Show file tree
Hide file tree
Showing 51 changed files with 1,285 additions and 463 deletions.
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ queryCoord:
channelExclusiveNodeFactor: 4 # the least node number for enable channel's exclusive mode
collectionObserverInterval: 200 # the interval of collection observer
checkExecutedFlagInterval: 100 # the interval of check executed flag to force to pull dist
updateCollectionLoadStatusInterval: 5 # 5m, max interval of updating collection loaded status for check health
updateCollectionLoadStatusInterval: 5 # 300s, max interval of updating collection loaded status for check health
cleanExcludeSegmentInterval: 60 # the time duration of clean pipeline exclude segment which used for filter invalid data, in seconds
ip: # TCP/IP address of queryCoord. If not specified, use the first unicastable address
port: 19531 # TCP port of queryCoord
Expand Down
18 changes: 17 additions & 1 deletion internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package datacoord

import (
"context"
"github.com/milvus-io/milvus/pkg/kv"
"testing"
"time"

Expand All @@ -38,6 +37,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/proxypb"
"github.com/milvus-io/milvus/internal/proto/rootcoordpb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
Expand Down Expand Up @@ -304,6 +304,22 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}

func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
if c.state == commonpb.StateCode_Healthy {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success},
IsHealthy: true,
Reasons: []string{},
}, nil
} else {
return &milvuspb.CheckHealthResponse{
Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_NotReadyServe},
IsHealthy: false,
Reasons: []string{"fails"},
}, nil
}
}

type mockRootCoordClient struct {
state commonpb.StateCode
cnt atomic.Int64
Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
Expand Down Expand Up @@ -167,6 +168,8 @@ type Server struct {
streamingCoord *streamingcoord.Server

metricsRequest *metricsinfo.MetricsRequest

healthChecker *healthcheck.Checker
}

type CollectionNameInfo struct {
Expand Down Expand Up @@ -426,6 +429,8 @@ func (s *Server) initDataCoord() error {

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return nil
}
Expand Down Expand Up @@ -768,6 +773,8 @@ func (s *Server) startServerLoop() {
if !(streamingutil.IsStreamingServiceEnabled() || paramtable.Get().DataNodeCfg.SkipBFStatsLoad.GetAsBool()) {
s.syncSegmentsScheduler.Start()
}

s.healthChecker.Start()
}

func (s *Server) startTaskScheduler() {
Expand Down Expand Up @@ -1089,6 +1096,9 @@ func (s *Server) Stop() error {
if !s.stateCode.CompareAndSwap(commonpb.StateCode_Healthy, commonpb.StateCode_Abnormal) {
return nil
}
if s.healthChecker != nil {
s.healthChecker.Close()
}
logutil.Logger(s.ctx).Info("datacoord server shutdown")
s.garbageCollector.close()
logutil.Logger(s.ctx).Info("datacoord garbage collector stopped")
Expand Down
53 changes: 29 additions & 24 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/workerpb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand Down Expand Up @@ -2509,12 +2510,12 @@ func Test_CheckHealth(t *testing.T) {
return sm
}

getChannelManager := func(t *testing.T, findWatcherOk bool) ChannelManager {
getChannelManager := func(findWatcherOk bool) ChannelManager {
channelManager := NewMockChannelManager(t)
if findWatcherOk {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil)
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, nil).Maybe()
} else {
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error"))
channelManager.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("error")).Maybe()
}
return channelManager
}
Expand All @@ -2527,6 +2528,21 @@ func Test_CheckHealth(t *testing.T) {
2: nil,
}

newServer := func(isHealthy bool, findWatcherOk bool, meta *meta) *Server {
svr := &Server{
ctx: context.TODO(),
sessionManager: getSessionManager(isHealthy),
channelManager: getChannelManager(findWatcherOk),
meta: meta,
session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}},
}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.healthChecker = healthcheck.NewChecker(20*time.Millisecond, svr.healthCheckFn)
svr.healthChecker.Start()
time.Sleep(30 * time.Millisecond) // wait for next cycle for health checker
return svr
}

t.Run("not healthy", func(t *testing.T) {
ctx := context.Background()
s := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
Expand All @@ -2538,9 +2554,8 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("data node health check is fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(false)
svr := newServer(false, true, &meta{channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2549,11 +2564,8 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("check channel watched fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, false)
svr.meta = &meta{collections: collections}
svr := newServer(true, false, &meta{collections: collections, channelCPs: newChannelCps()})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2562,11 +2574,7 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("check checkpoint fail", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
svr := newServer(true, true, &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
Expand All @@ -2576,8 +2584,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
}

})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand All @@ -2586,11 +2594,7 @@ func Test_CheckHealth(t *testing.T) {
})

t.Run("ok", func(t *testing.T) {
svr := &Server{session: &sessionutil.Session{SessionRaw: sessionutil.SessionRaw{ServerID: 1}}}
svr.stateCode.Store(commonpb.StateCode_Healthy)
svr.sessionManager = getSessionManager(true)
svr.channelManager = getChannelManager(t, true)
svr.meta = &meta{
svr := newServer(true, true, &meta{
collections: collections,
channelCPs: &channelCPs{
checkpoints: map[string]*msgpb.MsgPosition{
Expand All @@ -2608,7 +2612,8 @@ func Test_CheckHealth(t *testing.T) {
},
},
},
}
})
defer svr.healthChecker.Close()
ctx := context.Background()
resp, err := svr.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
assert.NoError(t, err)
Expand Down
26 changes: 15 additions & 11 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
Expand Down Expand Up @@ -1581,20 +1581,24 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
}, nil
}

err := s.sessionManager.CheckHealth(ctx)
if err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
latestCheckResult := s.healthChecker.GetLatestCheckResult()
return healthcheck.GetCheckHealthResponseFromResult(latestCheckResult), nil
}

if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
func (s *Server) healthCheckFn() *healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()

if err = CheckCheckPointsHealth(s.meta); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
checkResults := s.sessionManager.CheckDNHealth(ctx)
for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.ChannelsWatched))
}

return componentutil.CheckHealthRespWithErr(nil), nil
for collectionID, failReason := range CheckCheckPointsHealth(s.meta) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.NewUnhealthyCollectionMsg(collectionID, failReason, healthcheck.CheckpointLagExceed))
}
return checkResults
}

func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
Expand Down
46 changes: 32 additions & 14 deletions internal/datacoord/session/datanode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package session
import (
"context"
"fmt"
"sync"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
Expand Down Expand Up @@ -69,7 +71,7 @@ type DataNodeManager interface {
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
CheckHealth(ctx context.Context) error
CheckDNHealth(ctx context.Context) *healthcheck.Result
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
Close()
Expand Down Expand Up @@ -507,28 +509,44 @@ func (c *DataNodeManagerImpl) DropImport(nodeID int64, in *datapb.DropImportRequ
return merr.CheckRPCCall(status, err)
}

func (c *DataNodeManagerImpl) CheckHealth(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)

func (c *DataNodeManagerImpl) CheckDNHealth(ctx context.Context) *healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
ids := c.GetSessionIDs()

for _, nodeID := range ids {
nodeID := nodeID
group.Go(func() error {
cli, err := c.getClient(ctx, nodeID)
wg.Add(1)
go func() {
defer wg.Done()

datanodeClient, err := c.getClient(ctx, nodeID)
if err != nil {
return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err)
err = fmt.Errorf("failed to get node:%d: %v", nodeID, err)
return
}

sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return err
checkHealthResp, err := datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil && !errors.Is(err, merr.ErrServiceUnimplemented) {
err = fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err)
wlock.Lock()
result.AppendUnhealthyClusterMsg(
healthcheck.NewUnhealthyClusterMsg(typeutil.DataNodeRole, nodeID, err.Error(), healthcheck.NodeHealthCheck))
wlock.Unlock()
return
}
err = merr.AnalyzeState("DataNode", nodeID, sta)
return err
})

if checkHealthResp != nil && len(checkHealthResp.Reasons) > 0 {
wlock.Lock()
result.AppendResult(healthcheck.GetHealthCheckResultFromResp(checkHealthResp))
wlock.Unlock()
}
}()
}

return group.Wait()
wg.Wait()
return result
}

func (c *DataNodeManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) {
Expand Down
Loading

0 comments on commit 064e956

Please sign in to comment.