Skip to content

Commit

Permalink
Publish selected event kinds to relay.nos.social (#30)
Browse files Browse the repository at this point in the history
The idea is to make it work like https://purplepag.es/what.
  • Loading branch information
boreq authored Nov 24, 2023
1 parent 6b51f07 commit a973cd2
Show file tree
Hide file tree
Showing 10 changed files with 437 additions and 52 deletions.
4 changes: 2 additions & 2 deletions cmd/event-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

var (
Expand Down Expand Up @@ -121,3 +122,8 @@ type ContactsExtractor interface {
type Subscriber interface {
EventSavedQueueLength(ctx context.Context) (int, error)
}

type RelayConnections interface {
GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
SendEvent(ctx context.Context, relayAddress domain.RelayAddress, event domain.Event) error
}
5 changes: 0 additions & 5 deletions service/app/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

const (
Expand Down Expand Up @@ -42,10 +41,6 @@ type RelaySource interface {
GetRelays(ctx context.Context) ([]domain.RelayAddress, error)
}

type RelayConnections interface {
GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
}

type PublicKeySource interface {
GetPublicKeys(ctx context.Context) ([]domain.PublicKey, error)
}
Expand Down
54 changes: 54 additions & 0 deletions service/app/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package app

import (
"time"

"github.com/planetary-social/nos-event-service/service/domain"
)

type EventFilter struct {
filterNewerThan *time.Duration
filterOlderThan *time.Duration
filterLargerThan *int
filterWithMoreTagsThan *int
}

func NewEventFilter(
filterNewerThan *time.Duration,
filterOlderThan *time.Duration,
filterLargerThan *int,
filterWithMoreTagsThan *int,
) *EventFilter {
return &EventFilter{
filterNewerThan: filterNewerThan,
filterOlderThan: filterOlderThan,
filterLargerThan: filterLargerThan,
filterWithMoreTagsThan: filterWithMoreTagsThan,
}
}

func (f EventFilter) IsOk(event domain.Event) bool {
if f.filterOlderThan != nil {
maxPastAllowed := time.Now().Add(-*f.filterOlderThan)
if event.CreatedAt().Before(maxPastAllowed) {
return false
}
}

if f.filterNewerThan != nil {
maxFutureAllowed := time.Now().Add(*f.filterNewerThan)
if event.CreatedAt().After(maxFutureAllowed) {
return false
}
}

if f.filterLargerThan != nil && len(event.Raw()) > *f.filterLargerThan {
return false
}

if f.filterWithMoreTagsThan != nil && len(event.Tags()) > *f.filterWithMoreTagsThan {
return false
}

return true
}
58 changes: 58 additions & 0 deletions service/app/handler_process_saved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,28 @@ package app

import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

var (
nosRelayAddress = domain.MustNewRelayAddress("wss://relay.nos.social")
eventKindsWhichShouldBeSentToRelay = internal.NewSetVariadic(
domain.EventKindMetadata,
domain.EventKindContacts,
domain.EventKindRelayListMetadata,
)
pushToRelayFilter = NewEventFilter(
internal.Pointer(900*time.Second),
nil,
internal.Pointer(65536),
internal.Pointer(1024),
)
)

type ProcessSavedEvent struct {
Expand All @@ -21,6 +39,7 @@ type ProcessSavedEventHandler struct {
relaysExtractor RelaysExtractor
contactsExtractor ContactsExtractor
externalEventPublisher ExternalEventPublisher
relayConnections RelayConnections
logger logging.Logger
metrics Metrics
}
Expand All @@ -30,6 +49,7 @@ func NewProcessSavedEventHandler(
relaysExtractor RelaysExtractor,
contactsExtractor ContactsExtractor,
externalEventPublisher ExternalEventPublisher,
relayConnections RelayConnections,
logger logging.Logger,
metrics Metrics,
) *ProcessSavedEventHandler {
Expand All @@ -38,6 +58,7 @@ func NewProcessSavedEventHandler(
relaysExtractor: relaysExtractor,
contactsExtractor: contactsExtractor,
externalEventPublisher: externalEventPublisher,
relayConnections: relayConnections,
logger: logger.New("processSavedEventHandler"),
metrics: metrics,
}
Expand All @@ -59,6 +80,10 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE
return errors.Wrap(err, "error publishing the external event")
}

if err := h.maybeSendEventToRelay(ctx, event); err != nil {
return errors.Wrapf(err, "error sending the event '%s' to relay", event.Id().Hex())
}

return nil
}

Expand Down Expand Up @@ -138,3 +163,36 @@ func (h *ProcessSavedEventHandler) shouldReplaceContacts(ctx context.Context, ad

return domain.ShouldReplaceContactsEvent(oldEvent, newEvent)
}

func (h *ProcessSavedEventHandler) maybeSendEventToRelay(ctx context.Context, event domain.Event) error {
if !eventKindsWhichShouldBeSentToRelay.Contains(event.Kind()) {
return nil
}

if !pushToRelayFilter.IsOk(event) {
return nil
}

if err := h.relayConnections.SendEvent(ctx, nosRelayAddress, event); err != nil {
if h.shouldDisregardSendEventErr(err) {
return nil
}
return errors.Wrap(err, "error sending event to relay")
}

return nil
}

func (h *ProcessSavedEventHandler) shouldDisregardSendEventErr(err error) bool {
var okResponseErr relays.OKResponseError
if errors.As(err, &okResponseErr) {
switch okResponseErr.Reason() {
case "replaced: have newer event":
return true
default:
return false
}
}

return false
}
15 changes: 15 additions & 0 deletions service/app/handler_save_received_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package app

import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
)

var (
saveFilter = NewEventFilter(
internal.Pointer(12*time.Hour),
nil,
internal.Pointer(1*1000*1000),
internal.Pointer(10000),
)
)

type SaveReceivedEvent struct {
relay domain.RelayAddress
event domain.Event
Expand Down Expand Up @@ -46,6 +57,10 @@ func (h *SaveReceivedEventHandler) Handle(ctx context.Context, cmd SaveReceivedE
WithField("event.kind", cmd.event.Kind().Int()).
Message("saving received event")

if !saveFilter.IsOk(cmd.event) {
return nil
}

if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error {
exists, err := h.eventAlreadyExists(ctx, adapters, cmd.event)
if err != nil {
Expand Down
Loading

0 comments on commit a973cd2

Please sign in to comment.