From 3711c35067fc03002eaa958e4ca0c22b6fe3674c Mon Sep 17 00:00:00 2001 From: He-Pin Date: Tue, 17 Dec 2024 01:31:01 +0800 Subject: [PATCH] feat: Add SchedulerTask which will be notified once cancelled. --- .../apache/pekko/actor/SchedulerSpec.scala | 28 +++++++++++++++++++ .../actor/LightArrayRevolverScheduler.scala | 20 +++++++++++-- .../org/apache/pekko/actor/Scheduler.scala | 19 ++++++++++++- 3 files changed, 64 insertions(+), 3 deletions(-) diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/SchedulerSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/SchedulerSpec.scala index bb9eef056bd..cb1fd0b9bc9 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/SchedulerSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/SchedulerSpec.scala @@ -131,6 +131,17 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit task.isCancelled should ===(true) } + "notify callback if cancel is performed before execution" taggedAs TimingTest in { + val latch = new CountDownLatch(1) + val task = system.scheduler.scheduleOnce(100 millis, + new SchedulerTask { + override def run(): Unit = () + override def cancelled(): Unit = latch.countDown() + }) + task.cancel() + latch.await(100, TimeUnit.MILLISECONDS) should ===(true) + } + "not be canceled if cancel is performed after execution" taggedAs TimingTest in { val latch = TestLatch(1) val task = collectCancellable(system.scheduler.scheduleOnce(10.millis)(latch.countDown())) @@ -334,6 +345,23 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit ticks.get should ===(1) } + "notify callback if cancel is performed after initial delay" taggedAs TimingTest in { + val latch = new CountDownLatch(1) + val initialDelay = 90.millis.dilated + val delay = 500.millis.dilated + val task = system.scheduler.scheduleWithFixedDelay( + initialDelay, + delay)( + new SchedulerTask { + override def run(): Unit = () + override def cancelled(): Unit = latch.countDown() + }) + + Thread.sleep((initialDelay + 200.millis.dilated).toMillis) + task.cancel() + latch.await(100, TimeUnit.MILLISECONDS) should ===(true) + } + /** * ticket #307 */ diff --git a/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala b/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala index c82969ba592..b012cf979d2 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala @@ -139,7 +139,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac final override protected def scheduledFirst(): Cancellable = schedule( executor, - new AtomicLong(clock() + initialDelay.toNanos) with Runnable { + new AtomicLong(clock() + initialDelay.toNanos) with SchedulerTask { override def run(): Unit = { try { runnable.run() @@ -150,6 +150,11 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac case _: SchedulerException => // ignore failure to enqueue or terminated target actor } } + + override def cancelled(): Unit = runnable match { + case task: SchedulerTask => task.cancelled() + case _ => + } }, roundUp(initialDelay)) } @@ -390,7 +395,18 @@ object LightArrayRevolverScheduler { override def cancel(): Boolean = extractTask(CancelledTask) match { case ExecutedTask | CancelledTask => false - case _ => true + case task: SchedulerTask => + notifyCancellation(task) + true + case _ => true + } + + private def notifyCancellation(task: SchedulerTask): Unit = { + try { + executionContext.execute(() => task.cancelled()) + } catch { + case NonFatal(e) => executionContext.reportFailure(e) + } } override def isCancelled: Boolean = task eq CancelledTask diff --git a/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala b/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala index 5095c9ed7db..10d53daac8b 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala @@ -85,7 +85,7 @@ trait Scheduler { final override protected def scheduledFirst(): Cancellable = scheduleOnce( initialDelay, - new Runnable { + new SchedulerTask { override def run(): Unit = { try { runnable.run() @@ -97,6 +97,11 @@ trait Scheduler { case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] => } } + + override def cancelled(): Unit = runnable match { + case task: SchedulerTask => task.cancelled() + case _ => + } }) } @@ -498,6 +503,18 @@ trait Scheduler { // this one is just here so we can present a nice AbstractScheduler for Java abstract class AbstractSchedulerBase extends Scheduler +/** + * A Task that will be notified when it is cancelled. + */ +trait SchedulerTask extends Runnable { + + /** + * Called for [[SchedulerTask]]s that are successfully canceled via [[Cancellable#cancel]]. + * Overriding this method allows to for example run some cleanup. + */ + def cancelled(): Unit = () +} + /** * Signifies something that can be cancelled * There is no strict guarantee that the implementation is thread-safe,