diff --git a/go.sum b/go.sum index 20a4faf084194..25be847bbe9f7 100644 --- a/go.sum +++ b/go.sum @@ -290,6 +290,7 @@ github.com/go-latex/latex v0.0.0-20210118124228-b3d85cf34e07/go.mod h1:CO1AlKB2C github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.3.0 h1:2y3SDp0ZXuc6/cjLSZ+Q3ir+QB9T/iG5yYRXqsagWSY= github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -516,6 +517,7 @@ github.com/kataras/iris/v12 v12.1.8/go.mod h1:LMYy4VlP67TQ3Zgriz8RE2h2kMZV2SgMYb github.com/kataras/neffos v0.0.14/go.mod h1:8lqADm8PnbeFfL7CLXh1WHw53dG27MC3pgi2R1rmoTE= github.com/kataras/pio v0.0.2/go.mod h1:hAoW0t9UmXi4R5Oyq5Z4irTbaTsOemSrDGUtaTl7Dro= github.com/kataras/sitemap v0.0.5/go.mod h1:KY2eugMKiPwsJgx7+U103YZehfvNGOXURubcGyk0Bz8= +github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 87b3fdbe2cd7c..c9919d7b821ea 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -45,11 +45,12 @@ const ( tsTimeout = uint64(1) ) +//go:generate mockery --name=compactionPlanContext --structname=MockCompactionPlanContext --output=./ --filename=mock_compaction_plan_context.go --with-expecter --inpackage type compactionPlanContext interface { start() stop() // execCompactionPlan start to execute plan and return immediately - execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error + execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) // getCompaction return compaction task. If planId does not exist, return nil. getCompaction(planID int64) *compactionTask // updateCompaction set the compaction state to timeout or completed @@ -277,14 +278,8 @@ func (c *compactionPlanHandler) updateTask(planID int64, opts ...compactionTaskO } } -func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { - nodeID, err := c.chManager.FindWatcher(plan.GetChannel()) - if err != nil { - log.Error("failed to find watcher", zap.Int64("planID", plan.GetPlanID()), zap.Error(err)) - return err - } - - log := log.With(zap.Int64("planID", plan.GetPlanID()), zap.Int64("nodeID", nodeID)) +func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *datapb.CompactionPlan) { + log := log.With(zap.Int64("planID", plan.GetPlanID())) c.setSegmentsCompacting(plan, true) _, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", plan.GetType())) @@ -293,7 +288,6 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data triggerInfo: signal, plan: plan, state: pipelining, - dataNodeID: nodeID, span: span, } c.mu.Lock() @@ -301,8 +295,7 @@ func (c *compactionPlanHandler) enqueuePlan(signal *compactionSignal, plan *data c.mu.Unlock() c.scheduler.Submit(task) - log.Info("Compaction plan submited") - return nil + log.Info("Compaction plan submitted") } func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error { @@ -337,10 +330,14 @@ func (c *compactionPlanHandler) RefreshPlan(task *compactionTask) error { sealedSegBinlogs := lo.Map(sealedSegments, func(info *SegmentInfo, _ int) *datapb.CompactionSegmentBinlogs { return &datapb.CompactionSegmentBinlogs{ - SegmentID: info.GetID(), - Level: info.GetLevel(), - CollectionID: info.GetCollectionID(), - PartitionID: info.GetPartitionID(), + SegmentID: info.GetID(), + FieldBinlogs: nil, + Field2StatslogPaths: info.GetStatslogs(), + Deltalogs: nil, + InsertChannel: info.GetInsertChannel(), + Level: info.GetLevel(), + CollectionID: info.GetCollectionID(), + PartitionID: info.GetPartitionID(), } }) @@ -407,8 +404,8 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { } // execCompactionPlan start to execute plan and return immediately -func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { - return c.enqueuePlan(signal, plan) +func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) { + c.enqueuePlan(signal, plan) } func (c *compactionPlanHandler) setSegmentsCompacting(plan *datapb.CompactionPlan, compacting bool) { @@ -483,25 +480,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact log.Info("meta has already been changed, skip meta change and retry sync segments") } else { // Also prepare metric updates. - newSegments, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result) + _, metricMutation, err := c.meta.CompleteCompactionMutation(plan, result) if err != nil { return err } // Apply metrics after successful meta update. metricMutation.commit() - newSegmentInfo = newSegments[0] } nodeID := c.plans[plan.GetPlanID()].dataNodeID req := &datapb.SyncSegmentsRequest{ - PlanID: plan.PlanID, - CompactedTo: newSegmentInfo.GetID(), - CompactedFrom: newSegmentInfo.GetCompactionFrom(), - NumOfRows: newSegmentInfo.GetNumOfRows(), - StatsLogs: newSegmentInfo.GetStatslogs(), - ChannelName: plan.GetChannel(), - PartitionId: newSegmentInfo.GetPartitionID(), - CollectionId: newSegmentInfo.GetCollectionID(), + PlanID: plan.PlanID, } log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID)) @@ -633,8 +622,7 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // without changing the meta log.Info("compaction syncing unknown plan with node") if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{ - PlanID: planID, - ChannelName: plan.GetChannel(), + PlanID: planID, }); err != nil { log.Warn("compaction failed to sync segments with node", zap.Error(err)) return err diff --git a/internal/datacoord/compaction_scheduler.go b/internal/datacoord/compaction_scheduler.go index 5e592d5e3033f..745a9d40ff7f6 100644 --- a/internal/datacoord/compaction_scheduler.go +++ b/internal/datacoord/compaction_scheduler.go @@ -64,75 +64,64 @@ func (s *CompactionScheduler) Submit(tasks ...*compactionTask) { // Schedule pick 1 or 0 tasks for 1 node func (s *CompactionScheduler) Schedule() []*compactionTask { - s.taskGuard.Lock() - nodeTasks := lo.GroupBy(s.queuingTasks, func(t *compactionTask) int64 { - return t.dataNodeID - }) - s.taskGuard.Unlock() - if len(nodeTasks) == 0 { + s.taskGuard.RLock() + if len(s.queuingTasks) == 0 { + s.taskGuard.RUnlock() return nil // To mitigate the need for frequent slot querying } + s.taskGuard.RUnlock() nodeSlots := s.cluster.QuerySlots() - executable := make(map[int64]*compactionTask) + l0ChannelExcludes := typeutil.NewSet[string]() + mixChannelExcludes := typeutil.NewSet[string]() - pickPriorPolicy := func(tasks []*compactionTask, exclusiveChannels []string, executing []string) *compactionTask { - for _, task := range tasks { - // TODO: sheep, replace pickShardNode with pickAnyNode - if nodeID := s.pickShardNode(task.dataNodeID, nodeSlots); nodeID == NullNodeID { - log.Warn("cannot find datanode for compaction task", zap.Int64("planID", task.plan.PlanID), zap.String("vchannel", task.plan.Channel)) - continue + for _, tasks := range s.parallelTasks { + for _, t := range tasks { + switch t.plan.GetType() { + case datapb.CompactionType_Level0DeleteCompaction: + l0ChannelExcludes.Insert(t.plan.GetChannel()) + case datapb.CompactionType_MixCompaction: + mixChannelExcludes.Insert(t.plan.GetChannel()) } - - if lo.Contains(exclusiveChannels, task.plan.GetChannel()) { - continue - } - - if task.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { - // Channel of LevelZeroCompaction task with no executing compactions - if !lo.Contains(executing, task.plan.GetChannel()) { - return task - } - - // Don't schedule any tasks for channel with LevelZeroCompaction task - // when there're executing compactions - exclusiveChannels = append(exclusiveChannels, task.plan.GetChannel()) - continue - } - - return task } - - return nil } s.taskGuard.Lock() defer s.taskGuard.Unlock() - // pick 1 or 0 task for 1 node - for node, tasks := range nodeTasks { - parallel := s.parallelTasks[node] - - var ( - executing = typeutil.NewSet[string]() - channelsExecPrior = typeutil.NewSet[string]() - ) - for _, t := range parallel { - executing.Insert(t.plan.GetChannel()) - if t.plan.GetType() == datapb.CompactionType_Level0DeleteCompaction { - channelsExecPrior.Insert(t.plan.GetChannel()) - } - } - picked := pickPriorPolicy(tasks, channelsExecPrior.Collect(), executing.Collect()) - if picked != nil { - executable[node] = picked - nodeSlots[node]-- + picked := make([]*compactionTask, 0) + for _, t := range s.queuingTasks { + nodeID := s.pickAnyNode(nodeSlots) + if nodeID == NullNodeID { + log.Warn("cannot find datanode for compaction task", + zap.Int64("planID", t.plan.PlanID), zap.String("vchannel", t.plan.Channel)) + continue + } + switch t.plan.GetType() { + case datapb.CompactionType_Level0DeleteCompaction: + if l0ChannelExcludes.Contain(t.plan.GetChannel()) || + mixChannelExcludes.Contain(t.plan.GetChannel()) { + continue + } + t.dataNodeID = nodeID + picked = append(picked, t) + l0ChannelExcludes.Insert(t.plan.GetChannel()) + nodeSlots[nodeID]-- + case datapb.CompactionType_MixCompaction: + if l0ChannelExcludes.Contain(t.plan.GetChannel()) { + continue + } + t.dataNodeID = nodeID + picked = append(picked, t) + mixChannelExcludes.Insert(t.plan.GetChannel()) + nodeSlots[nodeID]-- } } var pickPlans []int64 - for node, task := range executable { + for _, task := range picked { + node := task.dataNodeID pickPlans = append(pickPlans, task.plan.PlanID) if _, ok := s.parallelTasks[node]; !ok { s.parallelTasks[node] = []*compactionTask{task} @@ -156,7 +145,7 @@ func (s *CompactionScheduler) Schedule() []*compactionTask { } } - return lo.Values(executable) + return picked } func (s *CompactionScheduler) Finish(nodeID UniqueID, plan *datapb.CompactionPlan) { diff --git a/internal/datacoord/compaction_scheduler_test.go b/internal/datacoord/compaction_scheduler_test.go index 37f64f740b2f7..a9e30ec996a17 100644 --- a/internal/datacoord/compaction_scheduler_test.go +++ b/internal/datacoord/compaction_scheduler_test.go @@ -60,11 +60,11 @@ func (s *SchedulerSuite) TestScheduleParallelTaskFull() { }{ {"with L0 tasks", []*compactionTask{ {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}}, - {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, }, []UniqueID{}}, {"without L0 tasks", []*compactionTask{ - {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MinorCompaction}}, - {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_MixCompaction}}, + {dataNodeID: 100, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, }, []UniqueID{}}, {"empty tasks", []*compactionTask{}, []UniqueID{}}, } @@ -101,16 +101,16 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() { }{ {"with L0 tasks diff channel", []*compactionTask{ {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}}, - {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, - }, []UniqueID{10}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, + }, []UniqueID{10, 11}}, {"with L0 tasks same channel", []*compactionTask{ {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-2", Type: datapb.CompactionType_Level0DeleteCompaction}}, - {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, }, []UniqueID{11}}, {"without L0 tasks", []*compactionTask{ - {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MinorCompaction}}, - {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, - }, []UniqueID{14}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-2", Type: datapb.CompactionType_MixCompaction}}, + {dataNodeID: 101, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, + }, []UniqueID{14, 13}}, {"empty tasks", []*compactionTask{}, []UniqueID{}}, } @@ -134,15 +134,6 @@ func (s *SchedulerSuite) TestScheduleNodeWith1ParallelTask() { return t.plan.PlanID })) - // the second schedule returns empty for no slot - if len(test.tasks) > 0 { - cluster := NewMockCluster(s.T()) - cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0}) - s.scheduler.cluster = cluster - } - gotTasks = s.scheduler.Schedule() - s.Empty(gotTasks) - s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount()) }) } @@ -158,16 +149,16 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() { }{ {"with L0 tasks diff channel", []*compactionTask{ {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-10", Type: datapb.CompactionType_Level0DeleteCompaction}}, - {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, - }, []UniqueID{10}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, + }, []UniqueID{10, 11}}, {"with L0 tasks same channel", []*compactionTask{ {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 10, Channel: "ch-3", Type: datapb.CompactionType_Level0DeleteCompaction}}, - {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, - {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 11, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction}}, }, []UniqueID{11}}, {"without L0 tasks", []*compactionTask{ - {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MinorCompaction}}, - {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MinorCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 14, Channel: "ch-3", Type: datapb.CompactionType_MixCompaction}}, + {dataNodeID: 102, plan: &datapb.CompactionPlan{PlanID: 13, Channel: "ch-11", Type: datapb.CompactionType_MixCompaction}}, }, []UniqueID{13}}, {"empty tasks", []*compactionTask{}, []UniqueID{}}, } @@ -192,17 +183,6 @@ func (s *SchedulerSuite) TestScheduleNodeWithL0Executing() { return t.plan.PlanID })) - // the second schedule returns empty for no slot - if len(test.tasks) > 0 { - cluster := NewMockCluster(s.T()) - cluster.EXPECT().QuerySlots().Return(map[int64]int64{101: 0}) - s.scheduler.cluster = cluster - } - if len(gotTasks) > 0 { - gotTasks = s.scheduler.Schedule() - s.Empty(gotTasks) - } - s.Equal(4+len(test.tasks), s.scheduler.GetTaskCount()) }) } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 0936e7f8adf24..879dfdbbbb9a9 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -431,43 +431,22 @@ func (s *CompactionPlanHandlerSuite) TestRefreshPlanMixCompaction() { } func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { - s.mockCm.EXPECT().FindWatcher(mock.Anything).RunAndReturn(func(channel string) (int64, error) { - if channel == "ch-1" { - return 0, errors.Errorf("mock error for ch-1") - } - - return 1, nil - }).Twice() s.mockSch.EXPECT().Submit(mock.Anything).Return().Once() - tests := []struct { - description string - channel string - hasError bool - }{ - {"channel with error", "ch-1", true}, - {"channel with no error", "ch-2", false}, - } - handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.scheduler = s.mockSch - for idx, test := range tests { - sig := &compactionSignal{id: int64(idx)} - plan := &datapb.CompactionPlan{ - PlanID: int64(idx), - } - s.Run(test.description, func() { - plan.Channel = test.channel - - err := handler.execCompactionPlan(sig, plan) - if test.hasError { - s.Error(err) - } else { - s.NoError(err) - } - }) + sig := &compactionSignal{id: int64(1)} + plan := &datapb.CompactionPlan{ + PlanID: int64(1), } + plan.Channel = "ch-1" + + handler.execCompactionPlan(sig, plan) + handler.mu.RLock() + defer handler.mu.RUnlock() + _, ok := handler.plans[int64(1)] + s.True(ok) } func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index b6ff595887fa5..f890f6f850bd3 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -430,23 +430,14 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { break } start := time.Now() - if err := fillOriginPlan(t.allocator, plan); err != nil { + if err := fillOriginPlan(coll.Schema, t.allocator, plan); err != nil { log.Warn("failed to fill plan", zap.Int64("collectionID", signal.collectionID), zap.Int64s("segmentIDs", segIDs), zap.Error(err)) continue } - err := t.compactionHandler.execCompactionPlan(signal, plan) - if err != nil { - log.Warn("failed to execute compaction plan", - zap.Int64("collectionID", signal.collectionID), - zap.Int64("planID", plan.PlanID), - zap.Int64s("segmentIDs", segIDs), - zap.Error(err)) - continue - } - + t.compactionHandler.execCompactionPlan(signal, plan) log.Info("time cost of generating global compaction", zap.Int64("planID", plan.PlanID), zap.Int64("time cost", time.Since(start).Milliseconds()), @@ -530,18 +521,11 @@ func (t *compactionTrigger) handleSignal(signal *compactionSignal) { break } start := time.Now() - if err := fillOriginPlan(t.allocator, plan); err != nil { + if err := fillOriginPlan(coll.Schema, t.allocator, plan); err != nil { log.Warn("failed to fill plan", zap.Error(err)) continue } - if err := t.compactionHandler.execCompactionPlan(signal, plan); err != nil { - log.Warn("failed to execute compaction plan", - zap.Int64("collection", signal.collectionID), - zap.Int64("planID", plan.PlanID), - zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs())), - zap.Error(err)) - continue - } + t.compactionHandler.execCompactionPlan(signal, plan) log.Info("time cost of generating compaction", zap.Int64("planID", plan.PlanID), zap.Int64("time cost", time.Since(start).Milliseconds()), @@ -713,6 +697,7 @@ func segmentsToPlan(segments []*SegmentInfo, compactTime *compactTime) *datapb.C } log.Info("generate a plan for priority candidates", zap.Any("plan", plan), + zap.Int("len(segments)", len(plan.GetSegmentBinlogs())), zap.Int64("target segment row", plan.TotalRows), zap.Int64("target segment size", size)) return plan } diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 19d4146a65e14..56710ed80efee 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -51,9 +51,8 @@ var _ compactionPlanContext = (*spyCompactionHandler)(nil) func (h *spyCompactionHandler) removeTasksByChannel(channel string) {} // execCompactionPlan start to execute plan and return immediately -func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { +func (h *spyCompactionHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) { h.spyChan <- plan - return nil } // completeCompaction record the result of a compaction @@ -106,6 +105,22 @@ func Test_compactionTrigger_force(t *testing.T) { vecFieldID := int64(201) indexID := int64(1001) + + schema := &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: vecFieldID, + DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + { + Key: common.DimKey, + Value: "128", + }, + }, + }, + }, + } + tests := []struct { name string fields fields @@ -292,21 +307,8 @@ func Test_compactionTrigger_force(t *testing.T) { }, collections: map[int64]*collectionInfo{ 2: { - ID: 2, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: vecFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", - }, - }, - }, - }, - }, + ID: 2, + Schema: schema, Properties: map[string]string{ common.CollectionTTLConfigKey: "0", }, @@ -469,6 +471,7 @@ func Test_compactionTrigger_force(t *testing.T) { Type: datapb.CompactionType_MixCompaction, Channel: "ch1", TotalRows: 200, + Schema: schema, }, }, }, @@ -2386,7 +2389,7 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { }, }, }, nil) - s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return() tr.handleSignal(&compactionSignal{ segmentID: 1, collectionID: s.collectionID, @@ -2517,7 +2520,7 @@ func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { common.CollectionAutoCompactionKey: "false", }, }, nil) - s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil) + s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return() tr.handleGlobalSignal(&compactionSignal{ segmentID: 1, collectionID: s.collectionID, diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 1ba9c1d9ef4aa..e1678e9f97e66 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -2,10 +2,12 @@ package datacoord import ( "context" + "time" "github.com/samber/lo" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" ) @@ -33,16 +35,18 @@ type TriggerManager interface { // 2. SystemIDLE & schedulerIDLE // 3. Manual Compaction type CompactionTriggerManager struct { - scheduler Scheduler - handler compactionPlanContext // TODO replace with scheduler + scheduler Scheduler + handler Handler + compactionHandler compactionPlanContext // TODO replace with scheduler allocator allocator } -func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext) *CompactionTriggerManager { +func NewCompactionTriggerManager(alloc allocator, handler Handler, compactionHandler compactionPlanContext) *CompactionTriggerManager { m := &CompactionTriggerManager{ - allocator: alloc, - handler: handler, + allocator: alloc, + handler: handler, + compactionHandler: compactionHandler, } return m @@ -51,7 +55,7 @@ func NewCompactionTriggerManager(alloc allocator, handler compactionPlanContext) func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionTriggerType, views []CompactionView) { log := log.With(zap.Int64("taskID", taskID)) for _, view := range views { - if m.handler.isFull() { + if m.compactionHandler.isFull() { log.RatedInfo(1.0, "Skip trigger compaction for scheduler is full") return } @@ -103,7 +107,7 @@ func (m *CompactionTriggerManager) SubmitL0ViewToScheduler(taskID int64, outView // TODO, remove handler, use scheduler // m.scheduler.Submit(plan) - m.handler.execCompactionPlan(signal, plan) + m.compactionHandler.execCompactionPlan(signal, plan) log.Info("Finish to submit a LevelZeroCompaction plan", zap.Int64("taskID", taskID), zap.Int64("planID", plan.GetPlanID()), @@ -130,7 +134,14 @@ func (m *CompactionTriggerManager) buildL0CompactionPlan(view CompactionView) *d Channel: view.GetGroupLabel().Channel, } - if err := fillOriginPlan(m.allocator, plan); err != nil { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + collection, err := m.handler.GetCollection(ctx, view.GetGroupLabel().CollectionID) + if err != nil { + return nil + } + + if err := fillOriginPlan(collection.Schema, m.allocator, plan); err != nil { return nil } @@ -145,14 +156,16 @@ type chanPartSegments struct { segments []*SegmentInfo } -func fillOriginPlan(alloc allocator, plan *datapb.CompactionPlan) error { - // TODO context - id, err := alloc.allocID(context.TODO()) +func fillOriginPlan(schema *schemapb.CollectionSchema, alloc allocator, plan *datapb.CompactionPlan) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + id, err := alloc.allocID(ctx) if err != nil { return err } plan.PlanID = id plan.TimeoutInSeconds = Params.DataCoordCfg.CompactionTimeoutInSeconds.GetAsInt32() + plan.Schema = schema return nil } diff --git a/internal/datacoord/compaction_trigger_v2_test.go b/internal/datacoord/compaction_trigger_v2_test.go index 3176e04a0bd08..3b66ac944c822 100644 --- a/internal/datacoord/compaction_trigger_v2_test.go +++ b/internal/datacoord/compaction_trigger_v2_test.go @@ -20,6 +20,7 @@ type CompactionTriggerManagerSuite struct { suite.Suite mockAlloc *NMockAllocator + handler Handler mockPlanContext *MockCompactionPlanContext testLabel *CompactionGroupLabel meta *meta @@ -29,6 +30,7 @@ type CompactionTriggerManagerSuite struct { func (s *CompactionTriggerManagerSuite) SetupTest() { s.mockAlloc = NewNMockAllocator(s.T()) + s.handler = NewNMockHandler(s.T()) s.mockPlanContext = NewMockCompactionPlanContext(s.T()) s.testLabel = &CompactionGroupLabel{ @@ -42,7 +44,7 @@ func (s *CompactionTriggerManagerSuite) SetupTest() { s.meta.segments.SetSegment(id, segment) } - s.m = NewCompactionTriggerManager(s.mockAlloc, s.mockPlanContext) + s.m = NewCompactionTriggerManager(s.mockAlloc, s.handler, s.mockPlanContext) } func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() { @@ -73,6 +75,10 @@ func (s *CompactionTriggerManagerSuite) TestNotifyToFullScheduler() { } func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { + handler := NewNMockHandler(s.T()) + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil) + s.m.handler = handler + viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator) collSegs := s.meta.GetCompactableSegmentGroupByCollection() @@ -120,12 +126,16 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewIDLE() { s.ElementsMatch(expectedSegs, gotSegs) log.Info("generated plan", zap.Any("plan", plan)) - }).Return(nil).Once() + }).Return().Once() s.m.Notify(19530, TriggerTypeLevelZeroViewIDLE, levelZeroView) } func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { + handler := NewNMockHandler(s.T()) + handler.EXPECT().GetCollection(mock.Anything, mock.Anything).Return(&collectionInfo{}, nil) + s.m.handler = handler + viewManager := NewCompactionViewManager(s.meta, s.m, s.m.allocator) collSegs := s.meta.GetCompactableSegmentGroupByCollection() @@ -168,7 +178,7 @@ func (s *CompactionTriggerManagerSuite) TestNotifyByViewChange() { s.ElementsMatch(expectedSegs, gotSegs) log.Info("generated plan", zap.Any("plan", plan)) - }).Return(nil).Once() + }).Return().Once() s.m.Notify(19530, TriggerTypeLevelZeroViewChange, levelZeroView) } diff --git a/internal/datacoord/mock_compaction_plan_context.go b/internal/datacoord/mock_compaction_plan_context.go index b22041fb7f169..3b399474afe23 100644 --- a/internal/datacoord/mock_compaction_plan_context.go +++ b/internal/datacoord/mock_compaction_plan_context.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.30.1. DO NOT EDIT. package datacoord @@ -21,17 +21,8 @@ func (_m *MockCompactionPlanContext) EXPECT() *MockCompactionPlanContext_Expecte } // execCompactionPlan provides a mock function with given fields: signal, plan -func (_m *MockCompactionPlanContext) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { - ret := _m.Called(signal, plan) - - var r0 error - if rf, ok := ret.Get(0).(func(*compactionSignal, *datapb.CompactionPlan) error); ok { - r0 = rf(signal, plan) - } else { - r0 = ret.Error(0) - } - - return r0 +func (_m *MockCompactionPlanContext) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) { + _m.Called(signal, plan) } // MockCompactionPlanContext_execCompactionPlan_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'execCompactionPlan' @@ -40,8 +31,8 @@ type MockCompactionPlanContext_execCompactionPlan_Call struct { } // execCompactionPlan is a helper method to define mock.On call -// - signal *compactionSignal -// - plan *datapb.CompactionPlan +// - signal *compactionSignal +// - plan *datapb.CompactionPlan func (_e *MockCompactionPlanContext_Expecter) execCompactionPlan(signal interface{}, plan interface{}) *MockCompactionPlanContext_execCompactionPlan_Call { return &MockCompactionPlanContext_execCompactionPlan_Call{Call: _e.mock.On("execCompactionPlan", signal, plan)} } @@ -53,12 +44,12 @@ func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Run(run func(signal return _c } -func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Return(_a0 error) *MockCompactionPlanContext_execCompactionPlan_Call { - _c.Call.Return(_a0) +func (_c *MockCompactionPlanContext_execCompactionPlan_Call) Return() *MockCompactionPlanContext_execCompactionPlan_Call { + _c.Call.Return() return _c } -func (_c *MockCompactionPlanContext_execCompactionPlan_Call) RunAndReturn(run func(*compactionSignal, *datapb.CompactionPlan) error) *MockCompactionPlanContext_execCompactionPlan_Call { +func (_c *MockCompactionPlanContext_execCompactionPlan_Call) RunAndReturn(run func(*compactionSignal, *datapb.CompactionPlan)) *MockCompactionPlanContext_execCompactionPlan_Call { _c.Call.Return(run) return _c } @@ -85,7 +76,7 @@ type MockCompactionPlanContext_getCompaction_Call struct { } // getCompaction is a helper method to define mock.On call -// - planID int64 +// - planID int64 func (_e *MockCompactionPlanContext_Expecter) getCompaction(planID interface{}) *MockCompactionPlanContext_getCompaction_Call { return &MockCompactionPlanContext_getCompaction_Call{Call: _e.mock.On("getCompaction", planID)} } @@ -129,7 +120,7 @@ type MockCompactionPlanContext_getCompactionTasksBySignalID_Call struct { } // getCompactionTasksBySignalID is a helper method to define mock.On call -// - signalID int64 +// - signalID int64 func (_e *MockCompactionPlanContext_Expecter) getCompactionTasksBySignalID(signalID interface{}) *MockCompactionPlanContext_getCompactionTasksBySignalID_Call { return &MockCompactionPlanContext_getCompactionTasksBySignalID_Call{Call: _e.mock.On("getCompactionTasksBySignalID", signalID)} } @@ -203,7 +194,7 @@ type MockCompactionPlanContext_removeTasksByChannel_Call struct { } // removeTasksByChannel is a helper method to define mock.On call -// - channel string +// - channel string func (_e *MockCompactionPlanContext_Expecter) removeTasksByChannel(channel interface{}) *MockCompactionPlanContext_removeTasksByChannel_Call { return &MockCompactionPlanContext_removeTasksByChannel_Call{Call: _e.mock.On("removeTasksByChannel", channel)} } @@ -309,7 +300,7 @@ type MockCompactionPlanContext_updateCompaction_Call struct { } // updateCompaction is a helper method to define mock.On call -// - ts uint64 +// - ts uint64 func (_e *MockCompactionPlanContext_Expecter) updateCompaction(ts interface{}) *MockCompactionPlanContext_updateCompaction_Call { return &MockCompactionPlanContext_updateCompaction_Call{Call: _e.mock.On("updateCompaction", ts)} } diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 50ccc8d37ca58..85c6535637d41 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -524,7 +524,7 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ func (s *Server) createCompactionHandler() { s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator) - triggerv2 := NewCompactionTriggerManager(s.allocator, s.compactionHandler) + triggerv2 := NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler) s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator) } diff --git a/internal/datanode/compaction/compactor.go b/internal/datanode/compaction/compactor.go index da57562d93e28..825723a98fd52 100644 --- a/internal/datanode/compaction/compactor.go +++ b/internal/datanode/compaction/compactor.go @@ -21,10 +21,10 @@ import ( "github.com/milvus-io/milvus/pkg/util/typeutil" ) +//go:generate mockery --name=Compactor --structname=MockCompactor --output=./ --filename=mock_compactor.go --with-expecter --inpackage type Compactor interface { Complete() Compact() (*datapb.CompactionPlanResult, error) - InjectDone() Stop() GetPlanID() typeutil.UniqueID GetCollection() typeutil.UniqueID diff --git a/internal/datanode/compaction/mix_compactor.go b/internal/datanode/compaction/mix_compactor.go index da18de0f82fa8..928fff81ed248 100644 --- a/internal/datanode/compaction/mix_compactor.go +++ b/internal/datanode/compaction/mix_compactor.go @@ -21,7 +21,6 @@ import ( "fmt" sio "io" "strconv" - "sync" "time" "github.com/cockroachdb/errors" @@ -33,15 +32,12 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/io" iter "github.com/milvus-io/milvus/internal/datanode/iterators" - "github.com/milvus-io/milvus/internal/datanode/metacache" - "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/util/funcutil" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/timerecord" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -51,9 +47,6 @@ import ( // for MixCompaction only type mixCompactionTask struct { binlogIO io.BinlogIO - Compactor - metaCache metacache.MetaCache - syncMgr syncmgr.SyncManager allocator.Allocator currentTs typeutil.Timestamp @@ -62,9 +55,8 @@ type mixCompactionTask struct { ctx context.Context cancel context.CancelFunc - injectDoneOnce sync.Once - done chan struct{} - tr *timerecord.TimeRecorder + done chan struct{} + tr *timerecord.TimeRecorder } // make sure compactionTask implements compactor interface @@ -73,8 +65,6 @@ var _ Compactor = (*mixCompactionTask)(nil) func NewMixCompactionTask( ctx context.Context, binlogIO io.BinlogIO, - metaCache metacache.MetaCache, - syncMgr syncmgr.SyncManager, alloc allocator.Allocator, plan *datapb.CompactionPlan, ) *mixCompactionTask { @@ -83,8 +73,6 @@ func NewMixCompactionTask( ctx: ctx1, cancel: cancel, binlogIO: binlogIO, - syncMgr: syncMgr, - metaCache: metaCache, Allocator: alloc, plan: plan, tr: timerecord.NewTimeRecorder("mix compaction"), @@ -100,7 +88,6 @@ func (t *mixCompactionTask) Complete() { func (t *mixCompactionTask) Stop() { t.cancel() <-t.done - t.InjectDone() } func (t *mixCompactionTask) GetPlanID() typeutil.UniqueID { @@ -112,18 +99,16 @@ func (t *mixCompactionTask) GetChannelName() string { } // return num rows of all segment compaction from -func (t *mixCompactionTask) getNumRows() (int64, error) { +func (t *mixCompactionTask) getNumRows() int64 { numRows := int64(0) for _, binlog := range t.plan.SegmentBinlogs { - seg, ok := t.metaCache.GetSegmentByID(binlog.GetSegmentID()) - if !ok { - return 0, merr.WrapErrSegmentNotFound(binlog.GetSegmentID(), "get compaction segments num rows failed") + if len(binlog.GetFieldBinlogs()) > 0 { + for _, ct := range binlog.GetFieldBinlogs()[0].GetBinlogs() { + numRows += ct.GetEntriesNum() + } } - - numRows += seg.NumOfRows() } - - return numRows, nil + return numRows } func (t *mixCompactionTask) mergeDeltalogs(ctx context.Context, dpaths map[typeutil.UniqueID][]string) (map[interface{}]typeutil.Timestamp, error) { @@ -417,7 +402,19 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, fmt.Sprintf("MixCompact-%d", t.GetPlanID())) defer span.End() - log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID()), zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) + if len(t.plan.GetSegmentBinlogs()) < 1 { + log.Warn("compact wrong, there's no segments in segment binlogs", zap.Int64("planID", t.plan.GetPlanID())) + return nil, errors.New("compaction plan is illegal") + } + + collectionID := t.plan.GetSegmentBinlogs()[0].GetCollectionID() + partitionID := t.plan.GetSegmentBinlogs()[0].GetPartitionID() + + log := log.Ctx(ctx).With(zap.Int64("planID", t.plan.GetPlanID()), + zap.Int64("collectionID", collectionID), + zap.Int64("partitionID", partitionID), + zap.Int32("timeout in seconds", t.plan.GetTimeoutInSeconds())) + if ok := funcutil.CheckCtxValid(ctx); !ok { log.Warn("compact wrong, task context done or timeout") return nil, ctx.Err() @@ -427,10 +424,6 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { defer cancelAll() log.Info("compact start") - if len(t.plan.GetSegmentBinlogs()) < 1 { - log.Warn("compact wrong, there's no segments in segment binlogs") - return nil, errors.New("compaction plan is illegal") - } targetSegID, err := t.AllocOne() if err != nil { @@ -438,15 +431,9 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { return nil, err } - previousRowCount, err := t.getNumRows() - if err != nil { - log.Warn("compact wrong, unable to get previous numRows", zap.Error(err)) - return nil, err - } - - partID := t.plan.GetSegmentBinlogs()[0].GetPartitionID() + previousRowCount := t.getNumRows() - writer, err := NewSegmentWriter(t.metaCache.Schema(), previousRowCount, targetSegID, partID, t.metaCache.Collection()) + writer, err := NewSegmentWriter(t.plan.GetSchema(), previousRowCount, targetSegID, partitionID, collectionID) if err != nil { log.Warn("compact wrong, unable to init segment writer", zap.Error(err)) return nil, err @@ -455,12 +442,6 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { segIDs := lo.Map(t.plan.GetSegmentBinlogs(), func(binlogs *datapb.CompactionSegmentBinlogs, _ int) int64 { return binlogs.GetSegmentID() }) - // Inject to stop flush - // when compaction failed, these segments need to be Unblocked by injectDone in compaction_executor - // when compaction succeeded, these segments will be Unblocked by SyncSegments from DataCoord. - for _, segID := range segIDs { - t.syncMgr.Block(segID) - } if err := binlog.DecompressCompactionBinlogs(t.plan.GetSegmentBinlogs()); err != nil { log.Warn("compact wrong, fail to decompress compaction binlogs", zap.Error(err)) @@ -541,16 +522,9 @@ func (t *mixCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { return planResult, nil } -func (t *mixCompactionTask) InjectDone() { - t.injectDoneOnce.Do(func() { - for _, binlog := range t.plan.SegmentBinlogs { - t.syncMgr.Unblock(binlog.SegmentID) - } - }) -} - func (t *mixCompactionTask) GetCollection() typeutil.UniqueID { - return t.metaCache.Collection() + // The length of SegmentBinlogs is checked before task enqueueing. + return t.plan.GetSegmentBinlogs()[0].GetCollectionID() } func (t *mixCompactionTask) isExpiredEntity(ts typeutil.Timestamp) bool { diff --git a/internal/datanode/compaction/mix_compactor_test.go b/internal/datanode/compaction/mix_compactor_test.go index 6ca701bedd91e..cea2c0b6fe16a 100644 --- a/internal/datanode/compaction/mix_compactor_test.go +++ b/internal/datanode/compaction/mix_compactor_test.go @@ -32,12 +32,10 @@ import ( "github.com/milvus-io/milvus/internal/datanode/allocator" "github.com/milvus-io/milvus/internal/datanode/io" "github.com/milvus-io/milvus/internal/datanode/metacache" - "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" - "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -54,8 +52,6 @@ type MixCompactionTaskSuite struct { mockBinlogIO *io.MockBinlogIO mockAlloc *allocator.MockAllocator - mockMeta *metacache.MockMetaCache - mockSyncMgr *syncmgr.MockSyncManager meta *etcdpb.CollectionMeta segWriter *SegmentWriter @@ -71,10 +67,8 @@ func (s *MixCompactionTaskSuite) SetupSuite() { func (s *MixCompactionTaskSuite) SetupTest() { s.mockBinlogIO = io.NewMockBinlogIO(s.T()) s.mockAlloc = allocator.NewMockAllocator(s.T()) - s.mockMeta = metacache.NewMockMetaCache(s.T()) - s.mockSyncMgr = syncmgr.NewMockSyncManager(s.T()) - s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.mockMeta, s.mockSyncMgr, s.mockAlloc, nil) + s.task = NewMixCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil) s.meta = genTestCollectionMeta() @@ -90,6 +84,7 @@ func (s *MixCompactionTaskSuite) SetupTest() { }}, TimeoutInSeconds: 10, Type: datapb.CompactionType_MixCompaction, + Schema: s.meta.GetSchema(), } s.task.plan = s.plan } @@ -106,26 +101,10 @@ func getMilvusBirthday() time.Time { return time.Date(2019, time.Month(5), 30, 0, 0, 0, 0, time.UTC) } -func (s *MixCompactionTaskSuite) TestInjectDone() { - segmentIDs := []int64{100, 200, 300} - s.task.plan.SegmentBinlogs = lo.Map(segmentIDs, func(id int64, _ int) *datapb.CompactionSegmentBinlogs { - return &datapb.CompactionSegmentBinlogs{SegmentID: id} - }) - - for _, segmentID := range segmentIDs { - s.mockSyncMgr.EXPECT().Unblock(segmentID).Return().Once() - } - - s.task.InjectDone() - s.task.InjectDone() -} - func (s *MixCompactionTaskSuite) TestCompactDupPK() { // Test merge compactions, two segments with the same pk, one deletion pk=1 // The merged segment 19530 should remain 3 pk without pk=100 s.mockAlloc.EXPECT().AllocOne().Return(int64(19530), nil).Twice() - s.mockMeta.EXPECT().Schema().Return(s.meta.GetSchema()).Once() - s.mockMeta.EXPECT().Collection().Return(CollectionID).Once() segments := []int64{7, 8, 9} dblobs, err := getInt64DeltaBlobs( 1, @@ -153,12 +132,12 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { s.segWriter.writer.Flush() s.Require().NoError(err) - statistic := &storage.PkStatistics{ - PkFilter: s.segWriter.pkstats.BF, - MinPK: s.segWriter.pkstats.MinPk, - MaxPK: s.segWriter.pkstats.MaxPk, - } - bfs := metacache.NewBloomFilterSet(statistic) + //statistic := &storage.PkStatistics{ + // PkFilter: s.segWriter.pkstats.BF, + // MinPK: s.segWriter.pkstats.MinPk, + // MaxPK: s.segWriter.pkstats.MaxPk, + //} + //bfs := metacache.NewBloomFilterSet(statistic) kvs, fBinlogs, err := s.task.serializeWrite(context.TODO(), s.segWriter) s.Require().NoError(err) @@ -167,17 +146,12 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { return len(left) == 0 && len(right) == 0 })).Return(lo.Values(kvs), nil).Once() - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ - CollectionID: CollectionID, - PartitionID: PartitionID, - ID: segID, - NumOfRows: 1, - }, bfs) - - s.mockMeta.EXPECT().GetSegmentByID(segID).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { - return seg, true - }) - s.mockSyncMgr.EXPECT().Block(segID).Return().Once() + //seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + // CollectionID: CollectionID, + // PartitionID: PartitionID, + // ID: segID, + // NumOfRows: 1, + //}, bfs) s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, @@ -204,8 +178,6 @@ func (s *MixCompactionTaskSuite) TestCompactDupPK() { func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { s.mockAlloc.EXPECT().AllocOne().Return(int64(19530), nil).Twice() - s.mockMeta.EXPECT().Schema().Return(s.meta.GetSchema()).Once() - s.mockMeta.EXPECT().Collection().Return(CollectionID).Once() segments := []int64{5, 6, 7} s.mockAlloc.EXPECT().Alloc(mock.Anything).Return(7777777, 8888888, nil) @@ -213,12 +185,12 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { s.task.plan.SegmentBinlogs = make([]*datapb.CompactionSegmentBinlogs, 0) for _, segID := range segments { s.initSegBuffer(segID) - statistic := &storage.PkStatistics{ - PkFilter: s.segWriter.pkstats.BF, - MinPK: s.segWriter.pkstats.MinPk, - MaxPK: s.segWriter.pkstats.MaxPk, - } - bfs := metacache.NewBloomFilterSet(statistic) + //statistic := &storage.PkStatistics{ + // PkFilter: s.segWriter.pkstats.BF, + // MinPK: s.segWriter.pkstats.MinPk, + // MaxPK: s.segWriter.pkstats.MaxPk, + //} + //bfs := metacache.NewBloomFilterSet(statistic) kvs, fBinlogs, err := s.task.serializeWrite(context.TODO(), s.segWriter) s.Require().NoError(err) s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.MatchedBy(func(keys []string) bool { @@ -226,17 +198,12 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { return len(left) == 0 && len(right) == 0 })).Return(lo.Values(kvs), nil).Once() - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ - CollectionID: CollectionID, - PartitionID: PartitionID, - ID: segID, - NumOfRows: 1, - }, bfs) - - s.mockMeta.EXPECT().GetSegmentByID(segID).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { - return seg, true - }) - s.mockSyncMgr.EXPECT().Block(segID).Return().Once() + //seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ + // CollectionID: CollectionID, + // PartitionID: PartitionID, + // ID: segID, + // NumOfRows: 1, + //}, bfs) s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: segID, @@ -251,10 +218,7 @@ func (s *MixCompactionTaskSuite) TestCompactTwoToOne() { ID: 99999, NumOfRows: 0, }, metacache.NewBloomFilterSet()) - s.mockMeta.EXPECT().GetSegmentByID(seg.SegmentID()).RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { - return seg, true - }) - s.mockSyncMgr.EXPECT().Block(seg.SegmentID()).Return().Once() + s.plan.SegmentBinlogs = append(s.plan.SegmentBinlogs, &datapb.CompactionSegmentBinlogs{ SegmentID: seg.SegmentID(), }) @@ -531,15 +495,6 @@ func (s *MixCompactionTaskSuite) TestCompactFail() { _, err := s.task.Compact() s.Error(err) }) - - s.Run("Test getNumRows error", func() { - s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Once() - s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false) - - _, err := s.task.Compact() - s.Error(err) - s.ErrorIs(err, merr.ErrSegmentNotFound) - }) } func (s *MixCompactionTaskSuite) TestIsExpiredEntity() { diff --git a/internal/datanode/compaction_executor.go b/internal/datanode/compaction_executor.go index 938d1b5db0fd7..d8ac9fa423f83 100644 --- a/internal/datanode/compaction_executor.go +++ b/internal/datanode/compaction_executor.go @@ -70,12 +70,11 @@ func (c *compactionExecutor) toCompleteState(task compaction.Compactor) { c.executing.GetAndRemove(task.GetPlanID()) } -func (c *compactionExecutor) injectDone(planID UniqueID) { +func (c *compactionExecutor) removeTask(planID UniqueID) { c.completed.GetAndRemove(planID) task, loaded := c.completedCompactor.GetAndRemove(planID) if loaded { - log.Info("Compaction task inject done", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName())) - task.InjectDone() + log.Info("Compaction task removed", zap.Int64("planID", planID), zap.String("channel", task.GetChannelName())) } } @@ -110,12 +109,11 @@ func (c *compactionExecutor) executeTask(task compaction.Compactor) { result, err := task.Compact() if err != nil { - task.InjectDone() log.Warn("compaction task failed", zap.Error(err)) - } else { - c.completed.Insert(result.GetPlanID(), result) - c.completedCompactor.Insert(result.GetPlanID(), task) + return } + c.completed.Insert(result.GetPlanID(), result) + c.completedCompactor.Insert(result.GetPlanID(), task) log.Info("end to execute compaction") } @@ -152,7 +150,7 @@ func (c *compactionExecutor) discardPlan(channel string) { // remove all completed plans of channel c.completed.Range(func(planID int64, result *datapb.CompactionPlanResult) bool { if result.GetChannel() == channel { - c.injectDone(planID) + c.removeTask(planID) log.Info("remove compaction plan and results", zap.String("channel", channel), zap.Int64("planID", planID)) diff --git a/internal/datanode/l0_compactor.go b/internal/datanode/l0_compactor.go index 89157c407834b..f04cc280c5c41 100644 --- a/internal/datanode/l0_compactor.go +++ b/internal/datanode/l0_compactor.go @@ -34,7 +34,6 @@ import ( "github.com/milvus-io/milvus/internal/datanode/io" iter "github.com/milvus-io/milvus/internal/datanode/iterators" "github.com/milvus-io/milvus/internal/datanode/metacache" - "github.com/milvus-io/milvus/internal/datanode/syncmgr" "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/storage" @@ -52,12 +51,8 @@ import ( ) type levelZeroCompactionTask struct { - compaction.Compactor io.BinlogIO - allocator allocator.Allocator - metacache metacache.MetaCache - syncmgr syncmgr.SyncManager cm storage.ChunkManager plan *datapb.CompactionPlan @@ -76,8 +71,6 @@ func newLevelZeroCompactionTask( ctx context.Context, binlogIO io.BinlogIO, alloc allocator.Allocator, - metaCache metacache.MetaCache, - syncmgr syncmgr.SyncManager, cm storage.ChunkManager, plan *datapb.CompactionPlan, ) *levelZeroCompactionTask { @@ -88,8 +81,6 @@ func newLevelZeroCompactionTask( BinlogIO: binlogIO, allocator: alloc, - metacache: metaCache, - syncmgr: syncmgr, cm: cm, plan: plan, tr: timerecord.NewTimeRecorder("levelzero compaction"), @@ -115,12 +106,10 @@ func (t *levelZeroCompactionTask) GetChannelName() string { } func (t *levelZeroCompactionTask) GetCollection() int64 { - return t.metacache.Collection() + // The length of SegmentBinlogs is checked before task enqueueing. + return t.plan.GetSegmentBinlogs()[0].GetCollectionID() } -// Do nothing for levelzero compaction -func (t *levelZeroCompactionTask) InjectDone() {} - func (t *levelZeroCompactionTask) Compact() (*datapb.CompactionPlanResult, error) { ctx, span := otel.Tracer(typeutil.DataNodeRole).Start(t.ctx, "L0Compact") defer span.End() @@ -338,16 +327,20 @@ func (t *levelZeroCompactionTask) splitDelta( } func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storage.DeleteData) (map[string][]byte, *datapb.Binlog, error) { + segment, ok := lo.Find(t.plan.GetSegmentBinlogs(), func(segment *datapb.CompactionSegmentBinlogs) bool { + return segment.GetSegmentID() == segmentID + }) + if !ok { + return nil, nil, merr.WrapErrSegmentNotFound(segmentID, "cannot find segment in compaction plan") + } + var ( - collID = t.metacache.Collection() - uploadKv = make(map[string][]byte) + collectionID = segment.GetCollectionID() + partitionID = segment.GetPartitionID() + uploadKv = make(map[string][]byte) ) - seg, ok := t.metacache.GetSegmentByID(segmentID) - if !ok { - return nil, nil, merr.WrapErrSegmentLack(segmentID) - } - blob, err := storage.NewDeleteCodec().Serialize(collID, seg.PartitionID(), segmentID, dData) + blob, err := storage.NewDeleteCodec().Serialize(collectionID, partitionID, segmentID, dData) if err != nil { return nil, nil, err } @@ -357,7 +350,7 @@ func (t *levelZeroCompactionTask) composeDeltalog(segmentID int64, dData *storag return nil, nil, err } - blobKey := metautil.JoinIDPath(collID, seg.PartitionID(), segmentID, logID) + blobKey := metautil.JoinIDPath(collectionID, partitionID, segmentID, logID) blobPath := t.BinlogIO.JoinFullPath(common.SegmentDeltaLogPath, blobKey) uploadKv[blobPath] = blob.GetValue() @@ -447,7 +440,7 @@ func (t *levelZeroCompactionTask) loadBF(targetSegments []*datapb.CompactionSegm _ = binlog.DecompressBinLog(storage.StatsBinlog, segment.GetCollectionID(), segment.GetPartitionID(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) pks, err := loadStats(t.ctx, t.cm, - t.metacache.Schema(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) + t.plan.GetSchema(), segment.GetSegmentID(), segment.GetField2StatslogPaths()) if err != nil { log.Warn("failed to load segment stats log", zap.Error(err)) return err, err diff --git a/internal/datanode/l0_compactor_test.go b/internal/datanode/l0_compactor_test.go index 08df575433ace..9aad1fb685443 100644 --- a/internal/datanode/l0_compactor_test.go +++ b/internal/datanode/l0_compactor_test.go @@ -51,7 +51,6 @@ type LevelZeroCompactionTaskSuite struct { mockBinlogIO *io.MockBinlogIO mockAlloc *allocator.MockAllocator - mockMeta *metacache.MockMetaCache task *levelZeroCompactionTask dData *storage.DeleteData @@ -61,9 +60,8 @@ type LevelZeroCompactionTaskSuite struct { func (s *LevelZeroCompactionTaskSuite) SetupTest() { s.mockAlloc = allocator.NewMockAllocator(s.T()) s.mockBinlogIO = io.NewMockBinlogIO(s.T()) - s.mockMeta = metacache.NewMockMetaCache(s.T()) // plan of the task is unset - s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, s.mockMeta, nil, nil, nil) + s.task = newLevelZeroCompactionTask(context.Background(), s.mockBinlogIO, s.mockAlloc, nil, nil) pk2ts := map[int64]uint64{ 1: 20000, @@ -101,20 +99,19 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchLoadDeltaFail() { }, {SegmentID: 200, Level: datapb.SegmentLevel_L1}, }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + IsPrimaryKey: true, + }, + }, + }, } s.task.plan = plan s.task.tr = timerecord.NewTimeRecorder("test") s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return(nil, errors.New("mock download fail")).Twice() - s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - IsPrimaryKey: true, - }, - }, - }) - targetSegments := lo.Filter(plan.SegmentBinlogs, func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L1 }) @@ -154,6 +151,13 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() { }, }}, }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + IsPrimaryKey: true, + }, + }, + }, } s.task.plan = plan @@ -170,15 +174,9 @@ func (s *LevelZeroCompactionTaskSuite) TestLinearBatchUploadByCheckFail() { s.task.cm = cm s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Twice() - s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - IsPrimaryKey: true, - }, - }, - }) + mockAlloc := allocator.NewMockAllocator(s.T()) + mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc err")) + s.task.allocator = mockAlloc targetSegments := lo.Filter(plan.SegmentBinlogs, func(s *datapb.CompactionSegmentBinlogs, _ int) bool { return s.Level == datapb.SegmentLevel_L1 @@ -200,7 +198,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { Type: datapb.CompactionType_Level0DeleteCompaction, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ { - SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ + CollectionID: 1, + SegmentID: 100, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ {LogPath: "a/b/c1", LogSize: 100}, @@ -212,7 +211,8 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { }, }, { - SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ + CollectionID: 1, + SegmentID: 101, Level: datapb.SegmentLevel_L0, Deltalogs: []*datapb.FieldBinlog{ { Binlogs: []*datapb.Binlog{ {LogPath: "a/d/c1", LogSize: 100}, @@ -223,20 +223,33 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { }, }, }, - {SegmentID: 200, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {LogID: 9999, LogSize: 100}, + { + CollectionID: 1, + SegmentID: 200, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogID: 9999, LogSize: 100}, + }, }, }, - }}, - {SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{ - { - Binlogs: []*datapb.Binlog{ - {LogID: 9999, LogSize: 100}, + }, + { + CollectionID: 1, + SegmentID: 201, Level: datapb.SegmentLevel_L1, Field2StatslogPaths: []*datapb.FieldBinlog{ + { + Binlogs: []*datapb.Binlog{ + {LogID: 9999, LogSize: 100}, + }, }, }, - }}, + }, + }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + IsPrimaryKey: true, + }, + }, }, } @@ -254,18 +267,6 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactLinear() { s.task.cm = cm s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Times(2) - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything). - RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { - return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true - }) - s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - IsPrimaryKey: true, - }, - }, - }) s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything). @@ -357,6 +358,13 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { }, }}, }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + IsPrimaryKey: true, + }, + }, + }, } s.task.plan = plan @@ -373,18 +381,6 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { s.task.cm = cm s.mockBinlogIO.EXPECT().Download(mock.Anything, mock.Anything).Return([][]byte{s.dBlob}, nil).Once() - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT().GetSegmentByID(mock.Anything, mock.Anything). - RunAndReturn(func(id int64, filters ...metacache.SegmentFilter) (*metacache.SegmentInfo, bool) { - return metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: id, PartitionID: 10}, nil), true - }) - s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - IsPrimaryKey: true, - }, - }, - }) s.mockAlloc.EXPECT().AllocOne().Return(19530, nil).Times(2) s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything). @@ -430,11 +426,21 @@ func (s *LevelZeroCompactionTaskSuite) TestCompactBatch() { func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { ctx := context.Background() + + plan := &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, + }, + }, + } + s.Run("uploadByCheck directly composeDeltalog failed", func() { s.SetupTest() - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT().GetSegmentByID(mock.Anything).Return(nil, false).Once() - + s.task.plan = plan + mockAlloc := allocator.NewMockAllocator(s.T()) + mockAlloc.EXPECT().AllocOne().Return(0, errors.New("mock alloc err")) + s.task.allocator = mockAlloc segments := map[int64]*storage.DeleteData{100: s.dData} results := make(map[int64]*datapb.CompactionSegment) err := s.task.uploadByCheck(ctx, false, segments, results) @@ -444,13 +450,8 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { s.Run("uploadByCheck directly Upload failed", func() { s.SetupTest() + s.task.plan = plan s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(errors.New("mock upload failed")) - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT().GetSegmentByID( - mock.MatchedBy(func(ID int64) bool { - return ID == 100 - }), mock.Anything). - Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) blobKey := metautil.JoinIDPath(1, 10, 100, 19530) @@ -466,13 +467,8 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { s.Run("upload directly", func() { s.SetupTest() + s.task.plan = plan s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT().GetSegmentByID( - mock.MatchedBy(func(ID int64) bool { - return ID == 100 - }), mock.Anything). - Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) blobKey := metautil.JoinIDPath(1, 10, 100, 19530) @@ -503,16 +499,11 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { }) s.Run("check with upload", func() { + s.task.plan = plan blobKey := metautil.JoinIDPath(1, 10, 100, 19530) blobPath := path.Join(common.SegmentDeltaLogPath, blobKey) s.mockBinlogIO.EXPECT().Upload(mock.Anything, mock.Anything).Return(nil) - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT().GetSegmentByID( - mock.MatchedBy(func(ID int64) bool { - return ID == 100 - }), mock.Anything). - Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) s.mockBinlogIO.EXPECT().JoinFullPath(mock.Anything, mock.Anything).Return(blobPath) @@ -539,20 +530,17 @@ func (s *LevelZeroCompactionTaskSuite) TestUploadByCheck() { } func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() { - s.mockMeta.EXPECT().Collection().Return(1) - s.mockMeta.EXPECT(). - GetSegmentByID( - mock.MatchedBy(func(ID int64) bool { - return ID == 100 - }), mock.Anything). - Return(metacache.NewSegmentInfo(&datapb.SegmentInfo{ID: 100, PartitionID: 10}, nil), true) - - s.mockMeta.EXPECT(). - GetSegmentByID( - mock.MatchedBy(func(ID int64) bool { - return ID == 101 - }), mock.Anything). - Return(nil, false) + plan := &datapb.CompactionPlan{ + SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ + { + SegmentID: 100, + }, + { + SegmentID: 101, + }, + }, + } + s.task.plan = plan s.mockAlloc.EXPECT().AllocOne().Return(19530, nil) @@ -568,8 +556,13 @@ func (s *LevelZeroCompactionTaskSuite) TestComposeDeltalog() { s.NotNil(v) s.Equal(blobPath, binlog.LogPath) - _, _, err = s.task.composeDeltalog(101, s.dData) - s.Error(err) + kvs, _, err = s.task.composeDeltalog(101, s.dData) + s.NoError(err) + s.Equal(1, len(kvs)) + v, ok = kvs[blobPath] + s.True(ok) + s.NotNil(v) + s.Equal(blobPath, binlog.LogPath) } func (s *LevelZeroCompactionTaskSuite) TestSplitDelta() { @@ -684,6 +677,13 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadBF() { }, }}, }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + IsPrimaryKey: true, + }, + }, + }, } s.task.plan = plan @@ -698,14 +698,6 @@ func (s *LevelZeroCompactionTaskSuite) TestLoadBF() { cm.EXPECT().MultiRead(mock.Anything, mock.Anything).Return([][]byte{sw.GetBuffer()}, nil) s.task.cm = cm - s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - IsPrimaryKey: true, - }, - }, - }) - bfs, err := s.task.loadBF(plan.SegmentBinlogs) s.NoError(err) @@ -730,18 +722,17 @@ func (s *LevelZeroCompactionTaskSuite) TestFailed() { }, }}, }, + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + IsPrimaryKey: false, + }, + }, + }, } s.task.plan = plan - s.mockMeta.EXPECT().Schema().Return(&schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - IsPrimaryKey: false, - }, - }, - }) - _, err := s.task.loadBF(plan.SegmentBinlogs) s.Error(err) }) diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 4f172c966272b..d18f10ada6cd6 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -30,11 +30,8 @@ import ( "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/datanode/importv2" "github.com/milvus-io/milvus/internal/datanode/io" - "github.com/milvus-io/milvus/internal/datanode/metacache" - "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" - "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" @@ -205,29 +202,9 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan return merr.Status(err), nil } - ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannel()) - if !ok { - log.Warn("illegel compaction plan, channel not in this DataNode", zap.String("channelName", req.GetChannel())) - return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "illegel compaction plan")), nil - } - - if !node.compactionExecutor.isValidChannel(req.GetChannel()) { - log.Warn("channel of compaction is marked invalid in compaction executor", zap.String("channelName", req.GetChannel())) - return merr.Status(merr.WrapErrChannelNotFound(req.GetChannel(), "channel is dropping")), nil - } - - meta := ds.metacache - for _, segment := range req.GetSegmentBinlogs() { - if segment.GetLevel() == datapb.SegmentLevel_L0 { - continue - } - _, ok := meta.GetSegmentByID(segment.GetSegmentID(), metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - if !ok { - log.Warn("compaction plan contains segment which is not flushed", - zap.Int64("segmentID", segment.GetSegmentID()), - ) - return merr.Status(merr.WrapErrSegmentNotFound(segment.GetSegmentID(), "segment with flushed state not found")), nil - } + if len(req.GetSegmentBinlogs()) == 0 { + log.Info("no segments to compact") + return merr.Success(), nil } /* @@ -244,8 +221,6 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan taskCtx, binlogIO, node.allocator, - ds.metacache, - node.syncMgr, node.chunkManager, req, ) @@ -253,8 +228,6 @@ func (node *DataNode) Compaction(ctx context.Context, req *datapb.CompactionPlan task = compaction.NewMixCompactionTask( taskCtx, binlogIO, - ds.metacache, - node.syncMgr, node.allocator, req, ) @@ -288,10 +261,6 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments log := log.Ctx(ctx).With( zap.Int64("planID", req.GetPlanID()), zap.Int64("nodeID", node.GetNodeID()), - zap.Int64("target segmentID", req.GetCompactedTo()), - zap.Int64s("compacted from", req.GetCompactedFrom()), - zap.Int64("numOfRows", req.GetNumOfRows()), - zap.String("channelName", req.GetChannelName()), ) log.Info("DataNode receives SyncSegments") @@ -301,32 +270,8 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments return merr.Status(err), nil } - if len(req.GetCompactedFrom()) <= 0 { - log.Info("SyncSegments with empty compactedFrom, clearing the plan") - node.compactionExecutor.injectDone(req.GetPlanID()) - return merr.Success(), nil - } - - ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) - if !ok { - node.compactionExecutor.discardPlan(req.GetChannelName()) - err := merr.WrapErrChannelNotFound(req.GetChannelName()) - log.Warn("failed to sync segments", zap.Error(err)) - return merr.Status(err), nil - } - err := binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), req.GetCompactedTo(), req.GetStatsLogs()) - if err != nil { - log.Warn("failed to DecompressBinLog", zap.Error(err)) - return merr.Status(err), nil - } - pks, err := loadStats(ctx, node.chunkManager, ds.metacache.Schema(), req.GetCompactedTo(), req.GetStatsLogs()) - if err != nil { - log.Warn("failed to load segment statslog", zap.Error(err)) - return merr.Status(err), nil - } - bfs := metacache.NewBloomFilterSet(pks...) - ds.metacache.CompactSegments(req.GetCompactedTo(), req.GetPartitionId(), req.GetNumOfRows(), bfs, req.GetCompactedFrom()...) - node.compactionExecutor.injectDone(req.GetPlanID()) + // TODO: sheep, add a new DropCompaction interface, deprecate SyncSegments + node.compactionExecutor.removeTask(req.GetPlanID()) return merr.Success(), nil } diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index a834b1907d849..b90ef427f1a44 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -210,50 +210,7 @@ func (s *DataNodeServicesSuite) TestGetCompactionState() { func (s *DataNodeServicesSuite) TestCompaction() { dmChannelName := "by-dev-rootcoord-dml_0_100v0" - schema := &schemapb.CollectionSchema{ - Name: "test_collection", - Fields: []*schemapb.FieldSchema{ - {FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64}, - {FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64}, - {FieldID: common.StartOfUserFieldID, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"}, - {FieldID: common.StartOfUserFieldID + 1, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "128"}, - }}, - }, - } - flushedSegmentID := int64(100) - growingSegmentID := int64(101) - - vchan := &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: dmChannelName, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{}, - } - - err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, vchan, schema, genTestTickler()) - s.Require().NoError(err) - - fgservice, ok := s.node.flowgraphManager.GetFlowgraphService(dmChannelName) - s.Require().True(ok) - - metaCache := metacache.NewMockMetaCache(s.T()) - metaCache.EXPECT().Collection().Return(1).Maybe() - metaCache.EXPECT().Schema().Return(schema).Maybe() - s.node.writeBufferManager.Register(dmChannelName, metaCache, nil) - fgservice.metacache.AddSegment(&datapb.SegmentInfo{ - ID: flushedSegmentID, - CollectionID: 1, - PartitionID: 2, - StartPosition: &msgpb.MsgPosition{}, - }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }) - fgservice.metacache.AddSegment(&datapb.SegmentInfo{ - ID: growingSegmentID, - CollectionID: 1, - PartitionID: 2, - StartPosition: &msgpb.MsgPosition{}, - }, func(_ *datapb.SegmentInfo) *metacache.BloomFilterSet { return metacache.NewBloomFilterSet() }) s.Run("service_not_ready", func() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -269,40 +226,7 @@ func (s *DataNodeServicesSuite) TestCompaction() { s.False(merr.Ok(resp)) }) - s.Run("channel_not_match", func() { - node := s.node - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - req := &datapb.CompactionPlan{ - PlanID: 1000, - Channel: dmChannelName + "other", - } - - resp, err := node.Compaction(ctx, req) - s.NoError(err) - s.False(merr.Ok(resp)) - }) - - s.Run("channel_dropped", func() { - node := s.node - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - node.compactionExecutor.dropped.Insert(dmChannelName) - defer node.compactionExecutor.dropped.Remove(dmChannelName) - - req := &datapb.CompactionPlan{ - PlanID: 1000, - Channel: dmChannelName, - } - - resp, err := node.Compaction(ctx, req) - s.NoError(err) - s.False(merr.Ok(resp)) - }) - - s.Run("compact_growing_segment", func() { + s.Run("unknown CompactionType", func() { node := s.node ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -312,7 +236,7 @@ func (s *DataNodeServicesSuite) TestCompaction() { Channel: dmChannelName, SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{ {SegmentID: 102, Level: datapb.SegmentLevel_L0}, - {SegmentID: growingSegmentID, Level: datapb.SegmentLevel_L1}, + {SegmentID: 103, Level: datapb.SegmentLevel_L1}, }, } @@ -506,126 +430,6 @@ func (s *DataNodeServicesSuite) TestGetMetrics() { zap.String("response", resp.Response)) } -func (s *DataNodeServicesSuite) TestSyncSegments() { - chanName := "fake-by-dev-rootcoord-dml-test-syncsegments-1" - schema := &schemapb.CollectionSchema{ - Name: "test_collection", - Fields: []*schemapb.FieldSchema{ - {FieldID: common.RowIDField, Name: common.RowIDFieldName, DataType: schemapb.DataType_Int64}, - {FieldID: common.TimeStampField, Name: common.TimeStampFieldName, DataType: schemapb.DataType_Int64}, - {FieldID: common.StartOfUserFieldID, DataType: schemapb.DataType_Int64, IsPrimaryKey: true, Name: "pk"}, - {FieldID: common.StartOfUserFieldID + 1, DataType: schemapb.DataType_FloatVector, TypeParams: []*commonpb.KeyValuePair{ - {Key: common.DimKey, Value: "128"}, - }}, - }, - } - - err := s.node.flowgraphManager.AddandStartWithEtcdTickler(s.node, &datapb.VchannelInfo{ - CollectionID: 1, - ChannelName: chanName, - UnflushedSegmentIds: []int64{}, - FlushedSegmentIds: []int64{100, 200, 300}, - }, schema, genTestTickler()) - s.Require().NoError(err) - fg, ok := s.node.flowgraphManager.GetFlowgraphService(chanName) - s.Assert().True(ok) - - fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 100, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) - fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 101, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) - fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 200, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) - fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 201, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) - fg.metacache.AddSegment(&datapb.SegmentInfo{ID: 300, CollectionID: 1, State: commonpb.SegmentState_Flushed}, EmptyBfsFactory) - - s.Run("empty compactedFrom", func() { - req := &datapb.SyncSegmentsRequest{ - CompactedTo: 400, - NumOfRows: 100, - } - - req.CompactedFrom = []UniqueID{} - status, err := s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().True(merr.Ok(status)) - }) - - s.Run("invalid compacted from", func() { - req := &datapb.SyncSegmentsRequest{ - CompactedTo: 400, - NumOfRows: 100, - CompactedFrom: []UniqueID{101, 201}, - } - - req.CompactedFrom = []UniqueID{101, 201} - status, err := s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().False(merr.Ok(status)) - }) - - s.Run("valid request numRows>0", func() { - req := &datapb.SyncSegmentsRequest{ - CompactedFrom: []UniqueID{100, 200, 101, 201}, - CompactedTo: 102, - NumOfRows: 100, - ChannelName: chanName, - CollectionId: 1, - } - status, err := s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().True(merr.Ok(status)) - - _, result := fg.metacache.GetSegmentByID(req.GetCompactedTo(), metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.True(result) - for _, compactFrom := range req.GetCompactedFrom() { - seg, result := fg.metacache.GetSegmentByID(compactFrom, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.True(result) - s.Equal(req.CompactedTo, seg.CompactTo()) - } - - status, err = s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().True(merr.Ok(status)) - }) - - s.Run("without_channel_meta", func() { - fg.metacache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushed), - metacache.WithSegmentIDs(100, 200, 300)) - - req := &datapb.SyncSegmentsRequest{ - CompactedFrom: []int64{100, 200}, - CompactedTo: 101, - NumOfRows: 0, - } - status, err := s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().False(merr.Ok(status)) - }) - - s.Run("valid_request_with_meta_num=0", func() { - fg.metacache.UpdateSegments(metacache.UpdateState(commonpb.SegmentState_Flushed), - metacache.WithSegmentIDs(100, 200, 300)) - - req := &datapb.SyncSegmentsRequest{ - CompactedFrom: []int64{100, 200}, - CompactedTo: 301, - NumOfRows: 0, - ChannelName: chanName, - CollectionId: 1, - } - status, err := s.node.SyncSegments(s.ctx, req) - s.Assert().NoError(err) - s.Assert().True(merr.Ok(status)) - - seg, result := fg.metacache.GetSegmentByID(100, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.True(result) - s.Equal(metacache.NullSegment, seg.CompactTo()) - seg, result = fg.metacache.GetSegmentByID(200, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.True(result) - s.Equal(metacache.NullSegment, seg.CompactTo()) - _, result = fg.metacache.GetSegmentByID(301, metacache.WithSegmentState(commonpb.SegmentState_Flushed)) - s.False(result) - }) -} - func (s *DataNodeServicesSuite) TestResendSegmentStats() { req := &datapb.ResendSegmentStatsRequest{ Base: &commonpb.MsgBase{}, diff --git a/internal/datanode/syncmgr/mock_sync_manager.go b/internal/datanode/syncmgr/mock_sync_manager.go index 34c69ac6b011d..259d09b2da542 100644 --- a/internal/datanode/syncmgr/mock_sync_manager.go +++ b/internal/datanode/syncmgr/mock_sync_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.32.4. DO NOT EDIT. +// Code generated by mockery v2.30.1. DO NOT EDIT. package syncmgr @@ -25,39 +25,6 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter { return &MockSyncManager_Expecter{mock: &_m.Mock} } -// Block provides a mock function with given fields: segmentID -func (_m *MockSyncManager) Block(segmentID int64) { - _m.Called(segmentID) -} - -// MockSyncManager_Block_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Block' -type MockSyncManager_Block_Call struct { - *mock.Call -} - -// Block is a helper method to define mock.On call -// - segmentID int64 -func (_e *MockSyncManager_Expecter) Block(segmentID interface{}) *MockSyncManager_Block_Call { - return &MockSyncManager_Block_Call{Call: _e.mock.On("Block", segmentID)} -} - -func (_c *MockSyncManager_Block_Call) Run(run func(segmentID int64)) *MockSyncManager_Block_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockSyncManager_Block_Call) Return() *MockSyncManager_Block_Call { - _c.Call.Return() - return _c -} - -func (_c *MockSyncManager_Block_Call) RunAndReturn(run func(int64)) *MockSyncManager_Block_Call { - _c.Call.Return(run) - return _c -} - // GetEarliestPosition provides a mock function with given fields: channel func (_m *MockSyncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) { ret := _m.Called(channel) @@ -90,7 +57,7 @@ type MockSyncManager_GetEarliestPosition_Call struct { } // GetEarliestPosition is a helper method to define mock.On call -// - channel string +// - channel string func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call { return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)} } @@ -134,8 +101,8 @@ type MockSyncManager_SyncData_Call struct { } // SyncData is a helper method to define mock.On call -// - ctx context.Context -// - task Task +// - ctx context.Context +// - task Task func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call { return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)} } @@ -157,39 +124,6 @@ func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, return _c } -// Unblock provides a mock function with given fields: segmentID -func (_m *MockSyncManager) Unblock(segmentID int64) { - _m.Called(segmentID) -} - -// MockSyncManager_Unblock_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Unblock' -type MockSyncManager_Unblock_Call struct { - *mock.Call -} - -// Unblock is a helper method to define mock.On call -// - segmentID int64 -func (_e *MockSyncManager_Expecter) Unblock(segmentID interface{}) *MockSyncManager_Unblock_Call { - return &MockSyncManager_Unblock_Call{Call: _e.mock.On("Unblock", segmentID)} -} - -func (_c *MockSyncManager_Unblock_Call) Run(run func(segmentID int64)) *MockSyncManager_Unblock_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockSyncManager_Unblock_Call) Return() *MockSyncManager_Unblock_Call { - _c.Call.Return() - return _c -} - -func (_c *MockSyncManager_Unblock_Call) RunAndReturn(run func(int64)) *MockSyncManager_Unblock_Call { - _c.Call.Return(run) - return _c -} - // NewMockSyncManager creates a new instance of MockSyncManager. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockSyncManager(t interface { diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 190a8e7655be2..6a564c01345a4 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -40,19 +40,15 @@ type SyncMeta struct { metacache metacache.MetaCache } -// SyncMangger is the interface for sync manager. +// SyncManager is the interface for sync manager. // it processes the sync tasks inside and changes the meta. +// +//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage type SyncManager interface { // SyncData is the method to submit sync task. SyncData(ctx context.Context, task Task) *conc.Future[struct{}] // GetEarliestPosition returns the earliest position (normally start position) of the processing sync task of provided channel. GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) - // Block allows caller to block tasks of provided segment id. - // normally used by compaction task. - // if levelzero delta policy is enabled, this shall be an empty operation. - Block(segmentID int64) - // Unblock is the reverse method for `Block`. - Unblock(segmentID int64) } type syncManager struct { @@ -184,11 +180,3 @@ func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPo }) return segmentID, cp } - -func (mgr *syncManager) Block(segmentID int64) { - mgr.keyLock.Lock(segmentID) -} - -func (mgr *syncManager) Unblock(segmentID int64) { - mgr.keyLock.Unlock(segmentID) -} diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index 6f12a98df4d15..515e1266479d3 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -208,52 +208,6 @@ func (s *SyncManagerSuite) TestCompacted() { s.EqualValues(1001, segmentID.Load()) } -func (s *SyncManagerSuite) TestBlock() { - sig := make(chan struct{}) - counter := atomic.NewInt32(0) - s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Return(nil) - bfs := metacache.NewBloomFilterSet() - seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{ - ID: s.segmentID, - }, bfs) - metacache.UpdateNumOfRows(1000)(seg) - s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true) - s.metacache.EXPECT().GetSegmentsBy(mock.Anything). - RunAndReturn(func(...metacache.SegmentFilter) []*metacache.SegmentInfo { - return []*metacache.SegmentInfo{seg} - }) - s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(_ metacache.SegmentAction, filters ...metacache.SegmentFilter) { - if counter.Inc() == 2 { - close(sig) - } - }) - - manager, err := NewSyncManager(s.chunkManager, s.allocator) - s.NoError(err) - - // block - manager.Block(s.segmentID) - - task := s.getSuiteSyncTask() - task.WithMetaWriter(BrokerMetaWriter(s.broker, 1)) - task.WithTimeRange(50, 100) - task.WithCheckpoint(&msgpb.MsgPosition{ - ChannelName: s.channelName, - MsgID: []byte{1, 2, 3, 4}, - Timestamp: 100, - }) - go manager.SyncData(context.Background(), task) - - select { - case <-sig: - s.FailNow("sync task done during block") - case <-time.After(time.Second): - } - - manager.Unblock(s.segmentID) - <-sig -} - func (s *SyncManagerSuite) TestResizePool() { manager, err := NewSyncManager(s.chunkManager, s.allocator) s.NoError(err) diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index ecb29e3be162c..38781e958fa8d 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -528,6 +528,7 @@ message CompactionPlan { string channel = 7; int64 collection_ttl = 8; int64 total_rows = 9; + schema.CollectionSchema schema = 10; } message CompactionSegment { diff --git a/tests/integration/compaction/compaction_test.go b/tests/integration/compaction/compaction_test.go new file mode 100644 index 0000000000000..2e738e00fb6c8 --- /dev/null +++ b/tests/integration/compaction/compaction_test.go @@ -0,0 +1,47 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "testing" + + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +type CompactionSuite struct { + integration.MiniClusterSuite +} + +func (s *CompactionSuite) SetupSuite() { + s.MiniClusterSuite.SetupSuite() + + paramtable.Init() + paramtable.Get().Save(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key, "1") +} + +func (s *CompactionSuite) TearDownSuite() { + s.MiniClusterSuite.TearDownSuite() + + paramtable.Get().Reset(paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key) +} + +func TestCompaction(t *testing.T) { + suite.Run(t, new(CompactionSuite)) +} diff --git a/tests/integration/compaction/l0_compaction_test.go b/tests/integration/compaction/l0_compaction_test.go new file mode 100644 index 0000000000000..984e8eb3ce5e5 --- /dev/null +++ b/tests/integration/compaction/l0_compaction_test.go @@ -0,0 +1,238 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "context" + "fmt" + "time" + + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/tests/integration" +) + +func (s *CompactionSuite) TestL0Compaction() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 100000 + deleteCnt = 50000 + + indexType = integration.IndexFaissIvfFlat + metricType = metric.L2 + vecType = schemapb.DataType_FloatVector + ) + + paramtable.Get().Save(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key, "1") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.LevelZeroCompactionTriggerDeltalogMinNum.Key) + + collectionName := "TestCompaction_" + funcutil.GenRandomStr() + + schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, false, vecType) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + // create collection + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + ConsistencyLevel: commonpb.ConsistencyLevel_Strong, + }) + err = merr.CheckRPCCall(createCollectionStatus, err) + s.NoError(err) + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + + // show collection + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + err = merr.CheckRPCCall(showCollectionsResp, err) + s.NoError(err) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + // insert + pkColumn := integration.NewInt64FieldData(integration.Int64Field, rowNum) + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, rowNum, dim) + hashKeys := integration.GenerateHashKeys(rowNum) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{pkColumn, fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(rowNum), + }) + err = merr.CheckRPCCall(insertResult, err) + s.NoError(err) + s.Equal(int64(rowNum), insertResult.GetInsertCnt()) + + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + err = merr.CheckRPCCall(flushResp, err) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + err = merr.CheckRPCCall(createIndexStatus, err) + s.NoError(err) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + s.Equal(1, len(segments)) + s.Equal(int64(rowNum), segments[0].GetNumOfRows()) + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(loadStatus, err) + s.NoError(err) + s.WaitForLoad(ctx, collectionName) + + // delete + deleteResult, err := c.Proxy.Delete(ctx, &milvuspb.DeleteRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: fmt.Sprintf("%s < %d", integration.Int64Field, deleteCnt), + }) + err = merr.CheckRPCCall(deleteResult, err) + s.NoError(err) + + // flush l0 + flushResp, err = c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + err = merr.CheckRPCCall(flushResp, err) + s.NoError(err) + flushTs, has = flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + // query + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + + // wait for l0 compaction completed + showSegments := func() bool { + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + log.Info("ShowSegments result", zap.Any("segments", segments)) + flushed := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetState() == commonpb.SegmentState_Flushed + }) + if len(flushed) == 1 && + flushed[0].GetLevel() == datapb.SegmentLevel_L1 && + flushed[0].GetNumOfRows() == rowNum { + log.Info("l0 compaction done, wait for single compaction") + } + return len(flushed) == 1 && + flushed[0].GetLevel() == datapb.SegmentLevel_L1 && + flushed[0].GetNumOfRows() == rowNum-deleteCnt + } + for !showSegments() { + select { + case <-ctx.Done(): + s.Fail("waiting for compaction timeout") + return + case <-time.After(1 * time.Second): + } + } + + // search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + params := integration.GetSearchParams(indexType, metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + + // query + queryResult, err = c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.Equal(int64(rowNum-deleteCnt), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + + // release collection + status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) + + // drop collection + status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) + + log.Info("Test compaction succeed") +} diff --git a/tests/integration/compaction/mix_compaction_test.go b/tests/integration/compaction/mix_compaction_test.go new file mode 100644 index 0000000000000..b51636be5fd1e --- /dev/null +++ b/tests/integration/compaction/mix_compaction_test.go @@ -0,0 +1,205 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package compaction + +import ( + "context" + "fmt" + "time" + + "github.com/golang/protobuf/proto" + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/common" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/funcutil" + "github.com/milvus-io/milvus/pkg/util/merr" + "github.com/milvus-io/milvus/pkg/util/metric" + "github.com/milvus-io/milvus/tests/integration" +) + +func (s *CompactionSuite) TestMixCompaction() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10) + defer cancel() + c := s.Cluster + + const ( + dim = 128 + dbName = "" + rowNum = 10000 + batch = 1000 + + indexType = integration.IndexFaissIvfFlat + metricType = metric.L2 + vecType = schemapb.DataType_FloatVector + ) + + collectionName := "TestCompaction_" + funcutil.GenRandomStr() + + schema := integration.ConstructSchemaOfVecDataType(collectionName, dim, true, vecType) + marshaledSchema, err := proto.Marshal(schema) + s.NoError(err) + + // create collection + createCollectionStatus, err := c.Proxy.CreateCollection(ctx, &milvuspb.CreateCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: common.DefaultShardsNum, + ConsistencyLevel: commonpb.ConsistencyLevel_Strong, + }) + err = merr.CheckRPCCall(createCollectionStatus, err) + s.NoError(err) + log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) + + // show collection + showCollectionsResp, err := c.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + err = merr.CheckRPCCall(showCollectionsResp, err) + s.NoError(err) + log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) + + for i := 0; i < rowNum/batch; i++ { + // insert + fVecColumn := integration.NewFloatVectorFieldData(integration.FloatVecField, batch, dim) + hashKeys := integration.GenerateHashKeys(batch) + insertResult, err := c.Proxy.Insert(ctx, &milvuspb.InsertRequest{ + DbName: dbName, + CollectionName: collectionName, + FieldsData: []*schemapb.FieldData{fVecColumn}, + HashKeys: hashKeys, + NumRows: uint32(batch), + }) + err = merr.CheckRPCCall(insertResult, err) + s.NoError(err) + s.Equal(int64(batch), insertResult.GetInsertCnt()) + + // flush + flushResp, err := c.Proxy.Flush(ctx, &milvuspb.FlushRequest{ + DbName: dbName, + CollectionNames: []string{collectionName}, + }) + err = merr.CheckRPCCall(flushResp, err) + s.NoError(err) + segmentIDs, has := flushResp.GetCollSegIDs()[collectionName] + ids := segmentIDs.GetData() + s.Require().NotEmpty(segmentIDs) + s.Require().True(has) + flushTs, has := flushResp.GetCollFlushTs()[collectionName] + s.True(has) + s.WaitForFlush(ctx, ids, flushTs, dbName, collectionName) + + log.Info("insert done", zap.Int("i", i)) + } + + // create index + createIndexStatus, err := c.Proxy.CreateIndex(ctx, &milvuspb.CreateIndexRequest{ + CollectionName: collectionName, + FieldName: integration.FloatVecField, + IndexName: "_default", + ExtraParams: integration.ConstructIndexParam(dim, indexType, metricType), + }) + err = merr.CheckRPCCall(createIndexStatus, err) + s.NoError(err) + s.WaitForIndexBuilt(ctx, collectionName, integration.FloatVecField) + + segments, err := c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + s.Equal(rowNum/batch, len(segments)) + for _, segment := range segments { + log.Info("show segment result", zap.String("segment", segment.String())) + } + + // wait for compaction completed + showSegments := func() bool { + segments, err = c.MetaWatcher.ShowSegments() + s.NoError(err) + s.NotEmpty(segments) + compactFromSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetState() == commonpb.SegmentState_Dropped + }) + compactToSegments := lo.Filter(segments, func(segment *datapb.SegmentInfo, _ int) bool { + return segment.GetState() == commonpb.SegmentState_Flushed + }) + log.Info("ShowSegments result", zap.Int("len(compactFromSegments)", len(compactFromSegments)), + zap.Int("len(compactToSegments)", len(compactToSegments))) + return len(compactToSegments) == 1 + } + for !showSegments() { + select { + case <-ctx.Done(): + s.Fail("waiting for compaction timeout") + return + case <-time.After(1 * time.Second): + } + } + + // load + loadStatus, err := c.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(loadStatus, err) + s.NoError(err) + s.WaitForLoad(ctx, collectionName) + + // search + expr := fmt.Sprintf("%s > 0", integration.Int64Field) + nq := 10 + topk := 10 + roundDecimal := -1 + params := integration.GetSearchParams(indexType, metricType) + searchReq := integration.ConstructSearchRequest("", collectionName, expr, + integration.FloatVecField, vecType, nil, metricType, params, nq, dim, topk, roundDecimal) + + searchResult, err := c.Proxy.Search(ctx, searchReq) + err = merr.CheckRPCCall(searchResult, err) + s.NoError(err) + s.Equal(nq*topk, len(searchResult.GetResults().GetScores())) + + // query + queryResult, err := c.Proxy.Query(ctx, &milvuspb.QueryRequest{ + DbName: dbName, + CollectionName: collectionName, + Expr: "", + OutputFields: []string{"count(*)"}, + }) + err = merr.CheckRPCCall(queryResult, err) + s.NoError(err) + s.Equal(int64(rowNum), queryResult.GetFieldsData()[0].GetScalars().GetLongData().GetData()[0]) + + // release collection + status, err := c.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) + + // drop collection + status, err = c.Proxy.DropCollection(ctx, &milvuspb.DropCollectionRequest{ + CollectionName: collectionName, + }) + err = merr.CheckRPCCall(status, err) + s.NoError(err) + + log.Info("Test compaction succeed") +}