Skip to content

Commit

Permalink
fix: sync task still running after DataNode has stopped
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Dec 16, 2024
1 parent 8794ec9 commit 69fe00b
Show file tree
Hide file tree
Showing 14 changed files with 158 additions and 31 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -877,6 +877,8 @@ common:
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction
localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode.
sync:
taskPoolReleaseTimeoutSeconds: 60 # The maximum time to wait for the task to finish and release resources in the pool

# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
Expand Down
7 changes: 7 additions & 0 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,13 @@ func (node *DataNode) Stop() error {
node.writeBufferManager.Stop()
}

if node.syncMgr != nil {
err := node.syncMgr.Close()
if err != nil {
log.Error("sync manager close failed", zap.Error(err))
}

Check warning on line 409 in internal/datanode/data_node.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/data_node.go#L408-L409

Added lines #L408 - L409 were not covered by tests
}

if node.allocator != nil {
log.Ctx(node.ctx).Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
node.allocator.Close()
Expand Down
12 changes: 6 additions & 6 deletions internal/datanode/importv2/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm

s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
Expand Down Expand Up @@ -307,11 +307,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm

s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, errors.New("mock err")
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
Expand Down Expand Up @@ -384,11 +384,11 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
}

func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
var once sync.Once
data, err := testutil.CreateInsertData(s.schema, s.numRows)
Expand Down
8 changes: 6 additions & 2 deletions internal/datanode/importv2/task_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (t *ImportTask) importFile(reader importutilv2.Reader) error {
}

func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync import data", WrapLogFields(t)...)
log.Ctx(context.TODO()).Info("start to sync import data", WrapLogFields(t)...)
futures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for channelIdx, datas := range hashedData {
Expand Down Expand Up @@ -256,7 +256,11 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Ctx(context.TODO()).Error("sync data failed", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}

Check warning on line 263 in internal/datanode/importv2/task_import.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/importv2/task_import.go#L261-L263

Added lines #L261 - L263 were not covered by tests
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}
Expand Down
8 changes: 6 additions & 2 deletions internal/datanode/importv2/task_l0_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (t *L0ImportTask) importL0(reader binlog.L0Reader) error {
}

func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future[struct{}], []syncmgr.Task, error) {
log.Info("start to sync l0 delete data", WrapLogFields(t)...)
log.Ctx(context.TODO()).Info("start to sync l0 delete data", WrapLogFields(t)...)
futures := make([]*conc.Future[struct{}], 0)
syncTasks := make([]syncmgr.Task, 0)
for channelIdx, data := range delData {
Expand All @@ -231,7 +231,11 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Ctx(context.TODO()).Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}

Check warning on line 238 in internal/datanode/importv2/task_l0_import.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/importv2/task_l0_import.go#L236-L238

Added lines #L236 - L238 were not covered by tests
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/importv2/task_l0_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *L0ImportSuite) TestL0PreImport() {

func (s *L0ImportSuite) TestL0Import() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil)
task.(*syncmgr.SyncTask).WithAllocator(alloc)
Expand All @@ -147,7 +147,7 @@ func (s *L0ImportSuite) TestL0Import() {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})

req := &datapb.ImportRequest{
Expand Down
65 changes: 60 additions & 5 deletions internal/flushcommon/syncmgr/mock_sync_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions internal/flushcommon/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ type SyncMeta struct {
//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}]
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)

// Close waits for the task to finish and then shuts down the sync manager.
Close() error
TaskStatsJSON() string
}

Expand Down Expand Up @@ -97,13 +99,17 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
}
}

func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
if mgr.workerPool.IsClosed() {
return nil, fmt.Errorf("sync manager is closed")
}

switch t := task.(type) {
case *SyncTask:
t.WithChunkManager(mgr.chunkManager)
}

return mgr.safeSubmitTask(ctx, task, callbacks...)
return mgr.safeSubmitTask(ctx, task, callbacks...), nil
}

// safeSubmitTask submits task to SyncManager
Expand All @@ -126,6 +132,7 @@ func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callba
}
callbacks = append([]func(error) error{handler}, callbacks...)
log.Info("sync mgr sumbit task with key", zap.Int64("key", key))

return mgr.Submit(ctx, key, task, callbacks...)
}

Expand All @@ -142,3 +149,8 @@ func (mgr *syncManager) TaskStatsJSON() string {
}
return string(ret)
}

func (mgr *syncManager) Close() error {
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)
return mgr.workerPool.ReleaseTimeout(timeout)
}
24 changes: 18 additions & 6 deletions internal/flushcommon/syncmgr/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,22 @@ func (s *SyncManagerSuite) TestSubmit() {
Timestamp: 100,
})

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)

_, err := f.Await()
_, err = f.Await()
s.NoError(err)
}

func (s *SyncManagerSuite) TestClose() {
manager := NewSyncManager(s.chunkManager)
err := manager.Close()
s.NoError(err)

f, err := manager.SyncData(context.Background(), nil)
s.Error(err)
s.Nil(f)
}

func (s *SyncManagerSuite) TestCompacted() {
Expand All @@ -202,10 +213,11 @@ func (s *SyncManagerSuite) TestCompacted() {
Timestamp: 100,
})

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)

_, err := f.Await()
_, err = f.Await()
s.NoError(err)
s.EqualValues(1001, segmentID.Load())
}
Expand Down Expand Up @@ -254,7 +266,7 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
task.EXPECT().Run(mock.Anything).Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
f, _ := manager.SyncData(context.Background(), task)
_, err := f.Await()
s.Error(err)
}
Expand All @@ -268,7 +280,7 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() {
task.EXPECT().Run(mock.Anything).Return(errors.New("mock err")).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
f, _ := manager.SyncData(context.Background(), task)
_, err := f.Await()
s.Error(err)
}
Expand Down
13 changes: 10 additions & 3 deletions internal/flushcommon/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}
}

result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)
}
Expand All @@ -342,7 +342,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
}
}
return nil
}))
})
if err != nil {
log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
}

Check warning on line 348 in internal/flushcommon/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/flushcommon/writebuffer/write_buffer.go#L347-L348

Added lines #L347 - L348 were not covered by tests
result = append(result, future)
}
return result
}
Expand Down Expand Up @@ -643,7 +647,7 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
t.WithDrop()
}

f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
f, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if wb.taskObserverCallback != nil {
wb.taskObserverCallback(syncTask, err)
}
Expand All @@ -656,6 +660,9 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) {
}
return nil
})
if err != nil {
log.Fatal("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err))
}

Check warning on line 665 in internal/flushcommon/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/flushcommon/writebuffer/write_buffer.go#L664-L665

Added lines #L664 - L665 were not covered by tests
futures = append(futures, f)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/flushcommon/writebuffer/write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
return struct{}{}, nil
}))
}), nil)
defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
Expand Down
Loading

0 comments on commit 69fe00b

Please sign in to comment.