Skip to content

Commit

Permalink
Remove recollect segment stats during starting datacoord
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Oct 12, 2023
1 parent bf46ffd commit 8351428
Show file tree
Hide file tree
Showing 9 changed files with 9 additions and 159 deletions.
11 changes: 0 additions & 11 deletions internal/datacoord/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,6 @@ func (c *Cluster) Import(ctx context.Context, nodeID int64, it *datapb.ImportTas
c.sessionManager.Import(ctx, nodeID, it)
}

// ReCollectSegmentStats triggers a ReCollectSegmentStats call from session manager.
func (c *Cluster) ReCollectSegmentStats(ctx context.Context) error {
for _, node := range c.sessionManager.getLiveNodeIDs() {
err := c.sessionManager.ReCollectSegmentStats(ctx, node)
if err != nil {
return err
}
}
return nil
}

// GetSessions returns all sessions
func (c *Cluster) GetSessions() []*Session {
return c.sessionManager.GetSessions()
Expand Down
63 changes: 0 additions & 63 deletions internal/datacoord/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,66 +618,3 @@ func TestCluster_Import(t *testing.T) {
})
time.Sleep(500 * time.Millisecond)
}

func TestCluster_ReCollectSegmentStats(t *testing.T) {
kv := getWatchKV(t)
defer func() {
kv.RemoveWithPrefix("")
kv.Close()
}()

t.Run("recollect succeed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
mockSessionCreator := func(ctx context.Context, addr string, nodeID int64) (types.DataNodeClient, error) {
return newMockDataNodeClient(1, nil)
}
sessionManager := NewSessionManager(withSessionCreator(mockSessionCreator))
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)

err = cluster.Watch("chan-1", 1)
assert.NoError(t, err)

assert.NotPanics(t, func() {
cluster.ReCollectSegmentStats(ctx)
})
time.Sleep(500 * time.Millisecond)
})

t.Run("recollect failed", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
sessionManager := NewSessionManager()
channelManager, err := NewChannelManager(kv, newMockHandler())
assert.NoError(t, err)
cluster := NewCluster(sessionManager, channelManager)
defer cluster.Close()
addr := "localhost:8080"
info := &NodeInfo{
Address: addr,
NodeID: 1,
}
nodes := []*NodeInfo{info}
err = cluster.Startup(ctx, nodes)
assert.NoError(t, err)

err = cluster.Watch("chan-1", 1)
assert.NoError(t, err)

assert.NotPanics(t, func() {
cluster.ReCollectSegmentStats(ctx)
})
time.Sleep(500 * time.Millisecond)
})
}
28 changes: 0 additions & 28 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,12 +394,6 @@ func (s *Server) startDataCoord() {
s.compactionTrigger.start()
}
s.startServerLoop()
// DataCoord (re)starts successfully and starts to collection segment stats
// data from all DataNode.
// This will prevent DataCoord from missing out any important segment stats
// data while offline.
log.Info("DataCoord (re)starts successfully and re-collecting segment stats from DataNodes")
s.reCollectSegmentStats(s.ctx)
s.stateCode.Store(commonpb.StateCode_Healthy)
}

Expand Down Expand Up @@ -1111,25 +1105,3 @@ func (s *Server) loadCollectionFromRootCoord(ctx context.Context, collectionID i
s.meta.AddCollection(collInfo)
return nil
}

func (s *Server) reCollectSegmentStats(ctx context.Context) {
if s.channelManager == nil {
log.Error("null channel manager found, which should NOT happen in non-testing environment")
return
}
nodes := s.sessionManager.getLiveNodeIDs()
log.Info("re-collecting segment stats from DataNodes",
zap.Int64s("DataNode IDs", nodes))

reCollectFunc := func() error {
err := s.cluster.ReCollectSegmentStats(ctx)
if err != nil {
return err
}
return nil
}

if err := retry.Do(ctx, reCollectFunc, retry.Attempts(20), retry.Sleep(time.Millisecond*100), retry.MaxSleepTime(5*time.Second)); err != nil {
panic(err)
}
}
29 changes: 1 addition & 28 deletions internal/datacoord/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@ import (
const (
flushTimeout = 15 * time.Second
// TODO: evaluate and update import timeout.
importTimeout = 3 * time.Hour
reCollectTimeout = 5 * time.Second
importTimeout = 3 * time.Hour
)

// SessionManager provides the grpc interfaces of cluster
Expand Down Expand Up @@ -227,32 +226,6 @@ func (c *SessionManager) execImport(ctx context.Context, nodeID int64, itr *data
log.Info("success to import", zap.Int64("node", nodeID), zap.Any("import task", itr))
}

// ReCollectSegmentStats collects segment stats info from DataNodes, after DataCoord reboots.
func (c *SessionManager) ReCollectSegmentStats(ctx context.Context, nodeID int64) error {
cli, err := c.getClient(ctx, nodeID)
if err != nil {
log.Warn("failed to get dataNode client", zap.Int64("DataNode ID", nodeID), zap.Error(err))
return err
}
ctx, cancel := context.WithTimeout(ctx, reCollectTimeout)
defer cancel()
resp, err := cli.ResendSegmentStats(ctx, &datapb.ResendSegmentStatsRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_ResendSegmentStats),
commonpbutil.WithSourceID(paramtable.GetNodeID()),
),
})
if err := VerifyResponse(resp, err); err != nil {
log.Warn("re-collect segment stats call failed",
zap.Int64("DataNode ID", nodeID), zap.Error(err))
return err
}
log.Info("re-collect segment stats call succeeded",
zap.Int64("DataNode ID", nodeID),
zap.Int64s("segment stat collected", resp.GetSegResent()))
return nil
}

