Skip to content

Commit

Permalink
Add CyclicBarrier (#2857)
Browse files Browse the repository at this point in the history
* Remove @throws CountDownLatch
  • Loading branch information
nomisRev authored Dec 5, 2022
1 parent be63562 commit 5f17a0b
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 1 deletion.
6 changes: 6 additions & 0 deletions arrow-libs/fx/arrow-fx-coroutines/api/arrow-fx-coroutines.api
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public final class arrow/fx/coroutines/CountDownLatch {
public final fun countDown ()V
}

public final class arrow/fx/coroutines/CyclicBarrier {
public fun <init> (I)V
public final fun await (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getCapacity ()I
}

public abstract class arrow/fx/coroutines/ExitCase {
public static final field Companion Larrow/fx/coroutines/ExitCase$Companion;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import kotlinx.coroutines.CompletableDeferred
* Must be initialised with an [initial] value of 1 or higher,
* if constructed with 0 or negative value then it throws [IllegalArgumentException].
*/
public class CountDownLatch @Throws(IllegalArgumentException::class) constructor(private val initial: Long) {
public class CountDownLatch(private val initial: Long) {
private val signal = CompletableDeferred<Unit>()
private val count = AtomicRef(initial)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package arrow.fx.coroutines

import arrow.core.continuations.AtomicRef
import arrow.core.continuations.loop
import arrow.core.continuations.update
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CompletableDeferred

/**
* A [CyclicBarrier] is a synchronization mechanism that allows a set of coroutines to wait for each other
* to reach a certain point before continuing execution.
* It is called a "cyclic" barrier because it can be reused after all coroutines have reached the barrier and released.
*
* To use a CyclicBarrier, each coroutine must call the [await] method on the barrier object,
* which will cause the coroutine to suspend until the required number of coroutines have reached the barrier.
* Once all coroutines have reached the barrier they will _resume_ execution.
*
* Models the behavior of java.util.concurrent.CyclicBarrier in Kotlin with `suspend`.
*/
public class CyclicBarrier(public val capacity: Int) {
init {
require(capacity > 0) {
"Cyclic barrier must be constructed with positive non-zero capacity $capacity but was $capacity > 0"
}
}

private data class State(val awaiting: Int, val epoch: Long, val unblock: CompletableDeferred<Unit>)

private val state: AtomicRef<State> = AtomicRef(State(capacity, 0, CompletableDeferred()))

/**
* When [await] is called the function will suspend until the required number of coroutines have reached the barrier.
* Once the [capacity] of the barrier has been reached, the coroutine will be released and continue execution.
*/
public suspend fun await() {
state.loop { original ->
val (awaiting, epoch, unblock) = original
val awaitingNow = awaiting - 1
if (awaitingNow == 0 && state.compareAndSet(original, State(capacity, epoch + 1, CompletableDeferred()))) {
unblock.complete(Unit)
return
} else if (state.compareAndSet(original, State(awaitingNow, epoch, unblock))) {
return try {
unblock.await()
} catch (cancelled: CancellationException) {
state.update { s -> if (s.epoch == epoch) s.copy(awaiting = s.awaiting + 1) else s }
throw cancelled
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package arrow.fx.coroutines

import arrow.core.Either
import io.kotest.assertions.throwables.shouldThrow
import io.kotest.core.spec.style.StringSpec
import io.kotest.matchers.shouldBe
import io.kotest.matchers.types.shouldBeTypeOf
import io.kotest.property.Arb
import io.kotest.property.arbitrary.constant
import io.kotest.property.arbitrary.int
import io.kotest.property.checkAll
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.launch

class CyclicBarrierSpec : StringSpec({
"should raise an exception when constructed with a negative or zero capacity" {
checkAll(Arb.int(Int.MIN_VALUE, 0)) { i ->
shouldThrow<IllegalArgumentException> { CyclicBarrier(i) }.message shouldBe
"Cyclic barrier must be constructed with positive non-zero capacity $i but was $i > 0"
}
}

"barrier of capacity 1 is a no op" {
checkAll(Arb.constant(Unit)) {
val barrier = CyclicBarrier(1)
barrier.await()
}
}

"awaiting all in parallel resumes all coroutines" {
checkAll(Arb.int(1, 100)) { i ->
val barrier = CyclicBarrier(i)
(0 until i).parTraverse { barrier.await() }
}
}

"should reset once full" {
checkAll(Arb.constant(Unit)) {
val barrier = CyclicBarrier(2)
parZip({ barrier.await() }, { barrier.await() }) { _, _ -> }
barrier.capacity shouldBe 2
}
}

"await is cancelable" {
checkAll(Arb.int(2, Int.MAX_VALUE)) { i ->
val barrier = CyclicBarrier(i)
val exitCase = CompletableDeferred<ExitCase>()

val job =
launch(start = CoroutineStart.UNDISPATCHED) {
guaranteeCase({ barrier.await() }, exitCase::complete)
}

job.cancelAndJoin()
exitCase.isCompleted shouldBe true
exitCase.await().shouldBeTypeOf<ExitCase.Cancelled>()
}
}

"should clean up upon cancelation of await" {
checkAll(Arb.constant(Unit)) {
val barrier = CyclicBarrier(2)
launch(start = CoroutineStart.UNDISPATCHED) { barrier.await() }.cancelAndJoin()

barrier.capacity shouldBe 2
}
}

"race fiber cancel and barrier full" {
checkAll(Arb.constant(Unit)) {
val barrier = CyclicBarrier(2)
val job = launch(start = CoroutineStart.UNDISPATCHED) { barrier.await() }
when (raceN({ barrier.await() }, { job.cancelAndJoin() })) {
// without the epoch check in CyclicBarrier, a late cancellation would increment the count
// after the barrier has already reset, causing this code to never terminate (test times out)
is Either.Left -> parZip({ barrier.await() }, { barrier.await() }) { _, _ -> }
is Either.Right -> Unit
}
}
}
})

0 comments on commit 5f17a0b

Please sign in to comment.