Skip to content

Commit

Permalink
Handoff growing segment after sorted
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Nov 7, 2024
1 parent f813fb4 commit 3ba16c2
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 40 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, UpdateSegmentVisible(segID))
operators = append(operators, SetSegmentIsInvisible(segID, false))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}

Expand Down
9 changes: 4 additions & 5 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,19 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
if s.GetIsInvisible() {
// skip invisible segments
if s.GetIsInvisible() && s.GetCreatedByCompaction() {
// skip invisible compaction segments
continue
}

validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
case !isFlushState(s.GetState()) || s.GetIsInvisible():
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())

default:
flushedIDs.Insert(s.GetID())
}
Expand All @@ -185,7 +184,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
return false
}
}
Expand Down
148 changes: 120 additions & 28 deletions internal/datacoord/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
// | | | | | |
// \ | / \ / |
// \ | / \ / |
// [13u] [14i, 15u] 12i
// | | | |
// \ / \ /
// \ / \ /
// [16u] [17u]
// [13u] [14i, 15u] 12i [19u](unsorted)
// | | | | |
// \ / \ / |
// \ / \ / |
// [16u] [17u] [18u](unsorted) [20u](sorted) [21i](unsorted)
// all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6, 20, 21], growing: [18]
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
Expand Down Expand Up @@ -702,8 +702,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
assert.NoError(t, err)
Expand All @@ -719,8 +720,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
assert.NoError(t, err)
Expand All @@ -736,8 +738,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10))
assert.NoError(t, err)
Expand All @@ -753,8 +756,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11))
assert.NoError(t, err)
Expand All @@ -770,8 +774,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12))
assert.NoError(t, err)
Expand All @@ -787,8 +792,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13))
assert.NoError(t, err)
Expand All @@ -804,8 +810,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14))
assert.NoError(t, err)
Expand All @@ -821,8 +828,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15))
assert.NoError(t, err)
Expand All @@ -838,8 +846,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16))
assert.NoError(t, err)
Expand All @@ -855,15 +864,98 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17))
assert.NoError(t, err)
seg18 := &datapb.SegmentInfo{
ID: 18,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg18))
assert.NoError(t, err)
seg19 := &datapb.SegmentInfo{
ID: 19,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg19))
assert.NoError(t, err)
seg20 := &datapb.SegmentInfo{
ID: 20,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{19},
CreatedByCompaction: true,
IsInvisible: false,
IsSorted: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20))
assert.NoError(t, err)
seg21 := &datapb.SegmentInfo{
ID: 21,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{},
IsInvisible: false,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg21))
assert.NoError(t, err)

vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6, 20, 21}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds)
})
}

Expand Down
13 changes: 9 additions & 4 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,15 +791,15 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator {
}
}

func UpdateSegmentVisible(segmentID int64) UpdateOperator {
func SetSegmentIsInvisible(segmentID int64, isInvisible bool) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: update segment visible fail - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
segment.IsInvisible = false
segment.IsInvisible = isInvisible
return true
}
}
Expand Down Expand Up @@ -1979,6 +1979,11 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)

resultInvisible := oldSegment.GetIsInvisible()
if !oldSegment.GetCreatedByCompaction() {
resultInvisible = false
}

segmentInfo := &datapb.SegmentInfo{
CollectionID: oldSegment.GetCollectionID(),
PartitionID: oldSegment.GetPartitionID(),
Expand All @@ -1994,15 +1999,15 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
LastLevel: oldSegment.GetLastLevel(),
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(),
IsInvisible: oldSegment.GetIsInvisible(),
CreatedByCompaction: oldSegment.GetCreatedByCompaction(),
IsInvisible: resultInvisible,
ID: result.GetSegmentID(),
NumOfRows: result.GetNumRows(),
Binlogs: result.GetInsertLogs(),
Statslogs: result.GetStatsLogs(),
TextStatsLogs: result.GetTextStatsLogs(),
Bm25Statslogs: result.GetBm25Logs(),
Deltalogs: nil,
CreatedByCompaction: true,
CompactionFrom: []int64{oldSegmentID},
IsSorted: true,
}
Expand Down
8 changes: 6 additions & 2 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,12 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush")
}
// set segment to SegmentState_Flushed
if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
var operators []UpdateOperator
operators = append(operators, SetSegmentIsInvisible(segmentID, true))
operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed))
err := s.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("flush segment complete failed", zap.Error(err))
return err
}
select {
Expand Down

0 comments on commit 3ba16c2

Please sign in to comment.