diff --git a/broker/nats.go b/broker/nats.go index 005f5cee..cf387433 100644 --- a/broker/nats.go +++ b/broker/nats.go @@ -413,13 +413,8 @@ createConsumer: }) if cerr != nil { - if context.DeadlineExceeded == cerr { - if attempts > 0 { - attempts-- - n.log.Warnf("failed to create consumer for stream %s, retrying in 500ms...", stream) - time.Sleep(500 * time.Millisecond) - goto createConsumer - } + if n.shouldRetryOnError(cerr, &attempts, 500*time.Millisecond) { + goto createConsumer } } } @@ -469,13 +464,8 @@ createStream: }, func(stream string) {}) if err != nil { - if context.DeadlineExceeded == err { - if attempts > 0 { - attempts-- - n.log.Warnf("failed to create stream %s, retrying in 500ms...", stream) - time.Sleep(500 * time.Millisecond) - goto createStream - } + if n.shouldRetryOnError(err, &attempts, 500*time.Millisecond) { + goto createStream } } @@ -508,14 +498,10 @@ fetchEpoch: return string(entry.Value()), nil } else if err != nil { - if context.DeadlineExceeded == err { - if attempts > 0 { - attempts-- - n.log.Warnf("failed to retrieve epoch, retrying in 1s...") - time.Sleep(1 * time.Second) - goto fetchEpoch - } + if n.shouldRetryOnError(err, &attempts, 1*time.Second) { + goto fetchEpoch } + return "", errorx.Decorate(err, "failed to create key: %s", epochKey) } @@ -767,3 +753,16 @@ func (s *streamSync) idle() { s.active = false close(s.cv) } + +func (n *NATS) shouldRetryOnError(err error, attempts *int, cooldown time.Duration) bool { + if context.DeadlineExceeded == err || jetstream.ErrNoStreamResponse == err { + if *attempts > 0 { + (*attempts)-- + n.log.Warnf("operation failed with %s, retrying in %s...", err.Error(), cooldown.String()) + time.Sleep(cooldown) + return true + } + } + + return false +}