diff --git a/feign-reactor-core/src/main/java/reactivefeign/publisher/retry/RetryPublisherHttpClient.java b/feign-reactor-core/src/main/java/reactivefeign/publisher/retry/RetryPublisherHttpClient.java index 9862a1eb..b27be7ea 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/publisher/retry/RetryPublisherHttpClient.java +++ b/feign-reactor-core/src/main/java/reactivefeign/publisher/retry/RetryPublisherHttpClient.java @@ -40,6 +40,7 @@ abstract public class RetryPublisherHttpClient implements PublisherHttpClient { protected final String feignMethodKey; protected final PublisherHttpClient publisherClient; private final Retry retry; + private final ReactiveRetryPolicy retryPolicy; private final ExceptionPropagationPolicy exceptionPropagationPolicy; @@ -52,6 +53,7 @@ protected RetryPublisherHttpClient( this.feignMethodKey = methodMetadata.configKey(); this.retry = wrapWithRetryLog(retryPolicy.retry(), feignMethodKey); this.exceptionPropagationPolicy = retryPolicy.exceptionPropagationPolicy(); + this.retryPolicy = retryPolicy; } protected Retry getRetry(ReactiveHttpRequest request){ @@ -64,7 +66,7 @@ private Retry wrapWithOutOfRetriesLog(ReactiveHttpRequest request) { public Publisher generateCompanion(Flux retrySignals) { return Flux.from(retry.generateCompanion(retrySignals)) .onErrorResume(throwable -> Mono.just(new OutOfRetriesWrapper(throwable, request))) - .zipWith(Flux.range(1, Integer.MAX_VALUE), (object, index) -> { + .zipWith(Flux.range(1, retryPolicy.maxAllowedRetries() + 1), (object, index) -> { if(object instanceof OutOfRetriesWrapper){ OutOfRetriesWrapper wrapper = (OutOfRetriesWrapper) object; if(index == 1){ diff --git a/feign-reactor-core/src/main/java/reactivefeign/retry/BasicReactiveRetryPolicy.java b/feign-reactor-core/src/main/java/reactivefeign/retry/BasicReactiveRetryPolicy.java index cab838c7..c7b55b3c 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/retry/BasicReactiveRetryPolicy.java +++ b/feign-reactor-core/src/main/java/reactivefeign/retry/BasicReactiveRetryPolicy.java @@ -77,6 +77,11 @@ public long retryDelay(Throwable error, int attemptNo) { } } + @Override + public int maxAllowedRetries() { + return maxRetries; + } + public static class Builder implements ReactiveRetryPolicy.Builder{ private int maxRetries; private long backoffInMs = 0; diff --git a/feign-reactor-core/src/main/java/reactivefeign/retry/FilteredReactiveRetryPolicy.java b/feign-reactor-core/src/main/java/reactivefeign/retry/FilteredReactiveRetryPolicy.java index 08bc50f3..f81767be 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/retry/FilteredReactiveRetryPolicy.java +++ b/feign-reactor-core/src/main/java/reactivefeign/retry/FilteredReactiveRetryPolicy.java @@ -31,6 +31,11 @@ public Retry retry() { return filter(retryPolicy.retry(), toRetryOn); } + @Override + public int maxAllowedRetries() { + return retryPolicy.maxAllowedRetries(); + } + static Retry filter( Retry retry, Predicate toRetryOn){ diff --git a/feign-reactor-core/src/main/java/reactivefeign/retry/ReactiveRetryPolicy.java b/feign-reactor-core/src/main/java/reactivefeign/retry/ReactiveRetryPolicy.java index e1be2034..e1e32e2b 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/retry/ReactiveRetryPolicy.java +++ b/feign-reactor-core/src/main/java/reactivefeign/retry/ReactiveRetryPolicy.java @@ -10,6 +10,8 @@ public interface ReactiveRetryPolicy { Retry retry(); + int maxAllowedRetries(); + interface Builder { ReactiveRetryPolicy build(); } diff --git a/feign-reactor-core/src/main/java/reactivefeign/retry/SimpleReactiveRetryPolicy.java b/feign-reactor-core/src/main/java/reactivefeign/retry/SimpleReactiveRetryPolicy.java index aff940c0..0bbc9487 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/retry/SimpleReactiveRetryPolicy.java +++ b/feign-reactor-core/src/main/java/reactivefeign/retry/SimpleReactiveRetryPolicy.java @@ -55,7 +55,7 @@ public ExceptionPropagationPolicy exceptionPropagationPolicy(){ @Override public Retry retry() { return Retry.from(errors -> errors - .zipWith(Flux.range(1, Integer.MAX_VALUE), (signal, index) -> { + .zipWith(Flux.range(1, maxAllowedRetries() + 1), (signal, index) -> { long delay = retryDelay(signal.failure(), index); if (delay >= 0) { return Tuples.of(delay, signal);