Skip to content

Commit

Permalink
Support get segments from current segments view
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Dec 17, 2024
1 parent afac153 commit 6088b20
Show file tree
Hide file tree
Showing 5 changed files with 375 additions and 0 deletions.
127 changes: 127 additions & 0 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ type Handler interface {
CheckShouldDropChannel(ch string) bool
FinishDropChannel(ch string, collectionID int64) error
GetCollection(ctx context.Context, collectionID UniqueID) (*collectionInfo, error)
GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView
}

type SegmentsView struct {
FlushedSegmentIDs []int64
GrowingSegmentIDs []int64
DroppedSegmentIDs []int64
L0SegmentIDs []int64
ImportingSegmentIDs []int64
}

// ServerHandler is a helper of Server
Expand Down Expand Up @@ -263,6 +272,124 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
}
}

func (h *ServerHandler) GetCurrentSegmentsView(ctx context.Context, channel RWChannel, partitionIDs ...UniqueID) *SegmentsView {
validPartitions := lo.Filter(partitionIDs, func(partitionID int64, _ int) bool { return partitionID > allPartitionID })
if len(validPartitions) <= 0 {
collInfo, err := h.s.handler.GetCollection(h.s.ctx, channel.GetCollectionID())
if err != nil || collInfo == nil {
log.Warn("collectionInfo is nil")
return nil
}
validPartitions = collInfo.Partitions
}

var (
flushedIDs = make(typeutil.UniqueSet)
droppedIDs = make(typeutil.UniqueSet)
growingIDs = make(typeutil.UniqueSet)
importingIDs = make(typeutil.UniqueSet)
levelZeroIDs = make(typeutil.UniqueSet)
newFlushedIDs = make(typeutil.UniqueSet)
)

// cannot use GetSegmentsByChannel since dropped segments are needed here
segments := h.s.meta.GetRealSegmentsForChannel(channel.GetName())

validSegmentInfos := make(map[int64]*SegmentInfo)
for _, s := range segments {
if s.GetStartPosition() == nil && s.GetDmlPosition() == nil {
continue
}

validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
case s.GetState() == commonpb.SegmentState_Importing:
importingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())
case s.GetState() == commonpb.SegmentState_Growing:
growingIDs.Insert(s.GetID())
default:
flushedIDs.Insert(s.GetID())
}
}

isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
return false
}
}
return true
}

var compactionFromExist func(segID UniqueID) bool

compactionFromExist = func(segID UniqueID) bool {
compactionFrom := validSegmentInfos[segID].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
return false
}
for _, fromID := range compactionFrom {
if flushedIDs.Contain(fromID) || newFlushedIDs.Contain(fromID) {
return true
}
if compactionFromExist(fromID) {
return true
}
}
return false
}

retrieve := func() bool {
continueRetrieve := false
for id := range flushedIDs {
compactionFrom := validSegmentInfos[id].GetCompactionFrom()
if len(compactionFrom) == 0 || !isValid(compactionFrom...) {
newFlushedIDs.Insert(id)
continue
}
if !compactionFromExist(id) {
newFlushedIDs.Insert(id)
} else {
for _, fromID := range compactionFrom {
newFlushedIDs.Insert(fromID)
continueRetrieve = true
droppedIDs.Remove(fromID)
}
}
}
return continueRetrieve
}

for retrieve() {
flushedIDs = newFlushedIDs
newFlushedIDs = make(typeutil.UniqueSet)
}

flushedIDs = newFlushedIDs

log.Ctx(ctx).Info("GetCurrentSegmentsView",
zap.Int64("collectionID", channel.GetCollectionID()),
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("result flushed", len(flushedIDs)),
zap.Int("result growing", len(growingIDs)),
zap.Int("result importing", len(importingIDs)),
zap.Int("result L0", len(levelZeroIDs)),
)

return &SegmentsView{
FlushedSegmentIDs: flushedIDs.Collect(),
GrowingSegmentIDs: growingIDs.Collect(),
DroppedSegmentIDs: droppedIDs.Collect(),
L0SegmentIDs: levelZeroIDs.Collect(),
ImportingSegmentIDs: importingIDs.Collect(),
}
}

// getEarliestSegmentDMLPos returns the earliest dml position of segments,
// this is mainly for COMPATIBILITY with old version <=2.1.x
func (h *ServerHandler) getEarliestSegmentDMLPos(channel string, partitionIDs ...UniqueID) *msgpb.MsgPosition {
Expand Down
171 changes: 171 additions & 0 deletions internal/datacoord/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,177 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
})
}

func TestGetCurrentSegmentsView(t *testing.T) {
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
svr.meta.AddCollection(&collectionInfo{
ID: 0,
Schema: schema,
})
err := svr.meta.indexMeta.CreateIndex(context.TODO(), &model.Index{
TenantID: "",
CollectionID: 0,
FieldID: 2,
IndexID: 1,
})
assert.NoError(t, err)
seg1 := &datapb.SegmentInfo{
ID: 1,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg1))
assert.NoError(t, err)
seg2 := &datapb.SegmentInfo{
ID: 2,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg2))
assert.NoError(t, err)
seg3 := &datapb.SegmentInfo{
ID: 3,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg3))
assert.NoError(t, err)
seg4 := &datapb.SegmentInfo{
ID: 4,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg4))
assert.NoError(t, err)
seg5 := &datapb.SegmentInfo{
ID: 5,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{3, 4},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg5))
assert.NoError(t, err)
seg6 := &datapb.SegmentInfo{
ID: 6,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{3, 4},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg6))
assert.NoError(t, err)
seg7 := &datapb.SegmentInfo{
ID: 7,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg7))
assert.NoError(t, err)
seg8 := &datapb.SegmentInfo{
ID: 8,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Growing,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
assert.NoError(t, err)
seg9 := &datapb.SegmentInfo{
ID: 9,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Importing,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
assert.NoError(t, err)

view := svr.handler.GetCurrentSegmentsView(context.Background(), &channelMeta{Name: "ch1", CollectionID: 0})
assert.ElementsMatch(t, []int64{2, 3, 4}, view.FlushedSegmentIDs)
assert.ElementsMatch(t, []int64{8}, view.GrowingSegmentIDs)
assert.ElementsMatch(t, []int64{1}, view.DroppedSegmentIDs)
assert.ElementsMatch(t, []int64{7}, view.L0SegmentIDs)
assert.ElementsMatch(t, []int64{9}, view.ImportingSegmentIDs)
}

func TestShouldDropChannel(t *testing.T) {
type myRootCoord struct {
mocks2.MockRootCoordClient
Expand Down
64 changes: 64 additions & 0 deletions internal/datacoord/mock_handler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 6088b20

Please sign in to comment.