Skip to content

Commit

Permalink
enhance: Decouple compaction from shard (milvus-io#33138)
Browse files Browse the repository at this point in the history
Decouple compaction from shard, remove dependencies on shards (e.g.
SyncSegments, injection).

issue: milvus-io#32809

---------

Signed-off-by: bigsheeper <[email protected]>
  • Loading branch information
bigsheeper authored May 24, 2024
1 parent 592d701 commit 7730b91
Show file tree
Hide file tree
Showing 26 changed files with 843 additions and 876 deletions.
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down Expand Up @@ -516,6 +517,7 @@ github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYb
github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE=
github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro=
github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8=
github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand Down
48 changes: 18 additions & 30 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,12 @@ const (
tsTimeout = uint64(1)
)

//go:generate mockery --name=compactionPlanContext --structname=MockCompactionPlanContext --output=./ --filename=mock_compaction_plan_context.go --with-expecter --inpackage
type compactionPlanContext interface {
start()
stop()
// execCompactionPlan start to execute plan and return immediately
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error
execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan)
// getCompaction return compaction task. If planId does not exist, return nil.
getCompaction(planID int64) *compactionTask
// updateCompaction set the compaction state to timeout or completed
Expand Down Expand Up @@ -277,14 +278,8 @@ func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskO
}
}

func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
nodeID, err := c.chManager.FindWatcher(plan.GetChannel())
if err != nil {
log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err))
return err
}

log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID))
func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) {
log := log.With(zap.Int64("planID", plan.GetPlanID()))
c.setSegmentsCompacting(plan, true)

_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", plan.GetType()))
Expand All @@ -293,16 +288,14 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data
triggerInfo: signal,
plan: plan,
state: pipelining,
dataNodeID: nodeID,
span: span,
}
c.mu.Lock()
c.plans[plan.PlanID] = task
c.mu.Unlock()

c.scheduler.Submit(task)
log.Info("Compaction plan submited")
return nil
log.Info("Compaction plan submitted")
}

func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {
Expand Down Expand Up @@ -337,10 +330,14 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error {

sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs {
return &datapb.CompactionSegmentBinlogs{
SegmentID: info.GetID(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
SegmentID: info.GetID(),
FieldBinlogs: nil,
Field2StatslogPaths: info.GetStatslogs(),
Deltalogs: nil,
InsertChannel: info.GetInsertChannel(),
Level: info.GetLevel(),
CollectionID: info.GetCollectionID(),
PartitionID: info.GetPartitionID(),
}
})

Expand Down Expand Up @@ -407,8 +404,8 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) {
}

// execCompactionPlan start to execute plan and return immediately
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error {
return c.enqueuePlan(signal, plan)
func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) {
c.enqueuePlan(signal, plan)
}

func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) {
Expand Down Expand Up @@ -483,25 +480,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
log.Info("meta has already been changed, skip meta change and retry sync segments")
} else {
// Also prepare metric updates.
newSegments, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result)
_, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result)
if err != nil {
return err
}
// Apply metrics after successful meta update.
metricMutation.commit()
newSegmentInfo = newSegments[0]
}

nodeID := c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
CompactedTo: newSegmentInfo.GetID(),
CompactedFrom: newSegmentInfo.GetCompactionFrom(),
NumOfRows: newSegmentInfo.GetNumOfRows(),
StatsLogs: newSegmentInfo.GetStatslogs(),
ChannelName: plan.GetChannel(),
PartitionId: newSegmentInfo.GetPartitionID(),
CollectionId: newSegmentInfo.GetCollectionID(),
PlanID: plan.PlanID,
}

