Skip to content

Commit

Permalink
Tidy up a few compiler warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
dsyer committed Sep 6, 2019
1 parent ba0b6ed commit bf8dadb
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 394 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

import java.util.function.Supplier;

public interface LastBackoffPeriodSupplier extends Supplier<Long> {
public interface BackoffPeriodSupplier extends Supplier<Long> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class RememberPeriodSleeper implements Sleeper, LastBackoffPeriodSupplier {
public class RememberPeriodSleeper implements Sleeper, BackoffPeriodSupplier {

private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class);
private static final Log logger = LogFactory.getLog(RememberPeriodSleeper.class);

private volatile Long lastBackoffPeriod;
private volatile Long lastBackoffPeriod;

@Override
public void sleep(long backOffPeriod) {
logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod);
lastBackoffPeriod = backOffPeriod;
}
@Override
public void sleep(long backOffPeriod) {
logger.debug("Remembering a sleeping period instead of sleeping: " + backOffPeriod);
lastBackoffPeriod = backOffPeriod;
}

@Override
public Long get() {
return lastBackoffPeriod;
}

@Override
public Long get() {
return lastBackoffPeriod;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@

import org.springframework.classify.Classifier;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.RetryState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,69 +16,64 @@

package org.springframework.retry.support;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryException;
import org.springframework.retry.backoff.LastBackoffPeriodSupplier;
import org.springframework.retry.backoff.BackoffPeriodSupplier;

/**
* @author Dave Syer
* @param <T> The result type
*/
public abstract class AsyncRetryResultProcessor<T> implements RetryResultProcessor<T> {
private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class);

protected T doNewAttempt(Supplier<Result<T>> supplier) throws Throwable {
logger.debug("Performing the next async callback invocation...");
return supplier.get().getOrThrow();
}
private static final Log logger = LogFactory.getLog(AsyncRetryResultProcessor.class);

protected T doNewAttempt(Supplier<Result<T>> supplier) throws Throwable {
logger.debug("Performing the next async callback invocation...");
return supplier.get().getOrThrow();
}

protected abstract T scheduleNewAttemptAfterDelay(Supplier<Result<T>> supplier,
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx)
throws Throwable;

protected abstract T scheduleNewAttemptAfterDelay(
Supplier<Result<T>> supplier,
ScheduledExecutorService reschedulingExecutor,
long rescheduleAfterMillis,
RetryContext ctx
) throws Throwable;
protected T handleException(Supplier<Result<T>> supplier, Consumer<Throwable> handler, Throwable throwable,
ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {
try {
handler.accept(unwrapIfNeed(throwable));

protected T handleException(Supplier<Result<T>> supplier,
Consumer<Throwable> handler,
Throwable throwable,
ScheduledExecutorService reschedulingExecutor,
LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {
try {
handler.accept(unwrapIfNeed(throwable));
if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) {
return doNewAttempt(supplier);
}
else {
long rescheduleAfterMillis = lastBackoffPeriodSupplier.get();
logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis...");
return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx);
}
}
catch (Throwable t) {
throw RetryTemplate.runtimeException(unwrapIfNeed(t));
}
}

if (reschedulingExecutor == null || lastBackoffPeriodSupplier == null) {
return doNewAttempt(supplier);
} else {
long rescheduleAfterMillis = lastBackoffPeriodSupplier.get();
logger.debug("Scheduling a next retry with a delay = " + rescheduleAfterMillis + " millis...");
return scheduleNewAttemptAfterDelay(supplier, reschedulingExecutor, rescheduleAfterMillis, ctx);
}
}
catch (Throwable t) {
throw RetryTemplate.runtimeException(unwrapIfNeed(t));
}
}
static Throwable unwrapIfNeed(Throwable throwable) {
if (throwable instanceof ExecutionException || throwable instanceof CompletionException
|| throwable instanceof RetryException) {
return throwable.getCause();
}
else {
return throwable;
}
}

static Throwable unwrapIfNeed(Throwable throwable) {
if (throwable instanceof ExecutionException
|| throwable instanceof CompletionException
|| throwable instanceof RetryException) {
return throwable.getCause();
} else {
return throwable;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package org.springframework.retry.support;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
Expand All @@ -27,59 +25,56 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryException;
import org.springframework.retry.backoff.LastBackoffPeriodSupplier;
import org.springframework.retry.backoff.BackoffPeriodSupplier;

/**
* A {@link RetryResultProcessor} for a {@link CompletableFuture}. If a
* {@link RetryCallback} returns a <code>CompletableFuture</code> this processor can be
* used internally by the {@link RetryTemplate} to wrap it and process the result.
*
* @author Dave Syer
* @param <V> The result type
*/
public class CompletableFutureRetryResultProcessor<V>
extends AsyncRetryResultProcessor<CompletableFuture<V>> {
public class CompletableFutureRetryResultProcessor<V> extends AsyncRetryResultProcessor<CompletableFuture<V>> {

protected final Log logger = LogFactory.getLog(getClass());

@Override
public Result<CompletableFuture<V>> process(CompletableFuture<V> completable,
Supplier<Result<CompletableFuture<V>>> supplier,
Consumer<Throwable> handler, ScheduledExecutorService reschedulingExecutor,
LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
Supplier<Result<CompletableFuture<V>>> supplier, Consumer<Throwable> handler,
ScheduledExecutorService reschedulingExecutor, BackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {

CompletableFuture<V> handle = completable
.thenApply(CompletableFuture::completedFuture)
.exceptionally(throwable -> handleException(
supplier, handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)
)
.thenApply(CompletableFuture::completedFuture).exceptionally(throwable -> handleException(supplier,
handler, throwable, reschedulingExecutor, lastBackoffPeriodSupplier, ctx))
.thenCompose(Function.identity());

return new Result<>(handle);
}

protected CompletableFuture<V> scheduleNewAttemptAfterDelay(
Supplier<Result<CompletableFuture<V>>> supplier,
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis,
RetryContext ctx)
{
protected CompletableFuture<V> scheduleNewAttemptAfterDelay(Supplier<Result<CompletableFuture<V>>> supplier,
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx) {
CompletableFuture<CompletableFuture<V>> futureOfFurtherScheduling = new CompletableFuture<>();

reschedulingExecutor.schedule(() -> {
try {
RetrySynchronizationManager.register(ctx);
futureOfFurtherScheduling.complete(doNewAttempt(supplier));
} catch (Throwable t) {
}
catch (Throwable t) {
futureOfFurtherScheduling.completeExceptionally(t);
throw RetryTemplate.runtimeException(t);
} finally {
}
finally {
RetrySynchronizationManager.clear();
}
}, rescheduleAfterMillis, TimeUnit.MILLISECONDS);

return futureOfFurtherScheduling.thenCompose(Function.identity());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,42 +23,41 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.backoff.LastBackoffPeriodSupplier;
import org.springframework.retry.backoff.BackoffPeriodSupplier;

/**
* todo: check or remove after discussion
*
* A {@link RetryResultProcessor} for a plain {@link Future}. If a {@link RetryCallback}
* returns a <code>Future</code> this processor can be used internally by the
* {@link RetryTemplate} to wrap it and process the result.
*
*
* @author Dave Syer
* @param <V> The result type
*/
public class FutureRetryResultProcessor<V> extends AsyncRetryResultProcessor<Future<V>> {

@Override
public Result<Future<V>> process(Future<V> future,
Supplier<Result<Future<V>>> supplier, Consumer<Throwable> handler,
ScheduledExecutorService reschedulingExecutor, LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {
public Result<Future<V>> process(Future<V> future, Supplier<Result<Future<V>>> supplier,
Consumer<Throwable> handler, ScheduledExecutorService reschedulingExecutor,
BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx) {
return new Result<>(new FutureWrapper(future, supplier, handler, this, reschedulingExecutor,
lastBackoffPeriodSupplier, ctx));
}

@Override
protected Future<V> scheduleNewAttemptAfterDelay(Supplier<Result<Future<V>>> supplier,
ScheduledExecutorService reschedulingExecutor, long rescheduleAfterMillis, RetryContext ctx)
throws Throwable
{
throws Throwable {
ScheduledFuture<Future<V>> scheduledFuture = reschedulingExecutor.schedule(() -> {
try {
return doNewAttempt(supplier);
} catch (Throwable t) {
}
catch (Throwable t) {
throw RetryTemplate.runtimeException(t);
}
}, rescheduleAfterMillis, TimeUnit.MILLISECONDS);
Expand All @@ -73,15 +72,18 @@ private class FutureWrapper implements Future<V> {
private Supplier<Result<Future<V>>> supplier;

private Consumer<Throwable> handler;

private AsyncRetryResultProcessor<Future<V>> processor;

private final ScheduledExecutorService reschedulingExecutor;
private final LastBackoffPeriodSupplier lastBackoffPeriodSupplier;

private final BackoffPeriodSupplier lastBackoffPeriodSupplier;

private RetryContext ctx;

FutureWrapper(Future<V> delegate, Supplier<Result<Future<V>>> supplier,
Consumer<Throwable> handler, AsyncRetryResultProcessor<Future<V>> processor,
ScheduledExecutorService reschedulingExecutor, LastBackoffPeriodSupplier lastBackoffPeriodSupplier,
RetryContext ctx) {
FutureWrapper(Future<V> delegate, Supplier<Result<Future<V>>> supplier, Consumer<Throwable> handler,
AsyncRetryResultProcessor<Future<V>> processor, ScheduledExecutorService reschedulingExecutor,
BackoffPeriodSupplier lastBackoffPeriodSupplier, RetryContext ctx) {
this.delegate = delegate;
this.supplier = supplier;
this.handler = handler;
Expand Down Expand Up @@ -112,19 +114,20 @@ public V get() throws InterruptedException, ExecutionException {
return this.delegate.get();
}
catch (Throwable e) {
return processor.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)
return processor
.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)
.get();
}
}

@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
try {
return this.delegate.get(timeout, unit);
}
catch (Throwable e) {
return processor.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)
return processor
.handleException(supplier, handler, e, reschedulingExecutor, lastBackoffPeriodSupplier, ctx)
.get(timeout, unit);
}
}
Expand All @@ -142,22 +145,25 @@ private class FutureFlatter implements Future<V> {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
try {
if (this.nestedFuture.isDone()) {
return this.nestedFuture.get().cancel(mayInterruptIfRunning);
} else {
return this.nestedFuture.cancel(mayInterruptIfRunning);
if (this.nestedFuture.isDone()) {
return this.nestedFuture.get().cancel(mayInterruptIfRunning);
}
else {
return this.nestedFuture.cancel(mayInterruptIfRunning);
}
}
} catch (Throwable t) {
catch (Throwable t) {
throw RetryTemplate.runtimeException(t);
}
}
}

@Override
public boolean isCancelled() {
try {
return this.nestedFuture.isCancelled()
|| (this.nestedFuture.isDone() && this.nestedFuture.get().isCancelled());
} catch (Throwable t) {
}
catch (Throwable t) {
throw RetryTemplate.runtimeException(t);
}
}
Expand All @@ -166,7 +172,8 @@ public boolean isCancelled() {
public boolean isDone() {
try {
return this.nestedFuture.isDone() && this.nestedFuture.get().isDone();
} catch (Throwable t) {
}
catch (Throwable t) {
throw RetryTemplate.runtimeException(t);
}
}
Expand All @@ -177,8 +184,7 @@ public V get() throws InterruptedException, ExecutionException {
}

@Override
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return this.nestedFuture.get(timeout, unit).get(timeout, unit);
}

Expand Down
Loading

0 comments on commit bf8dadb

Please sign in to comment.