func (c *SessionManager) GetCompactionState() map[int64]*datapb.CompactionStateResult {
wg := sync.WaitGroup{}
ctx := context.Background()
Expand Down
20 changes: 0 additions & 20 deletions internal/datanode/flow_graph_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,26 +192,6 @@ func (fm *flowgraphManager) getChannel(segID UniqueID) (Channel, error) {
return nil, fmt.Errorf("cannot find segment %d in all flowgraphs", segID)
}

// resendTT loops through flow graphs, looks for segments that are not flushed,
// and sends them to that flow graph's `resendTTCh` channel so stats of
// these segments will be resent.
func (fm *flowgraphManager) resendTT() []UniqueID {
var unFlushedSegments []UniqueID
fm.flowgraphs.Range(func(key string, fg *dataSyncService) bool {
segIDs := fg.channel.listNotFlushedSegmentIDs()
if len(segIDs) > 0 {
log.Info("un-flushed segments found, stats will be resend",
zap.Int64s("segment IDs", segIDs))
unFlushedSegments = append(unFlushedSegments, segIDs...)
fg.resendTTCh <- resendTTMsg{
segmentIDs: segIDs,
}
}
return true
})
return unFlushedSegments
}

func (fm *flowgraphManager) getFlowgraphService(vchan string) (*dataSyncService, bool) {
return fm.flowgraphs.Get(vchan)
}
Expand Down
10 changes: 3 additions & 7 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,13 @@ func (node *DataNode) FlushSegments(ctx context.Context, req *datapb.FlushSegmen
return merr.Success(), nil
}

// ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message.
// ResendSegmentStats . ResendSegmentStats resend un-flushed segment stats back upstream to DataCoord by resending DataNode time tick message.
// It returns a list of segments to be sent.
// Deprecated in 2.3.2, reversed it just for compatibility during rolling back
func (node *DataNode) ResendSegmentStats(ctx context.Context, req *datapb.ResendSegmentStatsRequest) (*datapb.ResendSegmentStatsResponse, error) {
log.Info("start resending segment stats, if any",
zap.Int64("DataNode ID", paramtable.GetNodeID()))
segResent := node.flowgraphManager.resendTT()
log.Info("found segment(s) with stats to resend",
zap.Int64s("segment IDs", segResent))
return &datapb.ResendSegmentStatsResponse{
Status: merr.Success(),
SegResent: segResent,
SegResent: make([]int64, 0),
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,13 +813,13 @@ func (s *DataNodeServicesSuite) TestResendSegmentStats() {
resp, err := s.node.ResendSegmentStats(s.ctx, req)
s.Assert().NoError(err)
s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent())
s.Assert().Empty(resp.GetSegResent())

// Duplicate call.
resp, err = s.node.ResendSegmentStats(s.ctx, req)
s.Assert().NoError(err)
s.Assert().True(merr.Ok(resp.GetStatus()))
s.Assert().ElementsMatch([]UniqueID{0, 1, 2}, resp.GetSegResent())
s.Assert().Empty(resp.GetSegResent())
}

func (s *DataNodeServicesSuite) TestFlushChannels() {
Expand Down
1 change: 1 addition & 0 deletions internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ service DataNode {
// https://wiki.lfaidata.foundation/display/MIL/MEP+24+--+Support+bulk+load
rpc Import(ImportTaskRequest) returns(common.Status) {}

// Deprecated
rpc ResendSegmentStats(ResendSegmentStatsRequest) returns(ResendSegmentStatsResponse) {}

rpc AddImportSegment(AddImportSegmentRequest) returns(AddImportSegmentResponse) {}
Expand Down
2 changes: 2 additions & 0 deletions internal/proto/datapb/data_coord.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8351428

Please sign in to comment.