From 5f9fbd0e97890d7a513713c1ce5eb289c3cd2814 Mon Sep 17 00:00:00 2001 From: Cai Zhang Date: Sat, 2 Nov 2024 14:46:04 +0800 Subject: [PATCH] Handoff growing segment after sorted Signed-off-by: Cai Zhang --- .../datacoord/compaction_task_clustering.go | 2 +- internal/datacoord/handler.go | 7 +- internal/datacoord/handler_test.go | 97 +++++++++++++++++-- internal/datacoord/meta.go | 11 ++- internal/datacoord/server.go | 8 +- 5 files changed, 107 insertions(+), 18 deletions(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 0efdbf5fecadf..9ccf0df5cf1c3 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error { func (t *clusteringCompactionTask) markResultSegmentsVisible() error { var operators []UpdateOperator for _, segID := range t.GetTaskProto().GetResultSegments() { - operators = append(operators, UpdateSegmentVisible(segID)) + operators = append(operators, SetSegmentInvisible(segID, false)) operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID())) } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 2c789829967f3..018be5b83cfc7 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -145,7 +145,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // Skip bulk insert segments. continue } - if s.GetIsInvisible() { + if s.GetIsInvisible() && s.GetLevel() != datapb.SegmentLevel_L1 { // skip invisible segments continue } @@ -154,11 +154,10 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . switch { case s.GetState() == commonpb.SegmentState_Dropped: droppedIDs.Insert(s.GetID()) - case !isFlushState(s.GetState()): + case !isFlushState(s.GetState()) || s.GetIsInvisible(): growingIDs.Insert(s.GetID()) case s.GetLevel() == datapb.SegmentLevel_L0: levelZeroIDs.Insert(s.GetID()) - default: flushedIDs.Insert(s.GetID()) } @@ -185,7 +184,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // ================================================ isValid := func(ids ...UniqueID) bool { for _, id := range ids { - if seg, ok := validSegmentInfos[id]; !ok || seg == nil { + if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() { return false } } diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index 788c76a77393e..dd3f2e502e727 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -572,13 +572,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { // | | | | | | // \ | / \ / | // \ | / \ / | - // [13u] [14i, 15u] 12i - // | | | | - // \ / \ / - // \ / \ / - // [16u] [17u] + // [13u] [14i, 15u] 12i [19u](unsorted) + // | | | | | + // \ / \ / | + // \ / \ / | + // [16u] [17u] [18u](unsorted) [20u](sorted) [21i](unsorted) // all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6] - // should be returned: flushed: [7, 8, 9, 10, 4, 5, 6] + // should be returned: flushed: [7, 8, 9, 10, 4, 5, 6, 20, 21], growing: [18] svr := newTestServer(t) defer closeTestServer(t, svr) schema := newTestSchema() @@ -860,10 +860,91 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17)) assert.NoError(t, err) + seg18 := &datapb.SegmentInfo{ + ID: 18, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{}, + IsInvisible: true, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg18)) + assert.NoError(t, err) + seg19 := &datapb.SegmentInfo{ + ID: 19, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{}, + IsInvisible: true, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg19)) + assert.NoError(t, err) + seg20 := &datapb.SegmentInfo{ + ID: 20, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{19}, + IsInvisible: false, + IsSorted: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20)) + assert.NoError(t, err) + seg21 := &datapb.SegmentInfo{ + ID: 21, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{}, + IsInvisible: false, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg21)) + assert.NoError(t, err) vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) - assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds) - assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds) + assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6, 20, 21}, vchan.FlushedSegmentIds) + assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds) + assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds) }) } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index dec075915c11d..bc23fb9e1fa10 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -791,7 +791,7 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator { } } -func UpdateSegmentVisible(segmentID int64) UpdateOperator { +func SetSegmentInvisible(segmentID int64, invisible bool) UpdateOperator { return func(modPack *updateSegmentPack) bool { segment := modPack.Get(segmentID) if segment == nil { @@ -799,7 +799,7 @@ func UpdateSegmentVisible(segmentID int64) UpdateOperator { zap.Int64("segmentID", segmentID)) return false } - segment.IsInvisible = false + segment.IsInvisible = invisible return true } } @@ -1979,6 +1979,11 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats // metrics mutation for compaction from segments updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) + resultInvisible := oldSegment.GetIsInvisible() + if oldSegment.GetLevel() == datapb.SegmentLevel_L1 { + resultInvisible = false + } + segmentInfo := &datapb.SegmentInfo{ CollectionID: oldSegment.GetCollectionID(), PartitionID: oldSegment.GetPartitionID(), @@ -1994,7 +1999,7 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats LastLevel: oldSegment.GetLastLevel(), PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(), LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(), - IsInvisible: oldSegment.GetIsInvisible(), + IsInvisible: resultInvisible, ID: result.GetSegmentID(), NumOfRows: result.GetNumRows(), Binlogs: result.GetInsertLogs(), diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 3046183c2de68..636467515921f 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1007,8 +1007,12 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush") } // set segment to SegmentState_Flushed - if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { - log.Error("flush segment complete failed", zap.Error(err)) + var operators []UpdateOperator + operators = append(operators, SetSegmentInvisible(segmentID, true)) + operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed)) + err := s.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("flush segment complete failed", zap.Error(err)) return err } select {