diff --git a/configs/milvus.yaml b/configs/milvus.yaml index a16e60061fa06..b575af456d553 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -878,6 +878,8 @@ common: useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode. + sync: + taskPoolReleaseTimeoutSeconds: 60 # The maximum time to wait for the task to finish and release resources in the pool # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index 9067cb1088c75..7ee1b6a6be46d 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -402,6 +402,13 @@ func (node *DataNode) Stop() error { node.writeBufferManager.Stop() } + if node.syncMgr != nil { + err := node.syncMgr.Close() + if err != nil { + log.Error("sync manager close failed", zap.Error(err)) + } + } + if node.allocator != nil { log.Ctx(node.ctx).Info("close id allocator", zap.String("role", typeutil.DataNodeRole)) node.allocator.Close() diff --git a/internal/datanode/importv2/scheduler_test.go b/internal/datanode/importv2/scheduler_test.go index 62cff3f3ddedc..7752c382187d1 100644 --- a/internal/datanode/importv2/scheduler_test.go +++ b/internal/datanode/importv2/scheduler_test.go @@ -246,11 +246,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() { cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.cm = cm - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) { future := conc.Go(func() (struct{}, error) { return struct{}{}, nil }) - return future + return future, nil }) importReq := &datapb.ImportRequest{ JobID: 10, @@ -307,11 +307,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() { cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil) s.cm = cm - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) { future := conc.Go(func() (struct{}, error) { return struct{}{}, errors.New("mock err") }) - return future + return future, nil }) importReq := &datapb.ImportRequest{ JobID: 10, @@ -384,11 +384,11 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() { } func (s *SchedulerSuite) TestScheduler_ImportFile() { - s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { + s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) { future := conc.Go(func() (struct{}, error) { return struct{}{}, nil }) - return future + return future, nil }) var once sync.Once data, err := testutil.CreateInsertData(s.schema, s.numRows) diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index a365ec1a0c5c1..cdd71e53f4f4f 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -228,7 +228,7 @@ func (t *ImportTask) importFile(reader importutilv2.Reader) error { } func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []syncmgr.Task, error) { - log.Info("start to sync import data", WrapLogFields(t)...) + log.Ctx(context.TODO()).Info("start to sync import data", WrapLogFields(t)...) futures := make([]*conc.Future[struct{}], 0) syncTasks := make([]syncmgr.Task, 0) for channelIdx, datas := range hashedData { @@ -256,7 +256,11 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy if err != nil { return nil, nil, err } - future := t.syncMgr.SyncData(t.ctx, syncTask) + future, err := t.syncMgr.SyncData(t.ctx, syncTask) + if err != nil { + log.Ctx(context.TODO()).Error("sync data failed", WrapLogFields(t, zap.Error(err))...) + return nil, nil, err + } futures = append(futures, future) syncTasks = append(syncTasks, syncTask) } diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index ac87d056a7557..5be5452a398c1 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -213,7 +213,7 @@ func (t *L0ImportTask) importL0(reader binlog.L0Reader) error { } func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future[struct{}], []syncmgr.Task, error) { - log.Info("start to sync l0 delete data", WrapLogFields(t)...) + log.Ctx(context.TODO()).Info("start to sync l0 delete data", WrapLogFields(t)...) futures := make([]*conc.Future[struct{}], 0) syncTasks := make([]syncmgr.Task, 0) for channelIdx, data := range delData { @@ -231,7 +231,11 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future if err != nil { return nil, nil, err } - future := t.syncMgr.SyncData(t.ctx, syncTask) + future, err := t.syncMgr.SyncData(t.ctx, syncTask) + if err != nil { + log.Ctx(context.TODO()).Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...) + return nil, nil, err + } futures = append(futures, future) syncTasks = append(syncTasks, syncTask) } diff --git a/internal/datanode/importv2/task_l0_import_test.go b/internal/datanode/importv2/task_l0_import_test.go index e08238addbedf..9c2996eeb1d99 100644 --- a/internal/datanode/importv2/task_l0_import_test.go +++ b/internal/datanode/importv2/task_l0_import_test.go @@ -132,7 +132,7 @@ func (s *L0ImportSuite) TestL0PreImport() { func (s *L0ImportSuite) TestL0Import() { s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything). - RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] { + RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) { alloc := allocator.NewMockAllocator(s.T()) alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil) task.(*syncmgr.SyncTask).WithAllocator(alloc) @@ -147,7 +147,7 @@ func (s *L0ImportSuite) TestL0Import() { future := conc.Go(func() (struct{}, error) { return struct{}{}, nil }) - return future + return future, nil }) req := &datapb.ImportRequest{ diff --git a/internal/flushcommon/syncmgr/mock_sync_manager.go b/internal/flushcommon/syncmgr/mock_sync_manager.go index dc57aaee95c88..0078c8a23c62c 100644 --- a/internal/flushcommon/syncmgr/mock_sync_manager.go +++ b/internal/flushcommon/syncmgr/mock_sync_manager.go @@ -23,8 +23,53 @@ func (_m *MockSyncManager) EXPECT() *MockSyncManager_Expecter { return &MockSyncManager_Expecter{mock: &_m.Mock} } +// Close provides a mock function with given fields: +func (_m *MockSyncManager) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockSyncManager_Close_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Close' +type MockSyncManager_Close_Call struct { + *mock.Call +} + +// Close is a helper method to define mock.On call +func (_e *MockSyncManager_Expecter) Close() *MockSyncManager_Close_Call { + return &MockSyncManager_Close_Call{Call: _e.mock.On("Close")} +} + +func (_c *MockSyncManager_Close_Call) Run(run func()) *MockSyncManager_Close_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *MockSyncManager_Close_Call) Return(_a0 error) *MockSyncManager_Close_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockSyncManager_Close_Call) RunAndReturn(run func() error) *MockSyncManager_Close_Call { + _c.Call.Return(run) + return _c +} + // SyncData provides a mock function with given fields: ctx, task, callbacks -func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] { +func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) { _va := make([]interface{}, len(callbacks)) for _i := range callbacks { _va[_i] = callbacks[_i] @@ -39,6 +84,10 @@ func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks .. } var r0 *conc.Future[struct{}] + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, Task, ...func(error) error) (*conc.Future[struct{}], error)); ok { + return rf(ctx, task, callbacks...) + } if rf, ok := ret.Get(0).(func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]); ok { r0 = rf(ctx, task, callbacks...) } else { @@ -47,7 +96,13 @@ func (_m *MockSyncManager) SyncData(ctx context.Context, task Task, callbacks .. } } - return r0 + if rf, ok := ret.Get(1).(func(context.Context, Task, ...func(error) error) error); ok { + r1 = rf(ctx, task, callbacks...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } // MockSyncManager_SyncData_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'SyncData' @@ -77,12 +132,12 @@ func (_c *MockSyncManager_SyncData_Call) Run(run func(ctx context.Context, task return _c } -func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}]) *MockSyncManager_SyncData_Call { - _c.Call.Return(_a0) +func (_c *MockSyncManager_SyncData_Call) Return(_a0 *conc.Future[struct{}], _a1 error) *MockSyncManager_SyncData_Call { + _c.Call.Return(_a0, _a1) return _c } -func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task, ...func(error) error) *conc.Future[struct{}]) *MockSyncManager_SyncData_Call { +func (_c *MockSyncManager_SyncData_Call) RunAndReturn(run func(context.Context, Task, ...func(error) error) (*conc.Future[struct{}], error)) *MockSyncManager_SyncData_Call { _c.Call.Return(run) return _c } diff --git a/internal/flushcommon/syncmgr/sync_manager.go b/internal/flushcommon/syncmgr/sync_manager.go index eb48de924429b..e02611812ef9a 100644 --- a/internal/flushcommon/syncmgr/sync_manager.go +++ b/internal/flushcommon/syncmgr/sync_manager.go @@ -47,8 +47,10 @@ type SyncMeta struct { //go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage type SyncManager interface { // SyncData is the method to submit sync task. - SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] + SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) + // Close waits for the task to finish and then shuts down the sync manager. + Close() error TaskStatsJSON() string } @@ -97,13 +99,17 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) { } } -func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] { +func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) { + if mgr.workerPool.IsClosed() { + return nil, fmt.Errorf("sync manager is closed") + } + switch t := task.(type) { case *SyncTask: t.WithChunkManager(mgr.chunkManager) } - return mgr.safeSubmitTask(ctx, task, callbacks...) + return mgr.safeSubmitTask(ctx, task, callbacks...), nil } // safeSubmitTask submits task to SyncManager @@ -126,6 +132,7 @@ func (mgr *syncManager) submit(ctx context.Context, key int64, task Task, callba } callbacks = append([]func(error) error{handler}, callbacks...) log.Info("sync mgr sumbit task with key", zap.Int64("key", key)) + return mgr.Submit(ctx, key, task, callbacks...) } @@ -142,3 +149,8 @@ func (mgr *syncManager) TaskStatsJSON() string { } return string(ret) } + +func (mgr *syncManager) Close() error { + timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second) + return mgr.workerPool.ReleaseTimeout(timeout) +} diff --git a/internal/flushcommon/syncmgr/sync_manager_test.go b/internal/flushcommon/syncmgr/sync_manager_test.go index 40f8c868ce7a3..0531f481db8bd 100644 --- a/internal/flushcommon/syncmgr/sync_manager_test.go +++ b/internal/flushcommon/syncmgr/sync_manager_test.go @@ -173,11 +173,22 @@ func (s *SyncManagerSuite) TestSubmit() { Timestamp: 100, }) - f := manager.SyncData(context.Background(), task) + f, err := manager.SyncData(context.Background(), task) + s.NoError(err) s.NotNil(f) - _, err := f.Await() + _, err = f.Await() + s.NoError(err) +} + +func (s *SyncManagerSuite) TestClose() { + manager := NewSyncManager(s.chunkManager) + err := manager.Close() s.NoError(err) + + f, err := manager.SyncData(context.Background(), nil) + s.Error(err) + s.Nil(f) } func (s *SyncManagerSuite) TestCompacted() { @@ -202,10 +213,11 @@ func (s *SyncManagerSuite) TestCompacted() { Timestamp: 100, }) - f := manager.SyncData(context.Background(), task) + f, err := manager.SyncData(context.Background(), task) + s.NoError(err) s.NotNil(f) - _, err := f.Await() + _, err = f.Await() s.NoError(err) s.EqualValues(1001, segmentID.Load()) } @@ -254,7 +266,7 @@ func (s *SyncManagerSuite) TestUnexpectedError() { task.EXPECT().Run(mock.Anything).Return(merr.WrapErrServiceInternal("mocked")).Once() task.EXPECT().HandleError(mock.Anything) - f := manager.SyncData(context.Background(), task) + f, _ := manager.SyncData(context.Background(), task) _, err := f.Await() s.Error(err) } @@ -268,7 +280,7 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() { task.EXPECT().Run(mock.Anything).Return(errors.New("mock err")).Once() task.EXPECT().HandleError(mock.Anything) - f := manager.SyncData(context.Background(), task) + f, _ := manager.SyncData(context.Background(), task) _, err := f.Await() s.Error(err) } diff --git a/internal/flushcommon/writebuffer/write_buffer.go b/internal/flushcommon/writebuffer/write_buffer.go index ec46d23589e90..07aa582546d18 100644 --- a/internal/flushcommon/writebuffer/write_buffer.go +++ b/internal/flushcommon/writebuffer/write_buffer.go @@ -322,7 +322,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) } } - result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error { + future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error { if wb.taskObserverCallback != nil { wb.taskObserverCallback(syncTask, err) } @@ -342,7 +342,11 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) } } return nil - })) + }) + if err != nil { + log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err)) + } + result = append(result, future) } return result } @@ -643,7 +647,7 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) { t.WithDrop() } - f := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error { + f, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error { if wb.taskObserverCallback != nil { wb.taskObserverCallback(syncTask, err) } @@ -656,6 +660,9 @@ func (wb *writeBufferBase) Close(ctx context.Context, drop bool) { } return nil }) + if err != nil { + log.Fatal("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err)) + } futures = append(futures, f) } diff --git a/internal/flushcommon/writebuffer/write_buffer_test.go b/internal/flushcommon/writebuffer/write_buffer_test.go index b40b36720e16b..6f6f20df179a1 100644 --- a/internal/flushcommon/writebuffer/write_buffer_test.go +++ b/internal/flushcommon/writebuffer/write_buffer_test.go @@ -317,7 +317,7 @@ func (s *WriteBufferSuite) TestEvictBuffer() { serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil) s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) { return struct{}{}, nil - })) + }), nil) defer func() { s.wb.mut.Lock() defer s.wb.mut.Unlock() diff --git a/pkg/util/conc/pool.go b/pkg/util/conc/pool.go index f042dc04b2b35..9376c9113ce2b 100644 --- a/pkg/util/conc/pool.go +++ b/pkg/util/conc/pool.go @@ -20,8 +20,9 @@ import ( "fmt" "strconv" "sync" + "time" - ants "github.com/panjf2000/ants/v2" + "github.com/panjf2000/ants/v2" "github.com/milvus-io/milvus/pkg/util/generic" "github.com/milvus-io/milvus/pkg/util/hardware" @@ -107,10 +108,18 @@ func (pool *Pool[T]) Free() int { return pool.inner.Free() } +func (pool *Pool[T]) IsClosed() bool { + return pool.inner.IsClosed() +} + func (pool *Pool[T]) Release() { pool.inner.Release() } +func (pool *Pool[T]) ReleaseTimeout(timeout time.Duration) error { + return pool.inner.ReleaseTimeout(timeout) +} + func (pool *Pool[T]) Resize(size int) error { if pool.opt.preAlloc { return merr.WrapErrServiceInternal("cannot resize pre-alloc pool") diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 01a6c604149f5..f663cea499c23 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -290,6 +290,8 @@ type commonConfig struct { HealthCheckInterval ParamItem `refreshable:"true"` HealthCheckRPCTimeout ParamItem `refreshable:"true"` + + SyncTaskPoolReleaseTimeoutSeconds ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -965,6 +967,15 @@ This helps Milvus-CDC synchronize incremental data`, Doc: `RPC timeout for health check request`, } p.HealthCheckRPCTimeout.Init(base.mgr) + + p.SyncTaskPoolReleaseTimeoutSeconds = ParamItem{ + Key: "common.sync.taskPoolReleaseTimeoutSeconds", + DefaultValue: "60", + Version: "2.4.19", + Doc: "The maximum time to wait for the task to finish and release resources in the pool", + Export: true, + } + p.SyncTaskPoolReleaseTimeoutSeconds.Init(base.mgr) } type gpuConfig struct { diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 28c86e3ab4bd9..f33efe6fb33f4 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -143,6 +143,10 @@ func TestComponentParam(t *testing.T) { assert.False(t, params.CommonCfg.LocalRPCEnabled.GetAsBool()) params.Save("common.localRPCEnabled", "true") assert.True(t, params.CommonCfg.LocalRPCEnabled.GetAsBool()) + + assert.Equal(t, 60*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)) + params.Save("common.sync.taskPoolReleaseTimeoutSeconds", "100") + assert.Equal(t, 100*time.Second, params.CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)) }) t.Run("test rootCoordConfig", func(t *testing.T) {