Skip to content

Commit

Permalink
enhance: Add compaction task slot usage logic (milvus-io#34581)
Browse files Browse the repository at this point in the history
milvus-io#34544

Signed-off-by: wayblink <[email protected]>
  • Loading branch information
wayblink authored Jul 18, 2024
1 parent 0c0ca4c commit c79d1af
Show file tree
Hide file tree
Showing 21 changed files with 422 additions and 32 deletions.
7 changes: 6 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,10 @@ dataCoord:
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
syncSegmentsInterval: 300
slot:
clusteringCompactionUsage: 16
mixCompactionUsage: 8
l0DeleteCompactionUsage: 8

dataNode:
dataSync:
Expand Down Expand Up @@ -570,10 +574,11 @@ dataNode:
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
slot:
slotCap: 2 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.
slotCap: 16 # The maximum number of tasks(e.g. compaction, importing) allowed to run concurrently on a datanode.

clusteringCompaction:
memoryBufferRatio: 0.1 # The ratio of memory buffer of clustering compaction. Data larger than threshold will be spilled to storage.
workPoolSize: 8

# Configures the system log output.
log:
Expand Down
31 changes: 21 additions & 10 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

Expand Down Expand Up @@ -163,7 +164,6 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
if t.GetTriggerID() == triggerID {
cnt += 1
}
// if t.GetPlanID()
}
c.queueGuard.RUnlock()
c.executingGuard.RLock()
Expand Down Expand Up @@ -617,17 +617,19 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
}

for _, t := range tasks {
nodeID := c.pickAnyNode(slots)
nodeID, useSlot := c.pickAnyNode(slots, t)
if nodeID == NullNodeID {
log.Info("compactionHandler cannot find datanode for compaction task",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()))
zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()), zap.String("vchannel", t.GetChannel()))
continue
}
err := t.SetNodeID(nodeID)
if err != nil {
log.Info("compactionHandler assignNodeID failed",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Error(err))
} else {
// update the input nodeSlots
slots[nodeID] = slots[nodeID] - useSlot
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID))
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec()
Expand Down Expand Up @@ -674,18 +676,27 @@ func (c *compactionPlanHandler) checkCompaction() error {
return nil
}

func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64) int64 {
var (
nodeID int64 = NullNodeID
maxSlots int64 = -1
)
func (c *compactionPlanHandler) pickAnyNode(nodeSlots map[int64]int64, task CompactionTask) (nodeID int64, useSlot int64) {
nodeID = NullNodeID
var maxSlots int64 = -1

switch task.GetType() {
case datapb.CompactionType_ClusteringCompaction:
useSlot = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
useSlot = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
useSlot = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}

for id, slots := range nodeSlots {
if slots > 0 && slots > maxSlots {
if slots >= useSlot && slots > maxSlots {
nodeID = id
maxSlots = slots
}
}
return nodeID

return nodeID, useSlot
}

func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t CompactionTask) int64 {
Expand Down
12 changes: 10 additions & 2 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,13 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP
MaxSegmentRows: t.GetMaxSegmentRows(),
PreferSegmentRows: t.GetPreferSegmentRows(),
AnalyzeResultPath: path.Join(t.meta.(*meta).chunkManager.RootPath(), common.AnalyzeStatsPath, metautil.JoinIDPath(t.AnalyzeTaskID, t.AnalyzeVersion)),
AnalyzeSegmentIds: t.GetInputSegments(), // todo: if need
AnalyzeSegmentIds: t.GetInputSegments(),
BeginLogID: beginLogID,
PreAllocatedSegments: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
End: t.GetResultSegments()[1],
},
SlotUsage: Params.DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down Expand Up @@ -416,8 +417,10 @@ func (t *clusteringCompactionTask) doAnalyze() error {
func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
if t.NeedReAssignNodeID() {
return errors.New("not assign nodeID")
log.RatedWarn(10, "not assign nodeID")
return nil
}
log = log.With(zap.Int64("nodeID", t.GetNodeID()))

// todo refine this logic: GetCompactionPlanResult return a fail result when this is no compaction in datanode which is weird
// check whether the compaction plan is already submitted considering
Expand Down Expand Up @@ -446,6 +449,11 @@ func (t *clusteringCompactionTask) doCompact() error {
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
if errors.Is(err, merr.ErrDataNodeSlotExhausted) {
log.Warn("fail to notify compaction tasks to DataNode because the node slots exhausted")
t.updateAndSaveTaskMeta(setNodeID(NullNodeID))
return nil
}
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
TotalRows: t.GetTotalRows(),
Schema: t.GetSchema(),
BeginLogID: beginLogID,
SlotUsage: Params.DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64(),
}

log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))
Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,7 @@ func (t *mixCompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, er
PreAllocatedSegments: &datapb.IDRange{
Begin: t.GetResultSegments()[0],
},
SlotUsage: Params.DataCoordCfg.MixCompactionSlotUsage.GetAsInt64(),
}
log := log.With(zap.Int64("taskID", t.GetTriggerID()), zap.Int64("planID", plan.GetPlanID()))

