Skip to content

Commit

Permalink
enhance: optimize CPU usage for CheckHealth requests
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Aug 20, 2024
1 parent 22ced01 commit 0c31c7d
Show file tree
Hide file tree
Showing 29 changed files with 538 additions and 60 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/gin-gonic/gin v1.9.1
github.com/go-playground/validator/v10 v10.14.0
github.com/gofrs/flock v0.8.1
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/protobuf v1.5.4
github.com/google/btree v1.1.2
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/klauspost/compress v1.17.7
Expand Down
4 changes: 4 additions & 0 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ func (c *mockDataNodeClient) Stop() error {
return nil
}

func (c *mockDataNodeClient) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return &milvuspb.CheckHealthResponse{Status: merr.Success()}, nil
}

type mockRootCoordClient struct {
state commonpb.StateCode
cnt int64
Expand Down
19 changes: 16 additions & 3 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package datacoord
import (
"context"
"fmt"
"strings"
"time"

"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -520,11 +521,23 @@ func (c *SessionManagerImpl) CheckHealth(ctx context.Context) error {
}

sta, err := cli.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
if err = merr.CheckRPCCall(sta, err); err != nil {
return fmt.Errorf("GetComponentStates fails for datanode:%d, %w", nodeID, err)
}

if err = merr.AnalyzeState("DataNode", nodeID, sta); err != nil {
return err
}
err = merr.AnalyzeState("DataNode", nodeID, sta)
return err

checkHealthResp, err := cli.CheckHealth(ctx, &milvuspb.CheckHealthRequest{})
if err = merr.CheckRPCCall(checkHealthResp, err); err != nil {
return fmt.Errorf("CheckHealth fails for datanode:%d, %w", nodeID, err)
}

if !checkHealthResp.IsHealthy && len(checkHealthResp.Reasons) > 0 {
return errors.New(strings.Join(checkHealthResp.Reasons, ","))
}
return nil
})
}

Expand Down
4 changes: 1 addition & 3 deletions internal/datanode/channel/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,7 @@ func (s *ChannelManagerSuite) TearDownTest() {
}

func (s *ChannelManagerSuite) TestReleaseStuck() {
var (
channel = "by-dev-rootcoord-dml-2"
)
channel := "by-dev-rootcoord-dml-2"
s.manager.releaseFunc = func(channel string) {
time.Sleep(1 * time.Second)
}
Expand Down
21 changes: 21 additions & 0 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package datanode
import (
"context"
"fmt"
"time"

"github.com/samber/lo"
"go.uber.org/zap"
Expand All @@ -32,10 +33,12 @@ import (
"github.com/milvus-io/milvus/internal/datanode/importv2"
"github.com/milvus-io/milvus/internal/flushcommon/io"
"github.com/milvus-io/milvus/internal/flushcommon/metacache"
"github.com/milvus-io/milvus/internal/flushcommon/util"
"github.com/milvus-io/milvus/internal/metastore/kv/binlog"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/internalpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/internal/util/componentutil"
"github.com/milvus-io/milvus/internal/util/importutilv2"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
Expand All @@ -44,6 +47,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/ratelimitutil"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)

Expand Down Expand Up @@ -589,3 +594,19 @@ func (node *DataNode) DropCompactionPlan(ctx context.Context, req *datapb.DropCo
log.Ctx(ctx).Info("DropCompactionPlans success", zap.Int64("planID", req.GetPlanID()))
return merr.Success(), nil
}

func (node *DataNode) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
if err := merr.CheckHealthy(node.GetStateCode()); err != nil {
return &milvuspb.CheckHealthResponse{
Status: merr.Status(err),
Reasons: []string{err.Error()},
}, nil
}

maxDelay := paramtable.Get().QuotaConfig.MaxTimeTickDelay.GetAsDuration(time.Second)
minFGChannel, minFGTt := util.GetRateCollector().GetMinFlowGraphTt()
if err := ratelimitutil.CheckTimeTickDelay(minFGChannel, minFGTt, maxDelay); err != nil {
return componentutil.CheckHealthRespWithErr(err), nil
}
return componentutil.CheckHealthRespWithErr(nil), nil
}
6 changes: 6 additions & 0 deletions internal/distributed/datanode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,3 +267,9 @@ func (c *Client) DropCompactionPlan(ctx context.Context, req *datapb.DropCompact
return client.DropCompactionPlan(ctx, req)
})
}

func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return wrapGrpcCall(ctx, c, func(client datapb.DataNodeClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
}
4 changes: 4 additions & 0 deletions internal/distributed/datanode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,7 @@ func (s *Server) QuerySlot(ctx context.Context, req *datapb.QuerySlotRequest) (*
func (s *Server) DropCompactionPlan(ctx context.Context, req *datapb.DropCompactionPlanRequest) (*commonpb.Status, error) {
return s.datanode.DropCompactionPlan(ctx, req)
}

func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.datanode.CheckHealth(ctx, req)
}
6 changes: 6 additions & 0 deletions internal/distributed/querynode/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,3 +332,9 @@ func (c *Client) Delete(ctx context.Context, req *querypb.DeleteRequest, _ ...gr
return client.Delete(ctx, req)
})
}

func (c *Client) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest, opts ...grpc.CallOption) (*milvuspb.CheckHealthResponse, error) {
return wrapGrpcCall(ctx, c, func(client querypb.QueryNodeClient) (*milvuspb.CheckHealthResponse, error) {
return client.CheckHealth(ctx, req)
})
}
4 changes: 4 additions & 0 deletions internal/distributed/querynode/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,7 @@ func (s *Server) SyncDistribution(ctx context.Context, req *querypb.SyncDistribu
func (s *Server) Delete(ctx context.Context, req *querypb.DeleteRequest) (*commonpb.Status, error) {
return s.querynode.Delete(ctx, req)
}

func (s *Server) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthRequest) (*milvuspb.CheckHealthResponse, error) {
return s.querynode.CheckHealth(ctx, req)
}
55 changes: 55 additions & 0 deletions internal/mocks/mock_datanode.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions internal/mocks/mock_datanode_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

55 changes: 55 additions & 0 deletions internal/mocks/mock_querynode.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0c31c7d

Please sign in to comment.