diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 1356a3490ef1c..b58e7a6a2533c 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -167,25 +167,22 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP return nil, err } plan := &datapb.CompactionPlan{ - PlanID: t.GetPlanID(), - StartTime: t.GetStartTime(), - TimeoutInSeconds: t.GetTimeoutInSeconds(), - Type: t.GetType(), - Channel: t.GetChannel(), - CollectionTtl: t.GetCollectionTtl(), - TotalRows: t.GetTotalRows(), - Schema: t.GetSchema(), - ClusteringKeyField: t.GetClusteringKeyField().GetFieldID(), - MaxSegmentRows: t.GetMaxSegmentRows(), - PreferSegmentRows: t.GetPreferSegmentRows(), - AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)), - AnalyzeSegmentIds: t.GetInputSegments(), - BeginLogID: beginLogID, - PreAllocatedSegmentIDs: &datapb.IDRange{ - Begin: t.GetResultSegments()[0], - End: t.GetResultSegments()[1], - }, - SlotUsage: t.GetSlotUsage(), + PlanID: t.GetPlanID(), + StartTime: t.GetStartTime(), + TimeoutInSeconds: t.GetTimeoutInSeconds(), + Type: t.GetType(), + Channel: t.GetChannel(), + CollectionTtl: t.GetCollectionTtl(), + TotalRows: t.GetTotalRows(), + Schema: t.GetSchema(), + ClusteringKeyField: t.GetClusteringKeyField().GetFieldID(), + MaxSegmentRows: t.GetMaxSegmentRows(), + PreferSegmentRows: t.GetPreferSegmentRows(), + AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)), + AnalyzeSegmentIds: t.GetInputSegments(), + BeginLogID: beginLogID, + PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(), + SlotUsage: t.GetSlotUsage(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_task_meta.go b/internal/datacoord/compaction_task_meta.go index 1d31130d255fa..cdc536eb1dad3 100644 --- a/internal/datacoord/compaction_task_meta.go +++ b/internal/datacoord/compaction_task_meta.go @@ -57,6 +57,13 @@ func (csm *compactionTaskMeta) reloadFromKV() error { return err } for _, task := range compactionTasks { + // To maintain compatibility with versions ≤v2.4.12, which use `ResultSegments` as preallocate segment IDs. + if task.PreAllocatedSegmentIDs == nil && len(task.GetResultSegments()) == 2 { + task.PreAllocatedSegmentIDs = &datapb.IDRange{ + Begin: task.GetResultSegments()[0], + End: task.GetResultSegments()[1], + } + } csm.saveCompactionTaskMemory(task) } log.Info("DataCoord compactionTaskMeta reloadFromKV done", zap.Duration("duration", record.ElapseSpan())) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index ee6e219944ef4..e772e35330926 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -318,21 +318,18 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er return nil, err } plan := &datapb.CompactionPlan{ - PlanID: t.GetPlanID(), - StartTime: t.GetStartTime(), - TimeoutInSeconds: t.GetTimeoutInSeconds(), - Type: t.GetType(), - Channel: t.GetChannel(), - CollectionTtl: t.GetCollectionTtl(), - TotalRows: t.GetTotalRows(), - Schema: t.GetSchema(), - BeginLogID: beginLogID, - PreAllocatedSegmentIDs: &datapb.IDRange{ - Begin: t.GetResultSegments()[0], - End: t.GetResultSegments()[1], - }, - SlotUsage: t.GetSlotUsage(), - MaxSize: t.GetMaxSize(), + PlanID: t.GetPlanID(), + StartTime: t.GetStartTime(), + TimeoutInSeconds: t.GetTimeoutInSeconds(), + Type: t.GetType(), + Channel: t.GetChannel(), + CollectionTtl: t.GetCollectionTtl(), + TotalRows: t.GetTotalRows(), + Schema: t.GetSchema(), + BeginLogID: beginLogID, + PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(), + SlotUsage: t.GetSlotUsage(), + MaxSize: t.GetMaxSize(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index b0081533e4081..66bd954e06d61 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -385,10 +385,14 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { PartitionID: group.partitionID, Channel: group.channelName, InputSegments: inputSegmentIDs, - ResultSegments: []int64{startID + 1, endID}, // pre-allocated target segment + ResultSegments: []int64{}, TotalRows: totalRows, Schema: coll.Schema, MaxSize: getExpandedSize(expectedSize), + PreAllocatedSegmentIDs: &datapb.IDRange{ + Begin: startID + 1, + End: endID, + }, } err = t.compactionHandler.enqueueCompaction(task) if err != nil { @@ -491,10 +495,14 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { PartitionID: partitionID, Channel: channel, InputSegments: inputSegmentIDs, - ResultSegments: []int64{startID + 1, endID}, // pre-allocated target segment + ResultSegments: []int64{}, TotalRows: totalRows, Schema: coll.Schema, MaxSize: getExpandedSize(expectedSize), + PreAllocatedSegmentIDs: &datapb.IDRange{ + Begin: startID + 1, + End: endID, + }, } if err := t.compactionHandler.enqueueCompaction(task); err != nil { log.Warn("failed to execute compaction task", diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 64d9ea76525f2..f3e1a2a93599a 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -70,7 +70,6 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er } alloc := newMock0Allocator(h.t) t.allocator = alloc - t.ResultSegments = []int64{100, 200} plan, err := t.BuildCompactionRequest() h.spyChan <- plan return err @@ -532,7 +531,7 @@ func Test_compactionTrigger_force(t *testing.T) { }, []*datapb.CompactionPlan{ { - PlanID: 0, + PlanID: 100, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { SegmentID: 1, @@ -580,12 +579,13 @@ func Test_compactionTrigger_force(t *testing.T) { }, }, // StartTime: 0, + BeginLogID: 100, TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(), Type: datapb.CompactionType_MixCompaction, Channel: "ch1", TotalRows: 200, Schema: schema, - PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 100, End: 200}, + PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 101, End: 200}, MaxSize: 1342177280, }, }, @@ -878,6 +878,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) { IsSorted: true, }, }, + BeginLogID: 100, StartTime: 3, TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(), Type: datapb.CompactionType_MixCompaction, diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 6274e04a0372a..05474bba5402b 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -335,12 +335,16 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C Schema: collection.Schema, ClusteringKeyField: view.(*ClusteringSegmentsView).clusteringKeyField, InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }), - ResultSegments: []int64{start, end}, // pre-allocated result segments range + ResultSegments: []int64{}, MaxSegmentRows: maxSegmentRows, PreferSegmentRows: preferSegmentRows, TotalRows: totalRows, AnalyzeTaskID: taskID + 1, LastStateStartTime: time.Now().Unix(), + PreAllocatedSegmentIDs: &datapb.IDRange{ + Begin: start, + End: end, + }, } err = m.compactionHandler.enqueueCompaction(task) if err != nil { @@ -391,10 +395,14 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte Channel: view.GetGroupLabel().Channel, Schema: collection.Schema, InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }), - ResultSegments: []int64{startID + 1, endID}, + ResultSegments: []int64{}, TotalRows: totalRows, LastStateStartTime: time.Now().Unix(), MaxSize: getExpandedSize(expectedSize), + PreAllocatedSegmentIDs: &datapb.IDRange{ + Begin: startID + 1, + End: endID, + }, } err = m.compactionHandler.enqueueCompaction(task) if err != nil { diff --git a/internal/datacoord/mock_test.go b/internal/datacoord/mock_test.go index 1a2b22f24f651..4108cc2e56682 100644 --- a/internal/datacoord/mock_test.go +++ b/internal/datacoord/mock_test.go @@ -108,9 +108,9 @@ func newMockAllocator(t *testing.T) *allocator.MockAllocator { func newMock0Allocator(t *testing.T) *allocator.MockAllocator { mock0Allocator := allocator.NewMockAllocator(t) - mock0Allocator.EXPECT().AllocID(mock.Anything).Return(0, nil).Maybe() - mock0Allocator.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil).Maybe() - mock0Allocator.EXPECT().AllocN(mock.Anything).Return(0, 0, nil).Maybe() + mock0Allocator.EXPECT().AllocID(mock.Anything).Return(100, nil).Maybe() + mock0Allocator.EXPECT().AllocTimestamp(mock.Anything).Return(1000, nil).Maybe() + mock0Allocator.EXPECT().AllocN(mock.Anything).Return(100, 200, nil).Maybe() return mock0Allocator } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 4f960da240f47..4b84970de2693 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -609,7 +609,7 @@ message CompactionPlan { repeated int64 analyze_segment_ids = 15; int32 state = 16; int64 begin_logID = 17; - IDRange pre_allocated_segmentIDs = 18; // only for clustering compaction + IDRange pre_allocated_segmentIDs = 18; int64 slot_usage = 19; int64 max_size = 20; } @@ -972,6 +972,7 @@ message CompactionTask{ int64 lastStateStartTime = 25; int64 max_size = 26; repeated int64 tmpSegments = 27; + IDRange pre_allocated_segmentIDs = 28; } message PartitionStatsInfo {