diff --git a/internal/datanode/importv2/task_import.go b/internal/datanode/importv2/task_import.go index 02e239061fb3e..f0216f3d0e1be 100644 --- a/internal/datanode/importv2/task_import.go +++ b/internal/datanode/importv2/task_import.go @@ -237,8 +237,8 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy } future, err := t.syncMgr.SyncData(t.ctx, syncTask) if err != nil { - log.Warn("sync data failed", WrapLogFields(t, zap.Error(err))...) - continue + log.Error("sync data failed", WrapLogFields(t, zap.Error(err))...) + return nil, nil, err } futures = append(futures, future) syncTasks = append(syncTasks, syncTask) diff --git a/internal/datanode/importv2/task_l0_import.go b/internal/datanode/importv2/task_l0_import.go index 07d48337e9c2b..358b46b283aab 100644 --- a/internal/datanode/importv2/task_l0_import.go +++ b/internal/datanode/importv2/task_l0_import.go @@ -225,8 +225,8 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future } future, err := t.syncMgr.SyncData(t.ctx, syncTask) if err != nil { - log.Warn("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...) - continue + log.Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...) + return nil, nil, err } futures = append(futures, future) syncTasks = append(syncTasks, syncTask) diff --git a/internal/datanode/syncmgr/sync_manager.go b/internal/datanode/syncmgr/sync_manager.go index 3c1896400fb1a..84915f4575797 100644 --- a/internal/datanode/syncmgr/sync_manager.go +++ b/internal/datanode/syncmgr/sync_manager.go @@ -47,6 +47,9 @@ type SyncMeta struct { type SyncManager interface { // SyncData is the method to submit sync task. SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error) + + // Close waits for the task to finish and then shuts down the sync manager. + Close() error } type syncManager struct { diff --git a/internal/datanode/syncmgr/sync_manager_test.go b/internal/datanode/syncmgr/sync_manager_test.go index d2908b923e844..919ec39dad129 100644 --- a/internal/datanode/syncmgr/sync_manager_test.go +++ b/internal/datanode/syncmgr/sync_manager_test.go @@ -285,7 +285,8 @@ func (s *SyncManagerSuite) TestUnexpectedError() { task.EXPECT().Run().Return(merr.WrapErrServiceInternal("mocked")).Once() task.EXPECT().HandleError(mock.Anything) - f, _ := manager.SyncData(context.Background(), task) + f, err := manager.SyncData(context.Background(), task) + s.NoError(err) _, err = f.Await() s.Error(err) } @@ -300,7 +301,9 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() { task.EXPECT().Run().Return(errors.New("mock err")).Once() task.EXPECT().HandleError(mock.Anything) - f, _ := manager.SyncData(context.Background(), task) + f, err := manager.SyncData(context.Background(), task) + s.NoError(err) + _, err = f.Await() s.Error(err) } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 8352f5ad57f5d..2b22b6311f8ca 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -347,8 +347,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64) return nil }) if err != nil { - log.Warn("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err)) - continue + log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err)) } result = append(result, future) } @@ -667,8 +666,7 @@ func (wb *writeBufferBase) Close(drop bool) { return nil }) if err != nil { - log.Warn("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err)) - continue + log.Fatal("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err)) } futures = append(futures, f) }