From 416330ddf27d8b1c3032c2be7958e474b5215c02 Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Wed, 17 Jan 2024 15:59:15 -0300 Subject: [PATCH] Add more comments --- cmd/event-service/di/service.go | 15 +++++++++++++ service/app/handler_process_saved_event.go | 2 ++ service/domain/downloader/downloader.go | 9 ++++++++ service/domain/downloader/scheduler.go | 25 +++++++++++++++++++-- service/domain/downloader/scheduler_test.go | 12 +++++++--- service/domain/event_kind.go | 5 ++++- 6 files changed, 62 insertions(+), 6 deletions(-) diff --git a/cmd/event-service/di/service.go b/cmd/event-service/di/service.go index 9836e7f..35ec2cb 100644 --- a/cmd/event-service/di/service.go +++ b/cmd/event-service/di/service.go @@ -77,35 +77,50 @@ func (s Service) Run(ctx context.Context) error { runners := 0 runners++ + // Serve http go func() { errCh <- errors.Wrap(s.server.ListenAndServe(ctx), "server error") }() + // Fetch events from the database relays and send them to the in memory pubsub runners++ go func() { errCh <- errors.Wrap(s.downloader.Run(ctx), "downloader error") }() + // Subscribe to the in memory pubsub of events, emit a NewSaveReceivedEvent + // command that will make some checks on the event, save it if the check + // passes and emit a EventSavedEvent to the sqlite pubsub. runners++ go func() { errCh <- errors.Wrap(s.receivedEventSubscriber.Run(ctx), "received event subscriber error") }() + // Subscribe to saved events in the database. This uses the sqlite pubsub. This triggers: + // - analysis to extract new relays and store them in db. They will be used by the downloader. + // - analysis to store pubkeys and store them in the db (contacts_followees, pubkeys, contacts_events). This will be used by the downloader. + // - publish to watermill pubsub + // - may publish the event in wss://relay.nos.social if they are metadata related runners++ go func() { errCh <- errors.Wrap(s.eventSavedEventSubscriber.Run(ctx), "event saved subscriber error") }() + // The metrics timer collects metrics from the app. runners++ go func() { errCh <- errors.Wrap(s.metricsTimer.Run(ctx), "metrics timer error") }() + // Sqlite transaction runner runners++ go func() { errCh <- errors.Wrap(s.transactionRunner.Run(ctx), "transaction runner error") }() + // The task scheduler creates sequential time window based tasks that + // contain filters to be applied to each relay to fetch the events we want. + // Event downloaders subscribe to this. runners++ go func() { errCh <- errors.Wrap(s.taskScheduler.Run(ctx), "task scheduler error") diff --git a/service/app/handler_process_saved_event.go b/service/app/handler_process_saved_event.go index a36688e..e81d5f1 100644 --- a/service/app/handler_process_saved_event.go +++ b/service/app/handler_process_saved_event.go @@ -81,6 +81,7 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE return errors.Wrap(err, "error saving relays and contacts") } + // Published to nostr-events google pubsub if err := h.externalEventPublisher.PublishNewEventReceived(ctx, event); err != nil { return errors.Wrap(err, "error publishing the external event") } @@ -200,6 +201,7 @@ func ShouldSendEventToRelay(event Event) bool { return false } + // Check if the event is too old. if !pushToRelayFilter.IsOk(event) { return false } diff --git a/service/domain/downloader/downloader.go b/service/domain/downloader/downloader.go index ebcb0be..dca7be8 100644 --- a/service/domain/downloader/downloader.go +++ b/service/domain/downloader/downloader.go @@ -129,6 +129,10 @@ func NewDownloader( } } +// Will fetch pubkeys from the database and a list of hardcoded kinds from each relay found in out database. +// These will be used to create tasks that specify nostr filters and contain a time window to control the since and until filter keys. +// Theses filters are used to start queries for each relay. Events found this +// way will be published to all subscribers of the downloader publisher. func (d *Downloader) Run(ctx context.Context) error { go d.storeMetricsLoop(ctx) @@ -167,6 +171,7 @@ func (d *Downloader) storeMetrics() { d.metrics.ReportNumberOfRelayDownloaders(len(d.relayDownloaders)) } +// For each relay from getRelays() start a downloader and kill those that are not part of the list. func (d *Downloader) updateDownloaders(ctx context.Context) error { relays, err := d.getRelays(ctx) if err != nil { @@ -216,6 +221,7 @@ func (d *Downloader) updateDownloaders(ctx context.Context) error { return nil } +// Get the bootstrap relays and those already in the database. func (d *Downloader) getRelays(ctx context.Context) (*internal.Set[domain.RelayAddress], error) { result := internal.NewEmptySet[domain.RelayAddress]() @@ -268,6 +274,7 @@ func NewRelayDownloader( return v } +// Will fetch tasks for the current relay and use them to query it and then publish the event to a pubsub. func (d *RelayDownloader) Start(ctx context.Context) error { ch, err := d.scheduler.GetTasks(ctx, d.address) if err != nil { @@ -276,6 +283,7 @@ func (d *RelayDownloader) Start(ctx context.Context) error { go func() { for task := range ch { + // Uses the filter of the task to fetch events from the relay for the task time window. go d.performTask(task) } }() @@ -290,6 +298,7 @@ func (d *RelayDownloader) performTask(task Task) { } } +// Run the filter specified by the task and for each event found publish it to all subscribers. func (d *RelayDownloader) performTaskWithErr(task Task) error { ch, err := d.relayConnections.GetEvents(task.Ctx(), d.address, task.Filter()) if err != nil { diff --git a/service/domain/downloader/scheduler.go b/service/domain/downloader/scheduler.go index 0d6e025..4c6c0e3 100644 --- a/service/domain/downloader/scheduler.go +++ b/service/domain/downloader/scheduler.go @@ -36,6 +36,7 @@ type Scheduler interface { GetTasks(ctx context.Context, relay domain.RelayAddress) (<-chan Task, error) } +// A TaskScheduler is responsible for generating tasks for all relays by maintaing a list of TaskGenerators. type TaskScheduler struct { taskGeneratorsLock sync.Mutex taskGenerators map[domain.RelayAddress]*RelayTaskGenerator @@ -65,6 +66,7 @@ func (t *TaskScheduler) GetTasks(ctx context.Context, relay domain.RelayAddress) } ch := make(chan Task) + // The subscription for the relay will generate a sequence of tasks mapped to time windows generator.AddSubscription(ctx, ch) return ch, nil } @@ -155,6 +157,16 @@ func newTaskSubscription(ctx context.Context, ch chan Task) *taskSubscription { } } +// A RelayTaskGenerator is responsible for generating tasks for a single relay. +// Each task provides the time window for each query (since and until) and keeps +// track of how many available running queries we can perform on the relay, we only +// allow 3 running queries at a time; for global, author and tag queries across all +// subscription channels. + +// RelayTaskGenerator maintains 3 TimeWindowTaskGenerator, one for each query +// type. Each TimeWindowTaskGenerator maintains a list of TimeWindowTaskTracker, +// one for each time window. Each TimeWindowTaskTracker maintains a list of +// runningRelayDownloader, one for each concurrency setting. Each TimeWindowTaskTracker uses a TimeWindow type RelayTaskGenerator struct { lock sync.Mutex @@ -213,6 +225,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo t.lock.Lock() defer t.lock.Unlock() + // Create tags for the public keys. var pTags []domain.FilterTag for _, publicKey := range publicKeys.Tagged() { tag, err := domain.NewFilterTag(domain.TagProfile, publicKey.Hex()) @@ -222,6 +235,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo pTags = append(pTags, tag) } + // Delete each done subscription. slices.DeleteFunc(t.taskSubscriptions, func(subscription *taskSubscription) bool { select { case <-subscription.ctx.Done(): @@ -233,6 +247,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo sentTasksForAtLeastOneSubscription := false for _, taskSubscription := range t.taskSubscriptions { + // Send a task for each subscription numberOfSentTasks, err := t.pushTasks(taskSubscription.ctx, taskSubscription.ch, publicKeys.Authors(), pTags) if err != nil { return false, errors.Wrap(err, "error sending out generators") @@ -245,6 +260,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo return sentTasksForAtLeastOneSubscription, nil } +// Pushes tasks to the task channel. If tasks are not done nothing is pushed. func (t *RelayTaskGenerator) pushTasks(ctx context.Context, ch chan<- Task, authors []domain.PublicKey, tags []domain.FilterTag) (int, error) { tasks, err := t.getTasksToPush(ctx, authors, tags) if err != nil { @@ -317,12 +333,17 @@ func NewTimeWindowTaskGenerator( }, nil } +// A task generator creates a task tracker per concurrency setting. The tracker +// will be used to return the corresponding task, if the task is still runnning +// it will return no task. If the task is done it will discard the current +// tracker, create a new one and return a new task. +// Each task generated will be pushed to all subscribers of the scheduler func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) { t.lock.Lock() defer t.lock.Unlock() - t.taskTrackers = slices.DeleteFunc(t.taskTrackers, func(task *TimeWindowTaskTracker) bool { - return task.CheckIfDoneAndEnd() + t.taskTrackers = slices.DeleteFunc(t.taskTrackers, func(tracker *TimeWindowTaskTracker) bool { + return tracker.CheckIfDoneAndEnd() }) for i := len(t.taskTrackers); i < timeWindowTaskConcurrency; i++ { diff --git a/service/domain/downloader/scheduler_test.go b/service/domain/downloader/scheduler_test.go index 35a2926..13455af 100644 --- a/service/domain/downloader/scheduler_test.go +++ b/service/domain/downloader/scheduler_test.go @@ -42,7 +42,8 @@ func TestTaskScheduler_SchedulerWaitsForTasksToCompleteBeforeProducingMore(t *te ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress()) require.NoError(t, err) - tasks := collectAllTasks(ctx, ch, false) + completeTasks := false + tasks := collectAllTasks(ctx, ch, completeTasks) require.EventuallyWithT(t, func(t *assert.CollectT) { require.Equal(t, numberOfTaskTypes, len(tasks.Tasks())) @@ -67,9 +68,12 @@ func TestTaskScheduler_SchedulerDoesNotProduceEmptyTasks(t *testing.T) { ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress()) require.NoError(t, err) - tasks := collectAllTasks(ctx, ch, false) + completeTasks := false + tasks := collectAllTasks(ctx, ch, completeTasks) <-time.After(5 * time.Second) + + // No public keys so no task for authors or ptags, but still one for event kinds require.Len(t, tasks.Tasks(), 1) } @@ -91,7 +95,8 @@ func TestTaskScheduler_SchedulerProducesTasksFromSequentialTimeWindowsLeadingUpT ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress()) require.NoError(t, err) - tasks := collectAllTasks(ctx, ch, true) + completeTasks := true + tasks := collectAllTasks(ctx, ch, completeTasks) require.EventuallyWithT(t, func(t *assert.CollectT) { filters := make(map[downloader.TimeWindow][]domain.Filter) @@ -99,6 +104,7 @@ func TestTaskScheduler_SchedulerProducesTasksFromSequentialTimeWindowsLeadingUpT start := task.Filter().Since() duration := task.Filter().Until().Sub(*start) window := downloader.MustNewTimeWindow(*start, duration) + // there will be 3 tasks per window, one for kind filter, one for authors filter and one for tags filter filters[window] = append(filters[window], task.Filter()) } diff --git a/service/domain/event_kind.go b/service/domain/event_kind.go index 3e05448..33cd9ae 100644 --- a/service/domain/event_kind.go +++ b/service/domain/event_kind.go @@ -12,7 +12,10 @@ var ( EventKindEncryptedDirectMessage = MustNewEventKind(4) EventKindReaction = MustNewEventKind(7) EventKindRelayListMetadata = MustNewEventKind(10002) - EventKindRegistration = MustNewEventKind(6666) + // TODO: This should be changed to 30078 + // See https://github.com/nostr-protocol/nips/blob/master/78.md , 6666 is + // reserved by nip 90 + EventKindRegistration = MustNewEventKind(6666) ) type EventKind struct {