Skip to content

Commit

Permalink
enhance: optimize CPU usage for CheckHealth requests
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Aug 21, 2024
1 parent 22ced01 commit f055ee7
Show file tree
Hide file tree
Showing 38 changed files with 866 additions and 139 deletions.
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/validator/v10 v10.14.0
github.com/gofrs/flock v0.8.1
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/protobuf v1.5.4
github.com/google/btree v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
Expand All @@ -32,6 +32,7 @@ require (
github.com/samber/lo v1.27.0
github.com/sbinet/npyio v0.6.0
github.com/soheilhy/cmux v0.1.5
github.com/sourcegraph/conc v0.3.0
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.9.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -798,6 +798,8 @@ github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
Expand Down
28 changes: 15 additions & 13 deletions internal/datacoord/mock_session_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}

func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{Status: merr.Success()}, nil
}

type mockRootCoordClient struct {
state commonpb.StateCode
cnt int64
Expand Down
5 changes: 5 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
streamingcoord "github.com/milvus-io/milvus/internal/streamingcoord/server"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/internal/util/streamingutil"
"github.com/milvus-io/milvus/pkg/kv"
Expand Down Expand Up @@ -161,6 +162,8 @@ type Server struct {

// streamingcoord server is embedding in datacoord now.
streamingCoord *streamingcoord.Server

healthChecker *healthcheck.Checker
}

type CollectionNameInfo struct {
Expand Down Expand Up @@ -402,6 +405,8 @@ func (s *Server) initDataCoord() error {

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

interval := Params.CommonCfg.HealthCheckInterval.GetAsDuration(time.Second)
s.healthChecker = healthcheck.NewChecker(interval, s.healthCheckFn)
log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
return nil
}
Expand Down
27 changes: 17 additions & 10 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/internal/util/segmentutil"
"github.com/milvus-io/milvus/pkg/common"
Expand Down Expand Up @@ -1576,20 +1577,26 @@ func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthReque
}, nil
}

err := s.sessionManager.CheckHealth(ctx)
if err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
latestCheckResult := s.healthChecker.GetLatestCheckResult()
if !latestCheckResult.IsHealthy() {
return healthcheck.GetCheckHealthResponseFrom(&latestCheckResult), nil
}
return componentutil.CheckHealthRespWithErr(nil), nil
}

if err = CheckAllChannelsWatched(s.meta, s.channelManager); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
func (s *Server) healthCheckFn() healthcheck.Result {
timeout := Params.CommonCfg.HealthCheckRPCTimeout.GetAsDuration(time.Second)
ctx, cancel := context.WithTimeout(s.ctx, timeout)
defer cancel()

if err = CheckCheckPointsHealth(s.meta); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
checkResults := s.sessionManager.CheckDNHealth(ctx)
for collectionID, failReason := range CheckAllChannelsWatched(s.meta, s.channelManager) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.UnhealthyCollectionMsg{CollectionID: collectionID, UnhealthyMsg: failReason})
}

return componentutil.CheckHealthRespWithErr(nil), nil
for collectionID, failReason := range CheckCheckPointsHealth(s.meta) {
checkResults.AppendUnhealthyCollectionMsgs(healthcheck.UnhealthyCollectionMsg{CollectionID: collectionID, UnhealthyMsg: failReason})
}
return checkResults
}

