diff --git a/service/domain/downloader/scheduler.go b/service/domain/downloader/scheduler.go index 5d37676..0d6e025 100644 --- a/service/domain/downloader/scheduler.go +++ b/service/domain/downloader/scheduler.go @@ -172,7 +172,6 @@ func NewRelayTaskGenerator( logger logging.Logger, ) (*RelayTaskGenerator, error) { globalTask, err := NewTimeWindowTaskGenerator( - globalEventKindsToDownload, currentTimeProvider, logger, ) @@ -180,7 +179,6 @@ func NewRelayTaskGenerator( return nil, errors.Wrap(err, "error creating the global task") } authorTask, err := NewTimeWindowTaskGenerator( - nil, currentTimeProvider, logger, ) @@ -188,7 +186,6 @@ func NewRelayTaskGenerator( return nil, errors.Wrap(err, "error creating the author task") } tagTask, err := NewTimeWindowTaskGenerator( - nil, currentTimeProvider, logger, ) @@ -267,42 +264,42 @@ func (t *RelayTaskGenerator) pushTasks(ctx context.Context, ch chan<- Task, auth func (t *RelayTaskGenerator) getTasksToPush(ctx context.Context, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) { var result []Task - for _, generator := range t.generators(authors, tags) { - tasks, err := generator.Generate(ctx, authors, tags) + + tasks, err := t.globalTask.Generate(ctx, globalEventKindsToDownload, nil, nil) + if err != nil { + return nil, errors.Wrap(err, "error calling one of the generators") + } + result = append(result, tasks...) + + if len(authors) > 0 { + tasks, err := t.authorTask.Generate(ctx, nil, authors, nil) if err != nil { return nil, errors.Wrap(err, "error calling one of the generators") } result = append(result, tasks...) } - return result, nil -} - -func (t *RelayTaskGenerator) generators(authors []domain.PublicKey, tags []domain.FilterTag) []*TimeWindowTaskGenerator { - generators := []*TimeWindowTaskGenerator{t.globalTask} - - if len(authors) > 0 { - generators = append(generators, t.authorTask) - } if len(tags) > 0 { - generators = append(generators, t.tagTask) + tasks, err := t.tagTask.Generate(ctx, nil, nil, tags) + if err != nil { + return nil, errors.Wrap(err, "error calling one of the generators") + } + result = append(result, tasks...) } - return generators + return result, nil } type TimeWindowTaskGenerator struct { lastWindow TimeWindow taskTrackers []*TimeWindowTaskTracker lock sync.Mutex - eventKinds []domain.EventKind currentTimeProvider CurrentTimeProvider logger logging.Logger } func NewTimeWindowTaskGenerator( - eventKinds []domain.EventKind, currentTimeProvider CurrentTimeProvider, logger logging.Logger, ) (*TimeWindowTaskGenerator, error) { @@ -314,14 +311,13 @@ func NewTimeWindowTaskGenerator( } return &TimeWindowTaskGenerator{ - eventKinds: eventKinds, lastWindow: startingWindow, currentTimeProvider: currentTimeProvider, logger: logger.New("timeWindowTaskGenerator"), }, nil } -func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) { +func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) { t.lock.Lock() defer t.lock.Unlock() @@ -341,7 +337,7 @@ func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, authors []domain var result []Task for _, tracker := range t.taskTrackers { - task, ok, err := tracker.MaybeStart(ctx, t.eventKinds, authors, tags) + task, ok, err := tracker.MaybeStart(ctx, kinds, authors, tags) if err != nil { return nil, errors.Wrap(err, "error resetting a task") }