Skip to content

Commit

Permalink
enhance: Handoff growing segment after sorted (#37385)
Browse files Browse the repository at this point in the history
issue: #33744 

1. Segments generated from inserts will be loaded as growing until they
are sorted by primary key.
2. This PR may increase memory pressure on the delegator, but we need to
test the performance of stats. In local testing, the speed of stats is
greater than the insert speed.

Signed-off-by: Cai Zhang <[email protected]>
  • Loading branch information
xiaocai2333 authored Nov 7, 2024
1 parent e47bf21 commit 4dc6841
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 40 deletions.
2 changes: 1 addition & 1 deletion internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error {
func (t *clusteringCompactionTask) markResultSegmentsVisible() error {
var operators []UpdateOperator
for _, segID := range t.GetTaskProto().GetResultSegments() {
operators = append(operators, UpdateSegmentVisible(segID))
operators = append(operators, SetSegmentIsInvisible(segID, false))
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID()))
}

Expand Down
9 changes: 4 additions & 5 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,20 +145,19 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
if s.GetIsInvisible() {
// skip invisible segments
if s.GetIsInvisible() && s.GetCreatedByCompaction() {
// skip invisible compaction segments
continue
}

validSegmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
case !isFlushState(s.GetState()) || s.GetIsInvisible():
growingIDs.Insert(s.GetID())
case s.GetLevel() == datapb.SegmentLevel_L0:
levelZeroIDs.Insert(s.GetID())

default:
flushedIDs.Insert(s.GetID())
}
Expand All @@ -185,7 +184,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// ================================================
isValid := func(ids ...UniqueID) bool {
for _, id := range ids {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil {
if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() {
return false
}
}
Expand Down
148 changes: 120 additions & 28 deletions internal/datacoord/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
// | | | | | |
// \ | / \ / |
// \ | / \ / |
// [13u] [14i, 15u] 12i
// | | | |
// \ / \ /
// \ / \ /
// [16u] [17u]
// [13u] [14i, 15u] 12i [19u](unsorted)
// | | | | |
// \ / \ / |
// \ / \ / |
// [16u] [17u] [18u](unsorted) [20u](sorted) [21i](unsorted)
// all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6]
// should be returned: flushed: [7, 8, 9, 10, 4, 5, 6, 20, 21], growing: [18]
svr := newTestServer(t)
defer closeTestServer(t, svr)
schema := newTestSchema()
Expand Down Expand Up @@ -702,8 +702,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8))
assert.NoError(t, err)
Expand All @@ -719,8 +720,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9))
assert.NoError(t, err)
Expand All @@ -736,8 +738,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
NumOfRows: 100,
CompactionFrom: []int64{1, 2, 3},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10))
assert.NoError(t, err)
Expand All @@ -753,8 +756,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 2048,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11))
assert.NoError(t, err)
Expand All @@ -770,8 +774,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
NumOfRows: 100,
CompactionFrom: []int64{4, 5, 6},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12))
assert.NoError(t, err)
Expand All @@ -787,8 +792,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
NumOfRows: 2047,
CompactionFrom: []int64{7, 8, 9},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13))
assert.NoError(t, err)
Expand All @@ -804,8 +810,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
NumOfRows: 100,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14))
assert.NoError(t, err)
Expand All @@ -821,8 +828,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
NumOfRows: 2048,
CompactionFrom: []int64{10, 11},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15))
assert.NoError(t, err)
Expand All @@ -838,8 +846,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
NumOfRows: 2048,
CompactionFrom: []int64{13, 14},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16))
assert.NoError(t, err)
Expand All @@ -855,15 +864,98 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) {
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
NumOfRows: 2048,
CompactionFrom: []int64{12, 15},
CreatedByCompaction: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17))
assert.NoError(t, err)
seg18 := &datapb.SegmentInfo{
ID: 18,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg18))
assert.NoError(t, err)
seg19 := &datapb.SegmentInfo{
ID: 19,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Dropped,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{},
IsInvisible: true,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg19))
assert.NoError(t, err)
seg20 := &datapb.SegmentInfo{
ID: 20,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 2048,
CompactionFrom: []int64{19},
CreatedByCompaction: true,
IsInvisible: false,
IsSorted: true,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20))
assert.NoError(t, err)
seg21 := &datapb.SegmentInfo{
ID: 21,
CollectionID: 0,
PartitionID: 0,
InsertChannel: "ch1",
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
DmlPosition: &msgpb.MsgPosition{
ChannelName: "ch1",
MsgID: []byte{1, 2, 3},
MsgGroup: "",
Timestamp: 1,
},
NumOfRows: 100,
CompactionFrom: []int64{},
IsInvisible: false,
IsSorted: false,
}
err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg21))
assert.NoError(t, err)

vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0})
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds)
assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6, 20, 21}, vchan.FlushedSegmentIds)
assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds)
assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds)
})
}

Expand Down
13 changes: 9 additions & 4 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,15 +791,15 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator {
}
}

func UpdateSegmentVisible(segmentID int64) UpdateOperator {
func SetSegmentIsInvisible(segmentID int64, isInvisible bool) UpdateOperator {
return func(modPack *updateSegmentPack) bool {
segment := modPack.Get(segmentID)
if segment == nil {
log.Warn("meta update: update segment visible fail - segment not found",
zap.Int64("segmentID", segmentID))
return false
}
segment.IsInvisible = false
segment.IsInvisible = isInvisible
return true
}
}
Expand Down Expand Up @@ -1979,6 +1979,11 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
// metrics mutation for compaction from segments
updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation)

resultInvisible := oldSegment.GetIsInvisible()
if !oldSegment.GetCreatedByCompaction() {
resultInvisible = false
}

segmentInfo := &datapb.SegmentInfo{
CollectionID: oldSegment.GetCollectionID(),
PartitionID: oldSegment.GetPartitionID(),
Expand All @@ -1994,15 +1999,15 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats
LastLevel: oldSegment.GetLastLevel(),
PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(),
LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(),
IsInvisible: oldSegment.GetIsInvisible(),
CreatedByCompaction: oldSegment.GetCreatedByCompaction(),
IsInvisible: resultInvisible,
ID: result.GetSegmentID(),
NumOfRows: result.GetNumRows(),
Binlogs: result.GetInsertLogs(),
Statslogs: result.GetStatsLogs(),
TextStatsLogs: result.GetTextStatsLogs(),
Bm25Statslogs: result.GetBm25Logs(),
Deltalogs: nil,
CreatedByCompaction: true,
CompactionFrom: []int64{oldSegmentID},
IsSorted: true,
}
Expand Down
8 changes: 6 additions & 2 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,8 +1011,12 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error {
return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush")
}
// set segment to SegmentState_Flushed
if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil {
log.Error("flush segment complete failed", zap.Error(err))
var operators []UpdateOperator
operators = append(operators, SetSegmentIsInvisible(segmentID, true))
operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed))
err := s.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("flush segment complete failed", zap.Error(err))
return err
}
select {
Expand Down

0 comments on commit 4dc6841

Please sign in to comment.