Expand Down
62 changes: 59 additions & 3 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,15 +343,71 @@ func (s *CompactionPlanHandlerSuite) TestScheduleNodeWithL0Executing() {
}

func (s *CompactionPlanHandlerSuite) TestPickAnyNode() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 16,
101: 24,
}
node, useSlot := s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(100), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(nodeSlots, &mixCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_MixCompaction,
},
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node, useSlot = s.handler.pickAnyNode(map[int64]int64{}, &mixCompactionTask{})
s.Equal(int64(NullNodeID), node)
}

func (s *CompactionPlanHandlerSuite) TestPickAnyNodeForClusteringTask() {
s.SetupTest()
nodeSlots := map[int64]int64{
100: 2,
101: 3,
101: 16,
102: 10,
}
executingTasks := make(map[int64]CompactionTask, 0)
executingTasks[1] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
}
executingTasks[2] = &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
}
node := s.handler.pickAnyNode(nodeSlots)
s.handler.executingTasks = executingTasks
node, useSlot := s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(101), node)
nodeSlots[node] = nodeSlots[node] - useSlot

node = s.handler.pickAnyNode(map[int64]int64{})
node, useSlot = s.handler.pickAnyNode(nodeSlots, &clusteringCompactionTask{
CompactionTask: &datapb.CompactionTask{
Type: datapb.CompactionType_ClusteringCompaction,
},
})
s.Equal(int64(NullNodeID), node)
}

Expand Down
1 change: 1 addition & 0 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ func Test_compactionTrigger_force(t *testing.T) {
TotalRows: 200,
Schema: schema,
PreAllocatedSegments: &datapb.IDRange{Begin: 100},
SlotUsage: 8,
},
},
},
Expand Down
9 changes: 8 additions & 1 deletion internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ func (t *clusteringCompactionTask) GetChannelName() string {
return t.plan.GetChannel()
}

func (t *clusteringCompactionTask) GetCompactionType() datapb.CompactionType {
return t.plan.GetType()
}

