Skip to content

Commit

Permalink
Fix tracking incorrect context
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Dec 15, 2023
1 parent b63b44a commit 6c45d1e
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 51 deletions.
2 changes: 1 addition & 1 deletion service/domain/downloader/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (t *TimeWindowTaskGenerator) maybeGenerateNewTracker(ctx context.Context) (
return nil, false, nil
}
t.lastWindow = nextWindow
v, err := NewTimeWindowTaskTracker(ctx, nextWindow)
v, err := NewTimeWindowTaskTracker(nextWindow)
if err != nil {
return nil, false, errors.Wrap(err, "error creating a task")
}
Expand Down
86 changes: 53 additions & 33 deletions service/domain/downloader/time_window_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,36 +29,14 @@ type TimeWindowTaskTracker struct {
lock sync.Mutex
}

func NewTimeWindowTaskTracker(
ctx context.Context,
window TimeWindow,
) (*TimeWindowTaskTracker, error) {
ctx, cancel := context.WithCancel(ctx)
func NewTimeWindowTaskTracker(window TimeWindow) (*TimeWindowTaskTracker, error) {
t := &TimeWindowTaskTracker{
ctx: ctx,
cancel: cancel,
state: TimeWindowTaskStateNew,
window: window,
}
return t, nil
}

func (t *TimeWindowTaskTracker) MarkAsDone() {
t.lock.Lock()
defer t.lock.Unlock()

t.state = TimeWindowTaskStateDone
}

func (t *TimeWindowTaskTracker) MarkAsFailed() {
t.lock.Lock()
defer t.lock.Unlock()

if t.state != TimeWindowTaskStateDone {
t.state = TimeWindowTaskStateError
}
}

func (t *TimeWindowTaskTracker) CheckIfDoneAndEnd() bool {
t.lock.Lock()
defer t.lock.Unlock()
Expand Down Expand Up @@ -95,45 +73,87 @@ func (t *TimeWindowTaskTracker) MaybeStart(ctx context.Context, kinds []domain.E
return nil, false, errors.Wrap(err, "error creating a filter")
}

t.cancel()
if t.cancel != nil {
t.cancel()
}

ctx, cancel := context.WithCancel(ctx)

t.ctx = ctx
t.cancel = cancel
t.state = TimeWindowTaskStateStarted

task := NewTimeWindowTask(filter, t)
task := NewTimeWindowTask(ctx, filter, t)
return task, true, nil
}

func (t *TimeWindowTaskTracker) markAsDone() {
t.lock.Lock()
defer t.lock.Unlock()

if t.state == TimeWindowTaskStateStarted {
t.state = TimeWindowTaskStateDone
}
}

func (t *TimeWindowTaskTracker) markAsFailed() {
t.lock.Lock()
defer t.lock.Unlock()

if t.state != TimeWindowTaskStateDone {
t.state = TimeWindowTaskStateError
}
}

func (t *TimeWindowTaskTracker) isDead() bool {
return t.ctxIsDead() || t.state == TimeWindowTaskStateError || t.state == TimeWindowTaskStateNew
}

func (t *TimeWindowTaskTracker) ctxIsDead() bool {
if t.ctx == nil {
return true
}
if err := t.ctx.Err(); err != nil {
return true
}
return t.state == TimeWindowTaskStateError || t.state == TimeWindowTaskStateNew
return false
}

type TimeWindowTask struct {
filter domain.Filter
t *TimeWindowTaskTracker
ctx context.Context
filter domain.Filter
tracker tracker
}

func NewTimeWindowTask(filter domain.Filter, t *TimeWindowTaskTracker) *TimeWindowTask {
return &TimeWindowTask{filter: filter, t: t}
func NewTimeWindowTask(ctx context.Context, filter domain.Filter, tracker tracker) *TimeWindowTask {
return &TimeWindowTask{
ctx: ctx,
filter: filter,
tracker: tracker,
}
}

func (t *TimeWindowTask) Ctx() context.Context {
return t.t.ctx
return t.ctx
}

func (t *TimeWindowTask) Filter() domain.Filter {
return t.filter
}

func (t *TimeWindowTask) OnReceivedEOSE() {
t.t.MarkAsDone()
if t.ctx.Err() == nil {
t.tracker.markAsDone()
}
}

func (t *TimeWindowTask) OnError(err error) {
t.t.MarkAsFailed()
if t.ctx.Err() == nil {
t.tracker.markAsFailed()
}
}

type tracker interface {
markAsDone()
markAsFailed()
}
48 changes: 31 additions & 17 deletions service/domain/downloader/time_window_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,65 +11,79 @@ import (
func TestTimeWindowTaskTracker_ReportingErrorsAfterTaskIsConsideredToBeDoneShouldBeIgnored(t *testing.T) {
ctx := fixtures.TestContext(t)

task, err := downloader.NewTimeWindowTaskTracker(ctx, fixtures.SomeTimeWindow())
tracker, err := downloader.NewTimeWindowTaskTracker(fixtures.SomeTimeWindow())
require.NoError(t, err)

task.MarkAsDone()
task.MarkAsFailed()
task, ok, err := tracker.MaybeStart(ctx, nil, nil, nil)
require.NoError(t, err)
require.True(t, ok)

ok := task.CheckIfDoneAndEnd()
task.OnReceivedEOSE()

ok = tracker.CheckIfDoneAndEnd()
require.True(t, ok)

task.OnError(fixtures.SomeError())

ok = tracker.CheckIfDoneAndEnd()
require.True(t, ok)
}

func TestTimeWindowTaskTracker_NewTasksCanBeStarted(t *testing.T) {
ctx := fixtures.TestContext(t)

task, err := downloader.NewTimeWindowTaskTracker(ctx, fixtures.SomeTimeWindow())
tracker, err := downloader.NewTimeWindowTaskTracker(fixtures.SomeTimeWindow())
require.NoError(t, err)

_, ok, err := task.MaybeStart(ctx, nil, nil, nil)
_, ok, err := tracker.MaybeStart(ctx, nil, nil, nil)
require.NoError(t, err)
require.True(t, ok)
}

func TestTimeWindowTaskTracker_FailedTasksCanBeStarted(t *testing.T) {
ctx := fixtures.TestContext(t)

task, err := downloader.NewTimeWindowTaskTracker(ctx, fixtures.SomeTimeWindow())
tracker, err := downloader.NewTimeWindowTaskTracker(fixtures.SomeTimeWindow())
require.NoError(t, err)

task.MarkAsFailed()
task, ok, err := tracker.MaybeStart(ctx, nil, nil, nil)
require.NoError(t, err)
require.True(t, ok)

task.OnError(fixtures.SomeError())

_, ok, err := task.MaybeStart(ctx, nil, nil, nil)
_, ok, err = tracker.MaybeStart(ctx, nil, nil, nil)
require.NoError(t, err)
require.True(t, ok)
}

func TestTimeWindowTaskTracker_StartedTasksCanNotBeStarted(t *testing.T) {
ctx := fixtures.TestContext(t)

task, err := downloader.NewTimeWindowTaskTracker(ctx, fixtures.SomeTimeWindow())
tracker, err := downloader.NewTimeWindowTaskTracker(fixtures.SomeTimeWindow())
require.NoError(t, err)

task.MarkAsFailed()

_, ok, err := task.MaybeStart(ctx, nil, nil, nil)
_, ok, err := tracker.MaybeStart(ctx, nil, nil, nil)
require.NoError(t, err)
require.True(t, ok)

_, ok, err = task.MaybeStart(ctx, nil, nil, nil)
_, ok, err = tracker.MaybeStart(ctx, nil, nil, nil)
require.NoError(t, err)
require.False(t, ok)
}

func TestTimeWindowTaskTracker_DoneTasksCanNotBeStarted(t *testing.T) {
ctx := fixtures.TestContext(t)

task, err := downloader.NewTimeWindowTaskTracker(ctx, fixtures.SomeTimeWindow())
tracker, err := downloader.NewTimeWindowTaskTracker(fixtures.SomeTimeWindow())
require.NoError(t, err)

task.MarkAsDone()
task, ok, err := tracker.MaybeStart(ctx, nil, nil, nil)
require.NoError(t, err)
require.True(t, ok)

task.OnReceivedEOSE()

_, _, err = task.MaybeStart(ctx, nil, nil, nil)
_, _, err = tracker.MaybeStart(ctx, nil, nil, nil)
require.EqualError(t, err, "why are we trying to reset a completed task?")
}

0 comments on commit 6c45d1e

Please sign in to comment.