Skip to content

Commit

Permalink
fix: Milvus panic when compaction disabled and dropping a collection
Browse files Browse the repository at this point in the history
See also: milvus-io#31059
pr: milvus-io#34103

Signed-off-by: yangxuan <[email protected]>
  • Loading branch information
XuanYang-cn committed Jun 26, 2024
1 parent a9a0981 commit 2e9ab6d
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 35 deletions.
66 changes: 33 additions & 33 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,11 +372,8 @@ func (s *Server) initDataCoord() error {
}
log.Info("init service discovery done")

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.createCompactionHandler()
s.createCompactionTrigger()
log.Info("init compaction scheduler done")
}
s.initCompaction()
log.Info("init compaction done")

if err = s.initSegmentManager(); err != nil {
return err
Expand Down Expand Up @@ -416,11 +413,6 @@ func (s *Server) Start() error {
}

func (s *Server) startDataCoord() {
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.compactionHandler.start()
s.compactionTrigger.start()
s.compactionViewManager.Start()
}
s.startServerLoop()

// http.Register(&http.Handler{
Expand Down Expand Up @@ -522,25 +514,6 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ
s.indexNodeCreator = f
}

func (s *Server) createCompactionHandler() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator)
triggerv2 := NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler)
s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator)
}

func (s *Server) stopCompactionHandler() {
s.compactionHandler.stop()
s.compactionViewManager.Close()
}

func (s *Server) createCompactionTrigger() {
s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager)
}

func (s *Server) stopCompactionTrigger() {
s.compactionTrigger.stop()
}

func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) {
chunkManagerFactory := storage.NewChunkManagerFactoryWithParam(Params)
cli, err := chunkManagerFactory.NewPersistentStorageChunkManager(s.ctx)
Expand Down Expand Up @@ -699,7 +672,37 @@ func (s *Server) initIndexNodeManager() {
}
}

func (s *Server) initCompaction() {
s.compactionHandler = newCompactionPlanHandler(s.cluster, s.sessionManager, s.channelManager, s.meta, s.allocator)
triggerv2 := NewCompactionTriggerManager(s.allocator, s.handler, s.compactionHandler)
s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator)
}

func (s *Server) stopCompaction() {
if s.compactionTrigger != nil {
s.compactionTrigger.stop()
}

if s.compactionHandler != nil {
s.compactionHandler.stop()
}
}

func (s *Server) startCompaction() {
if s.compactionHandler != nil {
s.compactionHandler.start()
}

if s.compactionTrigger != nil {
s.compactionTrigger.start()
}
}

func (s *Server) startServerLoop() {
if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.startCompaction()
}

if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() {
s.serverLoopWg.Add(1)
s.startDataNodeTtLoop(s.serverLoopCtx)
Expand Down Expand Up @@ -1103,10 +1106,7 @@ func (s *Server) Stop() error {
s.importScheduler.Close()
s.importChecker.Close()

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
s.stopCompactionHandler()
}
s.stopCompaction()
logutil.Logger(s.ctx).Info("datacoord compaction stopped")

s.indexBuilder.Stop()
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/syncmgr/mock_task.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/mq/mqimpl/rocksmq/server/rocksmq_retention.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"github.com/tecbot/gorocksdb"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/kv/rocksdb"
rocksdbkv "github.com/milvus-io/milvus/pkg/kv/rocksdb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
Expand Down

0 comments on commit 2e9ab6d

Please sign in to comment.