From 784fb0eafa80621f2f1b8b3a2b89cec64eb1057b Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Mon, 12 Feb 2024 15:08:19 -0300 Subject: [PATCH 1/2] Rate limit from notice warnings --- cmd/send-all-events-to-relay/main.go | 3 + go.mod | 2 +- service/adapters/mocks/metrics.go | 4 + service/adapters/prometheus/prometheus.go | 20 +- service/domain/downloader/scheduler.go | 2 +- service/domain/relays/relay_connection.go | 178 +++++++++++++++--- .../domain/relays/relay_connection_test.go | 3 + service/domain/relays/relay_connections.go | 2 + 8 files changed, 181 insertions(+), 33 deletions(-) diff --git a/cmd/send-all-events-to-relay/main.go b/cmd/send-all-events-to-relay/main.go index ceee879..c9aa850 100644 --- a/cmd/send-all-events-to-relay/main.go +++ b/cmd/send-all-events-to-relay/main.go @@ -339,3 +339,6 @@ func (m mockMetrics) ReportMessageReceived(address domain.RelayAddress, messageT func (m mockMetrics) ReportRelayDisconnection(address domain.RelayAddress, err error) { } + +func (m mockMetrics) ReportNotice(address domain.RelayAddress, noticeType relays.NoticeType) { +} diff --git a/go.mod b/go.mod index 3069dcd..158b0e5 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,6 @@ require ( github.com/ThreeDotsLabs/watermill-googlecloud v1.0.13 github.com/boreq/errors v0.1.0 github.com/boreq/rest v0.1.0 - github.com/davecgh/go-spew v1.1.1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 github.com/google/wire v0.5.0 github.com/gorilla/mux v1.8.1 @@ -37,6 +36,7 @@ require ( github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/service/adapters/mocks/metrics.go b/service/adapters/mocks/metrics.go index e411f6c..030f335 100644 --- a/service/adapters/mocks/metrics.go +++ b/service/adapters/mocks/metrics.go @@ -5,6 +5,7 @@ import ( "github.com/planetary-social/nos-event-service/service/app" "github.com/planetary-social/nos-event-service/service/domain" + "github.com/planetary-social/nos-event-service/service/domain/relays" ) type Metrics struct { @@ -39,6 +40,9 @@ func (m Metrics) ReportNumberOfStoredEvents(n int) { func (m Metrics) ReportEventSentToRelay(address domain.RelayAddress, decision app.SendEventToRelayDecision, result app.SendEventToRelayResult) { } +func (m Metrics) ReportNotice(address domain.RelayAddress, noticeType relays.NoticeType) { +} + type ApplicationCall struct { } diff --git a/service/adapters/prometheus/prometheus.go b/service/adapters/prometheus/prometheus.go index a1e5e3c..208b3a0 100644 --- a/service/adapters/prometheus/prometheus.go +++ b/service/adapters/prometheus/prometheus.go @@ -32,10 +32,12 @@ const ( labelReason = "reason" - labelDecision = "decision" + labelDecision = "decision" + labelNoticeType = "noticeType" ) type Prometheus struct { + noticeTypeCounter *prometheus.CounterVec applicationHandlerCallsCounter *prometheus.CounterVec applicationHandlerCallDurationHistogram *prometheus.HistogramVec relayDownloadersGauge prometheus.Gauge @@ -56,6 +58,13 @@ type Prometheus struct { } func NewPrometheus(logger logging.Logger) (*Prometheus, error) { + noticeTypeCounter := prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "notice_type_total", + Help: "Total number of notices per notice type.", + }, + []string{labelAddress, labelNoticeType}, + ) applicationHandlerCallsCounter := prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "application_handler_calls_total", @@ -154,6 +163,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { reg := prometheus.NewRegistry() for _, v := range []prometheus.Collector{ + noticeTypeCounter, applicationHandlerCallsCounter, applicationHandlerCallDurationHistogram, versionGague, @@ -191,6 +201,7 @@ func NewPrometheus(logger logging.Logger) (*Prometheus, error) { } return &Prometheus{ + noticeTypeCounter: noticeTypeCounter, applicationHandlerCallsCounter: applicationHandlerCallsCounter, applicationHandlerCallDurationHistogram: applicationHandlerCallDurationHistogram, relayDownloadersGauge: relayDownloadersGauge, @@ -281,6 +292,13 @@ func (p *Prometheus) ReportEventSentToRelay(address domain.RelayAddress, decisio }).Inc() } +func (p *Prometheus) ReportNotice(address domain.RelayAddress, noticeType relays.NoticeType) { + p.noticeTypeCounter.With(prometheus.Labels{ + labelAddress: address.String(), + labelNoticeType: string(noticeType), + }).Inc() +} + func (p *Prometheus) Registry() *prometheus.Registry { return p.registry } diff --git a/service/domain/downloader/scheduler.go b/service/domain/downloader/scheduler.go index 4c6c0e3..045c93a 100644 --- a/service/domain/downloader/scheduler.go +++ b/service/domain/downloader/scheduler.go @@ -372,7 +372,7 @@ func (t *TimeWindowTaskGenerator) Generate(ctx context.Context, kinds []domain.E func (t *TimeWindowTaskGenerator) maybeGenerateNewTracker(ctx context.Context) (*TimeWindowTaskTracker, bool, error) { nextWindow := t.lastWindow.Advance() now := t.currentTimeProvider.GetCurrentTime() - if nextWindow.End().After(now.Add(-time.Minute)) { + if nextWindow.End().After(now.Add(-windowSize)) { return nil, false, nil } t.lastWindow = nextWindow diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index 942ec81..09ba261 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -5,7 +5,10 @@ import ( "context" "fmt" "math" + "regexp" + "strings" "sync" + "sync/atomic" "time" "github.com/boreq/errors" @@ -18,6 +21,7 @@ import ( "github.com/planetary-social/nos-event-service/service/domain/relays/transport" ) +const relayAddress = "wss://relay.nostr.com.au" const ( ReconnectionBackoff = 5 * time.Minute MaxDialReconnectionBackoff = 30 * time.Minute @@ -51,6 +55,69 @@ type ConnectionFactory interface { Address() domain.RelayAddress } +type RateLimitNoticeBackoffManager struct { + address domain.RelayAddress + rateLimitNoticeCount int32 + lastBumpTime atomic.Value // Use atomic.Value for time.Time +} + +func (r *RateLimitNoticeBackoffManager) updateLastBumpTime() { + r.lastBumpTime.Store(time.Now()) +} + +func (r *RateLimitNoticeBackoffManager) getLastBumpTime() time.Time { + return r.lastBumpTime.Load().(time.Time) +} + +func NewRateLimitNoticeBackoffManager(address domain.RelayAddress) *RateLimitNoticeBackoffManager { + r := &RateLimitNoticeBackoffManager{ + address: address, + rateLimitNoticeCount: 0, + } + + r.updateLastBumpTime() + return r +} + +func (r *RateLimitNoticeBackoffManager) Bump() { + timeSinceLastBump := time.Since(r.getLastBumpTime()) + if timeSinceLastBump < 500*time.Millisecond { + // Give some time for the rate limit to be lifted before increasing the counter + return + } + + atomic.AddInt32(&r.rateLimitNoticeCount, 1) + r.updateLastBumpTime() +} + +// logPeriodically executes the passed action function if the current time in milliseconds +// modulo logInterval equals zero. This approach allows executing the action periodically, +// approximating the execution to happen once every `logInterval` milliseconds. +func logPeriodically(action func(), logInterval int64) { + currentTimeMillis := time.Now().UnixNano() / int64(time.Millisecond) + if currentTimeMillis%logInterval == 0 { + action() + } +} + +func (r *RateLimitNoticeBackoffManager) Wait() { + if r.rateLimitNoticeCount <= 0 { + return + } + + backoffMs := int(math.Pow(2, float64(r.rateLimitNoticeCount))) * 100 + + timeSinceLastBump := time.Since(r.getLastBumpTime()) + if timeSinceLastBump > 5*time.Second { + atomic.AddInt32(&r.rateLimitNoticeCount, -1) + r.updateLastBumpTime() + } + + if backoffMs > 0 { + time.Sleep(time.Duration(backoffMs) * time.Millisecond) + } +} + type RelayConnection struct { connectionFactory ConnectionFactory logger logging.Logger @@ -60,14 +127,14 @@ type RelayConnection struct { state RelayConnectionState stateMutex sync.Mutex - subscriptions map[transport.SubscriptionID]subscription - subscriptionsUpdatedCh chan struct{} - subscriptionsUpdatedChClosed bool - subscriptionsMutex sync.Mutex + subscriptions map[transport.SubscriptionID]subscription + subscriptionsUpdatedCh chan struct{} + subscriptionsMutex sync.Mutex - eventsToSend map[domain.EventId]*eventToSend - eventsToSendMutex sync.Mutex - newEventsCh chan domain.Event + eventsToSend map[domain.EventId]*eventToSend + eventsToSendMutex sync.Mutex + newEventsCh chan domain.Event + rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager } func NewRelayConnection( @@ -76,15 +143,16 @@ func NewRelayConnection( metrics Metrics, ) *RelayConnection { return &RelayConnection{ - connectionFactory: connectionFactory, - logger: logger.New(fmt.Sprintf("relayConnection(%s)", connectionFactory.Address().String())), - metrics: metrics, - backoffManager: NewDefaultBackoffManager(), - state: RelayConnectionStateInitializing, - subscriptions: make(map[transport.SubscriptionID]subscription), - subscriptionsUpdatedCh: make(chan struct{}), - eventsToSend: make(map[domain.EventId]*eventToSend), - newEventsCh: make(chan domain.Event), + connectionFactory: connectionFactory, + logger: logger.New(fmt.Sprintf("relayConnection(%s)", connectionFactory.Address().String())), + metrics: metrics, + backoffManager: NewDefaultBackoffManager(), + state: RelayConnectionStateInitializing, + subscriptions: make(map[transport.SubscriptionID]subscription), + subscriptionsUpdatedCh: make(chan struct{}), + eventsToSend: make(map[domain.EventId]*eventToSend), + newEventsCh: make(chan domain.Event), + rateLimitNoticeBackoffManager: NewRateLimitNoticeBackoffManager(connectionFactory.Address()), } } @@ -136,6 +204,7 @@ func (r *RelayConnection) GetEvents(ctx context.Context, filter domain.Filter) ( r.triggerSubscriptionUpdate() go func() { + // Context is cancelled from a task.CheckIfDoneAndEnd() triggered from EOSE messages <-ctx.Done() if err := r.removeSubscriptionChannel(ch); err != nil { panic(err) @@ -145,12 +214,13 @@ func (r *RelayConnection) GetEvents(ctx context.Context, filter domain.Filter) ( return ch, nil } +// pushes the event to the eventsToSend channel and blocks until a sendEventResponse is received func (r *RelayConnection) SendEvent(ctx context.Context, event domain.Event) error { ctx, cancel := context.WithTimeout(ctx, sendEventTimeout) defer cancel() ch := make(chan sendEventResponse) - shouldNotify := r.scheduleSendingEvent(ctx, event, ch) + isFirstSchedule := r.scheduleSendingEvent(ctx, event, ch) defer func() { if err := r.removeEventChannel(event, ch); err != nil { @@ -158,7 +228,7 @@ func (r *RelayConnection) SendEvent(ctx context.Context, event domain.Event) err } }() - if shouldNotify { + if isFirstSchedule { select { case r.newEventsCh <- event: case <-ctx.Done(): @@ -259,17 +329,12 @@ func (r *RelayConnection) removeEventChannel(event domain.Event, chToRemove chan } func (r *RelayConnection) triggerSubscriptionUpdate() { - if !r.subscriptionsUpdatedChClosed { - r.subscriptionsUpdatedChClosed = true - close(r.subscriptionsUpdatedCh) + select { + case r.subscriptionsUpdatedCh <- struct{}{}: + default: } } -func (r *RelayConnection) resetSubscriptionUpdateCh() { - r.subscriptionsUpdatedChClosed = false - r.subscriptionsUpdatedCh = make(chan struct{}) -} - func (r *RelayConnection) run(ctx context.Context) error { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -341,12 +406,54 @@ func parseMessage(messageBytes []byte) nostr.Envelope { containsClose := bytes.Contains(label, []byte("CLOSE")) if containsClose { ce := nostr.CloseEnvelope("") - return &ce + if err := ce.UnmarshalJSON(messageBytes); err == nil { + return &ce + } } return nostr.ParseMessage(messageBytes) } +type NoticeType string + +const ( + NoticeTypeEmpty NoticeType = "EMPTY" + NoticeOnlyWithSearch NoticeType = "ONLY_WITH_SEARCH" + NoticeRateLimit NoticeType = "RATE_LIMIT" + NoticeAuthRequired NoticeType = "AUTH_REQUIRED" + NoticeUnknownFeed NoticeType = "UNKNOWN_FEED" + NoticeInvalid NoticeType = "INVALID" + NoticeUnknownError NoticeType = "UNKNOWN_ERROR" + NoticeUnknown NoticeType = "UNKNOWN" +) + +var unknownFeedRegexp = regexp.MustCompile(`unknown.*feed`) +var rateLimitRegexp = regexp.MustCompile(`rate.*limit|too fast`) + +func GetNoticeType(content string) NoticeType { + content = strings.ToLower(content) + content = strings.TrimSpace(content) + + switch { + case content == "": + return NoticeTypeEmpty + case strings.HasSuffix(content, "only filter with search is supported"): + return NoticeOnlyWithSearch + case rateLimitRegexp.MatchString(content): + return NoticeRateLimit + case strings.Contains(content, "auth"): + return NoticeAuthRequired + case unknownFeedRegexp.MatchString(content): + return NoticeUnknownFeed + case strings.Contains(content, "invalid"): + return NoticeInvalid + case strings.Contains(content, "error"): + return NoticeUnknownError + default: + return NoticeUnknown + } +} + func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) { address := r.connectionFactory.Address() @@ -391,10 +498,19 @@ func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) { return nil case *nostr.NoticeEnvelope: defer r.metrics.ReportMessageReceived(address, MessageTypeNotice, &err) + noticeType := GetNoticeType(string(*v)) + r.logger. Debug(). WithField("message", string(messageBytes)). + WithField("noticeType", noticeType). Message("received a message (notice)") + + defer r.metrics.ReportNotice(address, noticeType) + + if noticeType == NoticeRateLimit { + r.rateLimitNoticeBackoffManager.Bump() + } return nil case *nostr.AuthEnvelope: defer r.metrics.ReportMessageReceived(address, MessageTypeAuth, &err) @@ -482,7 +598,7 @@ func (r *RelayConnection) manageSubs(ctx context.Context, conn Connection) error select { case <-r.subscriptionsUpdatedCh: - continue + // A subscription update has been triggered; continue to reevaluate subscriptions. case <-ctx.Done(): return ctx.Err() } @@ -496,8 +612,6 @@ func (r *RelayConnection) updateSubs( r.subscriptionsMutex.Lock() defer r.subscriptionsMutex.Unlock() - r.resetSubscriptionUpdateCh() - for _, subscriptionID := range activeSubscriptions.List() { if _, ok := r.subscriptions[subscriptionID]; !ok { msg := transport.NewMessageClose(subscriptionID) @@ -506,6 +620,7 @@ func (r *RelayConnection) updateSubs( WithField("subscriptionID", subscriptionID). Message("closing subscription") + r.rateLimitNoticeBackoffManager.Wait() if err := conn.SendMessage(msg); err != nil { return errors.Wrap(err, "writing close message error") } @@ -522,6 +637,7 @@ func (r *RelayConnection) updateSubs( WithField("subscriptionID", subscriptionID). Message("opening subscription") + r.rateLimitNoticeBackoffManager.Wait() if err := conn.SendMessage(msg); err != nil { return errors.Wrap(err, "writing req message error") } @@ -548,6 +664,7 @@ func (r *RelayConnection) copyAllPastEvents() []domain.Event { func (r *RelayConnection) sendEvents(ctx context.Context, conn Connection) error { for _, event := range r.copyAllPastEvents() { msg := transport.NewMessageEvent(event) + r.rateLimitNoticeBackoffManager.Wait() if err := conn.SendMessage(msg); err != nil { return errors.Wrap(err, "error writing a message") } @@ -557,6 +674,7 @@ func (r *RelayConnection) sendEvents(ctx context.Context, conn Connection) error select { case event := <-r.newEventsCh: msg := transport.NewMessageEvent(event) + r.rateLimitNoticeBackoffManager.Wait() if err := conn.SendMessage(msg); err != nil { return errors.Wrap(err, "error writing a message") } diff --git a/service/domain/relays/relay_connection_test.go b/service/domain/relays/relay_connection_test.go index 0209d14..8fcdded 100644 --- a/service/domain/relays/relay_connection_test.go +++ b/service/domain/relays/relay_connection_test.go @@ -257,3 +257,6 @@ func (m2 mockMetrics) ReportMessageReceived(address domain.RelayAddress, message func (m2 mockMetrics) ReportRelayDisconnection(address domain.RelayAddress, err error) { } + +func (m2 mockMetrics) ReportNotice(address domain.RelayAddress, noticeType relays.NoticeType) { +} diff --git a/service/domain/relays/relay_connections.go b/service/domain/relays/relay_connections.go index d16c757..b7c97ae 100644 --- a/service/domain/relays/relay_connections.go +++ b/service/domain/relays/relay_connections.go @@ -14,6 +14,7 @@ type Metrics interface { ReportNumberOfSubscriptions(address domain.RelayAddress, n int) ReportMessageReceived(address domain.RelayAddress, messageType MessageType, err *error) ReportRelayDisconnection(address domain.RelayAddress, err error) + ReportNotice(address domain.RelayAddress, noticeType NoticeType) } type MessageType struct { @@ -93,6 +94,7 @@ func (d *RelayConnections) storeMetrics() { d.metrics.ReportRelayConnectionsState(m) } +// Notice that a single connection can serve multiple req. This can cause a too many concurrent requests error if not throttled. func (r *RelayConnections) getConnection(relayAddress domain.RelayAddress) *RelayConnection { r.connectionsLock.Lock() defer r.connectionsLock.Unlock() From dd8515ddeaf45894c0e24b3a916d599fa3dc0c1f Mon Sep 17 00:00:00 2001 From: Daniel Cadenas Date: Mon, 12 Feb 2024 15:16:33 -0300 Subject: [PATCH 2/2] Move func, remove unused var --- go.mod | 2 +- internal/logging/log.go | 11 +++++++++++ service/domain/relays/relay_connection.go | 11 ----------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/go.mod b/go.mod index 158b0e5..3069dcd 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/ThreeDotsLabs/watermill-googlecloud v1.0.13 github.com/boreq/errors v0.1.0 github.com/boreq/rest v0.1.0 + github.com/davecgh/go-spew v1.1.1 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 github.com/google/wire v0.5.0 github.com/gorilla/mux v1.8.1 @@ -36,7 +37,6 @@ require ( github.com/btcsuite/btcd/chaincfg/chainhash v1.0.2 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/crypto/blake256 v1.0.1 // indirect github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect diff --git a/internal/logging/log.go b/internal/logging/log.go index 06c5397..b405276 100644 --- a/internal/logging/log.go +++ b/internal/logging/log.go @@ -7,6 +7,7 @@ import ( "os/exec" "reflect" "runtime" + "time" "github.com/davecgh/go-spew/spew" ) @@ -202,6 +203,16 @@ func (d devNullLoggerEntry) WithField(key string, v any) Entry { func (d devNullLoggerEntry) Message(msg string) { } +// logPeriodically executes the passed action function if the current time in milliseconds +// modulo logInterval equals zero. This approach allows executing the action periodically, +// approximating the execution to happen once every `logInterval` milliseconds. +func LogPeriodically(action func(), logInterval int64) { + currentTimeMillis := time.Now().UnixNano() / int64(time.Millisecond) + if currentTimeMillis%logInterval == 0 { + action() + } +} + func Inspect(args ...interface{}) { for _, arg := range args { val := reflect.ValueOf(arg) diff --git a/service/domain/relays/relay_connection.go b/service/domain/relays/relay_connection.go index 09ba261..14cfe23 100644 --- a/service/domain/relays/relay_connection.go +++ b/service/domain/relays/relay_connection.go @@ -21,7 +21,6 @@ import ( "github.com/planetary-social/nos-event-service/service/domain/relays/transport" ) -const relayAddress = "wss://relay.nostr.com.au" const ( ReconnectionBackoff = 5 * time.Minute MaxDialReconnectionBackoff = 30 * time.Minute @@ -90,16 +89,6 @@ func (r *RateLimitNoticeBackoffManager) Bump() { r.updateLastBumpTime() } -// logPeriodically executes the passed action function if the current time in milliseconds -// modulo logInterval equals zero. This approach allows executing the action periodically, -// approximating the execution to happen once every `logInterval` milliseconds. -func logPeriodically(action func(), logInterval int64) { - currentTimeMillis := time.Now().UnixNano() / int64(time.Millisecond) - if currentTimeMillis%logInterval == 0 { - action() - } -} - func (r *RateLimitNoticeBackoffManager) Wait() { if r.rateLimitNoticeCount <= 0 { return