Skip to content

Commit

Permalink
fix: Milvus panic when compaction disabled and dropping a collection (#…
Browse files Browse the repository at this point in the history
…34206)

See also: #31059
pr: #34103

---------

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn authored Jul 11, 2024
1 parent d6fc6a9 commit 7e2a9d6
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 50 deletions.
23 changes: 12 additions & 11 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -77,8 +78,8 @@ type compactionTrigger struct {
compactionHandler compactionPlanContext
globalTrigger *time.Ticker
forceMu lock.Mutex
quit chan struct{}
wg sync.WaitGroup
closeCh lifetime.SafeChan
closeWaiter sync.WaitGroup

indexEngineVersionManager IndexEngineVersionManager

Expand All @@ -105,20 +106,20 @@ func newCompactionTrigger(
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
handler: handler,
closeCh: lifetime.NewSafeChan(),
}
}

func (t *compactionTrigger) start() {
t.quit = make(chan struct{})
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.wg.Add(2)
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
defer t.wg.Done()
defer t.closeWaiter.Done()

for {
select {
case <-t.quit:
case <-t.closeCh.CloseCh():
log.Info("compaction trigger quit")
return
case signal := <-t.signals:
Expand All @@ -145,7 +146,7 @@ func (t *compactionTrigger) start() {

func (t *compactionTrigger) startGlobalCompactionLoop() {
defer logutil.LogPanic()
defer t.wg.Done()
defer t.closeWaiter.Done()

// If AutoCompaction disabled, global loop will not start
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
Expand All @@ -154,7 +155,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {

for {
select {
case <-t.quit:
case <-t.closeCh.CloseCh():
t.globalTrigger.Stop()
log.Info("global compaction loop exit")
return
Expand All @@ -168,8 +169,8 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
}

func (t *compactionTrigger) stop() {
close(t.quit)
t.wg.Wait()
t.closeCh.Close()
t.closeWaiter.Wait()
}

func (t *compactionTrigger) getCollection(collectionID UniqueID) (*collectionInfo, error) {
Expand Down Expand Up @@ -241,7 +242,7 @@ func (t *compactionTrigger) triggerCompaction() error {
// triggerSingleCompaction trigger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error {
// If AutoCompaction disabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
if !paramtable.Get().DataCoordCfg.EnableAutoCompaction.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() {
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
Expand Down Expand Up @@ -494,6 +495,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
_, err := tr.triggerManualCompaction(tt.collectionID)
Expand All @@ -519,6 +521,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tt.collectionID = 1000
Expand All @@ -543,6 +546,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}

Expand Down Expand Up @@ -781,6 +785,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
_, err := tr.triggerManualCompaction(tt.args.collectionID)
Expand Down Expand Up @@ -932,6 +937,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1119,6 +1125,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1312,6 +1319,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1501,6 +1509,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1675,6 +1684,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down
59 changes: 26 additions & 33 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,11 +375,8 @@ func (s *Server) initDataCoord() error {
log.Info("init service discovery done")

s.initTaskScheduler(storageCli)
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.createCompactionHandler()
s.createCompactionTrigger()
log.Info("init compaction scheduler done")
}
s.initCompaction()
log.Info("init compaction done")

if err = s.initSegmentManager(); err != nil {
return err
Expand Down Expand Up @@ -420,12 +417,6 @@ func (s *Server) Start() error {
}

func (s *Server) startDataCoord() {
s.taskScheduler.Start()
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionTriggerManager.Start()
}
s.startServerLoop()

// http.Register(&http.Handler{
Expand Down Expand Up @@ -527,24 +518,6 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ
s.indexNodeCreator = f
}

func (s *Server) createCompactionHandler() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler)
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
}

func (s *Server) stopCompactionHandler() {
s.compactionHandler.stop()
s.compactionTriggerManager.Close()
}

func (s *Server) createCompactionTrigger() {
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

func (s *Server) stopCompactionTrigger() {
s.compactionTrigger.stop()
}

func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx)
Expand Down Expand Up @@ -703,7 +676,30 @@ func (s *Server) initIndexNodeManager() {
}
}

func (s *Server) initCompaction() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler)
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

func (s *Server) stopCompaction() {
s.compactionHandler.stop()
s.compactionTrigger.stop()
s.compactionTriggerManager.Close()
}

func (s *Server) startCompaction() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionTriggerManager.Start()
}

func (s *Server) startServerLoop() {
s.taskScheduler.Start()
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.startCompaction()
}

if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
s.serverLoopWg.Add(1)
s.startDataNodeTtLoop(s.serverLoopCtx)
Expand Down Expand Up @@ -1109,10 +1105,7 @@ func (s *Server) Stop() error {
s.importChecker.Close()
s.syncSegmentsScheduler.Stop()

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
s.stopCompaction()
logutil.Logger(s.ctx).Info("datacoord compaction stopped")

s.taskScheduler.Stop()
Expand Down
10 changes: 4 additions & 6 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,10 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath
s.flushCh <- req.SegmentID

// notify compaction
if paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() {
err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
req.GetSegmentID(), req.GetChannel(), false)
if err != nil {
log.Warn("failed to trigger single compaction")
}
err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
req.GetSegmentID(), req.GetChannel(), false)
if err != nil {
log.Warn("failed to trigger single compaction")
}
}

Expand Down

0 comments on commit 7e2a9d6

Please sign in to comment.