From 4dc684126ec6167670905eec30b8c2f40aea1c24 Mon Sep 17 00:00:00 2001 From: "cai.zhang" Date: Thu, 7 Nov 2024 16:08:24 +0800 Subject: [PATCH] enhance: Handoff growing segment after sorted (#37385) issue: #33744 1. Segments generated from inserts will be loaded as growing until they are sorted by primary key. 2. This PR may increase memory pressure on the delegator, but we need to test the performance of stats. In local testing, the speed of stats is greater than the insert speed. Signed-off-by: Cai Zhang --- .../datacoord/compaction_task_clustering.go | 2 +- internal/datacoord/handler.go | 9 +- internal/datacoord/handler_test.go | 148 ++++++++++++++---- internal/datacoord/meta.go | 13 +- internal/datacoord/server.go | 8 +- 5 files changed, 140 insertions(+), 40 deletions(-) diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 0efdbf5fecadf..a29f1a9d0c5c1 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -431,7 +431,7 @@ func (t *clusteringCompactionTask) processIndexing() error { func (t *clusteringCompactionTask) markResultSegmentsVisible() error { var operators []UpdateOperator for _, segID := range t.GetTaskProto().GetResultSegments() { - operators = append(operators, UpdateSegmentVisible(segID)) + operators = append(operators, SetSegmentIsInvisible(segID, false)) operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetTaskProto().GetPlanID())) } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index 2c789829967f3..86693188cef84 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -145,8 +145,8 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // Skip bulk insert segments. continue } - if s.GetIsInvisible() { - // skip invisible segments + if s.GetIsInvisible() && s.GetCreatedByCompaction() { + // skip invisible compaction segments continue } @@ -154,11 +154,10 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . switch { case s.GetState() == commonpb.SegmentState_Dropped: droppedIDs.Insert(s.GetID()) - case !isFlushState(s.GetState()): + case !isFlushState(s.GetState()) || s.GetIsInvisible(): growingIDs.Insert(s.GetID()) case s.GetLevel() == datapb.SegmentLevel_L0: levelZeroIDs.Insert(s.GetID()) - default: flushedIDs.Insert(s.GetID()) } @@ -185,7 +184,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . // ================================================ isValid := func(ids ...UniqueID) bool { for _, id := range ids { - if seg, ok := validSegmentInfos[id]; !ok || seg == nil { + if seg, ok := validSegmentInfos[id]; !ok || seg == nil || seg.GetIsInvisible() { return false } } diff --git a/internal/datacoord/handler_test.go b/internal/datacoord/handler_test.go index 788c76a77393e..88f17c2987c8d 100644 --- a/internal/datacoord/handler_test.go +++ b/internal/datacoord/handler_test.go @@ -572,13 +572,13 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { // | | | | | | // \ | / \ / | // \ | / \ / | - // [13u] [14i, 15u] 12i - // | | | | - // \ / \ / - // \ / \ / - // [16u] [17u] + // [13u] [14i, 15u] 12i [19u](unsorted) + // | | | | | + // \ / \ / | + // \ / \ / | + // [16u] [17u] [18u](unsorted) [20u](sorted) [21i](unsorted) // all leaf nodes are [1,2,3,4,5,6,7], but because segment3 has been gced, the leaf node becomes [7,8,9,10,4,5,6] - // should be returned: flushed: [7, 8, 9, 10, 4, 5, 6] + // should be returned: flushed: [7, 8, 9, 10, 4, 5, 6, 20, 21], growing: [18] svr := newTestServer(t) defer closeTestServer(t, svr) schema := newTestSchema() @@ -702,8 +702,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{1, 2, 3}, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg8)) assert.NoError(t, err) @@ -719,8 +720,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{1, 2, 3}, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg9)) assert.NoError(t, err) @@ -736,8 +738,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{1, 2, 3}, + NumOfRows: 100, + CompactionFrom: []int64{1, 2, 3}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg10)) assert.NoError(t, err) @@ -753,8 +756,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{4, 5, 6}, + NumOfRows: 2048, + CompactionFrom: []int64{4, 5, 6}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg11)) assert.NoError(t, err) @@ -770,8 +774,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{4, 5, 6}, + NumOfRows: 100, + CompactionFrom: []int64{4, 5, 6}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg12)) assert.NoError(t, err) @@ -787,8 +792,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2047, - CompactionFrom: []int64{7, 8, 9}, + NumOfRows: 2047, + CompactionFrom: []int64{7, 8, 9}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg13)) assert.NoError(t, err) @@ -804,8 +810,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 100, - CompactionFrom: []int64{10, 11}, + NumOfRows: 100, + CompactionFrom: []int64{10, 11}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg14)) assert.NoError(t, err) @@ -821,8 +828,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{10, 11}, + NumOfRows: 2048, + CompactionFrom: []int64{10, 11}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg15)) assert.NoError(t, err) @@ -838,8 +846,9 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{13, 14}, + NumOfRows: 2048, + CompactionFrom: []int64{13, 14}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg16)) assert.NoError(t, err) @@ -855,15 +864,98 @@ func TestGetQueryVChanPositions_Retrieve_unIndexed(t *testing.T) { MsgGroup: "", Timestamp: 1, }, - NumOfRows: 2048, - CompactionFrom: []int64{12, 15}, + NumOfRows: 2048, + CompactionFrom: []int64{12, 15}, + CreatedByCompaction: true, } err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg17)) assert.NoError(t, err) + seg18 := &datapb.SegmentInfo{ + ID: 18, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{}, + IsInvisible: true, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg18)) + assert.NoError(t, err) + seg19 := &datapb.SegmentInfo{ + ID: 19, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Dropped, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{}, + IsInvisible: true, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg19)) + assert.NoError(t, err) + seg20 := &datapb.SegmentInfo{ + ID: 20, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 2048, + CompactionFrom: []int64{19}, + CreatedByCompaction: true, + IsInvisible: false, + IsSorted: true, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg20)) + assert.NoError(t, err) + seg21 := &datapb.SegmentInfo{ + ID: 21, + CollectionID: 0, + PartitionID: 0, + InsertChannel: "ch1", + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + DmlPosition: &msgpb.MsgPosition{ + ChannelName: "ch1", + MsgID: []byte{1, 2, 3}, + MsgGroup: "", + Timestamp: 1, + }, + NumOfRows: 100, + CompactionFrom: []int64{}, + IsInvisible: false, + IsSorted: false, + } + err = svr.meta.AddSegment(context.TODO(), NewSegmentInfo(seg21)) + assert.NoError(t, err) vchan := svr.handler.GetQueryVChanPositions(&channelMeta{Name: "ch1", CollectionID: 0}) - assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6}, vchan.FlushedSegmentIds) - assert.ElementsMatch(t, []int64{1, 2}, vchan.DroppedSegmentIds) + assert.ElementsMatch(t, []int64{7, 8, 9, 10, 4, 5, 6, 20, 21}, vchan.FlushedSegmentIds) + assert.ElementsMatch(t, []int64{18}, vchan.UnflushedSegmentIds) + assert.ElementsMatch(t, []int64{1, 2, 19}, vchan.DroppedSegmentIds) }) } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 257e8980cb6cf..ac384cc047d9e 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -791,7 +791,7 @@ func UpdateCompactedOperator(segmentID int64) UpdateOperator { } } -func UpdateSegmentVisible(segmentID int64) UpdateOperator { +func SetSegmentIsInvisible(segmentID int64, isInvisible bool) UpdateOperator { return func(modPack *updateSegmentPack) bool { segment := modPack.Get(segmentID) if segment == nil { @@ -799,7 +799,7 @@ func UpdateSegmentVisible(segmentID int64) UpdateOperator { zap.Int64("segmentID", segmentID)) return false } - segment.IsInvisible = false + segment.IsInvisible = isInvisible return true } } @@ -1979,6 +1979,11 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats // metrics mutation for compaction from segments updateSegStateAndPrepareMetrics(cloned, commonpb.SegmentState_Dropped, metricMutation) + resultInvisible := oldSegment.GetIsInvisible() + if !oldSegment.GetCreatedByCompaction() { + resultInvisible = false + } + segmentInfo := &datapb.SegmentInfo{ CollectionID: oldSegment.GetCollectionID(), PartitionID: oldSegment.GetPartitionID(), @@ -1994,7 +1999,8 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats LastLevel: oldSegment.GetLastLevel(), PartitionStatsVersion: oldSegment.GetPartitionStatsVersion(), LastPartitionStatsVersion: oldSegment.GetLastPartitionStatsVersion(), - IsInvisible: oldSegment.GetIsInvisible(), + CreatedByCompaction: oldSegment.GetCreatedByCompaction(), + IsInvisible: resultInvisible, ID: result.GetSegmentID(), NumOfRows: result.GetNumRows(), Binlogs: result.GetInsertLogs(), @@ -2002,7 +2008,6 @@ func (m *meta) SaveStatsResultSegment(oldSegmentID int64, result *workerpb.Stats TextStatsLogs: result.GetTextStatsLogs(), Bm25Statslogs: result.GetBm25Logs(), Deltalogs: nil, - CreatedByCompaction: true, CompactionFrom: []int64{oldSegmentID}, IsSorted: true, } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 21e34ce602814..c735681951f3f 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -1011,8 +1011,12 @@ func (s *Server) postFlush(ctx context.Context, segmentID UniqueID) error { return merr.WrapErrSegmentNotFound(segmentID, "segment not found, might be a faked segment, ignore post flush") } // set segment to SegmentState_Flushed - if err := s.meta.SetState(segmentID, commonpb.SegmentState_Flushed); err != nil { - log.Error("flush segment complete failed", zap.Error(err)) + var operators []UpdateOperator + operators = append(operators, SetSegmentIsInvisible(segmentID, true)) + operators = append(operators, UpdateStatusOperator(segmentID, commonpb.SegmentState_Flushed)) + err := s.meta.UpdateSegmentsInfo(operators...) + if err != nil { + log.Warn("flush segment complete failed", zap.Error(err)) return err } select {