func (t *clusteringCompactionTask) GetCollection() int64 {
return t.plan.GetSegmentBinlogs()[0].GetCollectionID()
}
Expand Down Expand Up @@ -210,7 +214,6 @@ func (t *clusteringCompactionTask) Compact() (*datapb.CompactionPlanResult, erro
log.Warn("compact wrong, illegal compaction type")
return nil, merr.WrapErrIllegalCompactionPlan()
}
log.Info("Clustering compaction", zap.Duration("wait in queue elapse", t.tr.RecordSpan()))
if !funcutil.CheckCtxValid(ctx) {
log.Warn("compact wrong, task context done or timeout")
return nil, ctx.Err()
Expand Down Expand Up @@ -1171,3 +1174,7 @@ func (t *clusteringCompactionTask) refreshBufferWriter(buffer *ClusterBuffer) (b
buffer.writer = writer
return pack, nil
}

func (t *clusteringCompactionTask) GetSlotUsage() int64 {
return t.plan.GetSlotUsage()
}
2 changes: 2 additions & 0 deletions internal/datanode/compaction/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ type Compactor interface {
GetPlanID() typeutil.UniqueID
GetCollection() typeutil.UniqueID
GetChannelName() string
GetCompactionType() datapb.CompactionType
GetSlotUsage() int64
}
55 changes: 48 additions & 7 deletions internal/datanode/compaction/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
Expand All @@ -37,7 +38,7 @@ const (

type Executor interface {
Start(ctx context.Context)
Execute(task Compactor)
Execute(task Compactor) (bool, error)
Slots() int64
RemoveTask(planID int64)
GetResults(planID int64) []*datapb.CompactionPlanResult
Expand All @@ -50,8 +51,10 @@ type executor struct {
completedCompactor *typeutil.ConcurrentMap[int64, Compactor] // planID to compactor
completed *typeutil.ConcurrentMap[int64, *datapb.CompactionPlanResult] // planID to CompactionPlanResult
taskCh chan Compactor
taskSem *semaphore.Weighted
taskSem *semaphore.Weighted // todo remove this, unify with slot logic
dropped *typeutil.ConcurrentSet[string] // vchannel dropped
usingSlots int64
slotMu sync.RWMutex

// To prevent concurrency of release channel and compaction get results
// all released channel's compaction tasks will be discarded
Expand All @@ -66,27 +69,65 @@ func NewExecutor() *executor {
taskCh: make(chan Compactor, maxTaskQueueNum),
taskSem: semaphore.NewWeighted(maxParallelTaskNum),
dropped: typeutil.NewConcurrentSet[string](),
usingSlots: 0,
}
}

func (e *executor) Execute(task Compactor) {
func (e *executor) Execute(task Compactor) (bool, error) {
e.slotMu.Lock()
defer e.slotMu.Unlock()
if paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64()-e.usingSlots >= task.GetSlotUsage() {
newSlotUsage := task.GetSlotUsage()
// compatible for old datacoord or unexpected request
if task.GetSlotUsage() <= 0 {
switch task.GetCompactionType() {
case datapb.CompactionType_ClusteringCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.ClusteringCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_MixCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.MixCompactionSlotUsage.GetAsInt64()
case datapb.CompactionType_Level0DeleteCompaction:
newSlotUsage = paramtable.Get().DataCoordCfg.L0DeleteCompactionSlotUsage.GetAsInt64()
}
log.Warn("illegal task slot usage, change it to a default value", zap.Int64("illegalSlotUsage", task.GetSlotUsage()), zap.Int64("newSlotUsage", newSlotUsage))
}
e.usingSlots = e.usingSlots + newSlotUsage
} else {
return false, merr.WrapErrDataNodeSlotExhausted()
}
_, ok := e.executing.GetOrInsert(task.GetPlanID(), task)
if ok {
log.Warn("duplicated compaction task",
zap.Int64("planID", task.GetPlanID()),
zap.String("channel", task.GetChannelName()))
return
return false, merr.WrapErrDuplicatedCompactionTask()
}
e.taskCh <- task
return true, nil
}

func (e *executor) Slots() int64 {
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - int64(e.executing.Len())
return paramtable.Get().DataNodeCfg.SlotCap.GetAsInt64() - e.getUsingSlots()
}

func (e *executor) getUsingSlots() int64 {
e.slotMu.RLock()
defer e.slotMu.RUnlock()
return e.usingSlots
}

func (e *executor) toCompleteState(task Compactor) {
task.Complete()
e.executing.GetAndRemove(task.GetPlanID())
e.getAndRemoveExecuting(task.GetPlanID())
}

func (e *executor) getAndRemoveExecuting(planID typeutil.UniqueID) (Compactor, bool) {
task, ok := e.executing.GetAndRemove(planID)
if ok {
e.slotMu.Lock()
e.usingSlots = e.usingSlots - task.GetSlotUsage()
e.slotMu.Unlock()
}
return task, ok
}

func (e *executor) RemoveTask(planID int64) {
Expand Down Expand Up @@ -140,7 +181,7 @@ func (e *executor) executeTask(task Compactor) {
}

func (e *executor) stopTask(planID int64) {
task, loaded := e.executing.GetAndRemove(planID)
task, loaded := e.getAndRemoveExecuting(planID)
if loaded {
log.Warn("compaction executor stop task", zap.Int64("planID", planID), zap.String("vChannelName", task.GetChannelName()))
task.Stop()
Expand Down
Loading

0 comments on commit c79d1af

Please sign in to comment.