Skip to content

Commit

Permalink
Merge pull request #73 from planetary-social/more-comments
Browse files Browse the repository at this point in the history
Add more comments
  • Loading branch information
dcadenas authored Jan 19, 2024
2 parents 523185e + 416330d commit dc8a851
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 6 deletions.
15 changes: 15 additions & 0 deletions cmd/event-service/di/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,35 +77,50 @@ func (s Service) Run(ctx context.Context) error {
runners := 0

runners++
// Serve http
go func() {
errCh <- errors.Wrap(s.server.ListenAndServe(ctx), "server error")
}()

// Fetch events from the database relays and send them to the in memory pubsub
runners++
go func() {
errCh <- errors.Wrap(s.downloader.Run(ctx), "downloader error")
}()

// Subscribe to the in memory pubsub of events, emit a NewSaveReceivedEvent
// command that will make some checks on the event, save it if the check
// passes and emit a EventSavedEvent to the sqlite pubsub.
runners++
go func() {
errCh <- errors.Wrap(s.receivedEventSubscriber.Run(ctx), "received event subscriber error")
}()

// Subscribe to saved events in the database. This uses the sqlite pubsub. This triggers:
// - analysis to extract new relays and store them in db. They will be used by the downloader.
// - analysis to store pubkeys and store them in the db (contacts_followees, pubkeys, contacts_events). This will be used by the downloader.
// - publish to watermill pubsub
// - may publish the event in wss://relay.nos.social if they are metadata related
runners++
go func() {
errCh <- errors.Wrap(s.eventSavedEventSubscriber.Run(ctx), "event saved subscriber error")
}()

// The metrics timer collects metrics from the app.
runners++
go func() {
errCh <- errors.Wrap(s.metricsTimer.Run(ctx), "metrics timer error")
}()

// Sqlite transaction runner
runners++
go func() {
errCh <- errors.Wrap(s.transactionRunner.Run(ctx), "transaction runner error")
}()

// The task scheduler creates sequential time window based tasks that
// contain filters to be applied to each relay to fetch the events we want.
// Event downloaders subscribe to this.
runners++
go func() {
errCh <- errors.Wrap(s.taskScheduler.Run(ctx), "task scheduler error")
Expand Down
2 changes: 2 additions & 0 deletions service/app/handler_process_saved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE
return errors.Wrap(err, "error saving relays and contacts")
}

// Published to nostr-events google pubsub
if err := h.externalEventPublisher.PublishNewEventReceived(ctx, event); err != nil {
return errors.Wrap(err, "error publishing the external event")
}
Expand Down Expand Up @@ -200,6 +201,7 @@ func ShouldSendEventToRelay(event Event) bool {
return false
}

// Check if the event is too old.
if !pushToRelayFilter.IsOk(event) {
return false
}
Expand Down
9 changes: 9 additions & 0 deletions service/domain/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func NewDownloader(
}
}

