Skip to content

Commit

Permalink
feat: Add SchedulerTask which will be notified once cancelled.
Browse files Browse the repository at this point in the history
  • Loading branch information
He-Pin committed Dec 28, 2024
1 parent 2cd8313 commit 3711c35
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
task.isCancelled should ===(true)
}

"notify callback if cancel is performed before execution" taggedAs TimingTest in {
val latch = new CountDownLatch(1)
val task = system.scheduler.scheduleOnce(100 millis,
new SchedulerTask {
override def run(): Unit = ()
override def cancelled(): Unit = latch.countDown()
})
task.cancel()
latch.await(100, TimeUnit.MILLISECONDS) should ===(true)
}

"not be canceled if cancel is performed after execution" taggedAs TimingTest in {
val latch = TestLatch(1)
val task = collectCancellable(system.scheduler.scheduleOnce(10.millis)(latch.countDown()))
Expand Down Expand Up @@ -334,6 +345,23 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
ticks.get should ===(1)
}

"notify callback if cancel is performed after initial delay" taggedAs TimingTest in {
val latch = new CountDownLatch(1)
val initialDelay = 90.millis.dilated
val delay = 500.millis.dilated
val task = system.scheduler.scheduleWithFixedDelay(
initialDelay,
delay)(
new SchedulerTask {
override def run(): Unit = ()
override def cancelled(): Unit = latch.countDown()
})

Thread.sleep((initialDelay + 200.millis.dilated).toMillis)
task.cancel()
latch.await(100, TimeUnit.MILLISECONDS) should ===(true)
}

/**
* ticket #307
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
final override protected def scheduledFirst(): Cancellable =
schedule(
executor,
new AtomicLong(clock() + initialDelay.toNanos) with Runnable {
new AtomicLong(clock() + initialDelay.toNanos) with SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
Expand All @@ -150,6 +150,11 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
}
}

override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
},
roundUp(initialDelay))
}
Expand Down Expand Up @@ -390,7 +395,18 @@ object LightArrayRevolverScheduler {

override def cancel(): Boolean = extractTask(CancelledTask) match {
case ExecutedTask | CancelledTask => false
case _ => true
case task: SchedulerTask =>
notifyCancellation(task)
true
case _ => true
}

private def notifyCancellation(task: SchedulerTask): Unit = {
try {
executionContext.execute(() => task.cancelled())
} catch {
case NonFatal(e) => executionContext.reportFailure(e)
}
}

override def isCancelled: Boolean = task eq CancelledTask
Expand Down
19 changes: 18 additions & 1 deletion actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ trait Scheduler {
final override protected def scheduledFirst(): Cancellable =
scheduleOnce(
initialDelay,
new Runnable {
new SchedulerTask {
override def run(): Unit = {
try {
runnable.run()
Expand All @@ -97,6 +97,11 @@ trait Scheduler {
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
}
}

override def cancelled(): Unit = runnable match {
case task: SchedulerTask => task.cancelled()
case _ =>
}
})
}

Expand Down Expand Up @@ -498,6 +503,18 @@ trait Scheduler {
// this one is just here so we can present a nice AbstractScheduler for Java
abstract class AbstractSchedulerBase extends Scheduler

/**
* A Task that will be notified when it is cancelled.
*/
trait SchedulerTask extends Runnable {

/**
* Called for [[SchedulerTask]]s that are successfully canceled via [[Cancellable#cancel]].
* Overriding this method allows to for example run some cleanup.
*/
def cancelled(): Unit = ()
}

/**
* Signifies something that can be cancelled
* There is no strict guarantee that the implementation is thread-safe,
Expand Down

0 comments on commit 3711c35

Please sign in to comment.