log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
Expand Down Expand Up @@ -633,8 +622,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// without changing the meta
log.Info("compaction syncing unknown plan with node")
if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
PlanID: planID,
ChannelName: plan.GetChannel(),
PlanID: planID,
}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err
Expand Down
95 changes: 42 additions & 53 deletions internal/datacoord/compaction_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,75 +64,64 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) {

// Schedule pick 1 or 0 tasks for 1 node
func (s *CompactionScheduler) Schedule() []*compactionTask {
s.taskGuard.Lock()
nodeTasks := lo.GroupBy(s.queuingTasks, func(t *compactionTask) int64 {
return t.dataNodeID
})
s.taskGuard.Unlock()
if len(nodeTasks) == 0 {
s.taskGuard.RLock()
if len(s.queuingTasks) == 0 {
s.taskGuard.RUnlock()
return nil // To mitigate the need for frequent slot querying
}
s.taskGuard.RUnlock()

nodeSlots := s.cluster.QuerySlots()

executable := make(map[int64]*compactionTask)
l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()

pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask {
for _, task := range tasks {
// TODO: sheep, replace pickShardNode with pickAnyNode
if nodeID := s.pickShardNode(task.dataNodeID, nodeSlots); nodeID == NullNodeID {
log.Warn("cannot find datanode for compaction task", zap.Int64("planID", task.plan.PlanID), zap.String("vchannel", task.plan.Channel))
continue
for _, tasks := range s.parallelTasks {
for _, t := range tasks {
switch t.plan.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
l0ChannelExcludes.Insert(t.plan.GetChannel())
case datapb.CompactionType_MixCompaction:
mixChannelExcludes.Insert(t.plan.GetChannel())
}

if lo.Contains(exclusiveChannels, task.plan.GetChannel()) {
continue
}

if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
// Channel of LevelZeroCompaction task with no executing compactions
if !lo.Contains(executing, task.plan.GetChannel()) {
return task
}

// Don't schedule any tasks for channel with LevelZeroCompaction task
// when there're executing compactions
exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel())
continue
}

return task
}

return nil
}

s.taskGuard.Lock()
defer s.taskGuard.Unlock()
// pick 1 or 0 task for 1 node
for node, tasks := range nodeTasks {
parallel := s.parallelTasks[node]

var (
executing = typeutil.NewSet[string]()
channelsExecPrior = typeutil.NewSet[string]()
)
for _, t := range parallel {
executing.Insert(t.plan.GetChannel())
if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction {
channelsExecPrior.Insert(t.plan.GetChannel())
}
}

picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect())
if picked != nil {
executable[node] = picked
nodeSlots[node]--
picked := make([]*compactionTask, 0)
for _, t := range s.queuingTasks {
nodeID := s.pickAnyNode(nodeSlots)
if nodeID == NullNodeID {
log.Warn("cannot find datanode for compaction task",
zap.Int64("planID", t.plan.PlanID), zap.String("vchannel", t.plan.Channel))
continue
}
switch t.plan.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
if l0ChannelExcludes.Contain(t.plan.GetChannel()) ||
mixChannelExcludes.Contain(t.plan.GetChannel()) {
continue
}
t.dataNodeID = nodeID
picked = append(picked, t)
l0ChannelExcludes.Insert(t.plan.GetChannel())
nodeSlots[nodeID]--
case datapb.CompactionType_MixCompaction:
if l0ChannelExcludes.Contain(t.plan.GetChannel()) {
continue
}
t.dataNodeID = nodeID
picked = append(picked, t)
mixChannelExcludes.Insert(t.plan.GetChannel())
nodeSlots[nodeID]--
}
}

var pickPlans []int64
for node, task := range executable {
for _, task := range picked {
node := task.dataNodeID
pickPlans = append(pickPlans, task.plan.PlanID)
if _, ok := s.parallelTasks[node]; !ok {
s.parallelTasks[node] = []*compactionTask{task}
Expand All @@ -156,7 +145,7 @@ func (s *CompactionScheduler) Schedule() []*compactionTask {
}
}

return lo.Values(executable)
return picked
}

func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPlan) {
Expand Down
50 changes: 15 additions & 35 deletions internal/datacoord/compaction_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ func (s *SchedulerSuite) TestScheduleParallelTaskFull() {
}{
{"with L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
Expand Down Expand Up @@ -101,16 +101,16 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{10, 11}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-2", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{14}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{14, 13}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}

Expand All @@ -134,15 +134,6 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() {
return t.plan.PlanID
}))

// the second schedule returns empty for no slot
if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0})
s.scheduler.cluster = cluster
}
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)

s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
})
}
Expand All @@ -158,16 +149,16 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
}{
{"with L0 tasks diff channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
}, []UniqueID{10}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{10, 11}},
{"with L0 tasks same channel", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{11}},
{"without L0 tasks", []*compactionTask{
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction}},
{dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}},
}, []UniqueID{13}},
{"empty tasks", []*compactionTask{}, []UniqueID{}},
}
Expand All @@ -192,17 +183,6 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() {
return t.plan.PlanID
}))

// the second schedule returns empty for no slot
if len(test.tasks) > 0 {
cluster := NewMockCluster(s.T())
cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0})
s.scheduler.cluster = cluster
}
if len(gotTasks) > 0 {
gotTasks = s.scheduler.Schedule()
s.Empty(gotTasks)
}

s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount())
})
}
Expand Down
Loading

0 comments on commit 7730b91

Please sign in to comment.