Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Milvus panic when compaction disabled and dropping a collection #34103

Merged
merged 10 commits into from
Jul 11, 2024
23 changes: 12 additions & 11 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/indexparamcheck"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
Expand Down Expand Up @@ -77,8 +78,8 @@ type compactionTrigger struct {
compactionHandler compactionPlanContext
globalTrigger *time.Ticker
forceMu lock.Mutex
quit chan struct{}
wg sync.WaitGroup
closeCh lifetime.SafeChan
closeWaiter sync.WaitGroup

indexEngineVersionManager IndexEngineVersionManager

Expand All @@ -105,20 +106,20 @@ func newCompactionTrigger(
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
handler: handler,
closeCh: lifetime.NewSafeChan(),
}
}

func (t *compactionTrigger) start() {
t.quit = make(chan struct{})
t.globalTrigger = time.NewTicker(Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second))
t.wg.Add(2)
t.closeWaiter.Add(2)
go func() {
defer logutil.LogPanic()
defer t.wg.Done()
defer t.closeWaiter.Done()

for {
select {
case <-t.quit:
case <-t.closeCh.CloseCh():
log.Info("compaction trigger quit")
return
case signal := <-t.signals:
Expand All @@ -145,7 +146,7 @@ func (t *compactionTrigger) start() {

func (t *compactionTrigger) startGlobalCompactionLoop() {
defer logutil.LogPanic()
defer t.wg.Done()
defer t.closeWaiter.Done()

// If AutoCompaction disabled, global loop will not start
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
Expand All @@ -154,7 +155,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {

for {
select {
case <-t.quit:
case <-t.closeCh.CloseCh():
t.globalTrigger.Stop()
log.Info("global compaction loop exit")
return
Expand All @@ -168,8 +169,8 @@ func (t *compactionTrigger) startGlobalCompactionLoop() {
}

func (t *compactionTrigger) stop() {
close(t.quit)
t.wg.Wait()
t.closeCh.Close()
t.closeWaiter.Wait()
}

func (t *compactionTrigger) getCollection(collectionID UniqueID) (*collectionInfo, error) {
Expand Down Expand Up @@ -241,7 +242,7 @@ func (t *compactionTrigger) triggerCompaction() error {
// triggerSingleCompaction trigger a compaction bundled with collection-partition-channel-segment
func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error {
// If AutoCompaction disabled, flush request will not trigger compaction
if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() {
if !paramtable.Get().DataCoordCfg.EnableAutoCompaction.GetAsBool() && !paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() {
return nil
}

Expand Down
10 changes: 10 additions & 0 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/lifetime"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tsoutil"
)
Expand Down Expand Up @@ -494,6 +495,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
_, err := tr.triggerManualCompaction(tt.collectionID)
Expand All @@ -519,6 +521,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tt.collectionID = 1000
Expand All @@ -543,6 +546,7 @@ func Test_compactionTrigger_force(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}

Expand Down Expand Up @@ -781,6 +785,7 @@ func Test_compactionTrigger_force_maxSegmentLimit(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
_, err := tr.triggerManualCompaction(tt.args.collectionID)
Expand Down Expand Up @@ -932,6 +937,7 @@ func Test_compactionTrigger_noplan(t *testing.T) {
globalTrigger: tt.fields.globalTrigger,
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1119,6 +1125,7 @@ func Test_compactionTrigger_PrioritizedCandi(t *testing.T) {
signals: tt.fields.signals,
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1312,6 +1319,7 @@ func Test_compactionTrigger_SmallCandi(t *testing.T) {
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1501,6 +1509,7 @@ func Test_compactionTrigger_SqueezeNonPlannedSegs(t *testing.T) {
indexEngineVersionManager: newMockVersionManager(),
estimateDiskSegmentPolicy: calBySchemaPolicyWithDiskIndex,
estimateNonDiskSegmentPolicy: calBySchemaPolicy,
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down Expand Up @@ -1675,6 +1684,7 @@ func Test_compactionTrigger_noplan_random_size(t *testing.T) {
compactionHandler: tt.fields.compactionHandler,
globalTrigger: tt.fields.globalTrigger,
indexEngineVersionManager: newMockVersionManager(),
closeCh: lifetime.NewSafeChan(),
testingOnly: true,
}
tr.start()
Expand Down
4 changes: 3 additions & 1 deletion internal/datacoord/compaction_trigger_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type TriggerManager interface {
ManualTrigger(ctx context.Context, collectionID int64, clusteringCompaction bool) (UniqueID, error)
}

var _ TriggerManager = (*CompactionTriggerManager)(nil)

// CompactionTriggerManager registers Triggers to TriggerType
// so that when the certain TriggerType happens, the corresponding triggers can
// trigger the correct compaction plans.
Expand Down Expand Up @@ -93,7 +95,7 @@ func (m *CompactionTriggerManager) Start() {
go m.startLoop()
}

func (m *CompactionTriggerManager) Close() {
func (m *CompactionTriggerManager) Stop() {
close(m.closeSig)
m.closeWg.Wait()
}
Expand Down
76 changes: 43 additions & 33 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ type Server struct {

compactionTrigger trigger
compactionHandler compactionPlanContext
compactionTriggerManager *CompactionTriggerManager
compactionTriggerManager TriggerManager

syncSegmentsScheduler *SyncSegmentsScheduler
metricsCacheManager *metricsinfo.MetricsCacheManager
Expand Down Expand Up @@ -352,11 +352,10 @@ func (s *Server) initDataCoord() error {
log.Info("init service discovery done")

s.initTaskScheduler(storageCli)
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.createCompactionHandler()
s.createCompactionTrigger()
log.Info("init compaction scheduler done")
}
log.Info("init task scheduler done")

s.initCompaction()
log.Info("init compaction done")

if err = s.initSegmentManager(); err != nil {
return err
Expand Down Expand Up @@ -398,11 +397,6 @@ func (s *Server) Start() error {

func (s *Server) startDataCoord() {
s.taskScheduler.Start()
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionTriggerManager.Start()
}
s.startServerLoop()

// http.Register(&http.Handler{
Expand Down Expand Up @@ -497,24 +491,6 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ
s.indexNodeCreator = f
}

func (s *Server) createCompactionHandler() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler)
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
}

func (s *Server) stopCompactionHandler() {
s.compactionHandler.stop()
s.compactionTriggerManager.Close()
}

func (s *Server) createCompactionTrigger() {
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

func (s *Server) stopCompactionTrigger() {
s.compactionTrigger.stop()
}

func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx)
Expand Down Expand Up @@ -673,7 +649,44 @@ func (s *Server) initIndexNodeManager() {
}
}

func (s *Server) initCompaction() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator, s.taskScheduler, s.handler)
s.compactionTriggerManager = NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler, s.meta)
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

func (s *Server) stopCompaction() {
if s.compactionTrigger != nil {
s.compactionTrigger.stop()
}
if s.compactionTriggerManager != nil {
s.compactionTriggerManager.Stop()
}

if s.compactionHandler != nil {
s.compactionHandler.stop()
}
}

func (s *Server) startCompaction() {
if s.compactionHandler != nil {
s.compactionHandler.start()
}

if s.compactionTrigger != nil {
s.compactionTrigger.start()
}

if s.compactionTriggerManager != nil {
s.compactionTriggerManager.Start()
}
}

func (s *Server) startServerLoop() {
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.startCompaction()
}

s.serverLoopWg.Add(2)
s.startWatchService(s.serverLoopCtx)
s.startFlushLoop(s.serverLoopCtx)
Expand Down Expand Up @@ -1002,10 +1015,7 @@ func (s *Server) Stop() error {
s.importChecker.Close()
s.syncSegmentsScheduler.Stop()

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
s.stopCompaction()
logutil.Logger(s.ctx).Info("datacoord compaction stopped")

s.taskScheduler.Stop()
Expand Down
10 changes: 4 additions & 6 deletions internal/datacoord/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,10 @@
s.flushCh <- req.SegmentID

// notify compaction
if paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() {
err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
req.GetSegmentID(), req.GetChannel(), false)
if err != nil {
log.Warn("failed to trigger single compaction")
}
err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(),
req.GetSegmentID(), req.GetChannel(), false)
if err != nil {
log.Warn("failed to trigger single compaction")

Check warning on line 555 in internal/datacoord/services.go

View check run for this annotation

Codecov / codecov/patch

internal/datacoord/services.go#L555

Added line #L555 was not covered by tests
}
}

Expand Down
Loading