diff --git a/internal/datacoord/channel_manager.go b/internal/datacoord/channel_manager.go index 14c4ed3007896..2d8ce46c627c8 100644 --- a/internal/datacoord/channel_manager.go +++ b/internal/datacoord/channel_manager.go @@ -498,6 +498,18 @@ func (c *ChannelManager) GetChannels() []*NodeChannelInfo { return c.store.GetNodesChannels() } +func (c *ChannelManager) GetChannelsByCollectionID(collectionID UniqueID) []*channel { + channels := make([]*channel, 0) + for _, nodeChannels := range c.store.GetNodesChannels() { + for _, channelInfo := range nodeChannels.Channels { + if collectionID == channelInfo.CollectionID { + channels = append(channels, channelInfo) + } + } + } + return channels +} + // GetBufferChannels gets buffer channels. func (c *ChannelManager) GetBufferChannels() *NodeChannelInfo { c.mu.RLock() diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8e3292bc52d9b..70080a5297437 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -758,20 +758,11 @@ func (s *Server) GetRecoveryInfoV2(ctx context.Context, req *datapb.GetRecoveryI Status: merr.Status(err), }, nil } - - dresp, err := s.broker.DescribeCollectionInternal(s.ctx, collectionID) - if err != nil { - log.Error("get collection info from rootcoord failed", - zap.Error(err)) - - resp.Status = merr.Status(err) - return resp, nil - } - channels := dresp.GetVirtualChannelNames() + channels := s.channelManager.GetChannelsByCollectionID(collectionID) channelInfos := make([]*datapb.VchannelInfo, 0, len(channels)) flushedIDs := make(typeutil.UniqueSet) - for _, c := range channels { - channelInfo := s.handler.GetQueryVChanPositions(&channel{Name: c, CollectionID: collectionID}, partitionIDs...) + for _, ch := range channels { + channelInfo := s.handler.GetQueryVChanPositions(ch, partitionIDs...) channelInfos = append(channelInfos, channelInfo) log.Info("datacoord append channelInfo in GetRecoveryInfo", zap.String("channel", channelInfo.GetChannelName()), diff --git a/internal/datacoord/services_test.go b/internal/datacoord/services_test.go index 8d509a935876d..84b2d9cd980dd 100644 --- a/internal/datacoord/services_test.go +++ b/internal/datacoord/services_test.go @@ -114,8 +114,7 @@ func TestGetRecoveryInfoV2(t *testing.T) { assert.NoError(t, err) assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode()) assert.EqualValues(t, 0, len(resp.GetSegments())) - assert.EqualValues(t, 1, len(resp.GetChannels())) - assert.Nil(t, resp.GetChannels()[0].SeekPosition) + assert.EqualValues(t, 0, len(resp.GetChannels())) }) createSegment := func(id, collectionID, partitionID, numOfRows int64, posTs uint64, @@ -231,6 +230,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { }) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -307,6 +310,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -447,6 +454,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -489,6 +500,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) assert.NoError(t, err) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } @@ -569,6 +584,10 @@ func TestGetRecoveryInfoV2(t *testing.T) { IndexSize: 0, }) + ch := &channel{Name: "vchan1", CollectionID: 0} + svr.channelManager.AddNode(0) + svr.channelManager.Watch(ch) + req := &datapb.GetRecoveryInfoRequestV2{ CollectionID: 0, } diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 5153f46bd901d..2b986757b3927 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -150,7 +150,8 @@ func (broker *CoordinatorBroker) GetRecoveryInfoV2(ctx context.Context, collecti } recoveryInfo, err := broker.dataCoord.GetRecoveryInfoV2(ctx, getRecoveryInfoRequest) if err != nil { - log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) + log.Warn("get recovery info failed", zap.Int64("collectionID", collectionID), + zap.Int64s("partitionIDs", partitionIDs), zap.Error(err)) return nil, nil, err }