Skip to content

Commit

Permalink
fix: sync task still running after DataNode has stopped
Browse files Browse the repository at this point in the history
Signed-off-by: jaime <[email protected]>
  • Loading branch information
jaime0815 committed Dec 16, 2024
1 parent 4ebd6e4 commit 6acf9a6
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 10 deletions.
4 changes: 2 additions & 2 deletions internal/datanode/importv2/task_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ func (t *ImportTask) sync(hashedData HashedData) ([]*conc.Future[struct{}], []sy
}
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Warn("sync data failed", WrapLogFields(t, zap.Error(err))...)
continue
log.Error("sync data failed", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}

Check warning on line 242 in internal/datanode/importv2/task_import.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/importv2/task_import.go#L240-L242

Added lines #L240 - L242 were not covered by tests
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/importv2/task_l0_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ func (t *L0ImportTask) syncDelete(delData []*storage.DeleteData) ([]*conc.Future
}
future, err := t.syncMgr.SyncData(t.ctx, syncTask)
if err != nil {
log.Warn("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
continue
log.Error("failed to sync l0 delete data", WrapLogFields(t, zap.Error(err))...)
return nil, nil, err
}

Check warning on line 230 in internal/datanode/importv2/task_l0_import.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/importv2/task_l0_import.go#L228-L230

Added lines #L228 - L230 were not covered by tests
futures = append(futures, future)
syncTasks = append(syncTasks, syncTask)
Expand Down
3 changes: 3 additions & 0 deletions internal/datanode/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type SyncMeta struct {
type SyncManager interface {
// SyncData is the method to submit sync task.
SyncData(ctx context.Context, task Task, callbacks ...func(error) error) (*conc.Future[struct{}], error)

// Close waits for the task to finish and then shuts down the sync manager.
Close() error
}

type syncManager struct {
Expand Down
7 changes: 5 additions & 2 deletions internal/datanode/syncmgr/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,8 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
task.EXPECT().Run().Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().HandleError(mock.Anything)

f, _ := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)
_, err = f.Await()
s.Error(err)
}
Expand All @@ -300,7 +301,9 @@ func (s *SyncManagerSuite) TestTargetUpdateSameID() {
task.EXPECT().Run().Return(errors.New("mock err")).Once()
task.EXPECT().HandleError(mock.Anything)

f, _ := manager.SyncData(context.Background(), task)
f, err := manager.SyncData(context.Background(), task)
s.NoError(err)

_, err = f.Await()
s.Error(err)
}
Expand Down
6 changes: 2 additions & 4 deletions internal/datanode/writebuffer/write_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,7 @@ func (wb *writeBufferBase) syncSegments(ctx context.Context, segmentIDs []int64)
return nil
})
if err != nil {
log.Warn("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
continue
log.Fatal("failed to sync data", zap.Int64("segmentID", segmentID), zap.Error(err))
}

Check warning on line 351 in internal/datanode/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/writebuffer/write_buffer.go#L350-L351

Added lines #L350 - L351 were not covered by tests
result = append(result, future)
}
Expand Down Expand Up @@ -667,8 +666,7 @@ func (wb *writeBufferBase) Close(drop bool) {
return nil
})
if err != nil {
log.Warn("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err))
continue
log.Fatal("failed to sync segment", zap.Int64("segmentID", id), zap.Error(err))
}

Check warning on line 670 in internal/datanode/writebuffer/write_buffer.go

View check run for this annotation

Codecov / codecov/patch

internal/datanode/writebuffer/write_buffer.go#L668-L670

Added lines #L668 - L670 were not covered by tests
futures = append(futures, f)
}
Expand Down

0 comments on commit 6acf9a6

Please sign in to comment.