Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Not started Future is now cancellable (#2750) #2763

Merged
merged 1 commit into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions src/main/java/io/vavr/concurrent/FutureImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.concurrent.*;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* <strong>INTERNAL API - This class is subject to change.</strong>
Expand Down Expand Up @@ -100,7 +101,12 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
this.actions = actions;
this.waiters = waiters;
try {
computation.execute(this::tryComplete, this::updateThread);
computation.execute(this::tryComplete, this::updateThread, () -> {
// Synchronize as the future could be in the process of cancelling
synchronized (lock) {
return isCompleted();
}
});
} catch (Throwable x) {
tryComplete(Try.failure(x));
}
Expand All @@ -115,7 +121,7 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> of(Executor executor) {
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> {});
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) -> {});
}

/**
Expand All @@ -127,7 +133,7 @@ static <T> FutureImpl<T> of(Executor executor) {
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread) -> {});
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread, isCompleted) -> {});
}

/**
Expand All @@ -140,7 +146,7 @@ static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
* @return a new {@code FutureImpl} instance
*/
static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
task.run(complete::with)
);
}
Expand All @@ -156,8 +162,12 @@ static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
*/
static <T> FutureImpl<T> async(Executor executor, Task<? extends T> task) {
// In a single-threaded context this Future may already have been completed during initialization.
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) ->
executor.execute(() -> {
// Avoid performing work, if future is already complete (normally by cancellation)
if (isCompleted.get()) {
return;
}
updateThread.run();
try {
task.run(complete::with);
Expand Down Expand Up @@ -414,6 +424,6 @@ private void handleUncaughtException(Throwable x) {
}

private interface Computation<T> {
void execute(Task.Complete<T> complete, Runnable updateThread) throws Throwable;
void execute(Task.Complete<T> complete, Runnable updateThread, Supplier<Boolean> isCompleted) throws Throwable;
}
}
26 changes: 26 additions & 0 deletions src/test/java/io/vavr/concurrent/FutureTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static io.vavr.concurrent.Concurrent.waitUntil;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static io.vavr.concurrent.Concurrent.zZz;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.fail;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -146,6 +147,31 @@ protected int getPeekNonNilPerformingAnAction() {

// -- static failed()


@Test
public void shouldNotExecuteFutureThatHasBeenCancelledBeforeItStarted() throws InterruptedException {
ExecutorService es = Executors.newSingleThreadExecutor();

AtomicBoolean future2Executed = new AtomicBoolean(false);

// Submit f1 to the executor first
Future<Void> f = Future.run(es, () -> Thread.sleep(1000));
// Submit f2 next, it will have to wait to be executed
Future<Void> f2 = Future.run(es, () -> {
// Should never run this
future2Executed.set(true);
});

// Cancel f2 BEFORE it runs on the executor
f2.cancel(true);
f.cancel(true);
es.shutdown();
boolean terminated = es.awaitTermination(2, SECONDS);
assertThat(terminated).isTrue();
// f2 should never have run
assertThat(future2Executed.get()).isFalse();
}

@Test
public void shouldCreateFailureThatFailsWithRuntimeException() {
final Future<Object> failed = Future.failed(new RuntimeException("ooops")).await();
Expand Down
Loading