Skip to content

Commit

Permalink
Merge pull request #88 from planetary-social/load-shedding
Browse files Browse the repository at this point in the history
Load shedding instead of disconnection under backpressure
  • Loading branch information
dcadenas authored May 20, 2024
2 parents 27b9057 + 1d8df00 commit ea4e456
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 19 deletions.
19 changes: 3 additions & 16 deletions service/domain/relays/relay_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ type RelayConnection struct {
eventsToSendMutex sync.Mutex
newEventsCh chan domain.Event
rateLimitNoticeBackoffManager *RateLimitNoticeBackoffManager
cancelRun context.CancelFunc
cancelBackPressure context.CancelFunc
}

Expand All @@ -91,7 +90,6 @@ func NewRelayConnection(
eventsToSend: make(map[domain.EventId]*eventToSend),
newEventsCh: make(chan domain.Event),
rateLimitNoticeBackoffManager: rateLimitNoticeBackoffManager,
cancelRun: nil,
cancelBackPressure: nil,
}
}
Expand Down Expand Up @@ -311,8 +309,6 @@ func (r *RelayConnection) triggerSubscriptionUpdate() {

func (r *RelayConnection) run(ctx context.Context) error {
ctx, cancel := context.WithCancel(ctx)
backPressureCtx, cancelFromBackPressure := context.WithCancel(ctx)
r.cancelRun = cancelFromBackPressure
defer cancel()

defer r.setState(RelayConnectionStateDisconnected)
Expand Down Expand Up @@ -359,17 +355,9 @@ func (r *RelayConnection) run(ctx context.Context) error {
return NewReadMessageError(err)
}

select {
case <-backPressureCtx.Done():
// The backpressure handling code is to avoid overwhelming the queue
// that writes to relay.nos.social so we skip that signal for it or
// the queue will never shrink
if r.Address().HostWithoutPort() == "relay.nos.social" {
continue
}

return BackPressureError
default:
if r.state == RelayConnectionStateBackPressured {
// Load shedding under backpressure
continue
}

if err := r.handleMessage(messageBytes); err != nil {
Expand All @@ -378,7 +366,6 @@ func (r *RelayConnection) run(ctx context.Context) error {
WithError(err).
WithField("message", string(messageBytes)).
Message("error handling an incoming message")
continue
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions service/domain/relays/relay_connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,8 @@ func (d *RelayConnections) SendEvent(ctx context.Context, relayAddress domain.Re

func (d *RelayConnections) NotifyBackPressure() {
for _, connection := range d.connections {
if connection.cancelRun != nil && connection.Address().HostWithoutPort() != "relay.nos.social" {
connection.cancelRun()
connection.cancelRun = nil
if connection.Address().HostWithoutPort() != "relay.nos.social" {
connection.setState(RelayConnectionStateBackPressured)
}
}
}
Expand Down

0 comments on commit ea4e456

Please sign in to comment.