diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index b192f3e98d1b1..975d9d5be1e68 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -42,6 +42,15 @@ type Handler interface { CheckShouldDropChannel(ch string) bool FinishDropChannel(ch string, collectionID int64) error GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error) + GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView +} + +type SegmentsView struct { + FlushedSegmentIDs []int64 + GrowingSegmentIDs []int64 + DroppedSegmentIDs []int64 + L0SegmentIDs []int64 + ImportingSegmentIDs []int64 } // ServerHandler is a helper of Server @@ -263,6 +272,124 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . } } +func (h *ServerHandler) GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView { + validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID }) + if len(validPartitions) <= 0 { + collInfo, err := h.s.handler.GetCollection(h.s.ctx, channel.GetCollectionID()) + if err != nil || collInfo == nil { + log.Warn("collectionInfo is nil") + return nil + } + validPartitions = collInfo.Partitions + } + + var ( + flushedIDs = make(typeutil.UniqueSet) + droppedIDs = make(typeutil.UniqueSet) + growingIDs = make(typeutil.UniqueSet) + importingIDs = make(typeutil.UniqueSet) + levelZeroIDs = make(typeutil.UniqueSet) + newFlushedIDs = make(typeutil.UniqueSet) + ) + + // cannot use GetSegmentsByChannel since dropped segments are needed here + segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName()) + + validSegmentInfos := make(map[int64]*SegmentInfo) + for _, s := range segments { + if s.GetStartPosition() == nil && s.GetDmlPosition() == nil { + continue + } + + validSegmentInfos[s.GetID()] = s + switch { + case s.GetState() == commonpb.SegmentState_Dropped: + droppedIDs.Insert(s.GetID()) + case s.GetState() == commonpb.SegmentState_Importing: + importingIDs.Insert(s.GetID()) + case s.GetLevel() == datapb.SegmentLevel_L0: + levelZeroIDs.Insert(s.GetID()) + case s.GetState() == commonpb.SegmentState_Growing: + growingIDs.Insert(s.GetID()) + default: + flushedIDs.Insert(s.GetID()) + } + } + + isValid := func(ids ...UniqueID) bool { + for _, id := range ids { + if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() { + return false + } + } + return true + } + + var compactionFromExist func(segID UniqueID) bool + + compactionFromExist = func(segID UniqueID) bool { + compactionFrom := validSegmentInfos[segID].GetCompactionFrom() + if len(compactionFrom) == 0 || !isValid(compactionFrom...) { + return false + } + for _, fromID := range compactionFrom { + if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) { + return true + } + if compactionFromExist(fromID) { + return true + } + } + return false + } + + retrieve := func() bool { + continueRetrieve := false + for id := range flushedIDs { + compactionFrom := validSegmentInfos[id].GetCompactionFrom() + if len(compactionFrom) == 0 || !isValid(compactionFrom...) { + newFlushedIDs.Insert(id) + continue + } + if !compactionFromExist(id) { + newFlushedIDs.Insert(id) + } else { + for _, fromID := range compactionFrom { + newFlushedIDs.Insert(fromID) + continueRetrieve = true + droppedIDs.Remove(fromID) + } + } + } + return continueRetrieve + } + + for retrieve() { + flushedIDs = newFlushedIDs + newFlushedIDs = make(typeutil.UniqueSet) + } + + flushedIDs = newFlushedIDs + + log.Ctx(ctx).Info("GetCurrentSegmentsView", + zap.Int64("collectionID", channel.GetCollectionID()), + zap.String("channel", channel.GetName()), + zap.Int("numOfSegments", len(segments)), + zap.Int("result flushed", len(flushedIDs)), + zap.Int("result growing", len(growingIDs)), + zap.Int("result importing", len(importingIDs)), + zap.Int("result L0", len(levelZeroIDs)), + ) + + return &SegmentsView{ + FlushedSegmentIDs: flushedIDs.Collect(), + GrowingSegmentIDs: growingIDs.Collect(), + DroppedSegmentIDs: droppedIDs.Collect(), + L0SegmentIDs: levelZeroIDs.Collect(), + ImportingSegmentIDs: importingIDs.Collect(), + } +} + // getEarliestSegmentDMLPos returns the earliest dml position of segments, // this is mainly for COMPATIBILITY with old version <=2.1.x func (h *ServerHandler) getEarliestSegmentDMLPos(channel string, partitionIDs ...UniqueID) *msgpb.MsgPosition { diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index de202ca37c307..1535f3c02f3fe 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -959,6 +959,177 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { }) } +func TestGetCurrentSegmentsView(t *testing.T) { + svr := newTestServer(t) + defer closeTestServer(t, svr) + schema := newTestSchema() + svr.meta.AddCollection(&collectionInfo{ + ID: 0, + Schema: schema, + }) + err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{ + TenantID: "", + CollectionID: 0, + FieldID: 2, + IndexID: 1, + }) + assert.NoError(t, err) + seg1 := &datapb.SegmentInfo{ + ID: 1, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1)) + assert.NoError(t, err) + seg2 := &datapb.SegmentInfo{ + ID: 2, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{1}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2)) + assert.NoError(t, err) + seg3 := &datapb.SegmentInfo{ + ID: 3, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg3)) + assert.NoError(t, err) + seg4 := &datapb.SegmentInfo{ + ID: 4, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4)) + assert.NoError(t, err) + seg5 := &datapb.SegmentInfo{ + ID: 5, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{3, 4}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5)) + assert.NoError(t, err) + seg6 := &datapb.SegmentInfo{ + ID: 6, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{3, 4}, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg6)) + assert.NoError(t, err) + seg7 := &datapb.SegmentInfo{ + ID: 7, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg7)) + assert.NoError(t, err) + seg8 := &datapb.SegmentInfo{ + ID: 8, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Growing, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8)) + assert.NoError(t, err) + seg9 := &datapb.SegmentInfo{ + ID: 9, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Importing, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9)) + assert.NoError(t, err) + + view := svr.handler.GetCurrentSegmentsView(context.Background(), &channelMeta{Name: "ch1", CollectionID: 0}) + assert.ElementsMatch(t, []int64{2, 3, 4}, view.FlushedSegmentIDs) + assert.ElementsMatch(t, []int64{8}, view.GrowingSegmentIDs) + assert.ElementsMatch(t, []int64{1}, view.DroppedSegmentIDs) + assert.ElementsMatch(t, []int64{7}, view.L0SegmentIDs) + assert.ElementsMatch(t, []int64{9}, view.ImportingSegmentIDs) +} + func TestShouldDropChannel(t *testing.T) { type myRootCoord struct { mocks2.MockRootCoordClient diff --git a/internal/datacoord/mock_handler.go b/internal/datacoord/mock_handler.go index 20f84a8f53094..7c3ec969e8a48 100644 --- a/internal/datacoord/mock_handler.go +++ b/internal/datacoord/mock_handler.go @@ -174,6 +174,70 @@ func (_c *NMockHandler_GetCollection_Call) RunAndReturn(run func(context.Context return _c } +// GetCurrentSegmentsView provides a mock function with given fields: ctx, channel, partitionIDs +func (_m *NMockHandler) GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...int64) *SegmentsView { + _va := make([]interface{}, len(partitionIDs)) + for _i := range partitionIDs { + _va[_i] = partitionIDs[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, channel) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + if len(ret) == 0 { + panic("no return value specified for GetCurrentSegmentsView") + } + + var r0 *SegmentsView + if rf, ok := ret.Get(0).(func(context.Context, RWChannel, ...int64) *SegmentsView); ok { + r0 = rf(ctx, channel, partitionIDs...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*SegmentsView) + } + } + + return r0 +} + +// NMockHandler_GetCurrentSegmentsView_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCurrentSegmentsView' +type NMockHandler_GetCurrentSegmentsView_Call struct { + *mock.Call +} + +// GetCurrentSegmentsView is a helper method to define mock.On call +// - ctx context.Context +// - channel RWChannel +// - partitionIDs ...int64 +func (_e *NMockHandler_Expecter) GetCurrentSegmentsView(ctx interface{}, channel interface{}, partitionIDs ...interface{}) *NMockHandler_GetCurrentSegmentsView_Call { + return &NMockHandler_GetCurrentSegmentsView_Call{Call: _e.mock.On("GetCurrentSegmentsView", + append([]interface{}{ctx, channel}, partitionIDs...)...)} +} + +func (_c *NMockHandler_GetCurrentSegmentsView_Call) Run(run func(ctx context.Context, channel RWChannel, partitionIDs ...int64)) *NMockHandler_GetCurrentSegmentsView_Call { + _c.Call.Run(func(args mock.Arguments) { + variadicArgs := make([]int64, len(args)-2) + for i, a := range args[2:] { + if a != nil { + variadicArgs[i] = a.(int64) + } + } + run(args[0].(context.Context), args[1].(RWChannel), variadicArgs...) + }) + return _c +} + +func (_c *NMockHandler_GetCurrentSegmentsView_Call) Return(_a0 *SegmentsView) *NMockHandler_GetCurrentSegmentsView_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *NMockHandler_GetCurrentSegmentsView_Call) RunAndReturn(run func(context.Context, RWChannel, ...int64) *SegmentsView) *NMockHandler_GetCurrentSegmentsView_Call { + _c.Call.Return(run) + return _c +} + // GetDataVChanPositions provides a mock function with given fields: ch, partitionID func (_m *NMockHandler) GetDataVChanPositions(ch RWChannel, partitionID int64) *datapb.VchannelInfo { ret := _m.Called(ch, partitionID) diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 963ec0ac73cf0..f3f1890a9c972 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -763,6 +763,10 @@ func (h *mockHandler) GetCollection(_ context.Context, collectionID UniqueID) (* return &collectionInfo{ID: collectionID}, nil } +func (h *mockHandler) GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView { + return nil +} + func newMockHandlerWithMeta(meta *meta) *mockHandler { return &mockHandler{ meta: meta, diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 80c78b715a585..d85e74befcb13 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -1042,6 +1042,15 @@ func (s *Server) GetSegmentsByStates(ctx context.Context, req *datapb.GetSegment } else { segmentIDs = s.meta.GetSegmentsIDOfPartition(ctx, collectionID, partitionID) } + + channels := s.channelManager.GetChannelsByCollectionID(collectionID) + for _, channel := range channels { + channelSegmentsView := s.handler.GetCurrentSegmentsView(ctx, channel, partitionID) + segmentIDs = append(segmentIDs, channelSegmentsView.FlushedSegmentIDs...) + segmentIDs = append(segmentIDs, channelSegmentsView.GrowingSegmentIDs...) + segmentIDs = append(segmentIDs, channelSegmentsView.L0SegmentIDs...) + segmentIDs = append(segmentIDs, channelSegmentsView.ImportingSegmentIDs...) + } ret := make([]UniqueID, 0, len(segmentIDs)) statesDict := make(map[commonpb.SegmentState]bool)