Skip to content

Commit

Permalink
Fix all generators getting the same parameters
Browse files Browse the repository at this point in the history
  • Loading branch information
boreq committed Dec 15, 2023
1 parent 6c45d1e commit 4f3cd1d
Showing 1 changed file with 17 additions and 21 deletions.
38 changes: 17 additions & 21 deletions service/domain/downloader/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,23 +172,20 @@ func NewRelayTaskGenerator(
logger logging.Logger,
) (*RelayTaskGenerator, error) {
globalTask, err := NewTimeWindowTaskGenerator(
globalEventKindsToDownload,
currentTimeProvider,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "error creating the global task")
}
authorTask, err := NewTimeWindowTaskGenerator(
nil,
currentTimeProvider,
logger,
)
if err != nil {
return nil, errors.Wrap(err, "error creating the author task")
}
tagTask, err := NewTimeWindowTaskGenerator(
nil,
currentTimeProvider,
logger,
)
Expand Down Expand Up @@ -267,42 +264,42 @@ func (t *RelayTaskGenerator) pushTasks(ctx context.Context, ch chan<- Task, auth

func (t *RelayTaskGenerator) getTasksToPush(ctx context.Context, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) {
var result []Task
for _, generator := range t.generators(authors, tags) {
tasks, err := generator.Generate(ctx, authors, tags)

tasks, err := t.globalTask.Generate(ctx, globalEventKindsToDownload, nil, nil)
if err != nil {
return nil, errors.Wrap(err, "error calling one of the generators")
}
result = append(result, tasks...)

if len(authors) > 0 {
tasks, err := t.authorTask.Generate(ctx, nil, authors, nil)
if err != nil {
return nil, errors.Wrap(err, "error calling one of the generators")
}
result = append(result, tasks...)
}
return result, nil
}

func (t *RelayTaskGenerator) generators(authors []domain.PublicKey, tags []domain.FilterTag) []*TimeWindowTaskGenerator {
generators := []*TimeWindowTaskGenerator{t.globalTask}

if len(authors) > 0 {
generators = append(generators, t.authorTask)
}

if len(tags) > 0 {
generators = append(generators, t.tagTask)
tasks, err := t.tagTask.Generate(ctx, nil, nil, tags)
if err != nil {
return nil, errors.Wrap(err, "error calling one of the generators")
}
result = append(result, tasks...)
}

return generators
return result, nil
}

type TimeWindowTaskGenerator struct {
lastWindow TimeWindow
taskTrackers []*TimeWindowTaskTracker
lock sync.Mutex
eventKinds []domain.EventKind

currentTimeProvider CurrentTimeProvider
logger logging.Logger
}

func NewTimeWindowTaskGenerator(
eventKinds []domain.EventKind,
currentTimeProvider CurrentTimeProvider,
logger logging.Logger,
) (*TimeWindowTaskGenerator, error) {
Expand All @@ -314,14 +311,13 @@ func NewTimeWindowTaskGenerator(
}

return &TimeWindowTaskGenerator{
eventKinds: eventKinds,
lastWindow: startingWindow,
currentTimeProvider: currentTimeProvider,
logger: logger.New("timeWindowTaskGenerator"),
}, nil
}

func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) {
func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) {
t.lock.Lock()
defer t.lock.Unlock()

Expand All @@ -341,7 +337,7 @@ func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, authors []domain

var result []Task
for _, tracker := range t.taskTrackers {
task, ok, err := tracker.MaybeStart(ctx, t.eventKinds, authors, tags)
task, ok, err := tracker.MaybeStart(ctx, kinds, authors, tags)
if err != nil {
return nil, errors.Wrap(err, "error resetting a task")
}
Expand Down

0 comments on commit 4f3cd1d

Please sign in to comment.