Skip to content

Commit

Permalink
Fix #503 implement MonadCancel#canceled by sending an external inte…
Browse files Browse the repository at this point in the history
…rrupt to current fiber via `Fiber.unsafeCurrentFiber` (#544)

* Workaround zio/zio#6911 Only consider the *current* fiber being interrupted as `Outcome.Canceled`, consider internal interruption & Die to be `Outcome.Errored`
Trigger `MonadCancel#onCancel` only if the *current* fiber is being interrupted
Implement `MonadCancel#bracketFull` & `bracketCase`

* Fix #503 Find a way to send interrupt to self via `Fiber.unsafeCurrentFiber`, implement external-only Cancel semantic for `start` and `raceWith`

* make `MonadError#onError` consistent with `guaranteeCase` starting from `MonadCancel` instance

* Add tests for zio/zio#6911 (require #543)

* Fix js implementation of `async`

* arbitrary instance: convert from cats-effect

* rebase

* Use throwable instead of generic instance in `ZManaged#toResource[F]`

* Remove exceptions for inner interruption in generic error instances & generate Interrupted Causes for generic tests

* In tests, add more generators + cats conversion generators - disabled by default, because they surface further law failures still

* Fix `calls finalizers when using resource is canceled` test, fix unreliability of `canceled` when used with `unsafeRunToFuture`

* Change `toEffect` implementation to avoid getting a `BoxedException` when the underlying fiber is interrupted.

* Relax condition for signalling non-interruption, because ZIO sometimes creates invalid Causes without Interrupt node when interrupted

* Add `genNever` to random generators

* Change the implementation of `Async#async` to follow `async left is uncancelable sequenced raiseError` law.

Consider the law code:

```scala
  // format: off
  def asyncLeftIsUncancelableSequencedRaiseError[A](e: Throwable, fu: F[Unit]) =
    (F.async[A](k => F.delay(k(Left(e))) >> fu.as(None)) <* F.unit) <-> (F.uncancelable(_ => fu) >> F.raiseError(e))
  // format: on
```

if `fu` is `F.never`, then an implmentation based on ZIO.effectAsyncM can never satisfy this law,
because it runs the register code on a separate fiber and ignores its effect.
The law clearly states that register effect must run first and must run on the same fiber
as the callback listener, so as to supercede it.

* 2.12 build

* Remove incorrect `onError` definition. (should not be uncancelable)

* Remove redundant `resetForkScope`

* Acknowledge `onError`/`Outcome.Errored` incoherence in tests

* Restore no-op F.onError in `parTraverse + ZIO.die` test
  • Loading branch information
neko-kai authored Jun 29, 2022
1 parent 9992c56 commit d745d7e
Show file tree
Hide file tree
Showing 10 changed files with 530 additions and 139 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package zio.interop

import cats.effect.{ Async, IO as CIO, LiftIO }
import cats.effect.{ Async, IO as CIO, LiftIO, Outcome }
import cats.effect.kernel.{ Concurrent, Resource }
import zio.interop.catz.*
import zio.test.*
import zio.{ Promise, Task }
import zio.{ Promise, Task, ZIO }

object CatsInteropSpec extends CatsRunnableSpec {
def spec = suite("Cats interop")(
Expand Down Expand Up @@ -54,6 +54,97 @@ object CatsInteropSpec extends CatsRunnableSpec {
sanityCheckCIO <- fromEffect(test[CIO])
zioResult <- test[Task]
} yield zioResult && sanityCheckCIO
},
testM("onCancel is not triggered by ZIO.parTraverse + ZIO.fail https://github.com/zio/zio/issues/6911") {
val F = Concurrent[Task]

for {
counter <- F.ref("")
_ <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO.collectAllPar(
List(
ZIO.unit.forever,
counter.update(_ + "A") *> ZIO.fail(new RuntimeException("x")).unit
)
),
counter.update(_ + "1")
)
) { case _ => counter.update(_ + "B") }
) {
case Outcome.Errored(_) => counter.update(_ + "C")
case Outcome.Canceled() => counter.update(_ + "2")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.run
res <- counter.get
} yield assertTrue(!res.contains("1")) && assertTrue(res == "ABC")
},
testM("onCancel is not triggered by ZIO.parTraverse + ZIO.die https://github.com/zio/zio/issues/6911") {
val F = Concurrent[Task]

for {
counter <- F.ref("")
_ <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO.collectAllPar(
List(
ZIO.unit.forever,
counter.update(_ + "A") *> ZIO.die(new RuntimeException("x")).unit
)
),
counter.update(_ + "1")
)
) { case _ => counter.update(_ + "B") }
) {
case Outcome.Errored(_) => counter.update(_ + "C")
case Outcome.Canceled() => counter.update(_ + "2")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.run
res <- counter.get
} yield assertTrue(!res.contains("1")) && assertTrue(res == "AC")
},
testM("onCancel is not triggered by ZIO.parTraverse + ZIO.interrupt https://github.com/zio/zio/issues/6911") {
val F = Concurrent[Task]

for {
counter <- F.ref("")
_ <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO.collectAllPar(
List(
ZIO.unit.forever,
counter.update(_ + "A") *> ZIO.interrupt.unit
)
),
counter.update(_ + "1")
)
) { case _ => counter.update(_ + "B") }
) {
case Outcome.Errored(_) => counter.update(_ + "C")
case Outcome.Canceled() => counter.update(_ + "2")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.run
res <- counter.get
} yield assertTrue(!res.contains("1")) && assertTrue(res == "AC")
},
test("F.canceled.toEffect results in CancellationException, not BoxedException") {
val F = Concurrent[Task]

val exception: Option[Throwable] =
try {
F.canceled.toEffect[cats.effect.IO].unsafeRunSync()
None
} catch {
case t: Throwable => Some(t)
}

assertTrue(
!exception.get.getMessage.contains("Boxed Exception") &&
exception.get.getMessage.contains("The fiber was canceled")
)
}
)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.interop

