From a47b683d61ccbe7b4e6458066c931eed47ee0af5 Mon Sep 17 00:00:00 2001 From: Filip Borkiewicz Date: Fri, 24 Nov 2023 12:02:10 +0000 Subject: [PATCH] Suport registering users using nostr events (#32) The events have type `6666` and the payload looks like this: { "relays": [ { "address": "wss://example.com" } ] } --- go.mod | 1 - go.sum | 2 - service/domain/event_kind.go | 1 + service/domain/registration.go | 63 +++++++++++ service/domain/registration_test.go | 18 +++ service/domain/relays/transport/messages.go | 38 ++++++- service/ports/http/http.go | 115 +++++++++++++++----- 7 files changed, 207 insertions(+), 31 deletions(-) create mode 100644 service/domain/registration.go create mode 100644 service/domain/registration_test.go diff --git a/go.mod b/go.mod index 5910585..93f0028 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,6 @@ require ( github.com/ThreeDotsLabs/watermill v1.3.1 github.com/ThreeDotsLabs/watermill-googlecloud v1.0.13 github.com/boreq/errors v0.1.0 - github.com/boreq/rest v0.1.0 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.5.0 diff --git a/go.sum b/go.sum index c5498e6..a243d00 100644 --- a/go.sum +++ b/go.sum @@ -25,8 +25,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/boreq/errors v0.1.0 h1:aJIXv9JnyR5KtxFpQ8/AiblH3nfYmr1e1yoTze/5A1k= github.com/boreq/errors v0.1.0/go.mod h1:B3dsXzhYvfgUXp7ViU/moPYM4PojgQ9MiQ21uvY6qqQ= -github.com/boreq/rest v0.1.0 h1:bAx31Rp1KrXHkCOlzqAtLKdh74xbly2SHkv9k3vX3iA= -github.com/boreq/rest v0.1.0/go.mod h1:Ckfx0qLDdPbS081820aWkkqvwhlrbv0SDu8UBDY4k7w= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.22.0-beta.0.20220111032746-97732e52810c/go.mod h1:tjmYdS6MLJ5/s0Fj4DbLgSbDHbEqLJrtnHecBFkdz5M= github.com/btcsuite/btcd v0.23.0/go.mod h1:0QJIIN1wwIXF/3G/m87gIwGniDMDQqjVn4SZgnFpsYY= diff --git a/service/domain/event_kind.go b/service/domain/event_kind.go index e57cbbd..3e05448 100644 --- a/service/domain/event_kind.go +++ b/service/domain/event_kind.go @@ -12,6 +12,7 @@ var ( EventKindEncryptedDirectMessage = MustNewEventKind(4) EventKindReaction = MustNewEventKind(7) EventKindRelayListMetadata = MustNewEventKind(10002) + EventKindRegistration = MustNewEventKind(6666) ) type EventKind struct { diff --git a/service/domain/registration.go b/service/domain/registration.go new file mode 100644 index 0000000..995b14c --- /dev/null +++ b/service/domain/registration.go @@ -0,0 +1,63 @@ +package domain + +import ( + "encoding/json" + + "github.com/boreq/errors" + "github.com/planetary-social/nos-event-service/internal" +) + +type Registration struct { + publicKey PublicKey + relays []RelayAddress +} + +func NewRegistrationFromEvent(event Event) (Registration, error) { + var v registrationContent + if err := json.Unmarshal([]byte(event.Content()), &v); err != nil { + return Registration{}, errors.Wrap(err, "error unmarshaling content") + } + + relays, err := newRelays(v) + if err != nil { + return Registration{}, errors.Wrap(err, "error creating relay addresses") + } + + return Registration{ + publicKey: event.PubKey(), + relays: relays, + }, nil +} + +func (p Registration) PublicKey() PublicKey { + return p.publicKey +} + +func (p Registration) Relays() []RelayAddress { + return internal.CopySlice(p.relays) +} + +type registrationContent struct { + Relays []relayTransport `json:"relays"` +} + +type relayTransport struct { + Address string `json:"address"` +} + +func newRelays(v registrationContent) ([]RelayAddress, error) { + var relays []RelayAddress + for _, relayTransport := range v.Relays { + address, err := NewRelayAddress(relayTransport.Address) + if err != nil { + return nil, errors.Wrap(err, "error creating relay address") + } + relays = append(relays, address) + } + + if len(relays) == 0 { + return nil, errors.New("missing relays") + } + + return relays, nil +} diff --git a/service/domain/registration_test.go b/service/domain/registration_test.go new file mode 100644 index 0000000..36532a1 --- /dev/null +++ b/service/domain/registration_test.go @@ -0,0 +1,18 @@ +package domain_test + +import ( + "testing" + + "github.com/planetary-social/nos-event-service/internal/fixtures" + "github.com/planetary-social/nos-event-service/service/domain" + "github.com/stretchr/testify/require" +) + +func TestNewRegistrationFromEvent(t *testing.T) { + event := fixtures.Event(domain.EventKindRegistration, nil, `{"relays": [ {"address": "wss://example.com"} ]}`) + + registration, err := domain.NewRegistrationFromEvent(event) + require.NoError(t, err) + require.Equal(t, []domain.RelayAddress{domain.MustNewRelayAddress("wss://example.com")}, registration.Relays()) + require.Equal(t, event.PubKey(), registration.PublicKey()) +} diff --git a/service/domain/relays/transport/messages.go b/service/domain/relays/transport/messages.go index 5e650c2..0440012 100644 --- a/service/domain/relays/transport/messages.go +++ b/service/domain/relays/transport/messages.go @@ -1,6 +1,7 @@ package transport import ( + "github.com/boreq/errors" "github.com/nbd-wtf/go-nostr" "github.com/planetary-social/nos-event-service/service/domain" ) @@ -42,8 +43,8 @@ type MessageEvent struct { event domain.Event } -func NewMessageEvent(event domain.Event) *MessageEvent { - return &MessageEvent{event: event} +func NewMessageEvent(event domain.Event) MessageEvent { + return MessageEvent{event: event} } func (m MessageEvent) MarshalJSON() ([]byte, error) { @@ -53,3 +54,36 @@ func (m MessageEvent) MarshalJSON() ([]byte, error) { } return env.MarshalJSON() } + +type MessageOK struct { + eventID string + err error +} + +func NewMessageOKWithSuccess(eventId string) MessageOK { + return MessageOK{ + eventID: eventId, + err: nil, + } +} + +func NewMessageOKWithError(eventId string, message string) MessageOK { + return MessageOK{ + eventID: eventId, + err: errors.New(message), + } +} + +func (m MessageOK) MarshalJSON() ([]byte, error) { + env := nostr.OKEnvelope{ + EventID: m.eventID, + } + if m.err == nil { + env.OK = true + env.Reason = "" + } else { + env.OK = false + env.Reason = m.err.Error() + } + return env.MarshalJSON() +} diff --git a/service/ports/http/http.go b/service/ports/http/http.go index 390de9f..eb1590e 100644 --- a/service/ports/http/http.go +++ b/service/ports/http/http.go @@ -2,17 +2,18 @@ package http import ( "context" - "encoding/json" "net" "net/http" "github.com/boreq/errors" - "github.com/boreq/rest" + "github.com/gorilla/websocket" + "github.com/nbd-wtf/go-nostr" "github.com/planetary-social/nos-event-service/internal/logging" prometheusadapters "github.com/planetary-social/nos-event-service/service/adapters/prometheus" "github.com/planetary-social/nos-event-service/service/app" "github.com/planetary-social/nos-event-service/service/config" "github.com/planetary-social/nos-event-service/service/domain" + "github.com/planetary-social/nos-event-service/service/domain/relays/transport" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -58,42 +59,104 @@ func (s *Server) ListenAndServe(ctx context.Context) error { func (s *Server) createMux() *http.ServeMux { mux := http.NewServeMux() mux.Handle("/metrics", promhttp.HandlerFor(s.prometheus.Registry(), promhttp.HandlerOpts{})) - mux.Handle("/public-keys-to-monitor", rest.Wrap(s.publicKeysToMonitor)) + mux.HandleFunc("/", s.serveWs) return mux } -func (s *Server) publicKeysToMonitor(r *http.Request) rest.RestResponse { - switch r.Method { - case http.MethodPost: - return s.postPublicKeyToMonitor(r) - default: - return rest.ErrMethodNotAllowed - } -} +func (s *Server) serveWs(rw http.ResponseWriter, r *http.Request) { + ctx := r.Context() -func (s *Server) postPublicKeyToMonitor(r *http.Request) rest.RestResponse { - var t postPublicKeyToMonitorInput - if err := json.NewDecoder(r.Body).Decode(&t); err != nil { - s.logger.Error().WithError(err).Message("error decoding post public key to monitor input") - return rest.ErrBadRequest + var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, } - publicKey, err := domain.NewPublicKeyFromHex(t.PublicKey) + conn, err := upgrader.Upgrade(rw, r, nil) if err != nil { - s.logger.Error().WithError(err).Message("error creating a public key") - return rest.ErrBadRequest + s.logger.Error().WithError(err).Message("error upgrading the connection") + return } - cmd := app.NewAddPublicKeyToMonitor(publicKey) + defer func() { + if err := conn.Close(); err != nil { + s.logger.Error().WithError(err).Message("error closing the connection") + } + }() - if err := s.app.AddPublicKeyToMonitor.Handle(r.Context(), cmd); err != nil { - s.logger.Error().WithError(err).Message("error calling the add public key to monitor handler") - return rest.ErrInternalServerError + if err := s.handleConnection(ctx, conn); err != nil { + closeErr := &websocket.CloseError{} + if !errors.As(err, &closeErr) || closeErr.Code != websocket.CloseNormalClosure { + s.logger.Error().WithError(err).Message("error handling the connection") + } } +} + +func (s *Server) handleConnection(ctx context.Context, conn *websocket.Conn) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() - return rest.NewResponse(nil) + for { + _, messageBytes, err := conn.ReadMessage() + if err != nil { + return errors.Wrap(err, "error reading the websocket message") + } + + message := nostr.ParseMessage(messageBytes) + if message == nil { + s.logger. + Error(). + WithError(err). + WithField("message", string(messageBytes)). + Message("error parsing a message") + return errors.New("failed to parse a message") + } + + switch v := message.(type) { + case *nostr.EventEnvelope: + msg := s.processEventReturningOK(ctx, v.Event) + + msgJSON, err := msg.MarshalJSON() + if err != nil { + return errors.Wrap(err, "error marshaling a message") + } + + if err := conn.WriteMessage(websocket.TextMessage, msgJSON); err != nil { + return errors.Wrap(err, "error writing a message") + } + default: + s.logger.Error().WithField("message", message).Message("received an unknown message") + return errors.New("unknown message received") + } + } +} + +func (s *Server) processEventReturningOK(ctx context.Context, event nostr.Event) transport.MessageOK { + if err := s.processEvent(ctx, event); err != nil { + s.logger. + Error(). + WithError(err). + Message("error processing an event") + return transport.NewMessageOKWithError(event.ID, err.Error()) + } + return transport.NewMessageOKWithSuccess(event.ID) } -type postPublicKeyToMonitorInput struct { - PublicKey string `json:"publicKey"` +func (s *Server) processEvent(ctx context.Context, libevent nostr.Event) error { + event, err := domain.NewEvent(libevent) + if err != nil { + return errors.Wrap(err, "error creating an event") + } + + registration, err := domain.NewRegistrationFromEvent(event) + if err != nil { + return errors.Wrap(err, "error creating a registration") + } + + cmd := app.NewAddPublicKeyToMonitor(registration.PublicKey()) + + if err := s.app.AddPublicKeyToMonitor.Handle(ctx, cmd); err != nil { + return errors.Wrap(err, "error calling the handler") + } + + return nil }