Skip to content

Commit

Permalink
Publish selected event kinds to relay.nos.social
Browse files Browse the repository at this point in the history
The idea is to make it work like https://purplepag.es/what.
  • Loading branch information
boreq committed Nov 24, 2023
1 parent 6b51f07 commit 735fb36
Show file tree
Hide file tree
Showing 9 changed files with 391 additions and 52 deletions.
4 changes: 2 additions & 2 deletions cmd/event-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

var (
Expand Down Expand Up @@ -121,3 +122,8 @@ type ContactsExtractor interface {
type Subscriber interface {
EventSavedQueueLength(ctx context.Context) (int, error)
}

type RelayConnections interface {
GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
SendEvent(ctx context.Context, relayAddress domain.RelayAddress, event domain.Event) error
}
5 changes: 0 additions & 5 deletions service/app/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

const (
Expand Down Expand Up @@ -42,10 +41,6 @@ type RelaySource interface {
GetRelays(ctx context.Context) ([]domain.RelayAddress, error)
}

type RelayConnections interface {
GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
}

type PublicKeySource interface {
GetPublicKeys(ctx context.Context) ([]domain.PublicKey, error)
}
Expand Down
54 changes: 54 additions & 0 deletions service/app/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package app

import (
"time"

"github.com/planetary-social/nos-event-service/service/domain"
)

type EventFilter struct {
filterNewerThan *time.Duration
filterOlderThan *time.Duration
filterLargerThan *int
filterWithMoreTagsThan *int
}

func NewEventFilter(
filterNewerThan *time.Duration,
filterOlderThan *time.Duration,
filterLargerThan *int,
filterWithMoreTagsThan *int,
) *EventFilter {
return &EventFilter{
filterNewerThan: filterNewerThan,
filterOlderThan: filterOlderThan,
filterLargerThan: filterLargerThan,
filterWithMoreTagsThan: filterWithMoreTagsThan,
}
}

func (f EventFilter) IsOk(event domain.Event) bool {
if f.filterOlderThan != nil {
maxPastAllowed := time.Now().Add(-*f.filterOlderThan)
if event.CreatedAt().Before(maxPastAllowed) {
return false
}
}

if f.filterNewerThan != nil {
maxFutureAllowed := time.Now().Add(*f.filterNewerThan)
if event.CreatedAt().After(maxFutureAllowed) {
return false
}
}

if f.filterLargerThan != nil && len(event.Raw()) > *f.filterLargerThan {
return false
}

if f.filterWithMoreTagsThan != nil && len(event.Tags()) > *f.filterWithMoreTagsThan {
return false
}

return true
}
38 changes: 38 additions & 0 deletions service/app/handler_process_saved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,31 @@ package app

import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
)

const year = 365 * 24 * time.Hour

Check failure on line 13 in service/app/handler_process_saved_event.go

View workflow job for this annotation

GitHub Actions / Run CI (1.21)

const `year` is unused (unused)

var (
nosRelayAddress = domain.MustNewRelayAddress("wss://relay.nos.social")
eventKindsWhichShouldBeSentToRelay = internal.NewSetVariadic(
domain.EventKindMetadata,
domain.EventKindContacts,
domain.EventKindRelayListMetadata,
)
pushToRelayFilter = NewEventFilter(
internal.Pointer(900*time.Second),
nil,
internal.Pointer(65536),
internal.Pointer(1024),
)
)

type ProcessSavedEvent struct {
id domain.EventId
}
Expand All @@ -21,6 +40,7 @@ type ProcessSavedEventHandler struct {
relaysExtractor RelaysExtractor
contactsExtractor ContactsExtractor
externalEventPublisher ExternalEventPublisher
relayConnections RelayConnections
logger logging.Logger
metrics Metrics
}
Expand All @@ -30,6 +50,7 @@ func NewProcessSavedEventHandler(
relaysExtractor RelaysExtractor,
contactsExtractor ContactsExtractor,
externalEventPublisher ExternalEventPublisher,
relayConnections RelayConnections,
logger logging.Logger,
metrics Metrics,
) *ProcessSavedEventHandler {
Expand All @@ -38,6 +59,7 @@ func NewProcessSavedEventHandler(
relaysExtractor: relaysExtractor,
contactsExtractor: contactsExtractor,
externalEventPublisher: externalEventPublisher,
relayConnections: relayConnections,
logger: logger.New("processSavedEventHandler"),
metrics: metrics,
}
Expand All @@ -59,6 +81,10 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE
return errors.Wrap(err, "error publishing the external event")
}

if err := h.maybeSendEventToRelay(ctx, event); err != nil {
return errors.Wrap(err, "error sending the event to relay")
}

return nil
}

Expand Down Expand Up @@ -138,3 +164,15 @@ func (h *ProcessSavedEventHandler) shouldReplaceContacts(ctx context.Context, ad

return domain.ShouldReplaceContactsEvent(oldEvent, newEvent)
}

func (h *ProcessSavedEventHandler) maybeSendEventToRelay(ctx context.Context, event domain.Event) error {
if !eventKindsWhichShouldBeSentToRelay.Contains(event.Kind()) {
return nil
}

if !pushToRelayFilter.IsOk(event) {
return nil
}

return h.relayConnections.SendEvent(ctx, nosRelayAddress, event)
}
15 changes: 15 additions & 0 deletions service/app/handler_save_received_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package app

import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
)

var (
saveFilter = NewEventFilter(
internal.Pointer(12*time.Hour),
nil,
internal.Pointer(1*1000*1000),
internal.Pointer(10000),
)
)

type SaveReceivedEvent struct {
relay domain.RelayAddress
event domain.Event
Expand Down Expand Up @@ -46,6 +57,10 @@ func (h *SaveReceivedEventHandler) Handle(ctx context.Context, cmd SaveReceivedE
WithField("event.kind", cmd.event.Kind().Int()).
Message("saving received event")

if !saveFilter.IsOk(cmd.event) {
return nil
}

if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error {
exists, err := h.eventAlreadyExists(ctx, adapters, cmd.event)
if err != nil {
Expand Down
Loading

0 comments on commit 735fb36

Please sign in to comment.