Skip to content

Commit

Permalink
* Change Outcome conversion to always treat typed failure as `Outcome…
Browse files Browse the repository at this point in the history
….Erorred` (typed failure excludes possibility of external interruption) (#549)

* Treat `Cause.Empty` + external interruption as `Outcome.Canceled`, since this combination manifests sometimes due to a bug in ZIO runtime
* Fix Cause comparison in tests, fix Cogen[Cause] instance
* Compare Cause by converting to Outcome first to ignore Cause tree details not important to cats-effect
* Enable `genFail` and `genCancel` generators since with above fixes the laws pass with them now
* Replace `genRace` and `genParallel` with cats-effect based impls to preserve Outcome when F.canceled is generated
* Add test that `Cause.Fail` cannot be present after external interruption
* Delegate `race` & `both` to default implementations, because `raceFirst` & `zipPar` semantics do not match them
  • Loading branch information
neko-kai authored Jul 11, 2022
1 parent d745d7e commit 5f77316
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.{ Async, IO as CIO, LiftIO, Outcome }
import cats.effect.kernel.{ Concurrent, Resource }
import zio.interop.catz.*
import zio.test.*
import zio.{ Promise, Task, ZIO }
import zio.{ Cause, Exit, Promise, Task, ZIO }

object CatsInteropSpec extends CatsRunnableSpec {
def spec = suite("Cats interop")(
Expand Down Expand Up @@ -130,6 +130,58 @@ object CatsInteropSpec extends CatsRunnableSpec {
res <- counter.get
} yield assertTrue(!res.contains("1")) && assertTrue(res == "AC")
},
testM(
"onCancel is triggered when a fiber executing ZIO.parTraverse + ZIO.fail is interrupted and the inner typed" +
" error is lost in final Cause (Fail & Interrupt nodes cannot both exist in Cause after external interruption)"
) {
val F = Concurrent[Task]

for {
latch1 <- F.deferred[Unit]
latch2 <- F.deferred[Unit]
latch3 <- F.deferred[Unit]
counter <- F.ref("")
cause <- F.ref(Option.empty[Cause[Throwable]])
outerScope <- ZIO.forkScope
fiber <- F.guaranteeCase(
F.onError(
F.onCancel(
ZIO
.collectAllPar(
List(
F.onCancel(
ZIO.never,
latch2.complete(()).unit
),
(latch1.complete(()) *> latch3.get).uninterruptible,
counter.update(_ + "A") *>
latch1.get *>
ZIO.fail(new RuntimeException("The_Error")).unit
)
)
.overrideForkScope(outerScope)
.onExit {
case Exit.Success(_) => ZIO.unit
case Exit.Failure(c) => cause.set(Some(c)).orDie
},
counter.update(_ + "B")
)
) { case _ => counter.update(_ + "1") }
) {
case Outcome.Errored(_) => counter.update(_ + "2")
case Outcome.Canceled() => counter.update(_ + "C")
case Outcome.Succeeded(_) => counter.update(_ + "3")
}.fork
_ <- latch2.get
_ <- fiber.interrupt
_ <- latch3.complete(())
res <- counter.get
cause <- cause.get
} yield assertTrue(!res.contains("1")) &&
assertTrue(res == "ABC") &&
assertTrue(cause.isDefined) &&
assertTrue(!cause.get.prettyPrint.contains("The_Error"))
},
test("F.canceled.toEffect results in CancellationException, not BoxedException") {
val F = Concurrent[Task]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import cats.effect.testkit.TestInstances
import cats.effect.kernel.Outcome
import cats.effect.IO as CIO
import cats.syntax.all.*
import cats.{ Eq, Order }
import cats.{ Eq, Id, Order }
import org.scalacheck.{ Arbitrary, Cogen, Gen, Prop }
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.prop.Configuration
Expand Down Expand Up @@ -69,12 +69,15 @@ private[zio] trait CatsSpecBase
ZEnv.Services.live ++ Has(testClock) ++ Has(testBlocking)
}

def unsafeRun[E, A](io: IO[E, A])(implicit ticker: Ticker): Exit[E, Option[A]] =
def unsafeRun[E, A](io: IO[E, A])(implicit ticker: Ticker): (Exit[E, Option[A]], Boolean) =
try {
var exit: Exit[E, Option[A]] = Exit.succeed(Option.empty[A])
runtime.unsafeRunAsync[E, Option[A]](io.asSome)(exit = _)
var interrupted: Boolean = true
runtime.unsafeRunAsync[E, Option[A]] {
signalOnNoExternalInterrupt(io)(ZIO.effectTotal { interrupted = false }).asSome
}(exit = _)
ticker.ctx.tickAll(FiniteDuration(1, TimeUnit.SECONDS))
exit
(exit, interrupted)
} catch {
case error: Throwable =>
error.printStackTrace()
Expand Down Expand Up @@ -102,23 +105,15 @@ private[zio] trait CatsSpecBase
Eq.allEqual

implicit val eqForCauseOfNothing: Eq[Cause[Nothing]] =
eqForCauseOf[Nothing]

implicit def eqForCauseOf[E]: Eq[Cause[E]] =
(x, y) => (x.interrupted && y.interrupted) || x == y

implicit def eqForExitOfNothing[A: Eq]: Eq[Exit[Nothing, A]] = {
case (Exit.Success(x), Exit.Success(y)) => x eqv y
case (Exit.Failure(x), Exit.Failure(y)) => x eqv y
case _ => false
}
(x, y) => (x.interrupted && y.interrupted && x.failureOption.isEmpty && y.failureOption.isEmpty) || x == y

implicit def eqForUIO[A: Eq](implicit ticker: Ticker): Eq[UIO[A]] = { (uio1, uio2) =>
val exit1 = unsafeRun(uio1)
val exit2 = unsafeRun(uio2)
// println(s"comparing $exit1 $exit2")
(exit1 eqv exit2) || {
println(s"$exit1 was not equal to $exit2")
val (exit1, i1) = unsafeRun(uio1)
val (exit2, i2) = unsafeRun(uio2)
val out1 = toOutcomeCauseOtherFiber[Id, Nothing, Option[A]](i1)(identity, exit1)
val out2 = toOutcomeCauseOtherFiber[Id, Nothing, Option[A]](i2)(identity, exit2)
(out1 eqv out2) || {
println(s"$out1 was not equal to $out2")
false
}
}
Expand All @@ -136,7 +131,7 @@ private[zio] trait CatsSpecBase
.toEffect[CIO]

implicit def orderForUIOofFiniteDuration(implicit ticker: Ticker): Order[UIO[FiniteDuration]] =
Order.by(unsafeRun(_).toEither.toOption)
Order.by(unsafeRun(_)._1.toEither.toOption)

implicit def orderForRIOofFiniteDuration[R: Arbitrary](implicit ticker: Ticker): Order[RIO[R, FiniteDuration]] =
(x, y) =>
Expand All @@ -149,7 +144,7 @@ private[zio] trait CatsSpecBase
ticker: Ticker
): Order[ZIO[R, E, FiniteDuration]] = {
implicit val orderForIOofFiniteDuration: Order[IO[E, FiniteDuration]] =
Order.by(unsafeRun(_) match {
Order.by(unsafeRun(_)._1 match {
case Exit.Success(value) => Right(value)
case Exit.Failure(cause) => Left(cause.failureOption)
})
Expand All @@ -167,11 +162,11 @@ private[zio] trait CatsSpecBase
Cogen[Outcome[Option, E, A]].contramap { (zio: ZIO[R, E, A]) =>
Arbitrary.arbitrary[R].sample match {
case Some(r) =>
val result = unsafeRun(zio.provide(r))
val (result, extInterrupted) = unsafeRun(zio.provide(r))

result match {
case Exit.Failure(cause) =>
if (cause.interrupted) Outcome.canceled[Option, E, A]
if (cause.interrupted && extInterrupted) Outcome.canceled[Option, E, A]
else Outcome.errored(cause.failureOption.get)
case Exit.Success(value) => Outcome.succeeded(value)
}
Expand All @@ -181,8 +176,8 @@ private[zio] trait CatsSpecBase

implicit def cogenOutcomeZIO[R, A](implicit
cogen: Cogen[ZIO[R, Throwable, A]]
): Cogen[Outcome[ZIO[R, Throwable, *], Throwable, A]] =
cogenOutcome[RIO[R, *], Throwable, A]
): Cogen[Outcome[ZIO[R, Throwable, _], Throwable, A]] =
cogenOutcome[RIO[R, _], Throwable, A]
}

private[interop] sealed trait CatsSpecBaseLowPriority { this: CatsSpecBase =>
Expand Down Expand Up @@ -213,4 +208,21 @@ private[interop] sealed trait CatsSpecBaseLowPriority { this: CatsSpecBase =>

implicit def eqForTaskManaged[A: Eq](implicit ticker: Ticker): Eq[TaskManaged[A]] =
zManagedEq[Any, Throwable, A]

implicit def eqForCauseOf[E: Eq]: Eq[Cause[E]] = { (exit1, exit2) =>
val out1 =
toOutcomeOtherFiber0[Id, E, Either[E, Cause[Nothing]], Unit](true)(identity, Exit.Failure(exit1))(
(e, _) => Left(e),
Right(_)
)
val out2 =
toOutcomeOtherFiber0[Id, E, Either[E, Cause[Nothing]], Unit](true)(identity, Exit.Failure(exit2))(
(e, _) => Left(e),
Right(_)
)
(out1 eqv out2) || {
println(s"cause $out1 was not equal to cause $out2")
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@ import zio.*

trait GenIOInteropCats {

// FIXME generating anything but success (even genFail)
// surfaces multiple further unaddressed law failures
// FIXME `genDie` and `genInternalInterrupt` surface multiple further unaddressed law failures
// See `genDie` scaladoc
def betterGenerators: Boolean = false

// FIXME cats conversion surfaces failures in the following laws:
// `async left is uncancelable sequenced raiseError`
// `async right is uncancelable sequenced pure`
// `applicativeError onError raise`
// `canceled sequences onCanceled in order`
// FIXME cats conversion generator works most of the time
// but generates rare law failures in
// - `canceled sequences onCanceled in order`
// - `uncancelable eliminates onCancel`
// - `fiber join is guarantee case`
// possibly coming from the `GenSpawnGenerators#genRacePair` generator + `F.canceled`.
// Errors occur more often when combined with `genOfRace` or `genOfParallel`
def catsConversionGenerator: Boolean = false

/**
Expand All @@ -35,9 +37,28 @@ trait GenIOInteropCats {

def genFail[E: Arbitrary, A]: Gen[IO[E, A]] = Arbitrary.arbitrary[E].map(IO.fail[E](_))

/**
* We can't pass laws like `cats.effect.laws.GenSpawnLaws#fiberJoinIsGuaranteeCase`
* with either `genDie` or `genInternalInterrupt` because
* we are forced to rethrow an `Outcome.Errored` using
* `raiseError` in `Outcome#embed` which converts the
* specific state into a typed error.
*
* While we consider both states to be `Outcome.Errored`,
* they aren't really 'equivalent' even if we massage them
* into having the same `Outcome`, because `handleErrorWith`
* can't recover from these states.
*
* Now, we could make ZIO Throwable instances recover from
* all errors via [[zio.Cause#squashTraceWith]], but
* this would make Throwable instances contradict the
* generic MonadError instance.
* (Which I believe is acceptable, if confusing, as long
* as the generic instances are moved to a separate `generic`
* object.)
*/
def genDie(implicit arbThrowable: Arbitrary[Throwable]): Gen[UIO[Nothing]] = arbThrowable.arbitrary.map(IO.die(_))

def genInternalInterrupt: Gen[UIO[Nothing]] = ZIO.interrupt
def genInternalInterrupt: Gen[UIO[Nothing]] = ZIO.interrupt

def genCancel[E, A: Arbitrary](implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] =
Arbitrary.arbitrary[A].map(F.canceled.as(_))
Expand All @@ -60,18 +81,22 @@ trait GenIOInteropCats {
else
Gen.oneOf(
genSuccess[E, A],
genFail[E, A],
genCancel[E, A],
genNever
)

def genUIO[A: Arbitrary]: Gen[UIO[A]] =
def genUIO[A: Arbitrary](implicit F: GenConcurrent[UIO, ?]): Gen[UIO[A]] =
Gen.oneOf(genSuccess[Nothing, A], genIdentityTrans(genSuccess[Nothing, A]))

/**
* Given a generator for `IO[E, A]`, produces a sized generator for `IO[E, A]` which represents a transformation,
* by using some random combination of the methods `map`, `flatMap`, `mapError`, and any other method that does not change
* the success/failure of the value, but may change the value itself.
*/
def genLikeTrans[E: Arbitrary: Cogen, A: Arbitrary: Cogen](gen: Gen[IO[E, A]]): Gen[IO[E, A]] = {
def genLikeTrans[E: Arbitrary: Cogen, A: Arbitrary: Cogen](
gen: Gen[IO[E, A]]
)(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = {
val functions: IO[E, A] => Gen[IO[E, A]] = io =>
Gen.oneOf(
genOfFlatMaps[E, A](io)(genSuccess[E, A]),
Expand All @@ -87,7 +112,8 @@ trait GenIOInteropCats {
* Given a generator for `IO[E, A]`, produces a sized generator for `IO[E, A]` which represents a transformation,
* by using methods that can have no effect on the resulting value (e.g. `map(identity)`, `io.race(never)`, `io.par(io2).map(_._1)`).
*/
def genIdentityTrans[E, A: Arbitrary](gen: Gen[IO[E, A]]): Gen[IO[E, A]] = {
def genIdentityTrans[E, A: Arbitrary](gen: Gen[IO[E, A]])(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] = {
implicitly[Arbitrary[A]]
val functions: IO[E, A] => Gen[IO[E, A]] = io =>
Gen.oneOf(
genOfIdentityFlatMaps[E, A](io),
Expand Down Expand Up @@ -131,9 +157,13 @@ trait GenIOInteropCats {
private def genOfIdentityFlatMaps[E, A](io: IO[E, A]): Gen[IO[E, A]] =
Gen.const(io.flatMap(a => IO.succeed(a)))

private def genOfRace[E, A](io: IO[E, A]): Gen[IO[E, A]] =
Gen.const(io.interruptible.raceFirst(ZIO.never.interruptible))
private def genOfRace[E, A](io: IO[E, A])(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] =
// Gen.const(io.interruptible.raceFirst(ZIO.never.interruptible))
Gen.const(F.race(io, ZIO.never).map(_.merge)) // we must use cats version for Outcome preservation in F.canceled

private def genOfParallel[E, A](io: IO[E, A])(gen: Gen[IO[E, A]]): Gen[IO[E, A]] =
gen.map(parIo => io.interruptible.zipPar(parIo.interruptible).map(_._1))
private def genOfParallel[E, A](io: IO[E, A])(
gen: Gen[IO[E, A]]
)(implicit F: GenConcurrent[IO[E, _], ?]): Gen[IO[E, A]] =
// gen.map(parIo => io.interruptible.zipPar(parIo.interruptible).map(_._1))
gen.map(parIO => F.both(io, parIO).map(_._1)) // we must use cats version for Outcome preservation in F.canceled
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package zio.interop

import cats.effect.kernel.Outcome
import org.scalacheck.{ Arbitrary, Cogen, Gen }
import zio.*
import zio.clock.Clock

private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPriority with GenIOInteropCats {

implicit def arbitraryUIO[A: Arbitrary]: Arbitrary[UIO[A]] =
implicit def arbitraryUIO[A: Arbitrary]: Arbitrary[UIO[A]] = {
import zio.interop.catz.generic.concurrentInstanceCause
Arbitrary(genUIO[A])
}

implicit def arbitraryURIO[R: Cogen, A: Arbitrary]: Arbitrary[URIO[R, A]] =
Arbitrary(Arbitrary.arbitrary[R => UIO[A]].map(ZIO.environment[R].flatMap))
Expand Down Expand Up @@ -39,8 +42,13 @@ private[interop] trait ZioSpecBase extends CatsSpecBase with ZioSpecBaseLowPrior
Arbitrary(self)
}

implicit def cogenCause[E]: Cogen[Cause[E]] =
Cogen(_.hashCode.toLong)
implicit def cogenCause[E: Cogen]: Cogen[Cause[E]] =
Cogen[Outcome[Option, Either[E, Int], Unit]].contramap { cause =>
toOutcomeOtherFiber0[Option, E, Either[E, Int], Unit](true)(Option(_), Exit.Failure(cause))(
(e, _) => Left(e),
c => Right(c.hashCode())
)
}
}

private[interop] trait ZioSpecBaseLowPriority { self: ZioSpecBase =>
Expand Down
9 changes: 7 additions & 2 deletions zio-interop-cats/shared/src/main/scala/zio/interop/cats.scala
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,9 @@ private abstract class ZioConcurrent[R, E, E1]
)
} yield res

override final def both[A, B](fa: F[A], fb: F[B]): F[(A, B)] =
fa.interruptible zipPar fb.interruptible
// delegate race & both to default implementations, because `raceFirst` & `zipPar` semantics do not match them
override final def race[A, B](fa: F[A], fb: F[B]): F[Either[A, B]] = super.race(fa, fb)
override final def both[A, B](fa: F[A], fb: F[B]): F[(A, B)] = super.both(fa, fb)

override final def guarantee[A](fa: F[A], fin: F[Unit]): F[A] =
fa.ensuring(fin.orDieWith(toThrowableOrFiberFailure))
Expand Down Expand Up @@ -580,17 +581,21 @@ private abstract class ZioMonadErrorExit[R, E, E1] extends ZioMonadError[R, E, E
private trait ZioMonadErrorExitThrowable[R]
extends ZioMonadErrorExit[R, Throwable, Throwable]
with ZioMonadErrorE[R, Throwable] {

override final protected def toOutcomeThisFiber[A](exit: Exit[Throwable, A]): UIO[Outcome[F, Throwable, A]] =
toOutcomeThrowableThisFiber(exit)

protected final def toOutcomeOtherFiber[A](interruptedHandle: zio.Ref[Boolean])(
exit: Exit[Throwable, A]
): UIO[Outcome[F, Throwable, A]] =
interruptedHandle.get.map(toOutcomeThrowableOtherFiber(_)(ZIO.succeedNow, exit))
}

private trait ZioMonadErrorExitCause[R, E] extends ZioMonadErrorExit[R, E, Cause[E]] with ZioMonadErrorCause[R, E] {

override protected def toOutcomeThisFiber[A](exit: Exit[E, A]): UIO[Outcome[F, Cause[E], A]] =
toOutcomeCauseThisFiber(exit)

protected final def toOutcomeOtherFiber[A](interruptedHandle: zio.Ref[Boolean])(
exit: Exit[E, A]
): UIO[Outcome[F, Cause[E], A]] =
Expand Down
Loading

0 comments on commit 5f77316

Please sign in to comment.