Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
refactor: nats broker retrying logic
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Nov 2, 2023
1 parent fd96a6b commit d7e7184
Showing 1 changed file with 20 additions and 21 deletions.
41 changes: 20 additions & 21 deletions broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,13 +413,8 @@ createConsumer:
})

if cerr != nil {
if context.DeadlineExceeded == cerr {
if attempts > 0 {
attempts--
n.log.Warnf("failed to create consumer for stream %s, retrying in 500ms...", stream)
time.Sleep(500 * time.Millisecond)
goto createConsumer
}
if n.shouldRetryOnError(cerr, &attempts, 500*time.Millisecond) {
goto createConsumer
}
}
}
Expand Down Expand Up @@ -469,13 +464,8 @@ createStream:
}, func(stream string) {})

if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to create stream %s, retrying in 500ms...", stream)
time.Sleep(500 * time.Millisecond)
goto createStream
}
if n.shouldRetryOnError(err, &attempts, 500*time.Millisecond) {
goto createStream
}
}

Expand Down Expand Up @@ -508,14 +498,10 @@ fetchEpoch:

return string(entry.Value()), nil
} else if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to retrieve epoch, retrying in 1s...")
time.Sleep(1 * time.Second)
goto fetchEpoch
}
if n.shouldRetryOnError(err, &attempts, 1*time.Second) {
goto fetchEpoch
}

return "", errorx.Decorate(err, "failed to create key: %s", epochKey)
}

Expand Down Expand Up @@ -767,3 +753,16 @@ func (s *streamSync) idle() {
s.active = false
close(s.cv)
}

func (n *NATS) shouldRetryOnError(err error, attempts *int, cooldown time.Duration) bool {
if context.DeadlineExceeded == err || jetstream.ErrNoStreamResponse == err {
if *attempts > 0 {
(*attempts)--
n.log.Warnf("operation failed with %s, retrying in %s...", err.Error(), cooldown.String())
time.Sleep(cooldown)
return true
}
}

return false
}

0 comments on commit d7e7184

Please sign in to comment.