Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sync task still running after DataNode has stopped #38441

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,8 @@ common:
useVectorAsClusteringKey: false # if true, do clustering compaction and segment prune on vector field
enableVectorClusteringKey: false # if true, enable vector clustering key and vector clustering compaction
localRPCEnabled: false # enable local rpc for internal communication when mix or standalone mode.
sync:
taskPoolReleaseTimeoutSeconds: 60 # The maximum time to wait for the task to finish and release resources in the pool

# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
Expand Down
7 changes: 7 additions & 0 deletions internal/datanode/data_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@
node.writeBufferManager.Stop()
}

if node.syncMgr != nil {
err := node.syncMgr.Close()
if err != nil {
log.Error("sync manager close failed", zap.Error(err))
}

Check warning on line 471 in internal/datanode/data_node.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/data_node.go#L470-L471

Added lines #L470 - L471 were not covered by tests
}

if node.allocator != nil {
log.Info("close id allocator", zap.String("role", typeutil.DataNodeRole))
node.allocator.Close()
Expand Down
12 changes: 6 additions & 6 deletions internal/datanode/importv2/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm

s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
Expand Down Expand Up @@ -307,11 +307,11 @@ func (s *SchedulerSuite) TestScheduler_Start_Import_Failed() {
cm.EXPECT().Reader(mock.Anything, mock.Anything).Return(&mockReader{Reader: ioReader}, nil)
s.cm = cm

s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, errors.New("mock err")
})
return future
return future, nil
})
importReq := &datapb.ImportRequest{
JobID: 10,
Expand Down Expand Up @@ -384,11 +384,11 @@ func (s *SchedulerSuite) TestScheduler_ReadFileStat() {
}

func (s *SchedulerSuite) TestScheduler_ImportFile() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything).RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})
var once sync.Once
data, err := testutil.CreateInsertData(s.schema, s.numRows)
Expand Down
6 changes: 5 additions & 1 deletion internal/datanode/importv2/task_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Error("sync data failed", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}

Check warning on line 242 in internal/datanode/importv2/task_import.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/importv2/task_import.go#L240-L242

Added lines #L240 - L242 were not covered by tests
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/datanode/importv2/task_l0_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,11 @@
if err != nil {
return nil, nil, err
}
future := t.syncMgr.SyncData(t.ctx, syncTask)
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}

Check warning on line 230 in internal/datanode/importv2/task_l0_import.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/importv2/task_l0_import.go#L228-L230

Added lines #L228 - L230 were not covered by tests
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/importv2/task_l0_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (s *L0ImportSuite) TestL0PreImport() {

func (s *L0ImportSuite) TestL0Import() {
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) *conc.Future[struct{}] {
RunAndReturn(func(ctx context.Context, task syncmgr.Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
alloc := allocator.NewMockAllocator(s.T())
alloc.EXPECT().Alloc(mock.Anything).Return(1, int64(s.delCnt)+1, nil)
task.(*syncmgr.SyncTask).WithAllocator(alloc)
Expand All @@ -147,7 +147,7 @@ func (s *L0ImportSuite) TestL0Import() {
future := conc.Go(func() (struct{}, error) {
return struct{}{}, nil
})
return future
return future, nil
})

req := &datapb.ImportRequest{
Expand Down
65 changes: 60 additions & 5 deletions internal/datanode/syncmgr/mock_sync_manager.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 16 additions & 3 deletions internal/datanode/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"time"

"go.uber.org/zap"

Expand Down Expand Up @@ -45,7 +46,10 @@ type SyncMeta struct {
//go:generate mockery --name=SyncManager --structname=MockSyncManager --output=./ --filename=mock_sync_manager.go --with-expecter --inpackage
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}]
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)

// Close waits for the task to finish and then shuts down the sync manager.
Close() error
}

type syncManager struct {
Expand Down Expand Up @@ -97,15 +101,19 @@ func (mgr *syncManager) resizeHandler(evt *config.Event) {
}
}

func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) {
if mgr.workerPool.IsClosed() {
return nil, fmt.Errorf("sync manager is closed")
}

switch t := task.(type) {
case *SyncTask:
t.WithAllocator(mgr.allocator).WithChunkManager(mgr.chunkManager)
case *SyncTaskV2:
t.WithAllocator(mgr.allocator)
}

return mgr.safeSubmitTask(task, callbacks...)
return mgr.safeSubmitTask(task, callbacks...), nil
}