func (s *Server) GcConfirm(ctx context.Context, request *datapb.GcConfirmRequest) (*datapb.GcConfirmResponse, error) {
Expand Down
59 changes: 46 additions & 13 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package datacoord
import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/cockroachdb/errors"
Expand All @@ -31,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/types"
"github.com/milvus-io/milvus/internal/util/healthcheck"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
Expand Down Expand Up @@ -69,7 +72,7 @@ type SessionManager interface {
QueryPreImport(nodeID int64, in *datapb.QueryPreImportRequest) (*datapb.QueryPreImportResponse, error)
QueryImport(nodeID int64, in *datapb.QueryImportRequest) (*datapb.QueryImportResponse, error)
DropImport(nodeID int64, in *datapb.DropImportRequest) error
CheckHealth(ctx context.Context) error
CheckDNHealth(ctx context.Context) healthcheck.Result
QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error)
DropCompactionPlan(nodeID int64, req *datapb.DropCompactionPlanRequest) error
Close()
Expand Down Expand Up @@ -507,28 +510,58 @@ func (c *SessionManagerImpl) DropImport(nodeID int64, in *datapb.DropImportReque
return VerifyResponse(status, err)
}

func (c *SessionManagerImpl) CheckHealth(ctx context.Context) error {
group, ctx := errgroup.WithContext(ctx)

func (c *SessionManagerImpl) CheckDNHealth(ctx context.Context) healthcheck.Result {
result := healthcheck.NewResult()
wg := sync.WaitGroup{}
wlock := sync.Mutex{}
ids := c.GetSessionIDs()

for _, nodeID := range ids {
nodeID := nodeID
group.Go(func() error {
cli, err := c.getClient(ctx, nodeID)
wg.Add(1)
go func() {
var datanodeClient types.DataNodeClient
var checkHealthResp *milvuspb.CheckHealthResponse
var err error

defer func() {
if err != nil {
wlock.Lock()
result.UnhealthyNodeMsgs = append(result.UnhealthyNodeMsgs, healthcheck.UnhealthyNodeMsg{
Role: "DataNode",
NodeID: nodeID,
UnhealthyMsg: err.Error(),
})
wlock.Unlock()
}
wg.Done()
}()

datanodeClient, err = c.getClient(ctx, nodeID)
if err != nil {
return fmt.Errorf("failed to get DataNode %d: %v", nodeID, err)
err = fmt.Errorf("failed to get node:%d: %v", nodeID, err)
return
}

sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
err = merr.AnalyzeComponentStateResp(datanodeClient.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{}))
if err != nil {
return err
return
}
err = merr.AnalyzeState("DataNode", nodeID, sta)
return err
})

checkHealthResp, err = datanodeClient.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil {
err = fmt.Errorf("CheckHealth fails for node:%d, %w", nodeID, err)
return
}

if !checkHealthResp.IsHealthy && len(checkHealthResp.Reasons) > 0 {
err = errors.New(strings.Join(checkHealthResp.Reasons, ","))
}
}()
}

return group.Wait()
wg.Wait()
return result
}

func (c *SessionManagerImpl) QuerySlot(nodeID int64) (*datapb.QuerySlotResponse, error) {
Expand Down
18 changes: 9 additions & 9 deletions internal/datacoord/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,8 @@ func getBinLogIDs(segment *SegmentInfo, fieldID int64) []int64 {
return binlogIDs
}

func CheckCheckPointsHealth(meta *meta) error {
func CheckCheckPointsHealth(meta *meta) map[int64]string {
checkResult := make(map[int64]string)
for channel, cp := range meta.GetChannelCheckpoints() {
collectionID := funcutil.GetCollectionIDFromVChannel(channel)
if collectionID == -1 {
Expand All @@ -292,29 +293,28 @@ func CheckCheckPointsHealth(meta *meta) error {
ts, _ := tsoutil.ParseTS(cp.Timestamp)
lag := time.Since(ts)
if lag > paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.GetAsDuration(time.Second) {
return merr.WrapErrChannelCPExceededMaxLag(channel, fmt.Sprintf("checkpoint lag: %f(min)", lag.Minutes()))
checkResult[collectionID] = fmt.Sprintf("exceeds max lag:%s on channel:%s checkpoint", lag, channel)
}
}
return nil
return checkResult
}

func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) error {
func CheckAllChannelsWatched(meta *meta, channelManager ChannelManager) map[int64]string {
collIDs := meta.ListCollections()
checkResult := make(map[int64]string)
for _, collID := range collIDs {
collInfo := meta.GetCollection(collID)
if collInfo == nil {
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
log.RatedWarn(60, "collection info is nil, skip it", zap.Int64("collectionID", collID))
continue
}

for _, channelName := range collInfo.VChannelNames {
_, err := channelManager.FindWatcher(channelName)
if err != nil {
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
zap.String("channelName", channelName), zap.Error(err))
return err
checkResult[collID] = fmt.Sprintf("channel:%s is not watched", channelName)
}
}
}
return nil
return checkResult
}
4 changes: 1 addition & 3 deletions internal/datanode/channel/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ func (s *ChannelManagerSuite) TearDownTest() {
}

func (s *ChannelManagerSuite) TestReleaseStuck() {
var (
channel = "by-dev-rootcoord-dml-2"
)
channel := "by-dev-rootcoord-dml-2"
s.manager.releaseFunc = func(channel string) {
time.Sleep(1 * time.Second)
}
Expand Down
21 changes: 21 additions & 0 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package datanode
import (
"context"
"fmt"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
Expand All @@ -32,10 +33,12 @@ import (
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand All @@ -44,6 +47,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -589,3 +594,19 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo
log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID()))
return merr.Success(), nil
}

func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Status(err),
Reasons: []string{err.Error()},
}, nil
}

maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt()
if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return componentutil.CheckHealthRespWithErr(nil), nil
}
6 changes: 6 additions & 0 deletions internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact
return client.DropCompactionPlan(ctx, req)
})
}

func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
}
4 changes: 4 additions & 0 deletions internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*
func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
return s.datanode.DropCompactionPlan(ctx, req)
}

func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.datanode.CheckHealth(ctx, req)
}
Loading

0 comments on commit f055ee7

Please sign in to comment.