Skip to content

Commit

Permalink
enhance: Add PreallocatedSegmentIDs for the compaction task (#36734)
Browse files Browse the repository at this point in the history
Add `PreallocatedSegmentIDs` field to the compaction task, allowing the
`ResultSegments` in the compaction task to represent the final segments
produced by the compaction.

issue: #36733

also related: #36686

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored Oct 13, 2024
1 parent 383350c commit d230b91
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 45 deletions.
35 changes: 16 additions & 19 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,25 +167,22 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
ClusteringKeyField: t.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: t.GetMaxSegmentRows(),
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: t.GetSlotUsage(),
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
ClusteringKeyField: t.GetClusteringKeyField().GetFieldID(),
MaxSegmentRows: t.GetMaxSegmentRows(),
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
7 changes: 7 additions & 0 deletions internal/datacoord/compaction_task_meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ func (csm *compactionTaskMeta) reloadFromKV() error {
return err
}
for _, task := range compactionTasks {
// To maintain compatibility with versions ≤v2.4.12, which use `ResultSegments` as preallocate segment IDs.
if task.PreAllocatedSegmentIDs == nil && len(task.GetResultSegments()) == 2 {
task.PreAllocatedSegmentIDs = &datapb.IDRange{
Begin: task.GetResultSegments()[0],
End: task.GetResultSegments()[1],
}
}
csm.saveCompactionTaskMemory(task)
}
log.Info("DataCoord compactionTaskMeta reloadFromKV done", zap.Duration("duration", record.ElapseSpan()))
Expand Down
27 changes: 12 additions & 15 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,21 +318,18 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
return nil, err
}
plan := &datapb.CompactionPlan{
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: t.GetSlotUsage(),
MaxSize: t.GetMaxSize(),
PlanID: t.GetPlanID(),
StartTime: t.GetStartTime(),
TimeoutInSeconds: t.GetTimeoutInSeconds(),
Type: t.GetType(),
Channel: t.GetChannel(),
CollectionTtl: t.GetCollectionTtl(),
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
PreAllocatedSegmentIDs: t.GetPreAllocatedSegmentIDs(),
SlotUsage: t.GetSlotUsage(),
MaxSize: t.GetMaxSize(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
12 changes: 10 additions & 2 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,14 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error {
PartitionID: group.partitionID,
Channel: group.channelName,
InputSegments: inputSegmentIDs,
ResultSegments: []int64{startID + 1, endID}, // pre-allocated target segment
ResultSegments: []int64{},
TotalRows: totalRows,
Schema: coll.Schema,
MaxSize: getExpandedSize(expectedSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
err = t.compactionHandler.enqueueCompaction(task)
if err != nil {
Expand Down Expand Up @@ -491,10 +495,14 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) {
PartitionID: partitionID,
Channel: channel,
InputSegments: inputSegmentIDs,
ResultSegments: []int64{startID + 1, endID}, // pre-allocated target segment
ResultSegments: []int64{},
TotalRows: totalRows,
Schema: coll.Schema,
MaxSize: getExpandedSize(expectedSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
if err := t.compactionHandler.enqueueCompaction(task); err != nil {
log.Warn("failed to execute compaction task",
Expand Down
7 changes: 4 additions & 3 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ func (h *spyCompactionHandler) enqueueCompaction(task *datapb.CompactionTask) er
}
alloc := newMock0Allocator(h.t)
t.allocator = alloc
t.ResultSegments = []int64{100, 200}
plan, err := t.BuildCompactionRequest()
h.spyChan <- plan
return err
Expand Down Expand Up @@ -532,7 +531,7 @@ func Test_compactionTrigger_force(t *testing.T) {
},
[]*datapb.CompactionPlan{
{
PlanID: 0,
PlanID: 100,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 1,
Expand Down Expand Up @@ -580,12 +579,13 @@ func Test_compactionTrigger_force(t *testing.T) {
},
},
// StartTime: 0,
BeginLogID: 100,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
Channel: "ch1",
TotalRows: 200,
Schema: schema,
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 100, End: 200},
PreAllocatedSegmentIDs: &datapb.IDRange{Begin: 101, End: 200},
MaxSize: 1342177280,
},
},
Expand Down Expand Up @@ -878,6 +878,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
IsSorted: true,
},
},
BeginLogID: 100,
StartTime: 3,
TimeoutInSeconds: Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32(),
Type: datapb.CompactionType_MixCompaction,
Expand Down
12 changes: 10 additions & 2 deletions internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,16 @@ func (m *CompactionTriggerManager) SubmitClusteringViewToScheduler(ctx context.C
Schema: collection.Schema,
ClusteringKeyField: view.(*ClusteringSegmentsView).clusteringKeyField,
InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }),
ResultSegments: []int64{start, end}, // pre-allocated result segments range
ResultSegments: []int64{},
MaxSegmentRows: maxSegmentRows,
PreferSegmentRows: preferSegmentRows,
TotalRows: totalRows,
AnalyzeTaskID: taskID + 1,
LastStateStartTime: time.Now().Unix(),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: start,
End: end,
},
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
Expand Down Expand Up @@ -391,10 +395,14 @@ func (m *CompactionTriggerManager) SubmitSingleViewToScheduler(ctx context.Conte
Channel: view.GetGroupLabel().Channel,
Schema: collection.Schema,
InputSegments: lo.Map(view.GetSegmentsView(), func(segmentView *SegmentView, _ int) int64 { return segmentView.ID }),
ResultSegments: []int64{startID + 1, endID},
ResultSegments: []int64{},
TotalRows: totalRows,
LastStateStartTime: time.Now().Unix(),
MaxSize: getExpandedSize(expectedSize),
PreAllocatedSegmentIDs: &datapb.IDRange{
Begin: startID + 1,
End: endID,
},
}
err = m.compactionHandler.enqueueCompaction(task)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions internal/datacoord/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ func newMockAllocator(t *testing.T) *allocator.MockAllocator {

func newMock0Allocator(t *testing.T) *allocator.MockAllocator {
mock0Allocator := allocator.NewMockAllocator(t)
mock0Allocator.EXPECT().AllocID(mock.Anything).Return(0, nil).Maybe()
mock0Allocator.EXPECT().AllocTimestamp(mock.Anything).Return(0, nil).Maybe()
mock0Allocator.EXPECT().AllocN(mock.Anything).Return(0, 0, nil).Maybe()
mock0Allocator.EXPECT().AllocID(mock.Anything).Return(100, nil).Maybe()
mock0Allocator.EXPECT().AllocTimestamp(mock.Anything).Return(1000, nil).Maybe()
mock0Allocator.EXPECT().AllocN(mock.Anything).Return(100, 200, nil).Maybe()
return mock0Allocator
}

Expand Down
3 changes: 2 additions & 1 deletion internal/proto/data_coord.proto
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ message CompactionPlan {
repeated int64 analyze_segment_ids = 15;
int32 state = 16;
int64 begin_logID = 17;
IDRange pre_allocated_segmentIDs = 18; // only for clustering compaction
IDRange pre_allocated_segmentIDs = 18;
int64 slot_usage = 19;
int64 max_size = 20;
}
Expand Down Expand Up @@ -972,6 +972,7 @@ message CompactionTask{
int64 lastStateStartTime = 25;
int64 max_size = 26;
repeated int64 tmpSegments = 27;
IDRange pre_allocated_segmentIDs = 28;
}

message PartitionStatsInfo {
Expand Down

0 comments on commit d230b91

Please sign in to comment.