// Will fetch pubkeys from the database and a list of hardcoded kinds from each relay found in out database.
// These will be used to create tasks that specify nostr filters and contain a time window to control the since and until filter keys.
// Theses filters are used to start queries for each relay. Events found this
// way will be published to all subscribers of the downloader publisher.
func (d *Downloader) Run(ctx context.Context) error {
go d.storeMetricsLoop(ctx)

Expand Down Expand Up @@ -167,6 +171,7 @@ func (d *Downloader) storeMetrics() {
d.metrics.ReportNumberOfRelayDownloaders(len(d.relayDownloaders))
}

// For each relay from getRelays() start a downloader and kill those that are not part of the list.
func (d *Downloader) updateDownloaders(ctx context.Context) error {
relays, err := d.getRelays(ctx)
if err != nil {
Expand Down Expand Up @@ -216,6 +221,7 @@ func (d *Downloader) updateDownloaders(ctx context.Context) error {
return nil
}

// Get the bootstrap relays and those already in the database.
func (d *Downloader) getRelays(ctx context.Context) (*internal.Set[domain.RelayAddress], error) {
result := internal.NewEmptySet[domain.RelayAddress]()

Expand Down Expand Up @@ -268,6 +274,7 @@ func NewRelayDownloader(
return v
}

// Will fetch tasks for the current relay and use them to query it and then publish the event to a pubsub.
func (d *RelayDownloader) Start(ctx context.Context) error {
ch, err := d.scheduler.GetTasks(ctx, d.address)
if err != nil {
Expand All @@ -276,6 +283,7 @@ func (d *RelayDownloader) Start(ctx context.Context) error {

go func() {
for task := range ch {
// Uses the filter of the task to fetch events from the relay for the task time window.
go d.performTask(task)
}
}()
Expand All @@ -290,6 +298,7 @@ func (d *RelayDownloader) performTask(task Task) {
}
}

// Run the filter specified by the task and for each event found publish it to all subscribers.
func (d *RelayDownloader) performTaskWithErr(task Task) error {
ch, err := d.relayConnections.GetEvents(task.Ctx(), d.address, task.Filter())
if err != nil {
Expand Down
25 changes: 23 additions & 2 deletions service/domain/downloader/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type Scheduler interface {
GetTasks(ctx context.Context, relay domain.RelayAddress) (<-chan Task, error)
}

// A TaskScheduler is responsible for generating tasks for all relays by maintaing a list of TaskGenerators.
type TaskScheduler struct {
taskGeneratorsLock sync.Mutex
taskGenerators map[domain.RelayAddress]*RelayTaskGenerator
Expand Down Expand Up @@ -65,6 +66,7 @@ func (t *TaskScheduler) GetTasks(ctx context.Context, relay domain.RelayAddress)
}

ch := make(chan Task)
// The subscription for the relay will generate a sequence of tasks mapped to time windows
generator.AddSubscription(ctx, ch)
return ch, nil
}
Expand Down Expand Up @@ -155,6 +157,16 @@ func newTaskSubscription(ctx context.Context, ch chan Task) *taskSubscription {
}
}

// A RelayTaskGenerator is responsible for generating tasks for a single relay.
// Each task provides the time window for each query (since and until) and keeps
// track of how many available running queries we can perform on the relay, we only
// allow 3 running queries at a time; for global, author and tag queries across all
// subscription channels.

// RelayTaskGenerator maintains 3 TimeWindowTaskGenerator, one for each query
// type. Each TimeWindowTaskGenerator maintains a list of TimeWindowTaskTracker,
// one for each time window. Each TimeWindowTaskTracker maintains a list of
// runningRelayDownloader, one for each concurrency setting. Each TimeWindowTaskTracker uses a TimeWindow
type RelayTaskGenerator struct {
lock sync.Mutex

Expand Down Expand Up @@ -213,6 +225,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo
t.lock.Lock()
defer t.lock.Unlock()

// Create tags for the public keys.
var pTags []domain.FilterTag
for _, publicKey := range publicKeys.Tagged() {
tag, err := domain.NewFilterTag(domain.TagProfile, publicKey.Hex())
Expand All @@ -222,6 +235,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo
pTags = append(pTags, tag)
}

// Delete each done subscription.
slices.DeleteFunc(t.taskSubscriptions, func(subscription *taskSubscription) bool {
select {
case <-subscription.ctx.Done():
Expand All @@ -233,6 +247,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo

sentTasksForAtLeastOneSubscription := false
for _, taskSubscription := range t.taskSubscriptions {
// Send a task for each subscription
numberOfSentTasks, err := t.pushTasks(taskSubscription.ctx, taskSubscription.ch, publicKeys.Authors(), pTags)
if err != nil {
return false, errors.Wrap(err, "error sending out generators")
Expand All @@ -245,6 +260,7 @@ func (t *RelayTaskGenerator) SendOutTasks(publicKeys *PublicKeysToReplicate) (bo
return sentTasksForAtLeastOneSubscription, nil
}

// Pushes tasks to the task channel. If tasks are not done nothing is pushed.
func (t *RelayTaskGenerator) pushTasks(ctx context.Context, ch chan<- Task, authors []domain.PublicKey, tags []domain.FilterTag) (int, error) {
tasks, err := t.getTasksToPush(ctx, authors, tags)
if err != nil {
Expand Down Expand Up @@ -317,12 +333,17 @@ func NewTimeWindowTaskGenerator(
}, nil
}

// A task generator creates a task tracker per concurrency setting. The tracker
// will be used to return the corresponding task, if the task is still runnning
// it will return no task. If the task is done it will discard the current
// tracker, create a new one and return a new task.
// Each task generated will be pushed to all subscribers of the scheduler
func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.EventKind, authors []domain.PublicKey, tags []domain.FilterTag) ([]Task, error) {
t.lock.Lock()
defer t.lock.Unlock()

t.taskTrackers = slices.DeleteFunc(t.taskTrackers, func(task *TimeWindowTaskTracker) bool {
return task.CheckIfDoneAndEnd()
t.taskTrackers = slices.DeleteFunc(t.taskTrackers, func(tracker *TimeWindowTaskTracker) bool {
return tracker.CheckIfDoneAndEnd()
})

for i := len(t.taskTrackers); i < timeWindowTaskConcurrency; i++ {
Expand Down
12 changes: 9 additions & 3 deletions service/domain/downloader/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TestTaskScheduler_SchedulerWaitsForTasksToCompleteBeforeProducingMore(t *te
ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress())
require.NoError(t, err)

tasks := collectAllTasks(ctx, ch, false)
completeTasks := false
tasks := collectAllTasks(ctx, ch, completeTasks)

require.EventuallyWithT(t, func(t *assert.CollectT) {
require.Equal(t, numberOfTaskTypes, len(tasks.Tasks()))
Expand All @@ -67,9 +68,12 @@ func TestTaskScheduler_SchedulerDoesNotProduceEmptyTasks(t *testing.T) {
ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress())
require.NoError(t, err)

tasks := collectAllTasks(ctx, ch, false)
completeTasks := false
tasks := collectAllTasks(ctx, ch, completeTasks)

<-time.After(5 * time.Second)

// No public keys so no task for authors or ptags, but still one for event kinds
require.Len(t, tasks.Tasks(), 1)
}

Expand All @@ -91,14 +95,16 @@ func TestTaskScheduler_SchedulerProducesTasksFromSequentialTimeWindowsLeadingUpT
ch, err := ts.Scheduler.GetTasks(ctx, fixtures.SomeRelayAddress())
require.NoError(t, err)

tasks := collectAllTasks(ctx, ch, true)
completeTasks := true
tasks := collectAllTasks(ctx, ch, completeTasks)

require.EventuallyWithT(t, func(t *assert.CollectT) {
filters := make(map[downloader.TimeWindow][]domain.Filter)
for _, task := range tasks.Tasks() {
start := task.Filter().Since()
duration := task.Filter().Until().Sub(*start)
window := downloader.MustNewTimeWindow(*start, duration)
// there will be 3 tasks per window, one for kind filter, one for authors filter and one for tags filter
filters[window] = append(filters[window], task.Filter())
}

Expand Down
5 changes: 4 additions & 1 deletion service/domain/event_kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ var (
EventKindEncryptedDirectMessage = MustNewEventKind(4)
EventKindReaction = MustNewEventKind(7)
EventKindRelayListMetadata = MustNewEventKind(10002)
EventKindRegistration = MustNewEventKind(6666)
// TODO: This should be changed to 30078
// See https://github.com/nostr-protocol/nips/blob/master/78.md , 6666 is
// reserved by nip 90
EventKindRegistration = MustNewEventKind(6666)
)

type EventKind struct {
Expand Down

0 comments on commit dc8a851

Please sign in to comment.