diff --git a/cmd/event-service/di/wire_gen.go b/cmd/event-service/di/wire_gen.go index 9ae3f00..c5be0ba 100644 --- a/cmd/event-service/di/wire_gen.go +++ b/cmd/event-service/di/wire_gen.go @@ -62,7 +62,8 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S cleanup() return Service{}, nil, err } - processSavedEventHandler := app.NewProcessSavedEventHandler(genericTransactionProvider, relaysExtractor, contactsExtractor, externalEventPublisher, logger, prometheusPrometheus) + relayConnections := relays.NewRelayConnections(contextContext, logger, prometheusPrometheus) + processSavedEventHandler := app.NewProcessSavedEventHandler(genericTransactionProvider, relaysExtractor, contactsExtractor, externalEventPublisher, relayConnections, logger, prometheusPrometheus) sqliteGenericTransactionProvider := sqlite.NewPubSubTxTransactionProvider(db, databaseMutex) pubSub := sqlite.NewPubSub(sqliteGenericTransactionProvider, logger) subscriber := sqlite.NewSubscriber(pubSub, db) @@ -78,7 +79,6 @@ func BuildService(contextContext context.Context, configConfig config.Config) (S bootstrapRelaySource := relays.NewBootstrapRelaySource() databaseRelaySource := app.NewDatabaseRelaySource(genericTransactionProvider, logger) databasePublicKeySource := app.NewDatabasePublicKeySource(genericTransactionProvider, logger) - relayConnections := relays.NewRelayConnections(contextContext, logger, prometheusPrometheus) receivedEventPubSub := memorypubsub.NewReceivedEventPubSub() relayDownloaderFactory := app.NewRelayDownloaderFactory(databasePublicKeySource, relayConnections, receivedEventPubSub, logger, prometheusPrometheus) downloader := app.NewDownloader(bootstrapRelaySource, databaseRelaySource, logger, prometheusPrometheus, relayDownloaderFactory) diff --git a/service/app/app.go b/service/app/app.go index de8269d..4014b8c 100644 --- a/service/app/app.go +++ b/service/app/app.go @@ -5,6 +5,7 @@ import ( "github.com/boreq/errors" "github.com/planetary-social/nos-event-service/service/domain" + "github.com/planetary-social/nos-event-service/service/domain/relays" ) var ( @@ -121,3 +122,8 @@ type ContactsExtractor interface { type Subscriber interface { EventSavedQueueLength(ctx context.Context) (int, error) } + +type RelayConnections interface { + GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error) + SendEvent(ctx context.Context, relayAddress domain.RelayAddress, event domain.Event) error +} diff --git a/service/app/downloader.go b/service/app/downloader.go index e1e5ee3..10c8408 100644 --- a/service/app/downloader.go +++ b/service/app/downloader.go @@ -10,7 +10,6 @@ import ( "github.com/planetary-social/nos-event-service/internal" "github.com/planetary-social/nos-event-service/internal/logging" "github.com/planetary-social/nos-event-service/service/domain" - "github.com/planetary-social/nos-event-service/service/domain/relays" ) const ( @@ -42,10 +41,6 @@ type RelaySource interface { GetRelays(ctx context.Context) ([]domain.RelayAddress, error) } -type RelayConnections interface { - GetEvents(ctx context.Context, relayAddress domain.RelayAddress, filter domain.Filter) (<-chan relays.EventOrEndOfSavedEvents, error) -} - type PublicKeySource interface { GetPublicKeys(ctx context.Context) ([]domain.PublicKey, error) } diff --git a/service/app/filter.go b/service/app/filter.go new file mode 100644 index 0000000..8c1f026 --- /dev/null +++ b/service/app/filter.go @@ -0,0 +1,54 @@ +package app + +import ( + "time" + + "github.com/planetary-social/nos-event-service/service/domain" +) + +type EventFilter struct { + filterNewerThan *time.Duration + filterOlderThan *time.Duration + filterLargerThan *int + filterWithMoreTagsThan *int +} + +func NewEventFilter( + filterNewerThan *time.Duration, + filterOlderThan *time.Duration, + filterLargerThan *int, + filterWithMoreTagsThan *int, +) *EventFilter { + return &EventFilter{ + filterNewerThan: filterNewerThan, + filterOlderThan: filterOlderThan, + filterLargerThan: filterLargerThan, + filterWithMoreTagsThan: filterWithMoreTagsThan, + } +} + +func (f EventFilter) IsOk(event domain.Event) bool { + if f.filterOlderThan != nil { + maxPastAllowed := time.Now().Add(-*f.filterOlderThan) + if event.CreatedAt().Before(maxPastAllowed) { + return false + } + } + + if f.filterNewerThan != nil { + maxFutureAllowed := time.Now().Add(*f.filterNewerThan) + if event.CreatedAt().After(maxFutureAllowed) { + return false + } + } + + if f.filterLargerThan != nil && len(event.Raw()) > *f.filterLargerThan { + return false + } + + if f.filterWithMoreTagsThan != nil && len(event.Tags()) > *f.filterWithMoreTagsThan { + return false + } + + return true +} diff --git a/service/app/handler_process_saved_event.go b/service/app/handler_process_saved_event.go index ba8d866..5f5e962 100644 --- a/service/app/handler_process_saved_event.go +++ b/service/app/handler_process_saved_event.go @@ -2,12 +2,31 @@ package app import ( "context" + "time" "github.com/boreq/errors" + "github.com/planetary-social/nos-event-service/internal" "github.com/planetary-social/nos-event-service/internal/logging" "github.com/planetary-social/nos-event-service/service/domain" ) +const year = 365 * 24 * time.Hour + +var ( + nosRelayAddress = domain.MustNewRelayAddress("wss://relay.nos.social") + eventKindsWhichShouldBeSentToRelay = internal.NewSetVariadic( + domain.EventKindMetadata, + domain.EventKindContacts, + domain.EventKindRelayListMetadata, + ) + pushToRelayFilter = NewEventFilter( + internal.Pointer(900*time.Second), + nil, + internal.Pointer(65536), + internal.Pointer(1024), + ) +) + type ProcessSavedEvent struct { id domain.EventId } @@ -21,6 +40,7 @@ type ProcessSavedEventHandler struct { relaysExtractor RelaysExtractor contactsExtractor ContactsExtractor externalEventPublisher ExternalEventPublisher + relayConnections RelayConnections logger logging.Logger metrics Metrics } @@ -30,6 +50,7 @@ func NewProcessSavedEventHandler( relaysExtractor RelaysExtractor, contactsExtractor ContactsExtractor, externalEventPublisher ExternalEventPublisher, + relayConnections RelayConnections, logger logging.Logger, metrics Metrics, ) *ProcessSavedEventHandler { @@ -38,6 +59,7 @@ func NewProcessSavedEventHandler( relaysExtractor: relaysExtractor, contactsExtractor: contactsExtractor, externalEventPublisher: externalEventPublisher, + relayConnections: relayConnections, logger: logger.New("processSavedEventHandler"), metrics: metrics, } @@ -59,6 +81,10 @@ func (h *ProcessSavedEventHandler) Handle(ctx context.Context, cmd ProcessSavedE return errors.Wrap(err, "error publishing the external event") } + if err := h.maybeSendEventToRelay(ctx, event); err != nil { + return errors.Wrap(err, "error sending the event to relay") + } + return nil } @@ -138,3 +164,15 @@ func (h *ProcessSavedEventHandler) shouldReplaceContacts(ctx context.Context, ad return domain.ShouldReplaceContactsEvent(oldEvent, newEvent) } + +func (h *ProcessSavedEventHandler) maybeSendEventToRelay(ctx context.Context, event domain.Event) error { + if !eventKindsWhichShouldBeSentToRelay.Contains(event.Kind()) { + return nil + } + + if !pushToRelayFilter.IsOk(event) { + return nil + } + + return h.relayConnections.SendEvent(ctx, nosRelayAddress, event) +} diff --git a/service/app/handler_save_received_event.go b/service/app/handler_save_received_event.go index 3c608e0..df727d6 100644 --- a/service/app/handler_save_received_event.go +++ b/service/app/handler_save_received_event.go @@ -2,12 +2,23 @@ package app import ( "context" + "time" "github.com/boreq/errors" + "github.com/planetary-social/nos-event-service/internal" "github.com/planetary-social/nos-event-service/internal/logging" "github.com/planetary-social/nos-event-service/service/domain" ) +var ( + saveFilter = NewEventFilter( + internal.Pointer(12*time.Hour), + nil, + internal.Pointer(1*1000*1000), + internal.Pointer(10000), + ) +) + type SaveReceivedEvent struct { relay domain.RelayAddress event domain.Event @@ -46,6 +57,10 @@ func (h *SaveReceivedEventHandler) Handle(ctx context.Context, cmd SaveReceivedE WithField("event.kind", cmd.event.Kind().Int()). Message("saving received event") + if !saveFilter.IsOk(cmd.event) { + return nil + } + if err := h.transactionProvider.Transact(ctx, func(ctx context.Context, adapters Adapters) error { exists, err := h.eventAlreadyExists(ctx, adapters, cmd.event) if err != nil { diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index 9aafc93..d8f5ae1 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -17,6 +17,10 @@ import ( "github.com/planetary-social/nos-event-service/service/domain/relays/transport" ) +const ( + sendEventTimeout = 30 * time.Second +) + type BackoffManager interface { GetReconnectionBackoff(err error) time.Duration } @@ -34,6 +38,10 @@ type RelayConnection struct { subscriptionsUpdatedCh chan struct{} subscriptionsUpdatedChClosed bool subscriptionsMutex sync.Mutex + + eventsToSend map[domain.EventId][]eventToSend + eventsToSendMutex sync.Mutex + newEventsCh chan domain.Event } func NewRelayConnection( @@ -49,6 +57,8 @@ func NewRelayConnection( state: RelayConnectionStateInitializing, subscriptions: make(map[transport.SubscriptionID]subscription), subscriptionsUpdatedCh: make(chan struct{}), + eventsToSend: make(map[domain.EventId][]eventToSend), + newEventsCh: make(chan domain.Event), } } @@ -70,24 +80,14 @@ func (r *RelayConnection) Run(ctx context.Context) { } } -func (r *RelayConnection) logRunErr(err error) { - l := r.logger.Error() - if r.errorIsCommonAndShouldNotBeLoggedOnErrorLevel(err) { - l = r.logger.Debug() - } - l.WithError(err).Message("encountered an error") +func (r *RelayConnection) State() RelayConnectionState { + r.stateMutex.Lock() + defer r.stateMutex.Unlock() + return r.state } -func (r *RelayConnection) errorIsCommonAndShouldNotBeLoggedOnErrorLevel(err error) bool { - if errors.Is(err, DialError{}) { - return true - } - - if errors.Is(err, ReadMessageError{}) { - return true - } - - return false +func (r *RelayConnection) Address() domain.RelayAddress { + return r.address } func (r *RelayConnection) GetEvents(ctx context.Context, filter domain.Filter) (<-chan EventOrEndOfSavedEvents, error) { @@ -111,7 +111,7 @@ func (r *RelayConnection) GetEvents(ctx context.Context, filter domain.Filter) ( go func() { <-ctx.Done() - if err := r.removeChannel(ch); err != nil { + if err := r.removeSubscriptionChannel(ch); err != nil { panic(err) } }() @@ -119,17 +119,74 @@ func (r *RelayConnection) GetEvents(ctx context.Context, filter domain.Filter) ( return ch, nil } -func (r *RelayConnection) State() RelayConnectionState { - r.stateMutex.Lock() - defer r.stateMutex.Unlock() - return r.state +func (r *RelayConnection) SendEvent(ctx context.Context, event domain.Event) error { + ctx, cancel := context.WithTimeout(ctx, sendEventTimeout) + defer cancel() + + ch := make(chan sendEventResponse) + r.scheduleSendingEvent(ctx, event, ch) + + select { + case <-ctx.Done(): + return ctx.Err() + case response := <-ch: + if !response.ok { + if response.message != "" { + return fmt.Errorf("received a message: '%s'", response.message) + } else { + return errors.New("relay rejected the event") + } + } + return nil + } } -func (r *RelayConnection) Address() domain.RelayAddress { - return r.address +func (r *RelayConnection) scheduleSendingEvent(ctx context.Context, event domain.Event, ch chan sendEventResponse) { + r.eventsToSendMutex.Lock() + defer r.eventsToSendMutex.Unlock() + + r.eventsToSend[event.Id()] = append(r.eventsToSend[event.Id()], eventToSend{ + ctx: ctx, + ch: ch, + event: event, + }) + + go func() { + <-ctx.Done() + if err := r.removeEventChannel(event, ch); err != nil { + panic(err) + } + }() + + go func() { + select { + case r.newEventsCh <- event: + case <-ctx.Done(): + } + }() +} + +func (r *RelayConnection) logRunErr(err error) { + l := r.logger.Error() + if r.errorIsCommonAndShouldNotBeLoggedOnErrorLevel(err) { + l = r.logger.Debug() + } + l.WithError(err).Message("encountered an error") } -func (r *RelayConnection) removeChannel(chToRemove chan EventOrEndOfSavedEvents) error { +func (r *RelayConnection) errorIsCommonAndShouldNotBeLoggedOnErrorLevel(err error) bool { + if errors.Is(err, DialError{}) { + return true + } + + if errors.Is(err, ReadMessageError{}) { + return true + } + + return false +} + +func (r *RelayConnection) removeSubscriptionChannel(chToRemove chan EventOrEndOfSavedEvents) error { r.subscriptionsMutex.Lock() defer r.subscriptionsMutex.Unlock() @@ -145,6 +202,24 @@ func (r *RelayConnection) removeChannel(chToRemove chan EventOrEndOfSavedEvents) return errors.New("somehow the channel was already removed") } +func (r *RelayConnection) removeEventChannel(event domain.Event, chToRemove chan sendEventResponse) error { + r.eventsToSendMutex.Lock() + defer r.eventsToSendMutex.Unlock() + + for i, eventToSend := range r.eventsToSend[event.Id()] { + if chToRemove == eventToSend.ch { + r.eventsToSend[event.Id()] = append(r.eventsToSend[event.Id()][:i], r.eventsToSend[event.Id()][i+1:]...) + if len(r.eventsToSend[event.Id()]) == 0 { + delete(r.eventsToSend, event.Id()) + } + return nil + } + } + + return errors.New("somehow the channel was already removed") + +} + func (r *RelayConnection) triggerSubscriptionUpdate() { if !r.subscriptionsUpdatedChClosed { r.subscriptionsUpdatedChClosed = true @@ -165,9 +240,9 @@ func (r *RelayConnection) run(ctx context.Context) error { r.logger.Trace().Message("connecting") - conn, _, err := websocket.DefaultDialer.DialContext(ctx, r.address.String(), nil) + conn, err := NewConnection(ctx, r.address) if err != nil { - return NewDialError(err) + return errors.Wrap(err, "error creating a connection") } r.setState(RelayConnectionStateConnected) @@ -180,9 +255,13 @@ func (r *RelayConnection) run(ctx context.Context) error { } }() + if err := r.queueResendingAllEvents(ctx); err != nil { + return errors.Wrap(err, "error queueing events") + } + go func() { if err := r.manageSubs(ctx, conn); err != nil { - if !errors.Is(err, context.Canceled) && !errors.Is(err, websocket.ErrCloseSent) { + if !r.writeErrorShouldNotBeLogged(err) { r.logger.Error(). WithError(err). Message("error managing subs") @@ -190,8 +269,18 @@ func (r *RelayConnection) run(ctx context.Context) error { } }() + go func() { + if err := r.sendNewEvents(ctx, conn); err != nil { + if !r.writeErrorShouldNotBeLogged(err) { + r.logger.Error(). + WithError(err). + Message("error resending events") + } + } + }() + for { - _, messageBytes, err := conn.ReadMessage() + messageBytes, err := conn.ReadMessage() if err != nil { return NewReadMessageError(err) } @@ -207,6 +296,10 @@ func (r *RelayConnection) run(ctx context.Context) error { } } +func (r *RelayConnection) writeErrorShouldNotBeLogged(err error) bool { + return errors.Is(err, context.Canceled) || errors.Is(err, websocket.ErrCloseSent) +} + func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) { envelope := nostr.ParseMessage(messageBytes) if envelope == nil { @@ -226,7 +319,7 @@ func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) { if err != nil { return errors.Wrapf(err, "error creating subscription id from '%s'", string(*v)) } - r.passValueToChannel(subscriptionID, NewEventOrEndOfSavedEventsWithEOSE()) + r.passEventOrEOSEToChannel(subscriptionID, NewEventOrEndOfSavedEventsWithEOSE()) return nil case *nostr.EventEnvelope: defer r.metrics.ReportMessageReceived(r.address, MessageTypeEvent, &err) @@ -245,7 +338,7 @@ func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) { return errors.Wrap(err, "error creating an event") } - r.passValueToChannel(subscriptionID, NewEventOrEndOfSavedEventsWithEvent(event)) + r.passEventOrEOSEToChannel(subscriptionID, NewEventOrEndOfSavedEventsWithEvent(event)) return nil case *nostr.NoticeEnvelope: defer r.metrics.ReportMessageReceived(r.address, MessageTypeNotice, &err) @@ -261,13 +354,32 @@ func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) { WithField("message", string(messageBytes)). Message("received a message (auth)") return nil + case *nostr.OKEnvelope: + defer r.metrics.ReportMessageReceived(r.address, MessageTypeOK, &err) + r.logger. + Trace(). + WithField("message", string(messageBytes)). + Message("received a message (ok)") + + eventID, err := domain.NewEventId(v.EventID) + if err != nil { + return errors.Wrap(err, "error creating an event") + } + + response := sendEventResponse{ + ok: v.OK, + message: v.Reason, + } + + r.passSendEventResponseToChannel(eventID, response) + return nil default: defer r.metrics.ReportMessageReceived(r.address, MessageTypeUnknown, &err) return errors.New("unknown message type") } } -func (r *RelayConnection) passValueToChannel(id transport.SubscriptionID, value EventOrEndOfSavedEvents) { +func (r *RelayConnection) passEventOrEOSEToChannel(id transport.SubscriptionID, value EventOrEndOfSavedEvents) { r.subscriptionsMutex.Lock() defer r.subscriptionsMutex.Unlock() @@ -279,6 +391,18 @@ func (r *RelayConnection) passValueToChannel(id transport.SubscriptionID, value } } +func (r *RelayConnection) passSendEventResponseToChannel(eventID domain.EventId, response sendEventResponse) { + r.eventsToSendMutex.Lock() + defer r.eventsToSendMutex.Unlock() + + for _, eventToSend := range r.eventsToSend[eventID] { + select { + case <-eventToSend.ctx.Done(): + case eventToSend.ch <- response: + } + } +} + func (r *RelayConnection) setState(state RelayConnectionState) { r.stateMutex.Lock() defer r.stateMutex.Unlock() @@ -287,7 +411,7 @@ func (r *RelayConnection) setState(state RelayConnectionState) { func (r *RelayConnection) manageSubs( ctx context.Context, - conn *websocket.Conn, + conn *Connection, ) error { defer conn.Close() @@ -308,7 +432,7 @@ func (r *RelayConnection) manageSubs( } func (r *RelayConnection) updateSubs( - conn *websocket.Conn, + conn *Connection, activeSubscriptions *internal.Set[transport.SubscriptionID], ) error { r.subscriptionsMutex.Lock() @@ -319,17 +443,12 @@ func (r *RelayConnection) updateSubs( for _, subscriptionID := range activeSubscriptions.List() { if _, ok := r.subscriptions[subscriptionID]; !ok { msg := transport.NewMessageClose(subscriptionID) - msgJSON, err := msg.MarshalJSON() - if err != nil { - return errors.Wrap(err, "marshaling close message failed") - } r.logger.Trace(). WithField("subscriptionID", subscriptionID). - WithField("payload", string(msgJSON)). Message("closing subscription") - if err := conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil { + if err := conn.SendMessage(msg); err != nil { return errors.Wrap(err, "writing close message error") } @@ -340,17 +459,12 @@ func (r *RelayConnection) updateSubs( for subscriptionID, subscription := range r.subscriptions { if ok := activeSubscriptions.Contains(subscriptionID); !ok { msg := transport.NewMessageReq(subscription.id, []domain.Filter{subscription.filter}) - msgJSON, err := msg.MarshalJSON() - if err != nil { - return errors.Wrap(err, "marshaling req message failed") - } r.logger.Trace(). WithField("subscriptionID", subscriptionID). - WithField("payload", string(msgJSON)). Message("opening subscription") - if err := conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil { + if err := conn.SendMessage(msg); err != nil { return errors.Wrap(err, "writing req message error") } @@ -362,6 +476,41 @@ func (r *RelayConnection) updateSubs( return nil } +func (r *RelayConnection) queueResendingAllEvents(ctx context.Context) error { + r.eventsToSendMutex.Lock() + defer r.eventsToSendMutex.Unlock() + + for _, eventsToSend := range r.eventsToSend { + if len(eventsToSend) == 0 { + return errors.New("empty list which shouldn't happen as map entries get removed when list is empty") + } + + event := eventsToSend[0].event + go func() { + select { + case <-ctx.Done(): + case r.newEventsCh <- event: + } + }() + } + + return nil +} + +func (r *RelayConnection) sendNewEvents(ctx context.Context, conn *Connection) error { + for { + select { + case event := <-r.newEventsCh: + msg := transport.NewMessageEvent(event) + if err := conn.SendMessage(msg); err != nil { + return errors.Wrap(err, "error writing a message") + } + case <-ctx.Done(): + return ctx.Err() + } + } +} + type subscription struct { ctx context.Context ch chan EventOrEndOfSavedEvents @@ -370,6 +519,17 @@ type subscription struct { filter domain.Filter } +type eventToSend struct { + ctx context.Context + ch chan sendEventResponse + event domain.Event +} + +type sendEventResponse struct { + ok bool + message string +} + type DialError struct { underlying error } @@ -448,3 +608,52 @@ func (d *DefaultBackoffManager) dialBackoff(n int) time.Duration { func (d *DefaultBackoffManager) isDialError(err error) bool { return errors.Is(err, DialError{}) } + +type NostrMessage interface { + MarshalJSON() ([]byte, error) +} + +type Connection struct { + conn *websocket.Conn + writeLock sync.Mutex +} + +func NewConnection(ctx context.Context, address domain.RelayAddress) (*Connection, error) { + conn, _, err := websocket.DefaultDialer.DialContext(ctx, address.String(), nil) + if err != nil { + return nil, NewDialError(err) + } + + return &Connection{ + conn: conn, + }, nil +} + +func (c *Connection) SendMessage(msg NostrMessage) error { + msgJSON, err := msg.MarshalJSON() + if err != nil { + return errors.Wrap(err, "error marshaling message") + } + + c.writeLock.Lock() + defer c.writeLock.Unlock() + + if err := c.conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil { + return errors.Wrap(err, "error writing message") + } + + return nil +} + +func (c *Connection) ReadMessage() ([]byte, error) { + _, messageBytes, err := c.conn.ReadMessage() + if err != nil { + return nil, NewReadMessageError(err) + } + + return messageBytes, nil +} + +func (c *Connection) Close() error { + return c.conn.Close() +} diff --git a/service/domain/relays/relay_connections.go b/service/domain/relays/relay_connections.go index 37fd658..86aad89 100644 --- a/service/domain/relays/relay_connections.go +++ b/service/domain/relays/relay_connections.go @@ -29,6 +29,7 @@ var ( MessageTypeEOSE = MessageType{"eose"} MessageTypeEvent = MessageType{"event"} MessageTypeAuth = MessageType{"auth"} + MessageTypeOK = MessageType{"ok"} MessageTypeUnknown = MessageType{"unknown"} ) @@ -63,6 +64,11 @@ func (d *RelayConnections) GetEvents(ctx context.Context, relayAddress domain.Re return connection.GetEvents(ctx, filter) } +func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.RelayAddress, event domain.Event) error { + connection := d.getConnection(relayAddress) + return connection.SendEvent(ctx, event) +} + func (d *RelayConnections) storeMetricsLoop(ctx context.Context) { for { d.storeMetrics() diff --git a/service/domain/relays/transport/messages.go b/service/domain/relays/transport/messages.go index f07d1cc..5e650c2 100644 --- a/service/domain/relays/transport/messages.go +++ b/service/domain/relays/transport/messages.go @@ -37,3 +37,19 @@ func (m MessageClose) MarshalJSON() ([]byte, error) { env := nostr.CloseEnvelope(m.subscriptionID.String()) return env.MarshalJSON() } + +type MessageEvent struct { + event domain.Event +} + +func NewMessageEvent(event domain.Event) *MessageEvent { + return &MessageEvent{event: event} +} + +func (m MessageEvent) MarshalJSON() ([]byte, error) { + env := nostr.EventEnvelope{ + SubscriptionID: nil, + Event: m.event.Libevent(), + } + return env.MarshalJSON() +}