Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish selected event kinds to relay.nos.social #30

Merged
merged 1 commit into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/event-service/di/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions service/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

var (
Expand Down Expand Up @@ -121,3 +122,8 @@ type ContactsExtractor interface {
type Subscriber interface {
EventSavedQueueLength(ctx context.Context) (int, error)
}

type RelayConnections interface {
GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
SendEvent(ctx context.Context, relayAddress domain.RelayAddress, event domain.Event) error
}
5 changes: 0 additions & 5 deletions service/app/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

const (
Expand Down Expand Up @@ -42,10 +41,6 @@ type RelaySource interface {
GetRelays(ctx context.Context) ([]domain.RelayAddress, error)
}

type RelayConnections interface {
GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error)
}

type PublicKeySource interface {
GetPublicKeys(ctx context.Context) ([]domain.PublicKey, error)
}
Expand Down
54 changes: 54 additions & 0 deletions service/app/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package app

import (
"time"

"github.com/planetary-social/nos-event-service/service/domain"
)

type EventFilter struct {
filterNewerThan *time.Duration
filterOlderThan *time.Duration
filterLargerThan *int
filterWithMoreTagsThan *int
}

func NewEventFilter(
filterNewerThan *time.Duration,
filterOlderThan *time.Duration,
filterLargerThan *int,
filterWithMoreTagsThan *int,
) *EventFilter {
return &EventFilter{
filterNewerThan: filterNewerThan,
filterOlderThan: filterOlderThan,
filterLargerThan: filterLargerThan,
filterWithMoreTagsThan: filterWithMoreTagsThan,
}
}

func (f EventFilter) IsOk(event domain.Event) bool {
if f.filterOlderThan != nil {
maxPastAllowed := time.Now().Add(-*f.filterOlderThan)
if event.CreatedAt().Before(maxPastAllowed) {
return false
}
}

if f.filterNewerThan != nil {
maxFutureAllowed := time.Now().Add(*f.filterNewerThan)
if event.CreatedAt().After(maxFutureAllowed) {
return false
}
}

if f.filterLargerThan != nil && len(event.Raw()) > *f.filterLargerThan {
return false
}

if f.filterWithMoreTagsThan != nil && len(event.Tags()) > *f.filterWithMoreTagsThan {
return false
}

return true
}
58 changes: 58 additions & 0 deletions service/app/handler_process_saved_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,28 @@ package app

import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
"github.com/planetary-social/nos-event-service/service/domain/relays"
)

var (
nosRelayAddress = domain.MustNewRelayAddress("wss://relay.nos.social")
eventKindsWhichShouldBeSentToRelay = internal.NewSetVariadic(
domain.EventKindMetadata,
domain.EventKindContacts,
domain.EventKindRelayListMetadata,
)
pushToRelayFilter = NewEventFilter(
internal.Pointer(900*time.Second),
nil,
internal.Pointer(65536),
internal.Pointer(1024),
)
)

type ProcessSavedEvent struct {
Expand All @@ -21,6 +39,7 @@ type ProcessSavedEventHandler struct {
relaysExtractor RelaysExtractor
contactsExtractor ContactsExtractor
externalEventPublisher ExternalEventPublisher
relayConnections RelayConnections
logger logging.Logger
metrics Metrics
}
Expand All @@ -30,6 +49,7 @@ func NewProcessSavedEventHandler(
relaysExtractor RelaysExtractor,
contactsExtractor ContactsExtractor,
externalEventPublisher ExternalEventPublisher,
relayConnections RelayConnections,
logger logging.Logger,
metrics Metrics,
) *ProcessSavedEventHandler {
Expand All @@ -38,6 +58,7 @@ func NewProcessSavedEventHandler(
relaysExtractor: relaysExtractor,
contactsExtractor: contactsExtractor,
externalEventPublisher: externalEventPublisher,
relayConnections: relayConnections,
logger: logger.New("processSavedEventHandler"),
metrics: metrics,
}
Expand All @@ -59,6 +80,10 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE
return errors.Wrap(err, "error publishing the external event")
}

if err := h.maybeSendEventToRelay(ctx, event); err != nil {
return errors.Wrapf(err, "error sending the event '%s' to relay", event.Id().Hex())
}

return nil
}

Expand Down Expand Up @@ -138,3 +163,36 @@ func (h *ProcessSavedEventHandler) shouldReplaceContacts(ctx context.Context, ad

return domain.ShouldReplaceContactsEvent(oldEvent, newEvent)
}

func (h *ProcessSavedEventHandler) maybeSendEventToRelay(ctx context.Context, event domain.Event) error {
if !eventKindsWhichShouldBeSentToRelay.Contains(event.Kind()) {
return nil
}

if !pushToRelayFilter.IsOk(event) {
return nil
}

if err := h.relayConnections.SendEvent(ctx, nosRelayAddress, event); err != nil {
if h.shouldDisregardSendEventErr(err) {
return nil
}
return errors.Wrap(err, "error sending event to relay")
}

return nil
}

func (h *ProcessSavedEventHandler) shouldDisregardSendEventErr(err error) bool {
var okResponseErr relays.OKResponseError
if errors.As(err, &okResponseErr) {
switch okResponseErr.Reason() {
case "replaced: have newer event":
return true
default:
return false
}
}

return false
}
15 changes: 15 additions & 0 deletions service/app/handler_save_received_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,23 @@ package app

import (
"context"
"time"

"github.com/boreq/errors"
"github.com/planetary-social/nos-event-service/internal"
"github.com/planetary-social/nos-event-service/internal/logging"
"github.com/planetary-social/nos-event-service/service/domain"
)

var (
saveFilter = NewEventFilter(
internal.Pointer(12*time.Hour),
nil,
internal.Pointer(1*1000*1000),
internal.Pointer(10000),
)
)

type SaveReceivedEvent struct {
relay domain.RelayAddress
event domain.Event
Expand Down Expand Up @@ -46,6 +57,10 @@ func (h *SaveReceivedEventHandler) Handle(ctx context.Context, cmd SaveReceivedE
WithField("event.kind", cmd.event.Kind().Int()).
Message("saving received event")

if !saveFilter.IsOk(cmd.event) {
return nil
}

if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error {
exists, err := h.eventAlreadyExists(ctx, adapters, cmd.event)
if err != nil {
Expand Down
Loading