// safeSubmitTask submits task to SyncManager
Expand Down Expand Up @@ -147,3 +155,8 @@ func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPo
})
return segmentID, cp
}

func (mgr *syncManager) Close() error {
timeout := paramtable.Get().CommonCfg.SyncTaskPoolReleaseTimeoutSeconds.GetAsDuration(time.Second)
return mgr.workerPool.ReleaseTimeout(timeout)
}
25 changes: 21 additions & 4 deletions internal/datanode/syncmgr/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,26 @@ func (s *SyncManagerSuite) TestSubmit() {
Timestamp: 100,
})

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)

_, err = f.Await()
s.NoError(err)
}

func (s *SyncManagerSuite) TestClose() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

err = manager.Close()
s.NoError(err)

f, err := manager.SyncData(context.Background(), nil)
s.Error(err)
s.Nil(f)
}

func (s *SyncManagerSuite) TestCompacted() {
var segmentID atomic.Int64
s.broker.EXPECT().SaveBinlogPaths(mock.Anything, mock.Anything).Run(func(_ context.Context, req *datapb.SaveBinlogPathsRequest) {
Expand All @@ -200,7 +213,8 @@ func (s *SyncManagerSuite) TestCompacted() {
Timestamp: 100,
})

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
s.NotNil(f)

_, err = f.Await()
Expand Down Expand Up @@ -271,7 +285,8 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
task.EXPECT().Run().Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
_, err = f.Await()
s.Error(err)
}
Expand All @@ -286,7 +301,9 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() {
task.EXPECT().Run().Return(errors.New("mock err")).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)

_, err = f.Await()
s.Error(err)
}
Expand Down
13 changes: 10 additions & 3 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@
}
}

result = append(result, wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
future, err := wb.syncMgr.SyncData(ctx, syncTask, func(err error) error {
if err != nil {
return err
}
Expand All @@ -345,7 +345,11 @@
wb.syncCheckpoint.Remove(syncTask.SegmentID(), syncTask.StartPosition().GetTimestamp())
}
return nil
}))
})
if err != nil {
log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
}

Check warning on line 351 in internal/datanode/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/writebuffer/write_buffer.go#L350-L351

Added lines #L350 - L351 were not covered by tests
result = append(result, future)
}
return result
}
Expand Down Expand Up @@ -652,7 +656,7 @@
t.WithDrop()
}

f := wb.syncMgr.SyncData(context.Background(), syncTask, func(err error) error {
f, err := wb.syncMgr.SyncData(context.Background(), syncTask, func(err error) error {
if err != nil {
return err
}
Expand All @@ -661,6 +665,9 @@
}
return nil
})
if err != nil {
log.Fatal("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err))
}

Check warning on line 670 in internal/datanode/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/writebuffer/write_buffer.go#L669-L670

Added lines #L669 - L670 were not covered by tests
futures = append(futures, f)
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/writebuffer/write_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (s *WriteBufferSuite) TestEvictBuffer() {
serializer.EXPECT().EncodeBuffer(mock.Anything, mock.Anything).Return(syncmgr.NewSyncTask(), nil)
s.syncMgr.EXPECT().SyncData(mock.Anything, mock.Anything, mock.Anything).Return(conc.Go[struct{}](func() (struct{}, error) {
return struct{}{}, nil
}))
}), nil)
defer func() {
s.wb.mut.Lock()
defer s.wb.mut.Unlock()
Expand Down
11 changes: 10 additions & 1 deletion pkg/util/conc/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (
"fmt"
"strconv"
"sync"
"time"

ants "github.com/panjf2000/ants/v2"
"github.com/panjf2000/ants/v2"

"github.com/milvus-io/milvus/pkg/util/generic"
"github.com/milvus-io/milvus/pkg/util/hardware"
Expand Down Expand Up @@ -107,10 +108,18 @@ func (pool *Pool[T]) Free() int {
return pool.inner.Free()
}

func (pool *Pool[T]) IsClosed() bool {
return pool.inner.IsClosed()
}

func (pool *Pool[T]) Release() {
pool.inner.Release()
}

func (pool *Pool[T]) ReleaseTimeout(timeout time.Duration) error {
return pool.inner.ReleaseTimeout(timeout)
}

func (pool *Pool[T]) Resize(size int) error {
if pool.opt.preAlloc {
return merr.WrapErrServiceInternal("cannot resize pre-alloc pool")
Expand Down
Loading
Loading