From a26e965e6aec83637cbdead7d2104f5a60cfaa65 Mon Sep 17 00:00:00 2001 From: wayblink Date: Thu, 18 Jul 2024 09:55:43 +0800 Subject: [PATCH] enhance:[cherry-pick] Add compaction task slot usage logic (#34625) issue: #34544 pr: #34581 --------- Signed-off-by: wayblink --- configs/milvus.yaml | 7 +- internal/datacoord/compaction.go | 31 +++-- .../datacoord/compaction_task_clustering.go | 9 +- internal/datacoord/compaction_task_l0.go | 1 + internal/datacoord/compaction_task_mix.go | 1 + internal/datacoord/compaction_test.go | 62 +++++++++- internal/datacoord/compaction_trigger_test.go | 1 + internal/datacoord/import_util.go | 2 +- .../compaction/clustering_compactor.go | 9 +- internal/datanode/compaction/compactor.go | 2 + internal/datanode/compaction/executor.go | 55 +++++++-- internal/datanode/compaction/executor_test.go | 70 +++++++++++- internal/datanode/compaction/l0_compactor.go | 8 ++ internal/datanode/compaction/mix_compactor.go | 8 ++ .../datanode/compaction/mock_compactor.go | 108 +++++++++++++++++- internal/datanode/importv2/scheduler_test.go | 6 +- internal/datanode/importv2/util.go | 2 +- internal/datanode/importv2/util_test.go | 2 +- internal/datanode/services.go | 8 +- internal/datanode/services_test.go | 2 + internal/proto/data_coord.proto | 7 +- pkg/util/merr/errors.go | 3 + pkg/util/merr/utils.go | 16 +++ pkg/util/paramtable/component_param.go | 38 +++++- pkg/util/paramtable/component_param_test.go | 11 +- 25 files changed, 430 insertions(+), 39 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index c1159138b1114..3a1cb34675aa1 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -516,6 +516,10 @@ dataCoord: clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 syncSegmentsInterval: 300 + slot: + clusteringCompactionUsage: 16 + mixCompactionUsage: 8 + l0DeleteCompactionUsage: 8 dataNode: dataSync: @@ -565,10 +569,11 @@ dataNode: clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 slot: - slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode. + slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode. clusteringCompaction: memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage. + workPoolSize: 8 # Configures the system log output. log: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 1412e5d11c875..261fc73089090 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -37,6 +37,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/lock" "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -164,7 +165,6 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) if t.GetTriggerID() == triggerID { cnt += 1 } - // if t.GetPlanID() } c.queueGuard.RUnlock() c.executingGuard.RLock() @@ -618,10 +618,10 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { } for _, t := range tasks { - nodeID := c.pickAnyNode(slots) + nodeID, useSlot := c.pickAnyNode(slots, t) if nodeID == NullNodeID { log.Info("compactionHandler cannot find datanode for compaction task", - zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel())) + zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel())) continue } err := t.SetNodeID(nodeID) @@ -629,6 +629,8 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) { log.Info("compactionHandler assignNodeID failed", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err)) } else { + // update the input nodeSlots + slots[nodeID] = slots[nodeID] - useSlot log.Info("compactionHandler assignNodeID success", zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID)) metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec() @@ -675,18 +677,27 @@ func (c *compactionPlanHandler) checkCompaction() error { return nil } -func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64) int64 { - var ( - nodeID int64 = NullNodeID - maxSlots int64 = -1 - ) +func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) { + nodeID = NullNodeID + var maxSlots int64 = -1 + + switch task.GetType() { + case datapb.CompactionType_ClusteringCompaction: + useSlot = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_MixCompaction: + useSlot = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_Level0DeleteCompaction: + useSlot = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64() + } + for id, slots := range nodeSlots { - if slots > 0 && slots > maxSlots { + if slots >= useSlot && slots > maxSlots { nodeID = id maxSlots = slots } } - return nodeID + + return nodeID, useSlot } func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t CompactionTask) int64 { diff --git a/internal/datacoord/compaction_task_clustering.go b/internal/datacoord/compaction_task_clustering.go index 199c693e079a8..254614fe3a7b6 100644 --- a/internal/datacoord/compaction_task_clustering.go +++ b/internal/datacoord/compaction_task_clustering.go @@ -147,6 +147,7 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP PreferSegmentRows: t.GetPreferSegmentRows(), AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)), AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need + SlotUsage: Params.DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) @@ -406,7 +407,8 @@ func (t *clusteringCompactionTask) doAnalyze() error { func (t *clusteringCompactionTask) doCompact() error { log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String())) if t.NeedReAssignNodeID() { - return errors.New("not assign nodeID") + log.RatedWarn(10, "not assign nodeID") + return nil } var err error t.plan, err = t.BuildCompactionRequest() @@ -416,6 +418,11 @@ func (t *clusteringCompactionTask) doCompact() error { } err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan()) if err != nil { + if errors.Is(err, merr.ErrDataNodeSlotExhausted) { + log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted") + t.updateAndSaveTaskMeta(setNodeID(NullNodeID)) + return nil + } log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err)) t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID)) return err diff --git a/internal/datacoord/compaction_task_l0.go b/internal/datacoord/compaction_task_l0.go index 84260267f87c9..db379c231f78c 100644 --- a/internal/datacoord/compaction_task_l0.go +++ b/internal/datacoord/compaction_task_l0.go @@ -239,6 +239,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err CollectionTtl: t.GetCollectionTtl(), TotalRows: t.GetTotalRows(), Schema: t.GetSchema(), + SlotUsage: Params.DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_task_mix.go b/internal/datacoord/compaction_task_mix.go index 1957b71bbeee1..45d04ac8c563e 100644 --- a/internal/datacoord/compaction_task_mix.go +++ b/internal/datacoord/compaction_task_mix.go @@ -337,6 +337,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er CollectionTtl: t.GetCollectionTtl(), TotalRows: t.GetTotalRows(), Schema: t.GetSchema(), + SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(), } log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID())) diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index c93ce94a722d5..dfceae3b419dd 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -346,15 +346,71 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() { } func (s *CompactionPlanHandlerSuite) TestPickAnyNode() { + s.SetupTest() + nodeSlots := map[int64]int64{ + 100: 16, + 101: 24, + } + node, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(100), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_MixCompaction, + }, + }) + s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot + + node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{}) + s.Equal(int64(NullNodeID), node) +} + +func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() { s.SetupTest() nodeSlots := map[int64]int64{ 100: 2, - 101: 3, + 101: 16, + 102: 10, + } + executingTasks := make(map[int64]CompactionTask, 0) + executingTasks[1] = &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + } + executingTasks[2] = &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, } - node := s.handler.pickAnyNode(nodeSlots) + s.handler.executingTasks = executingTasks + node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + }) s.Equal(int64(101), node) + nodeSlots[node] = nodeSlots[node] - useSlot - node = s.handler.pickAnyNode(map[int64]int64{}) + node, useSlot = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{ + CompactionTask: &datapb.CompactionTask{ + Type: datapb.CompactionType_ClusteringCompaction, + }, + }) s.Equal(int64(NullNodeID), node) } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index e05fa1b1b8cc5..95f2022ddcff1 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -479,6 +479,7 @@ func Test_compactionTrigger_force(t *testing.T) { Channel: "ch1", TotalRows: 200, Schema: schema, + SlotUsage: 8, }, }, }, diff --git a/internal/datacoord/import_util.go b/internal/datacoord/import_util.go index 760b952e10365..0788f2e8d6256 100644 --- a/internal/datacoord/import_util.go +++ b/internal/datacoord/import_util.go @@ -215,7 +215,7 @@ func AssembleImportRequest(task ImportTask, job ImportJob, meta *meta, alloc all Files: importFiles, Options: job.GetOptions(), Ts: ts, - AutoIDRange: &datapb.AutoIDRange{Begin: idBegin, End: idEnd}, + IDRange: &datapb.IDRange{Begin: idBegin, End: idEnd}, RequestSegments: requestSegments, }, nil } diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 81a9a15c77825..f583162f178ec 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -167,6 +167,10 @@ func (t *clusteringCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + func (t *clusteringCompactionTask) GetCollection() int64 { return t.plan.GetSegmentBinlogs()[0].GetCollectionID() } @@ -207,7 +211,6 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro log.Warn("compact wrong, illegal compaction type") return nil, merr.WrapErrIllegalCompactionPlan() } - log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan())) if !funcutil.CheckCtxValid(ctx) { log.Warn("compact wrong, task context done or timeout") return nil, ctx.Err() @@ -1163,3 +1166,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b buffer.writer = writer return pack, nil } + +func (t *clusteringCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/compactor.go b/internal/datanode/compaction/compactor.go index 825723a98fd52..6d929bd30af9a 100644 --- a/internal/datanode/compaction/compactor.go +++ b/internal/datanode/compaction/compactor.go @@ -29,4 +29,6 @@ type Compactor interface { GetPlanID() typeutil.UniqueID GetCollection() typeutil.UniqueID GetChannelName() string + GetCompactionType() datapb.CompactionType + GetSlotUsage() int64 } diff --git a/internal/datanode/compaction/executor.go b/internal/datanode/compaction/executor.go index 167fc03acaaae..3caa27ef1d561 100644 --- a/internal/datanode/compaction/executor.go +++ b/internal/datanode/compaction/executor.go @@ -26,6 +26,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -37,7 +38,7 @@ const ( type Executor interface { Start(ctx context.Context) - Execute(task Compactor) + Execute(task Compactor) (bool, error) Slots() int64 RemoveTask(planID int64) GetResults(planID int64) []*datapb.CompactionPlanResult @@ -50,8 +51,10 @@ type executor struct { completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult taskCh chan Compactor - taskSem *semaphore.Weighted + taskSem *semaphore.Weighted // todo remove this, unify with slot logic dropped *typeutil.ConcurrentSet[string] // vchannel dropped + usingSlots int64 + slotMu sync.RWMutex // To prevent concurrency of release channel and compaction get results // all released channel's compaction tasks will be discarded @@ -66,27 +69,65 @@ func NewExecutor() *executor { taskCh: make(chan Compactor, maxTaskQueueNum), taskSem: semaphore.NewWeighted(maxParallelTaskNum), dropped: typeutil.NewConcurrentSet[string](), + usingSlots: 0, } } -func (e *executor) Execute(task Compactor) { +func (e *executor) Execute(task Compactor) (bool, error) { + e.slotMu.Lock() + defer e.slotMu.Unlock() + if paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64()-e.usingSlots >= task.GetSlotUsage() { + newSlotUsage := task.GetSlotUsage() + // compatible for old datacoord or unexpected request + if task.GetSlotUsage() <= 0 { + switch task.GetCompactionType() { + case datapb.CompactionType_ClusteringCompaction: + newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_MixCompaction: + newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64() + case datapb.CompactionType_Level0DeleteCompaction: + newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64() + } + log.Warn("illegal task slot usage, change it to a default value", zap.Int64("illegalSlotUsage", task.GetSlotUsage()), zap.Int64("newSlotUsage", newSlotUsage)) + } + e.usingSlots = e.usingSlots + newSlotUsage + } else { + return false, merr.WrapErrDataNodeSlotExhausted() + } _, ok := e.executing.GetOrInsert(task.GetPlanID(), task) if ok { log.Warn("duplicated compaction task", zap.Int64("planID", task.GetPlanID()), zap.String("channel", task.GetChannelName())) - return + return false, merr.WrapErrDuplicatedCompactionTask() } e.taskCh <- task + return true, nil } func (e *executor) Slots() int64 { - return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len()) + return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - e.getUsingSlots() +} + +func (e *executor) getUsingSlots() int64 { + e.slotMu.RLock() + defer e.slotMu.RUnlock() + return e.usingSlots } func (e *executor) toCompleteState(task Compactor) { task.Complete() - e.executing.GetAndRemove(task.GetPlanID()) + e.getAndRemoveExecuting(task.GetPlanID()) +} + +func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) { + task, ok := e.executing.GetAndRemove(planID) + if ok { + e.slotMu.Lock() + e.usingSlots = e.usingSlots - task.GetSlotUsage() + e.slotMu.Unlock() + } + return task, ok } func (e *executor) RemoveTask(planID int64) { @@ -140,7 +181,7 @@ func (e *executor) executeTask(task Compactor) { } func (e *executor) stopTask(planID int64) { - task, loaded := e.executing.GetAndRemove(planID) + task, loaded := e.getAndRemoveExecuting(planID) if loaded { log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName())) task.Stop() diff --git a/internal/datanode/compaction/executor_test.go b/internal/datanode/compaction/executor_test.go index 81b64556dafe9..dc491d3afd448 100644 --- a/internal/datanode/compaction/executor_test.go +++ b/internal/datanode/compaction/executor_test.go @@ -25,17 +25,82 @@ import ( "github.com/stretchr/testify/require" "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/paramtable" ) func TestCompactionExecutor(t *testing.T) { t.Run("Test execute", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) planID := int64(1) mockC := NewMockCompactor(t) mockC.EXPECT().GetPlanID().Return(planID) mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetSlotUsage().Return(8) executor := NewExecutor() - executor.Execute(mockC) - executor.Execute(mockC) + succeed, err := executor.Execute(mockC) + assert.Equal(t, true, succeed) + assert.NoError(t, err) + assert.EqualValues(t, 1, len(executor.taskCh)) + assert.EqualValues(t, 1, executor.executing.Len()) + + mockC.EXPECT().Stop().Return().Once() + executor.stopTask(planID) + }) + + t.Run("Test deplicate execute", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) + planID := int64(1) + mockC := NewMockCompactor(t) + mockC.EXPECT().GetPlanID().Return(planID) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetSlotUsage().Return(8) + executor := NewExecutor() + succeed, err := executor.Execute(mockC) + assert.Equal(t, true, succeed) + assert.NoError(t, err) + + succeed2, err2 := executor.Execute(mockC) + assert.Equal(t, false, succeed2) + assert.Error(t, err2) + assert.True(t, errors.Is(err2, merr.ErrDuplicatedCompactionTask)) + + assert.EqualValues(t, 1, len(executor.taskCh)) + assert.EqualValues(t, 1, executor.executing.Len()) + + mockC.EXPECT().Stop().Return().Once() + executor.stopTask(planID) + }) + + t.Run("Test execute task slot usage larger than free slop", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) + mockC := NewMockCompactor(t) + mockC.EXPECT().GetSlotUsage().Return(100) + executor := NewExecutor() + + succeed, err := executor.Execute(mockC) + assert.Equal(t, false, succeed) + assert.True(t, errors.Is(err, merr.ErrDataNodeSlotExhausted)) + + assert.EqualValues(t, 0, len(executor.taskCh)) + assert.EqualValues(t, 0, executor.executing.Len()) + }) + + t.Run("Test execute task with slot=0", func(t *testing.T) { + paramtable.Get().Init(paramtable.NewBaseTable()) + planID := int64(1) + mockC := NewMockCompactor(t) + mockC.EXPECT().GetPlanID().Return(planID) + mockC.EXPECT().GetChannelName().Return("ch1") + mockC.EXPECT().GetCompactionType().Return(datapb.CompactionType_MixCompaction) + mockC.EXPECT().GetSlotUsage().Return(0) + executor := NewExecutor() + + succeed, err := executor.Execute(mockC) + assert.Equal(t, true, succeed) + assert.NoError(t, err) + assert.Equal(t, int64(8), executor.Slots()) + assert.Equal(t, int64(8), executor.usingSlots) assert.EqualValues(t, 1, len(executor.taskCh)) assert.EqualValues(t, 1, executor.executing.Len()) @@ -115,6 +180,7 @@ func TestCompactionExecutor(t *testing.T) { mc.EXPECT().GetPlanID().Return(int64(1)) mc.EXPECT().GetChannelName().Return("mock") mc.EXPECT().Compact().Return(&datapb.CompactionPlanResult{PlanID: 1}, nil).Maybe() + mc.EXPECT().GetSlotUsage().Return(8) mc.EXPECT().Stop().Return().Once() ex.Execute(mc) diff --git a/internal/datanode/compaction/l0_compactor.go b/internal/datanode/compaction/l0_compactor.go index ecd3c5f170acb..95fe4f6438bc5 100644 --- a/internal/datanode/compaction/l0_compactor.go +++ b/internal/datanode/compaction/l0_compactor.go @@ -97,6 +97,10 @@ func (t *LevelZeroCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *LevelZeroCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + func (t *LevelZeroCompactionTask) GetCollection() int64 { // The length of SegmentBinlogs is checked before task enqueueing. return t.plan.GetSegmentBinlogs()[0].GetCollectionID() @@ -431,3 +435,7 @@ func (t *LevelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegm err := conc.AwaitAll(futures...) return bfs, err } + +func (t *LevelZeroCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index 8144ed8e07366..4b853adfdac39 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -94,6 +94,10 @@ func (t *mixCompactionTask) GetChannelName() string { return t.plan.GetChannel() } +func (t *mixCompactionTask) GetCompactionType() datapb.CompactionType { + return t.plan.GetType() +} + // return num rows of all segment compaction from func (t *mixCompactionTask) getNumRows() int64 { numRows := int64(0) @@ -392,3 +396,7 @@ func (t *mixCompactionTask) isExpiredEntity(ts typeutil.Timestamp) bool { return entityT.Add(time.Duration(t.plan.GetCollectionTtl())).Before(nowT) } + +func (t *mixCompactionTask) GetSlotUsage() int64 { + return t.plan.GetSlotUsage() +} diff --git a/internal/datanode/compaction/mock_compactor.go b/internal/datanode/compaction/mock_compactor.go index 19a83bf2e1b9d..073a25dac8e25 100644 --- a/internal/datanode/compaction/mock_compactor.go +++ b/internal/datanode/compaction/mock_compactor.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.43.2. DO NOT EDIT. package compaction @@ -24,6 +24,10 @@ func (_m *MockCompactor) EXPECT() *MockCompactor_Expecter { func (_m *MockCompactor) Compact() (*datapb.CompactionPlanResult, error) { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for Compact") + } + var r0 *datapb.CompactionPlanResult var r1 error if rf, ok := ret.Get(0).(func() (*datapb.CompactionPlanResult, error)); ok { @@ -109,6 +113,10 @@ func (_c *MockCompactor_Complete_Call) RunAndReturn(run func()) *MockCompactor_C func (_m *MockCompactor) GetChannelName() string { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetChannelName") + } + var r0 string if rf, ok := ret.Get(0).(func() string); ok { r0 = rf() @@ -150,6 +158,10 @@ func (_c *MockCompactor_GetChannelName_Call) RunAndReturn(run func() string) *Mo func (_m *MockCompactor) GetCollection() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetCollection") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -187,10 +199,59 @@ func (_c *MockCompactor_GetCollection_Call) RunAndReturn(run func() int64) *Mock return _c } +// GetCompactionType provides a mock function with given fields: +func (_m *MockCompactor) GetCompactionType() datapb.CompactionType { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetCompactionType") + } + + var r0 datapb.CompactionType + if rf, ok := ret.Get(0).(func() datapb.CompactionType); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(datapb.CompactionType) + } + + return r0 +} + +// MockCompactor_GetCompactionType_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetCompactionType' +type MockCompactor_GetCompactionType_Call struct { + *mock.Call +} + +// GetCompactionType is a helper method to define mock.On call +func (_e *MockCompactor_Expecter) GetCompactionType() *MockCompactor_GetCompactionType_Call { + return &MockCompactor_GetCompactionType_Call{Call: _e.mock.On("GetCompactionType")} +} + +func (_c *MockCompactor_GetCompactionType_Call) Run(run func()) *MockCompactor_GetCompactionType_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactor_GetCompactionType_Call) Return(_a0 datapb.CompactionType) *MockCompactor_GetCompactionType_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactor_GetCompactionType_Call) RunAndReturn(run func() datapb.CompactionType) *MockCompactor_GetCompactionType_Call { + _c.Call.Return(run) + return _c +} + // GetPlanID provides a mock function with given fields: func (_m *MockCompactor) GetPlanID() int64 { ret := _m.Called() + if len(ret) == 0 { + panic("no return value specified for GetPlanID") + } + var r0 int64 if rf, ok := ret.Get(0).(func() int64); ok { r0 = rf() @@ -228,6 +289,51 @@ func (_c *MockCompactor_GetPlanID_Call) RunAndReturn(run func() int64) *MockComp return _c } +// GetSlotUsage provides a mock function with given fields: +func (_m *MockCompactor) GetSlotUsage() int64 { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetSlotUsage") + } + + var r0 int64 + if rf, ok := ret.Get(0).(func() int64); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(int64) + } + + return r0 +} + +// MockCompactor_GetSlotUsage_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSlotUsage' +type MockCompactor_GetSlotUsage_Call struct { + *mock.Call +} + +// GetSlotUsage is a helper method to define mock.On call +func (_e *MockCompactor_Expecter) GetSlotUsage() *MockCompactor_GetSlotUsage_Call { + return &MockCompactor_GetSlotUsage_Call{Call: _e.mock.On("GetSlotUsage")} +} + +func (_c *MockCompactor_GetSlotUsage_Call) Run(run func()) *MockCompactor_GetSlotUsage_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockCompactor_GetSlotUsage_Call) Return(_a0 int64) *MockCompactor_GetSlotUsage_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCompactor_GetSlotUsage_Call) RunAndReturn(run func() int64) *MockCompactor_GetSlotUsage_Call { + _c.Call.Return(run) + return _c +} + // Stop provides a mock function with given fields: func (_m *MockCompactor) Stop() { _m.Called() diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index 112d537f3a4aa..4e9592ca3e555 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -265,7 +265,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() { }, }, Ts: 1000, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: int64(s.numRows), }, @@ -326,7 +326,7 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { }, }, Ts: 1000, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: int64(s.numRows), }, @@ -417,7 +417,7 @@ func (s *SchedulerSuite) TestScheduler_ImportFile() { }, }, Ts: 1000, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: int64(s.numRows), }, diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 400a339af5a00..a0e3bf36aff2f 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -152,7 +152,7 @@ func CheckRowsEqual(schema *schemapb.CollectionSchema, data *storage.InsertData) } func AppendSystemFieldsData(task *ImportTask, data *storage.InsertData) error { - idRange := task.req.GetAutoIDRange() + idRange := task.req.GetIDRange() pkField, err := typeutil.GetPrimaryFieldSchema(task.GetSchema()) if err != nil { return err diff --git a/internal/datanode/importv2/util_test.go b/internal/datanode/importv2/util_test.go index b1cca451e360a..7add07ecd1d41 100644 --- a/internal/datanode/importv2/util_test.go +++ b/internal/datanode/importv2/util_test.go @@ -58,7 +58,7 @@ func Test_AppendSystemFieldsData(t *testing.T) { task := &ImportTask{ req: &datapb.ImportRequest{ Ts: 1000, - AutoIDRange: &datapb.AutoIDRange{ + IDRange: &datapb.IDRange{ Begin: 0, End: count, }, diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 6ed25f5121ddb..24436df1de9ab 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -250,8 +250,12 @@ func (node *DataNode) CompactionV2(ctx context.Context, req *datapb.CompactionPl return merr.Status(merr.WrapErrParameterInvalidMsg("Unknown compaction type: %v", req.GetType().String())), nil } - node.compactionExecutor.Execute(task) - return merr.Success(), nil + succeed, err := node.compactionExecutor.Execute(task) + if succeed { + return merr.Success(), nil + } else { + return merr.Status(err), nil + } } // GetCompactionState called by DataCoord diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 038df3a4b6023..4154821c590b3 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -169,6 +169,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { mockC.EXPECT().GetPlanID().Return(int64(1)) mockC.EXPECT().GetCollection().Return(collection) mockC.EXPECT().GetChannelName().Return(channel) + mockC.EXPECT().GetSlotUsage().Return(8) mockC.EXPECT().Complete().Return() mockC.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ PlanID: 1, @@ -180,6 +181,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { mockC2.EXPECT().GetPlanID().Return(int64(2)) mockC2.EXPECT().GetCollection().Return(collection) mockC2.EXPECT().GetChannelName().Return(channel) + mockC2.EXPECT().GetSlotUsage().Return(8) mockC2.EXPECT().Complete().Return() mockC2.EXPECT().Compact().Return(&datapb.CompactionPlanResult{ PlanID: 2, diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index c2b5a8e5e237d..e2b5afb4eaea1 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -560,6 +560,9 @@ message CompactionPlan { string analyze_result_path = 14; repeated int64 analyze_segment_ids = 15; int32 state = 16; + int64 begin_logID = 17; + IDRange pre_allocated_segments = 18; // only for clustering compaction + int64 slot_usage = 19; } message CompactionSegment { @@ -723,7 +726,7 @@ message PreImportRequest { repeated common.KeyValuePair options = 9; } -message autoIDRange { +message IDRange { int64 begin = 1; int64 end = 2; } @@ -745,7 +748,7 @@ message ImportRequest { repeated internal.ImportFile files = 8; repeated common.KeyValuePair options = 9; uint64 ts = 10; - autoIDRange autoID_range = 11; + IDRange ID_range = 11; repeated ImportRequestSegment request_segments = 12; } diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index 7e7c602ff3b35..1c9f7daa83b49 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -207,6 +207,9 @@ var ( ErrBuildCompactionRequestFail = newMilvusError("fail to build CompactionRequest", 2312, true) ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true) ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false) + ErrDuplicatedCompactionTask = newMilvusError("duplicated compaction task", 2315, false) + + ErrDataNodeSlotExhausted = newMilvusError("datanode slot exhausted", 2401, false) // General ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index ad074c120c72a..1c84e51d02976 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -1183,3 +1183,19 @@ func WrapErrCompactionResult(msg ...string) error { } return err } + +func WrapErrDataNodeSlotExhausted(msg ...string) error { + err := error(ErrDataNodeSlotExhausted) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} + +func WrapErrDuplicatedCompactionTask(msg ...string) error { + err := error(ErrDuplicatedCompactionTask) + if len(msg) > 0 { + err = errors.Wrap(err, strings.Join(msg, "->")) + } + return err +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index a320934273e37..cc61433a212b5 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2939,6 +2939,10 @@ type dataCoordConfig struct { WaitForIndex ParamItem `refreshable:"true"` GracefulStopTimeout ParamItem `refreshable:"true"` + + ClusteringCompactionSlotUsage ParamItem `refreshable:"true"` + MixCompactionSlotUsage ParamItem `refreshable:"true"` + L0DeleteCompactionSlotUsage ParamItem `refreshable:"true"` } func (p *dataCoordConfig) init(base *BaseTable) { @@ -3683,6 +3687,36 @@ During compaction, the size of segment # of rows is able to exceed segment max # Export: true, } p.GracefulStopTimeout.Init(base.mgr) + + p.ClusteringCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.clusteringCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of clustering compaction job.", + DefaultValue: "16", + PanicIfEmpty: false, + Export: true, + } + p.ClusteringCompactionSlotUsage.Init(base.mgr) + + p.MixCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.mixCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of mix compaction job.", + DefaultValue: "8", + PanicIfEmpty: false, + Export: true, + } + p.MixCompactionSlotUsage.Init(base.mgr) + + p.L0DeleteCompactionSlotUsage = ParamItem{ + Key: "dataCoord.slot.l0DeleteCompactionUsage", + Version: "2.4.6", + Doc: "slot usage of l0 compaction job.", + DefaultValue: "8", + PanicIfEmpty: false, + Export: true, + } + p.L0DeleteCompactionSlotUsage.Init(base.mgr) } // ///////////////////////////////////////////////////////////////////////////// @@ -4081,7 +4115,7 @@ if this parameter <= 0, will set it as 10`, p.SlotCap = ParamItem{ Key: "dataNode.slot.slotCap", Version: "2.4.2", - DefaultValue: "2", + DefaultValue: "16", Doc: "The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode", Export: true, } @@ -4101,7 +4135,7 @@ if this parameter <= 0, will set it as 10`, Key: "dataNode.clusteringCompaction.workPoolSize", Version: "2.4.6", Doc: "worker pool size for one clustering compaction job.", - DefaultValue: "1", + DefaultValue: "8", PanicIfEmpty: false, Export: true, } diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 0f695eeae967a..52c22a9940f18 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -465,6 +465,12 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, int64(100*1024*1024), Params.ClusteringCompactionMaxSegmentSize.GetAsSize()) params.Save("dataCoord.compaction.clustering.preferSegmentSize", "10m") assert.Equal(t, int64(10*1024*1024), Params.ClusteringCompactionPreferSegmentSize.GetAsSize()) + params.Save("dataCoord.slot.clusteringCompactionUsage", "10") + assert.Equal(t, 10, Params.ClusteringCompactionSlotUsage.GetAsInt()) + params.Save("dataCoord.slot.mixCompactionUsage", "5") + assert.Equal(t, 5, Params.MixCompactionSlotUsage.GetAsInt()) + params.Save("dataCoord.slot.l0DeleteCompactionUsage", "4") + assert.Equal(t, 4, Params.L0DeleteCompactionSlotUsage.GetAsInt()) }) t.Run("test dataNodeConfig", func(t *testing.T) { @@ -518,10 +524,13 @@ func TestComponentParam(t *testing.T) { params.Save("datanode.gracefulStopTimeout", "100") assert.Equal(t, 100*time.Second, Params.GracefulStopTimeout.GetAsDuration(time.Second)) assert.Equal(t, 4, Params.BloomFilterApplyParallelFactor.GetAsInt()) - assert.Equal(t, 2, Params.SlotCap.GetAsInt()) + assert.Equal(t, 16, Params.SlotCap.GetAsInt()) + // clustering compaction params.Save("datanode.clusteringCompaction.memoryBufferRatio", "0.1") assert.Equal(t, 0.1, Params.ClusteringCompactionMemoryBufferRatio.GetAsFloat()) + params.Save("datanode.clusteringCompaction.workPoolSize", "2") + assert.Equal(t, int64(2), Params.ClusteringCompactionWorkerPoolSize.GetAsInt64()) }) t.Run("test indexNodeConfig", func(t *testing.T) {