Skip to content

Commit

Permalink
Handoff growing segment after sorted
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 committed Nov 4, 2024
1 parent 9fe90bf commit 49170c4
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 18 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, UpdateSegmentVisible(segID))
operators = append(operators, SetSegmentInvisible(segID, false))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}

Expand Down
7 changes: 3 additions & 4 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
if s.GetIsInvisible() {
if s.GetIsInvisible() && s.GetLevel() != datapb.SegmentLevel_L1 {
// skip invisible segments
continue
}
Expand All @@ -154,11 +154,10 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
case !isFlushState(s.GetState()) || s.GetIsInvisible():
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())

default:
flushedIDs.Insert(s.GetID())
}
Expand All @@ -185,7 +184,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
return false
}
}
Expand Down
97 changes: 89 additions & 8 deletions internal/datacoord/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
// | | | | | |
// \ | / \ / |
// \ | / \ / |
// [13u] [14i, 15u] 12i
// | | | |
// \ / \ /
// \ / \ /
// [16u] [17u]
// [13u] [14i, 15u] 12i [19u](unsorted)
// | | | | |
// \ / \ / |
// \ / \ / |
// [16u] [17u] [18u](unsorted) [20u](sorted) [21i](unsorted)
// all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6, 20, 21], growing: [18]
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
Expand Down Expand Up @@ -860,10 +860,91 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17))
assert.NoError(t, err)
seg18 := &datapb.SegmentInfo{
ID: 18,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg18))
assert.NoError(t, err)
seg19 := &datapb.SegmentInfo{
ID: 19,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg19))
assert.NoError(t, err)
seg20 := &datapb.SegmentInfo{
ID: 20,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{19},
IsInvisible: false,
IsSorted: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20))
assert.NoError(t, err)
seg21 := &datapb.SegmentInfo{
ID: 21,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{},
IsInvisible: false,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg21))
assert.NoError(t, err)

vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6, 20, 21}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds)
})
}

Expand Down
11 changes: 8 additions & 3 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,15 +791,15 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator {
}
}

func UpdateSegmentVisible(segmentID int64) UpdateOperator {
func SetSegmentInvisible(segmentID int64, invisible bool) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: update segment visible fail - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
segment.IsInvisible = false
segment.IsInvisible = invisible
return true
}
}
Expand Down Expand Up @@ -1979,6 +1979,11 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)

resultInvisible := oldSegment.GetIsInvisible()
if oldSegment.GetLevel() == datapb.SegmentLevel_L1 {
resultInvisible = false
}

segmentInfo := &datapb.SegmentInfo{
CollectionID: oldSegment.GetCollectionID(),
PartitionID: oldSegment.GetPartitionID(),
Expand All @@ -1994,7 +1999,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
LastLevel: oldSegment.GetLastLevel(),
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(),
IsInvisible: oldSegment.GetIsInvisible(),
IsInvisible: resultInvisible,
ID: result.GetSegmentID(),
NumOfRows: result.GetNumRows(),
Binlogs: result.GetInsertLogs(),
Expand Down
8 changes: 6 additions & 2 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,8 +1007,12 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush")
}
// set segment to SegmentState_Flushed
if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
var operators []UpdateOperator
operators = append(operators, SetSegmentInvisible(segmentID, true))
operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed))
err := s.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("flush segment complete failed", zap.Error(err))
return err
}
select {
Expand Down

0 comments on commit 49170c4

Please sign in to comment.