Skip to content

Commit

Permalink
Not started Future is now cancellable (#2750)
Browse files Browse the repository at this point in the history
  • Loading branch information
Egor Tsinko committed May 8, 2024
1 parent 26181f1 commit 5962a4c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
22 changes: 16 additions & 6 deletions src/main/java/io/vavr/concurrent/FutureImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* <strong>INTERNAL API - This class is subject to change.</strong>
Expand Down Expand Up @@ -100,7 +101,12 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
this.actions = actions;
this.waiters = waiters;
try {
computation.execute(this::tryComplete, this::updateThread);
computation.execute(this::tryComplete, this::updateThread, () -> {
// Synchronize as the future could be in the process of cancelling
synchronized (lock) {
return isCompleted();
}
});
} catch (Throwable x) {
tryComplete(Try.failure(x));
}
Expand All @@ -115,7 +121,7 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> of(Executor executor) {
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> {});
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) -> {});
}

/**
Expand All @@ -127,7 +133,7 @@ static <T> FutureImpl<T> of(Executor executor) {
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread) -> {});
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread, isCompleted) -> {});
}

/**
Expand All @@ -140,7 +146,7 @@ static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
task.run(complete::with)
);
}
Expand All @@ -156,8 +162,12 @@ static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
*/
static <T> FutureImpl<T> async(Executor executor, Task<? extends T> task) {
// In a single-threaded context this Future may already have been completed during initialization.
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
executor.execute(() -> {
// Avoid performing work, if future is already complete (normally by cancellation)
if (isCompleted.get()) {
return;
}
updateThread.run();
try {
task.run(complete::with);
Expand Down Expand Up @@ -414,6 +424,6 @@ private void handleUncaughtException(Throwable x) {
}

private interface Computation<T> {
void execute(Task.Complete<T> complete, Runnable updateThread) throws Throwable;
void execute(Task.Complete<T> complete, Runnable updateThread, Supplier<Boolean> isCompleted) throws Throwable;
}
}
26 changes: 26 additions & 0 deletions src/test/java/io/vavr/concurrent/FutureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import static io.vavr.concurrent.Concurrent.waitUntil;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static io.vavr.concurrent.Concurrent.zZz;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.fail;

@SuppressWarnings("deprecation")
Expand Down Expand Up @@ -143,6 +144,31 @@ protected int getPeekNonNilPerformingAnAction() {

// -- static failed()


@Test
public void shouldNotExecuteFutureThatHasBeenCancelledBeforeItStarted() throws InterruptedException {
ExecutorService es = Executors.newSingleThreadExecutor();

AtomicBoolean future2Executed = new AtomicBoolean(false);

// Submit f1 to the executor first
Future<Void> f = Future.run(es, () -> Thread.sleep(1000));
// Submit f2 next, it will have to wait to be executed
Future<Void> f2 = Future.run(es, () -> {
// Should never run this
future2Executed.set(true);
});

// Cancel f2 BEFORE it runs on the executor
f2.cancel(true);
f.cancel(true);
es.shutdown();
boolean terminated = es.awaitTermination(2, SECONDS);
assertThat(terminated).isTrue();
// f2 should never have run
assertThat(future2Executed.get()).isFalse();
}

@Test
public void shouldCreateFailureThatFailsWithRuntimeException() {
final Future<Object> failed = Future.failed(new RuntimeException("ooops")).await();
Expand Down

0 comments on commit 5962a4c

Please sign in to comment.