Skip to content

Commit

Permalink
Add cats.effect.Effect instance for Task
Browse files Browse the repository at this point in the history
  • Loading branch information
Luka Jacobowitz committed Aug 26, 2018
1 parent 1affa4d commit 883844f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 49 deletions.
103 changes: 55 additions & 48 deletions arrows-stdlib-cats/src/main/scala/arrows/stdlib/cats.scala
Original file line number Diff line number Diff line change
@@ -1,64 +1,22 @@
package arrows.stdlib

import cats.{ Applicative, StackSafeMonad }
import cats.effect.{ Async, ExitCase }
import cats.effect._
import cats.arrow.ArrowChoice
import cats.kernel.{ Monoid, Semigroup }
import cats.mtl.ApplicativeAsk

import scala.concurrent.Promise
import scala.concurrent.{ ExecutionContext, Promise }
import scala.util.control.NonFatal
import scala.util.{ Failure, Success }

object Cats extends ArrowInstances

sealed abstract class ArrowInstances extends ArrowInstances1 {

implicit def catsMonadForArrow[E]: Async[Arrow[E, ?]] = new Async[Arrow[E, ?]] with StackSafeMonad[Arrow[E, ?]] {

def flatMap[A, B](fa: Arrow[E, A])(f: A => Arrow[E, B]): Arrow[E, B] =
Arrow[E].flatMap(e => fa(e).flatMap(a => f(a)(e)))

def raiseError[A](e: Throwable): Arrow[E, A] = Arrow.failed[A](e)

def handleErrorWith[A](fa: Arrow[E, A])(f: Throwable => Arrow[E, A]): Arrow[E, A] =
Arrow[E].flatMap(e => fa.recoverWith { case t: Throwable => f(t)(e) }(e))

def pure[A](x: A): Arrow[E, A] = Arrow.successful(x)

def suspend[A](thunk: => Arrow[E, A]): Arrow[E, A] =
Arrow[E].flatMap(e => try { thunk(e) } catch { case NonFatal(t) => Arrow.failed[A](t)(e) })

override def delay[A](thunk: => A): Arrow[E, A] =
Arrow[E].flatMap(e => try { Arrow.successful(thunk) } catch { case NonFatal(t) => Arrow.failed[A](t)(e) })

def bracketCase[A, B](acquire: Arrow[E, A])(use: A => Arrow[E, B])(release: (A, ExitCase[Throwable]) => Arrow[E, Unit]): Arrow[E, B] = Arrow[E].flatMap(e =>
acquire.flatMap(a => use(a).transformWith {
case Success(b) => release(a, ExitCase.complete)(e).map(_ => b)
case Failure(t) => release(a, ExitCase.error(t))(e).flatMap(_ => Task.failed[B](t))
}(e))(e))

def async[A](k: (Either[Throwable, A] => Unit) => Unit): Arrow[E, A] = Arrow[E].flatMap { _ =>
val promise = Promise[A]

k {
case Left(t) => promise.failure(t)
case Right(a) => promise.success(a)
}

Task.async(promise.future)
}

def asyncF[A](k: (Either[Throwable, A] => Unit) => Arrow[E, Unit]): Arrow[E, A] = Arrow[E].flatMap { e =>
val promise = Promise[A]

k {
case Left(t) => promise.failure(t)
case Right(a) => promise.success(a)
}.flatMap(_ => Task.async(promise.future))(e)
}

override def map[A, B](fa: Arrow[E, A])(f: A => B): Arrow[E, B] = fa.map(f)
implicit def catsEffectForTask(implicit ec: ExecutionContext): Effect[Task] = new ArrowAsync[Unit] with Effect[Task] {
def runAsync[A](fa: Task[A])(cb: Either[Throwable, A] => IO[Unit]): SyncIO[Unit] =
SyncIO { fa.run(())(ec); () }
}

implicit val catsArrowChoiceForArrow: ArrowChoice[Arrow] = new ArrowChoice[Arrow] {
Expand All @@ -77,7 +35,7 @@ sealed abstract class ArrowInstances extends ArrowInstances1 {
}

implicit def catsApplicativeAskForArrow[E]: ApplicativeAsk[Arrow[E, ?], E] = new ApplicativeAsk[Arrow[E, ?], E] {
val applicative: Applicative[Arrow[E, ?]] = catsMonadForArrow[E]
val applicative: Applicative[Arrow[E, ?]] = catsAsyncForArrow[E]

def ask: Arrow[E, E] = Arrow[E]

Expand All @@ -94,11 +52,60 @@ sealed abstract class ArrowInstances extends ArrowInstances1 {
}

sealed abstract class ArrowInstances1 {

implicit def catsAsyncForArrow[E]: Async[Arrow[E, ?]] = new ArrowAsync[E] {}

implicit def catsSemigroupForArrow[A, B](implicit B0: Semigroup[B]): Semigroup[Arrow[A, B]] = new ArrowSemigroup[A, B] {
implicit def B: Semigroup[B] = B0
}
}

trait ArrowAsync[E] extends StackSafeMonad[Arrow[E, ?]] with Async[Arrow[E, ?]] {
def flatMap[A, B](fa: Arrow[E, A])(f: A => Arrow[E, B]): Arrow[E, B] =
Arrow[E].flatMap(e => fa(e).flatMap(a => f(a)(e)))

def raiseError[A](e: Throwable): Arrow[E, A] = Arrow.failed[A](e)

def handleErrorWith[A](fa: Arrow[E, A])(f: Throwable => Arrow[E, A]): Arrow[E, A] =
Arrow[E].flatMap(e => fa.recoverWith { case t: Throwable => f(t)(e) }(e))

def pure[A](x: A): Arrow[E, A] = Arrow.successful(x)

def suspend[A](thunk: => Arrow[E, A]): Arrow[E, A] =
Arrow[E].flatMap(e => try { thunk(e) } catch { case NonFatal(t) => Arrow.failed[A](t)(e) })

override def delay[A](thunk: => A): Arrow[E, A] =
Arrow[E].flatMap(e => try { Arrow.successful(thunk) } catch { case NonFatal(t) => Arrow.failed[A](t)(e) })

def bracketCase[A, B](acquire: Arrow[E, A])(use: A => Arrow[E, B])(release: (A, ExitCase[Throwable]) => Arrow[E, Unit]): Arrow[E, B] = Arrow[E].flatMap(e =>
acquire.flatMap(a => use(a).transformWith {
case Success(b) => release(a, ExitCase.complete)(e).map(_ => b)
case Failure(t) => release(a, ExitCase.error(t))(e).flatMap(_ => Task.failed[B](t))
}(e))(e))

def async[A](k: (Either[Throwable, A] => Unit) => Unit): Arrow[E, A] = Arrow[E].flatMap { _ =>
val promise = Promise[A]

k {
case Left(t) => promise.failure(t)
case Right(a) => promise.success(a)
}

Task.async(promise.future)
}

def asyncF[A](k: (Either[Throwable, A] => Unit) => Arrow[E, Unit]): Arrow[E, A] = Arrow[E].flatMap { e =>
val promise = Promise[A]

k {
case Left(t) => promise.failure(t)
case Right(a) => promise.success(a)
}.flatMap(_ => Task.async(promise.future))(e)
}

override def map[A, B](fa: Arrow[E, A])(f: A => B): Arrow[E, B] = fa.map(f)
}

trait ArrowSemigroup[A, B] extends Semigroup[Arrow[A, B]] {
implicit def B: Semigroup[B]

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ lazy val `arrows-stdlib-cats` =
"org.typelevel" %%% "cats-effect-laws" % "1.0.0-RC3",
"org.typelevel" %%% "cats-mtl-core" % "0.2.3",
"org.typelevel" %%% "cats-mtl-laws" % "0.2.3",
compilerPlugin("org.spire-math" %%% "kind-projector" % "0.9.7"),
compilerPlugin("org.spire-math" %% "kind-projector" % "0.9.7"),
"org.typelevel" %%% "cats-testkit" % "1.2.0" % "test",
),
scoverage.ScoverageKeys.coverageMinimum := 60,
Expand Down

0 comments on commit 883844f

Please sign in to comment.