Skip to content

Commit

Permalink
Correctly identify CLOSED messages and rate limit on them too
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Feb 14, 2024
1 parent 6281d72 commit 2527887
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 28 deletions.
24 changes: 18 additions & 6 deletions internal/logging/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os/exec"
"reflect"
"runtime"
"sync"
"time"

"github.com/davecgh/go-spew/spew"
Expand Down Expand Up @@ -203,12 +204,23 @@ func (d devNullLoggerEntry) WithField(key string, v any) Entry {
func (d devNullLoggerEntry) Message(msg string) {
}

// logPeriodically executes the passed action function if the current time in milliseconds
// modulo logInterval equals zero. This approach allows executing the action periodically,
// approximating the execution to happen once every `logInterval` milliseconds.
func LogPeriodically(action func(), logInterval int64) {
currentTimeMillis := time.Now().UnixNano() / int64(time.Millisecond)
if currentTimeMillis%logInterval == 0 {
// ConfigureTimeThrottler returns a throttled version of the provided action function.
// The returned function will execute the action at most once every specified duration.
// TODO: Maybe this should be moved to a separate package.
func ConfigureTimeThrottler(duration time.Duration) func(action func()) {
var (
lastExecution time.Time
mu sync.Mutex
)
return func(action func()) {
mu.Lock()
defer mu.Unlock()

if time.Since(lastExecution) < duration {
return
}

lastExecution = time.Now()
action()
}
}
Expand Down
8 changes: 6 additions & 2 deletions service/domain/relays/rate_limit_notice_backoff_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ func (r *RateLimitNoticeBackoffManager) Bump() {
r.updateLastBumpTime()
}

func (r *RateLimitNoticeBackoffManager) IsSet() bool {
rateLimitNoticeCount := atomic.LoadInt32(&r.rateLimitNoticeCount)
return rateLimitNoticeCount > 0
}

const maxBackoffMs = 10000
const secondsToDecreaseRateLimitNoticeCount = 60 * 5 // 5 minutes = 300 seconds

func (r *RateLimitNoticeBackoffManager) Wait() {
rateLimitNoticeCount := atomic.LoadInt32(&r.rateLimitNoticeCount)
if rateLimitNoticeCount <= 0 {
if !r.IsSet() {
return
}

Expand Down
65 changes: 45 additions & 20 deletions service/domain/relays/relay_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,21 +333,19 @@ func (r *RelayConnection) writeErrorShouldNotBeLogged(err error) bool {
return errors.Is(err, context.Canceled) || errors.Is(err, websocket.ErrCloseSent)
}

func parseMessage(messageBytes []byte) nostr.Envelope {
func parseEnvelope(messageBytes []byte) (nostr.Envelope, bool) {
firstComma := bytes.Index(messageBytes, []byte{','})
if firstComma == -1 {
return nil
return nil, false
}
label := messageBytes[0:firstComma]
containsClose := bytes.Contains(label, []byte("CLOSE"))
if containsClose {
ce := nostr.CloseEnvelope("")
if err := ce.UnmarshalJSON(messageBytes); err == nil {
return &ce
}
containsClosed := bytes.Contains(label, []byte("CLOSED"))
if containsClosed {
return nil, true
}

return nostr.ParseMessage(messageBytes)
nostrEnvelope := nostr.ParseMessage(messageBytes)
return nostrEnvelope, false
}

type NoticeType string
Expand Down Expand Up @@ -390,12 +388,20 @@ func GetNoticeType(content string) NoticeType {
}
}

var everyTenSeconds func(func()) = logging.ConfigureTimeThrottler(10 * time.Second)

func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) {
address := r.connectionFactory.Address()

envelope := parseMessage(messageBytes)
if envelope == nil {
envelope, isClosed := parseEnvelope(messageBytes)
if envelope == nil && !isClosed {
defer r.metrics.ReportMessageReceived(address, MessageTypeUnknown, &err)
everyTenSeconds(func() {
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received an unrecognized envelope message")
})
return errors.New("error parsing message, we are never going to find out what error unfortunately due to the design of this library")
}

Expand Down Expand Up @@ -474,16 +480,35 @@ func (r *RelayConnection) handleMessage(messageBytes []byte) (err error) {

r.passSendEventResponseToChannel(eventID, response)
return nil
case *nostr.CloseEnvelope:
defer r.metrics.ReportMessageReceived(address, MessageTypeClose, &err)
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received a message (close)")
return nil
default:
defer r.metrics.ReportMessageReceived(address, MessageTypeUnknown, &err)
return errors.New("unknown message type")
if isClosed {
defer r.metrics.ReportMessageReceived(address, MessageTypeClose, &err)
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received a message (closed)")

if r.rateLimitNoticeBackoffManager.rateLimitNoticeCount <= 0 {
// Let's add a rate limit for close messages too, just in case some
// relays close from rate limiting without sending a notice. It
// makes also sense event if it's not for rate limit reasons but
// just to avoid spamming relays that we know are closing often
r.rateLimitNoticeBackoffManager.Bump()
}
return nil
} else {
defer r.metrics.ReportMessageReceived(address, MessageTypeUnknown, &err)

// Add some traces to investigate these unknown messages
everyTenSeconds(func() {
r.logger.
Debug().
WithField("message", string(messageBytes)).
Message("received an unknown message")
})
return errors.New("unknown message type")

}
}
}

Expand Down

0 comments on commit 2527887

Please sign in to comment.