import cats.effect.kernel.Resource
import cats.effect.kernel.{ Concurrent, Resource }
import cats.effect.IO as CIO
import zio.*
import zio.interop.catz.*
Expand All @@ -17,13 +17,37 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
def spec =
suite("CatsZManagedSyntaxSpec")(
suite("toManaged")(
test("calls finalizers correctly when use is interrupted") {
test("calls finalizers correctly when use is externally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[CIO, Unit] =
Resource.makeCase(CIO.delay(effects += x).void) {
case (_, Resource.ExitCase.Canceled) =>
CIO.delay(effects += x + 1).void
case _ => CIO.unit
case (_, _) =>
CIO.unit
}

val testCase = {
val managed: ZManaged[Any, Throwable, Unit] = res(1).toManaged
Promise.make[Nothing, Unit].flatMap { latch =>
managed
.use(_ => latch.succeed(()) *> ZIO.never)
.forkDaemon
.flatMap(latch.await *> _.interrupt)
}
}

unsafeRun(testCase)
assert(effects.toList)(equalTo(List(1, 2)))
},
test("calls finalizers correctly when use is internally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[CIO, Unit] =
Resource.makeCase(CIO.delay(effects += x).void) {
case (_, Resource.ExitCase.Errored(_)) =>
CIO.delay(effects += x + 1).void
case (_, _) =>
CIO.unit
}

val testCase = {
Expand Down Expand Up @@ -118,7 +142,7 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
}
),
suite("toManagedZIO")(
test("calls finalizers correctly when use is interrupted") {
test("calls finalizers correctly when use is externally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[Task, Unit] =
Resource.makeCase(Task(effects += x).unit) {
Expand All @@ -127,6 +151,28 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
case _ => Task.unit
}

val testCase = {
val managed: ZManaged[Any, Throwable, Unit] = res(1).toManagedZIO
Promise.make[Nothing, Unit].flatMap { latch =>
managed
.use(_ => latch.succeed(()) *> ZIO.never)
.forkDaemon
.flatMap(latch.await *> _.interrupt)
}
}

unsafeRun(testCase)
assert(effects.toList)(equalTo(List(1, 2)))
},
test("calls finalizers correctly when use is internally interrupted") {
val effects = new mutable.ListBuffer[Int]
def res(x: Int): Resource[Task, Unit] =
Resource.makeCase(Task(effects += x).unit) {
case (_, Resource.ExitCase.Errored(_)) =>
Task(effects += x + 1).unit
case _ => Task.unit
}

val testCase = {
val managed: ZManaged[Any, Throwable, Unit] = res(1).toManagedZIO
managed.use(_ => ZIO.interrupt.unit)
Expand Down Expand Up @@ -242,7 +288,22 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
unsafeRun(testCase.orElse(ZIO.unit))
assert(effects.toList)(equalTo(List(1, 2)))
},
test("calls finalizers when using resource is canceled") {
test("calls finalizers when using resource is internally interrupted") {
val effects = new mutable.ListBuffer[Int]
def man(x: Int): ZManaged[Any, Throwable, Unit] =
ZManaged.makeExit(ZIO.effectTotal(effects += x).unit) {
case (_, Exit.Failure(c)) if !c.interrupted && c.failureOption.nonEmpty =>
ZIO.effectTotal(effects += x + 1)
case _ =>
ZIO.unit
}

val testCase = man(1).toResource[RIO[ZEnv, _]].use(_ => ZIO.interrupt)
try unsafeRun(testCase)
catch { case _: Throwable => }
assert(effects.toList)(equalTo(List(1, 2)))
},
test("calls finalizers when using resource is externally interrupted") {
val effects = new mutable.ListBuffer[Int]
def man(x: Int): ZManaged[Any, Throwable, Unit] =
ZManaged.makeExit(ZIO.effectTotal(effects += x).unit) {
Expand All @@ -252,8 +313,9 @@ object CatsZManagedSyntaxSpec extends CatsRunnableSpec {
ZIO.unit
}

val testCase = man(1).toResource[RIO[ZEnv, _]].use(_ => ZIO.interrupt)
unsafeRun(testCase.orElse(ZIO.unit))
val testCase = man(1).toResource[RIO[ZEnv, _]].use(_ => Concurrent[RIO[ZEnv, _]].canceled)
try unsafeRun(testCase)
catch { case _: Throwable => }
assert(effects.toList)(equalTo(List(1, 2)))
},
test("acquisition of Reservation preserves cancellability in new F") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ private[zio] trait CatsSpecBase
implicit val eqForNothing: Eq[Nothing] =
Eq.allEqual

// workaround for laws `evalOn local pure` & `executionContext commutativity`
// (ZIO cannot implement them at all due to `.executor.asEC` losing the original executionContext)
implicit val eqForExecutionContext: Eq[ExecutionContext] =
Eq.allEqual

Expand All @@ -114,6 +116,7 @@ private[zio] trait CatsSpecBase
implicit def eqForUIO[A: Eq](implicit ticker: Ticker): Eq[UIO[A]] = { (uio1, uio2) =>
val exit1 = unsafeRun(uio1)
val exit2 = unsafeRun(uio2)
// println(s"comparing $exit1 $exit2")
(exit1 eqv exit2) || {
println(s"$exit1 was not equal to $exit2")
false
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
package zio.interop

import cats.effect.GenConcurrent
import org.scalacheck.*
import zio.*

/**
* Temporary fork of zio.GenIO that overrides `genParallel` with ZManaged-based code
* instead of `io.zipPar(parIo).map(_._1)`
* because ZIP-PAR IS NON-DETERMINISTIC IN ITS SPAWNED EC TASKS (required for TestContext equality)
*/
trait GenIOInteropCats {

// FIXME generating anything but success (even genFail)
// surfaces multiple further unaddressed law failures
def betterGenerators: Boolean = false

// FIXME cats conversion surfaces failures in the following laws:
// `async left is uncancelable sequenced raiseError`
// `async right is uncancelable sequenced pure`
// `applicativeError onError raise`
// `canceled sequences onCanceled in order`
def catsConversionGenerator: Boolean = false

/**
* Given a generator for `A`, produces a generator for `IO[E, A]` using the `IO.point` constructor.
*/
Expand All @@ -26,8 +33,35 @@ trait GenIOInteropCats {
*/
def genSuccess[E, A: Arbitrary]: Gen[IO[E, A]] = Gen.oneOf(genSyncSuccess[E, A], genAsyncSuccess[E, A])

def genIO[E, A: Arbitrary]: Gen[IO[E, A]] =
genSuccess[E, A]
def genFail[E: Arbitrary, A]: Gen[IO[E, A]] = Arbitrary.arbitrary[E].map(IO.fail[E](_))

def genDie(implicit arbThrowable: Arbitrary[Throwable]): Gen[UIO[Nothing]] = arbThrowable.arbitrary.map(IO.die(_))

def genInternalInterrupt: Gen[UIO[Nothing]] = ZIO.interrupt

def genCancel[E, A: Arbitrary](implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] =
Arbitrary.arbitrary[A].map(F.canceled.as(_))

def genNever: Gen[UIO[Nothing]] = ZIO.never

def genIO[E: Arbitrary, A: Arbitrary](implicit
arbThrowable: Arbitrary[Throwable],
F: GenConcurrent[IO[E, _], ?]
): Gen[IO[E, A]] =
if (betterGenerators)
Gen.oneOf(
genSuccess[E, A],
genFail[E, A],
genDie,
genInternalInterrupt,
genCancel[E, A],
genNever
)
else
Gen.oneOf(
genSuccess[E, A],
genNever
)

def genUIO[A: Arbitrary]: Gen[UIO[A]] =
Gen.oneOf(genSuccess[Nothing, A], genIdentityTrans(genSuccess[Nothing, A]))
Expand Down Expand Up @@ -98,17 +132,8 @@ trait GenIOInteropCats {
Gen.const(io.flatMap(a => IO.succeed(a)))

private def genOfRace[E, A](io: IO[E, A]): Gen[IO[E, A]] =
Gen.const(io.raceFirst(ZIO.never.interruptible))
Gen.const(io.interruptible.raceFirst(ZIO.never.interruptible))

private def genOfParallel[E, A](io: IO[E, A])(gen: Gen[IO[E, A]]): Gen[IO[E, A]] =
gen.map { parIo =>
// this should work, but generates more random failures on CI
// io.interruptible.zipPar(parIo.interruptible).map(_._1)
Promise.make[Nothing, Unit].flatMap { p =>
ZManaged
.fromEffect(parIo *> p.succeed(()))
.fork
.use_(p.await *> io)
}
}
gen.map(parIo => io.interruptible.zipPar(parIo.interruptible).map(_._1))
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPrior
Gen.oneOf(
e.arbitrary.map(Cause.Fail(_)),
Arbitrary.arbitrary[Throwable].map(Cause.Die(_)),
// Generating interrupt failures causes law failures (`canceled`/`Outcome.Canceled` are ill-defined as of now https://github.com/zio/interop-cats/issues/503#issuecomment-1157101175=)
// Gen.long.flatMap(l1 => Gen.long.map(l2 => Cause.Interrupt(Fiber.Id(l1, l2)))),
Gen.long.flatMap(l1 => Gen.long.map(l2 => Cause.Interrupt(Fiber.Id(l1, l2)))),
Gen.delay(self.map(Cause.Traced(_, ZTrace(Fiber.Id.None, Nil, Nil, None)))),
Gen.delay(self.map(Cause.stackless)),
Gen.delay(self.flatMap(e1 => self.map(e2 => Cause.Both(e1, e2)))),
Expand All @@ -54,17 +53,41 @@ private[interop] trait ZioSpecBaseLowPriority { self: ZioSpecBase =>

implicit def arbitraryIO[E: CanFail: Arbitrary: Cogen, A: Arbitrary: Cogen]: Arbitrary[IO[E, A]] = {
implicitly[CanFail[E]]
Arbitrary(Gen.oneOf(genIO[E, A], genLikeTrans(genIO[E, A]), genIdentityTrans(genIO[E, A])))
import zio.interop.catz.generic.concurrentInstanceCause
Arbitrary(
Gen.oneOf(
genIO[E, A],
genLikeTrans(genIO[E, A]),
genIdentityTrans(genIO[E, A])
)
)
}

implicit def arbitraryZIO[R: Cogen, E: CanFail: Arbitrary: Cogen, A: Arbitrary: Cogen]: Arbitrary[ZIO[R, E, A]] =
Arbitrary(Gen.function1[R, IO[E, A]](arbitraryIO[E, A].arbitrary).map(ZIO.environment[R].flatMap))

implicit def arbitraryRIO[R: Cogen, A: Arbitrary: Cogen]: Arbitrary[RIO[R, A]] =
arbitraryZIO[R, Throwable, A]
implicit def arbitraryTask[A: Arbitrary: Cogen](implicit ticker: Ticker): Arbitrary[Task[A]] = {
val arbIO = arbitraryIO[Throwable, A]
if (catsConversionGenerator)
Arbitrary(Gen.oneOf(arbIO.arbitrary, genCatsConversionTask[A]))
else
arbIO
}

implicit def arbitraryTask[A: Arbitrary: Cogen]: Arbitrary[Task[A]] =
arbitraryIO[Throwable, A]
def genCatsConversionTask[A: Arbitrary: Cogen](implicit ticker: Ticker): Gen[Task[A]] =
arbitraryIO[A].arbitrary.map(liftIO(_))

def liftIO[A](io: cats.effect.IO[A])(implicit ticker: Ticker): zio.Task[A] =
ZIO.effectAsyncInterrupt { k =>
val (result, cancel) = io.unsafeToFutureCancelable()
k(ZIO.fromFuture(_ => result).tapError {
case c: scala.concurrent.CancellationException if c.getMessage == "The fiber was canceled" =>
zio.interop.catz.concurrentInstance.canceled *> ZIO.interrupt
case _ =>
ZIO.unit
})
Left(ZIO.fromFuture(_ => cancel()).orDie)
}

def zManagedArbitrary[R, E, A](implicit zio: Arbitrary[ZIO[R, E, A]]): Arbitrary[ZManaged[R, E, A]] =
Arbitrary(zio.arbitrary.map(ZManaged.fromEffect))
Expand Down
Loading

0 comments on commit d745d7e

Please sign in to comment.