From 5962a4ce721df1e3cbbc1835d19a8fd010427e39 Mon Sep 17 00:00:00 2001 From: Egor Tsinko Date: Wed, 8 May 2024 08:30:41 -0600 Subject: [PATCH] Not started Future is now cancellable (#2750) --- .../java/io/vavr/concurrent/FutureImpl.java | 22 +++++++++++----- .../java/io/vavr/concurrent/FutureTest.java | 26 +++++++++++++++++++ 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/vavr/concurrent/FutureImpl.java b/src/main/java/io/vavr/concurrent/FutureImpl.java index ba7c62aad..af836c701 100644 --- a/src/main/java/io/vavr/concurrent/FutureImpl.java +++ b/src/main/java/io/vavr/concurrent/FutureImpl.java @@ -34,6 +34,7 @@ import java.util.concurrent.*; import java.util.concurrent.locks.LockSupport; import java.util.function.Consumer; +import java.util.function.Supplier; /** * INTERNAL API - This class is subject to change. @@ -100,7 +101,12 @@ private FutureImpl(Executor executor, Option> value, Queue { + // Synchronize as the future could be in the process of cancelling + synchronized (lock) { + return isCompleted(); + } + }); } catch (Throwable x) { tryComplete(Try.failure(x)); } @@ -115,7 +121,7 @@ private FutureImpl(Executor executor, Option> value, Queue FutureImpl of(Executor executor) { - return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> {}); + return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) -> {}); } /** @@ -127,7 +133,7 @@ static FutureImpl of(Executor executor) { * @return a new {@code FutureImpl} instance */ static FutureImpl of(Executor executor, Try value) { - return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread) -> {}); + return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread, isCompleted) -> {}); } /** @@ -140,7 +146,7 @@ static FutureImpl of(Executor executor, Try value) { * @return a new {@code FutureImpl} instance */ static FutureImpl sync(Executor executor, Task task) { - return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> + return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) -> task.run(complete::with) ); } @@ -156,8 +162,12 @@ static FutureImpl sync(Executor executor, Task task) { */ static FutureImpl async(Executor executor, Task task) { // In a single-threaded context this Future may already have been completed during initialization. - return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> + return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread, isCompleted) -> executor.execute(() -> { + // Avoid performing work, if future is already complete (normally by cancellation) + if (isCompleted.get()) { + return; + } updateThread.run(); try { task.run(complete::with); @@ -414,6 +424,6 @@ private void handleUncaughtException(Throwable x) { } private interface Computation { - void execute(Task.Complete complete, Runnable updateThread) throws Throwable; + void execute(Task.Complete complete, Runnable updateThread, Supplier isCompleted) throws Throwable; } } diff --git a/src/test/java/io/vavr/concurrent/FutureTest.java b/src/test/java/io/vavr/concurrent/FutureTest.java index f60f45586..537c3e3e8 100644 --- a/src/test/java/io/vavr/concurrent/FutureTest.java +++ b/src/test/java/io/vavr/concurrent/FutureTest.java @@ -53,6 +53,7 @@ import static io.vavr.concurrent.Concurrent.waitUntil; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static io.vavr.concurrent.Concurrent.zZz; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.assertj.core.api.Assertions.fail; @SuppressWarnings("deprecation") @@ -143,6 +144,31 @@ protected int getPeekNonNilPerformingAnAction() { // -- static failed() + + @Test + public void shouldNotExecuteFutureThatHasBeenCancelledBeforeItStarted() throws InterruptedException { + ExecutorService es = Executors.newSingleThreadExecutor(); + + AtomicBoolean future2Executed = new AtomicBoolean(false); + + // Submit f1 to the executor first + Future f = Future.run(es, () -> Thread.sleep(1000)); + // Submit f2 next, it will have to wait to be executed + Future f2 = Future.run(es, () -> { + // Should never run this + future2Executed.set(true); + }); + + // Cancel f2 BEFORE it runs on the executor + f2.cancel(true); + f.cancel(true); + es.shutdown(); + boolean terminated = es.awaitTermination(2, SECONDS); + assertThat(terminated).isTrue(); + // f2 should never have run + assertThat(future2Executed.get()).isFalse(); + } + @Test public void shouldCreateFailureThatFailsWithRuntimeException() { final Future failed = Future.failed(new RuntimeException("ooops")).await();