From c899bdc6fd07a125280ddc74e9e7679c96ef1c07 Mon Sep 17 00:00:00 2001 From: jaime Date: Wed, 27 Sep 2023 17:04:53 +0800 Subject: [PATCH] Remove recollect segment stats during starting datacoord Signed-off-by: jaime --- internal/datacoord/cluster.go | 16 ++---- internal/datacoord/cluster_test.go | 70 ++------------------------ internal/datacoord/server.go | 29 +---------- internal/datacoord/session_manager.go | 33 ++---------- internal/datanode/data_node.go | 8 +-- internal/datanode/data_node_test.go | 12 +++-- internal/proto/data_coord.proto | 1 + internal/proto/datapb/data_coord.pb.go | 2 + 8 files changed, 24 insertions(+), 147 deletions(-) diff --git a/internal/datacoord/cluster.go b/internal/datacoord/cluster.go index de36366737fd0..72fcafced4a7a 100644 --- a/internal/datacoord/cluster.go +++ b/internal/datacoord/cluster.go @@ -21,11 +21,12 @@ import ( "fmt" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/samber/lo" + "go.uber.org/zap" + "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/util/commonpbutil" - "github.com/samber/lo" - "go.uber.org/zap" ) // Cluster provides interfaces to interact with datanode cluster @@ -110,17 +111,6 @@ func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTas c.sessionManager.Import(ctx, nodeID, it) } -// ReCollectSegmentStats triggers a ReCollectSegmentStats call from session manager. -func (c *Cluster) ReCollectSegmentStats(ctx context.Context) error { - for _, node := range c.sessionManager.getLiveNodeIDs() { - err := c.sessionManager.ReCollectSegmentStats(ctx, node) - if err != nil { - return err - } - } - return nil -} - // GetSessions returns all sessions func (c *Cluster) GetSessions() []*Session { return c.sessionManager.GetSessions() diff --git a/internal/datacoord/cluster_test.go b/internal/datacoord/cluster_test.go index 60e9f25b1515f..a1a46766bda5a 100644 --- a/internal/datacoord/cluster_test.go +++ b/internal/datacoord/cluster_test.go @@ -23,13 +23,14 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "stathat.com/c/consistent" + "github.com/milvus-io/milvus/internal/kv" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/types" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "stathat.com/c/consistent" ) func getMetaKv(t *testing.T) kv.MetaKv { @@ -585,66 +586,3 @@ func TestCluster_Import(t *testing.T) { }) time.Sleep(500 * time.Millisecond) } - -func TestCluster_ReCollectSegmentStats(t *testing.T) { - kv := getMetaKv(t) - defer func() { - kv.RemoveWithPrefix("") - kv.Close() - }() - - t.Run("recollect succeed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - var mockSessionCreator = func(ctx context.Context, addr string, nodeID int64) (types.DataNode, error) { - return newMockDataNodeClient(1, nil) - } - sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator)) - channelManager, err := NewChannelManager(kv, newMockHandler()) - assert.Nil(t, err) - cluster := NewCluster(sessionManager, channelManager) - defer cluster.Close() - addr := "localhost:8080" - info := &NodeInfo{ - Address: addr, - NodeID: 1, - } - nodes := []*NodeInfo{info} - err = cluster.Startup(ctx, nodes) - assert.Nil(t, err) - - err = cluster.Watch("chan-1", 1) - assert.NoError(t, err) - - assert.NotPanics(t, func() { - cluster.ReCollectSegmentStats(ctx) - }) - time.Sleep(500 * time.Millisecond) - }) - - t.Run("recollect failed", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - sessionManager := NewSessionManager() - channelManager, err := NewChannelManager(kv, newMockHandler()) - assert.Nil(t, err) - cluster := NewCluster(sessionManager, channelManager) - defer cluster.Close() - addr := "localhost:8080" - info := &NodeInfo{ - Address: addr, - NodeID: 1, - } - nodes := []*NodeInfo{info} - err = cluster.Startup(ctx, nodes) - assert.Nil(t, err) - - err = cluster.Watch("chan-1", 1) - assert.NoError(t, err) - - assert.NotPanics(t, func() { - cluster.ReCollectSegmentStats(ctx) - }) - time.Sleep(500 * time.Millisecond) - }) -} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 6510445b49dec..4d61a350f8d31 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus/internal/common" datanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" rootcoordclient "github.com/milvus-io/milvus/internal/distributed/rootcoord/client" @@ -363,12 +364,6 @@ func (s *Server) startDataCoord() { } s.startServerLoop() - // DataCoord (re)starts successfully and starts to collection segment stats - // data from all DataNode. - // This will prevent DataCoord from missing out any important segment stats - // data while offline. - log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes") - s.reCollectSegmentStats(s.ctx) s.stateCode.Store(commonpb.StateCode_Healthy) } @@ -1022,25 +1017,3 @@ func (s *Server) hasCollection(ctx context.Context, collectionID int64) (bool, e } return false, statusErr } - -func (s *Server) reCollectSegmentStats(ctx context.Context) { - if s.channelManager == nil { - log.Error("null channel manager found, which should NOT happen in non-testing environment") - return - } - nodes := s.sessionManager.getLiveNodeIDs() - log.Info("re-collecting segment stats from DataNodes", - zap.Int64s("DataNode IDs", nodes)) - - reCollectFunc := func() error { - err := s.cluster.ReCollectSegmentStats(ctx) - if err != nil { - return err - } - return nil - } - - if err := retry.Do(ctx, reCollectFunc, retry.Attempts(20), retry.Sleep(time.Millisecond*100), retry.MaxSleepTime(5*time.Second)); err != nil { - panic(err) - } -} diff --git a/internal/datacoord/session_manager.go b/internal/datacoord/session_manager.go index ab37755b35ee1..7237b458d7ec4 100644 --- a/internal/datacoord/session_manager.go +++ b/internal/datacoord/session_manager.go @@ -22,7 +22,10 @@ import ( "sync" "time" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + grpcdatanodeclient "github.com/milvus-io/milvus/internal/distributed/datanode/client" "github.com/milvus-io/milvus/internal/log" "github.com/milvus-io/milvus/internal/metrics" @@ -30,14 +33,12 @@ import ( "github.com/milvus-io/milvus/internal/types" "github.com/milvus-io/milvus/internal/util/commonpbutil" "github.com/milvus-io/milvus/internal/util/retry" - "go.uber.org/zap" ) const ( flushTimeout = 15 * time.Second // TODO: evaluate and update import timeout. - importTimeout = 3 * time.Hour - reCollectTimeout = 5 * time.Second + importTimeout = 3 * time.Hour ) // SessionManager provides the grpc interfaces of cluster @@ -223,32 +224,6 @@ func (c *SessionManager) execImport(ctx context.Context, nodeID int64, itr *data log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr)) } -// ReCollectSegmentStats collects segment stats info from DataNodes, after DataCoord reboots. -func (c *SessionManager) ReCollectSegmentStats(ctx context.Context, nodeID int64) error { - cli, err := c.getClient(ctx, nodeID) - if err != nil { - log.Warn("failed to get dataNode client", zap.Int64("DataNode ID", nodeID), zap.Error(err)) - return err - } - ctx, cancel := context.WithTimeout(ctx, reCollectTimeout) - defer cancel() - resp, err := cli.ResendSegmentStats(ctx, &datapb.ResendSegmentStatsRequest{ - Base: commonpbutil.NewMsgBase( - commonpbutil.WithMsgType(commonpb.MsgType_ResendSegmentStats), - commonpbutil.WithSourceID(Params.DataCoordCfg.GetNodeID()), - ), - }) - if err := VerifyResponse(resp, err); err != nil { - log.Warn("re-collect segment stats call failed", - zap.Int64("DataNode ID", nodeID), zap.Error(err)) - return err - } - log.Info("re-collect segment stats call succeeded", - zap.Int64("DataNode ID", nodeID), - zap.Int64s("segment stat collected", resp.GetSegResent())) - return nil -} - func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateResult { wg := sync.WaitGroup{} ctx := context.Background() diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 516a635df4e6e..8271fc2aa99cc 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -693,17 +693,13 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen // ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message. // It returns a list of segments to be sent. +// Deprecated in 2.2.15, reversed it just for compatibility during rolling back func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) { - log.Info("start resending segment stats, if any", - zap.Int64("DataNode ID", Params.DataNodeCfg.GetNodeID())) - segResent := node.flowgraphManager.resendTT() - log.Info("found segment(s) with stats to resend", - zap.Int64s("segment IDs", segResent)) return &datapb.ResendSegmentStatsResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, }, - SegResent: segResent, + SegResent: make([]int64, 0), }, nil } diff --git a/internal/datanode/data_node_test.go b/internal/datanode/data_node_test.go index d3e58e111dd36..2734a8c5d9646 100644 --- a/internal/datanode/data_node_test.go +++ b/internal/datanode/data_node_test.go @@ -30,9 +30,14 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/common" etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" "github.com/milvus-io/milvus/internal/log" @@ -47,9 +52,6 @@ import ( "github.com/milvus-io/milvus/internal/util/importutil" "github.com/milvus-io/milvus/internal/util/metricsinfo" "github.com/milvus-io/milvus/internal/util/sessionutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "go.uber.org/zap" ) const returnError = "ReturnError" @@ -1253,11 +1255,11 @@ func TestDataNode_ResendSegmentStats(t *testing.T) { resp, err := node.ResendSegmentStats(node.ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.ElementsMatch(t, []UniqueID{0, 1, 2}, resp.GetSegResent()) + assert.Empty(t, resp.GetSegResent()) // Duplicate call. resp, err = node.ResendSegmentStats(node.ctx, req) assert.NoError(t, err) assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) - assert.ElementsMatch(t, []UniqueID{0, 1, 2}, resp.GetSegResent()) + assert.Empty(t, resp.GetSegResent()) } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 5cabe33e6fc8c..c7ad9864b2d91 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -96,6 +96,7 @@ service DataNode { // https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load rpc Import(ImportTaskRequest) returns(common.Status) {} + // Deprecated rpc ResendSegmentStats(ResendSegmentStatsRequest) returns(ResendSegmentStatsResponse) {} rpc AddImportSegment(AddImportSegmentRequest) returns(AddImportSegmentResponse) {} diff --git a/internal/proto/datapb/data_coord.pb.go b/internal/proto/datapb/data_coord.pb.go index aeab6e3dae5c0..fe2ad66e85bd9 100644 --- a/internal/proto/datapb/data_coord.pb.go +++ b/internal/proto/datapb/data_coord.pb.go @@ -7155,6 +7155,7 @@ type DataNodeClient interface { SyncSegments(ctx context.Context, in *SyncSegmentsRequest, opts ...grpc.CallOption) (*commonpb.Status, error) // https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load Import(ctx context.Context, in *ImportTaskRequest, opts ...grpc.CallOption) (*commonpb.Status, error) + // Deprecated ResendSegmentStats(ctx context.Context, in *ResendSegmentStatsRequest, opts ...grpc.CallOption) (*ResendSegmentStatsResponse, error) AddImportSegment(ctx context.Context, in *AddImportSegmentRequest, opts ...grpc.CallOption) (*AddImportSegmentResponse, error) } @@ -7289,6 +7290,7 @@ type DataNodeServer interface { SyncSegments(context.Context, *SyncSegmentsRequest) (*commonpb.Status, error) // https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load Import(context.Context, *ImportTaskRequest) (*commonpb.Status, error) + // Deprecated ResendSegmentStats(context.Context, *ResendSegmentStatsRequest) (*ResendSegmentStatsResponse, error) AddImportSegment(context.Context, *AddImportSegmentRequest) (*AddImportSegmentResponse, error) }