From eb7f2649d4ac501536d3a05133430d1a48b9cc37 Mon Sep 17 00:00:00 2001 From: Avasil Date: Wed, 15 May 2019 10:19:28 +0200 Subject: [PATCH] [#1078] Streaming with Monix's Observable --- build.sbt | 18 +- .../main/scala/io/finch/circe/Decoders.scala | 16 ++ .../src/main/scala/io/finch/monix/Main.scala | 84 ++++++++++ .../main/scala/io/finch/monix/package.scala | 154 ++++++++++++++++++ .../monix/MonixObservableStreamingSpec.scala | 29 ++++ 5 files changed, 298 insertions(+), 3 deletions(-) create mode 100644 examples/src/main/scala/io/finch/monix/Main.scala create mode 100644 monix/src/main/scala/io/finch/monix/package.scala create mode 100644 monix/src/test/scala/io/finch/monix/MonixObservableStreamingSpec.scala diff --git a/build.sbt b/build.sbt index 7b0e3e6b7..9092971d2 100644 --- a/build.sbt +++ b/build.sbt @@ -18,6 +18,7 @@ lazy val iterateeVersion = "0.18.0" lazy val refinedVersion = "0.9.5" lazy val catsEffectVersion = "1.3.0" lazy val fs2Version = "1.0.4" +lazy val monixVersion = "3.0.0-RC2" lazy val compilerOptions = Seq( "-deprecation", @@ -236,7 +237,7 @@ lazy val finch = project.in(file(".")) "io.circe" %% "circe-generic" % circeVersion )) .aggregate( - core, fs2, iteratee, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined + core, fs2, iteratee, monix, generic, argonaut, circe, benchmarks, test, jsonTest, examples, refined ) .dependsOn(core, iteratee, generic, circe) @@ -264,6 +265,16 @@ lazy val fs2 = project ) .dependsOn(core % "compile->compile;test->test") +lazy val monix = project + .settings(moduleName := "finchx-monix") + .settings(allSettings) + .settings( + libraryDependencies ++= Seq( + "io.monix" %% "monix-reactive" % monixVersion + ) + ) + .dependsOn(core % "compile->compile;test->test") + lazy val generic = project .settings(moduleName := "finchx-generic") .settings(allSettings) @@ -306,10 +317,11 @@ lazy val circe = project "io.circe" %% "circe-iteratee" % circeIterateeVersion, "io.circe" %% "circe-fs2" % circeFs2Version, "io.circe" %% "circe-jawn" % circeVersion, + "io.monix" %% "monix-circe" % "0.0.1", "io.circe" %% "circe-generic" % circeVersion % "test" ) ) - .dependsOn(core, jsonTest % "test") + .dependsOn(core, monix, jsonTest % "test") lazy val refined = project .settings(moduleName := "finchx-refined") @@ -357,7 +369,7 @@ lazy val examples = project "com.twitter" %% "twitter-server" % twitterVersion ) ) - .dependsOn(core, circe, iteratee) + .dependsOn(core, circe, iteratee, monix) lazy val benchmarks = project .settings(moduleName := "finchx-benchmarks") diff --git a/circe/src/main/scala/io/finch/circe/Decoders.scala b/circe/src/main/scala/io/finch/circe/Decoders.scala index 0e978afcd..7b534754f 100644 --- a/circe/src/main/scala/io/finch/circe/Decoders.scala +++ b/circe/src/main/scala/io/finch/circe/Decoders.scala @@ -9,6 +9,7 @@ import io.circe.iteratee import io.circe.jawn._ import io.finch.{Application, Decode, DecodeStream} import io.finch.internal.HttpContent +import io.finch.monix.ObservableF import io.iteratee.Enumerator import java.nio.charset.StandardCharsets @@ -53,4 +54,19 @@ trait Decoders { } parsed.through(fs2.decoder[F, A]) }) + + implicit def monixCirce[F[_], A: Decoder]: DecodeStream.Json[ObservableF, F, A] = + DecodeStream.instance[ObservableF, F, A, Application.Json]((stream, cs) => { + val parsed = cs match { + case StandardCharsets.UTF_8 => + stream + .map(_.asByteArray) + .liftByOperator(monix.circe.byteStreamParser) + case _ => + stream + .map(_.asString(cs)) + .liftByOperator(monix.circe.stringStreamParser) + } + monix.circe.decoder(parsed) + }) } diff --git a/examples/src/main/scala/io/finch/monix/Main.scala b/examples/src/main/scala/io/finch/monix/Main.scala new file mode 100644 index 000000000..36a2c8031 --- /dev/null +++ b/examples/src/main/scala/io/finch/monix/Main.scala @@ -0,0 +1,84 @@ +package io.finch.monix + +import _root_.monix.eval.{Task, TaskApp} +import _root_.monix.execution.Scheduler +import _root_.monix.reactive.Observable +import cats.effect.{ExitCode, Resource} +import cats.implicits._ +import com.twitter.finagle.{Http, ListeningServer} +import com.twitter.util.Future +import io.circe.generic.auto._ +import io.finch._ +import io.finch.circe._ +import scala.util.Random + +/** + * A Finch application featuring Monix Observable-based streaming support. + * This approach is more advanced and performant then basic [[com.twitter.concurrent.AsyncStream]] + * + * There are three endpoints in this example: + * + * 1. `sumJson` - streaming request + * 2. `streamJson` - streaming response + * 3. `isPrime` - end-to-end (request - response) streaming + * + * Use the following sbt command to run the application. + * + * {{{ + * $ sbt 'examples/runMain io.finch.monix.Main' + * }}} + * + * Use the following HTTPie/curl commands to test endpoints. + * + * {{{ + * $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 2}' localhost:8081/sumJson + * + * $ http --stream GET :8081/streamJson + * + * $ curl -X POST --header "Transfer-Encoding: chunked" -d '{"i": 40} {"i": 42}' localhost:8081/streamPrime + * }}} + */ +object Main extends TaskApp with EndpointModule[Task] { + + override implicit def scheduler: Scheduler = super.scheduler + + final case class Result(result: Int) { + def add(n: Number): Result = copy(result = result + n.i) + } + + final case class Number(i: Int) { + def isPrime: IsPrime = IsPrime(!(2 :: (3 to Math.sqrt(i.toDouble).toInt by 2).toList exists (i % _ == 0))) + } + + final case class IsPrime(isPrime: Boolean) + + private def stream: Stream[Int] = Stream.continually(Random.nextInt()) + + val sumJson: Endpoint[Task, Result] = post("sumJson" :: jsonBodyStream[ObservableF, Number]) { + o: Observable[Number] => + o.foldLeftL(Result(0))(_ add _).map(Ok) + } + + val streamJson: Endpoint[Task, ObservableF[Task, Number]] = get("streamJson") { + Ok(Observable.fromIterable(stream).map(Number.apply)) + } + + val isPrime: Endpoint[Task, ObservableF[Task, IsPrime]] = + post("streamPrime" :: jsonBodyStream[ObservableF, Number]) { o: Observable[Number] => + Ok(o.map(_.isPrime)) + } + + def serve: Task[ListeningServer] = Task( + Http.server + .withStreaming(enabled = true) + .serve(":8081", (sumJson :+: streamJson :+: isPrime).toServiceAs[Application.Json]) + ) + + def run(args: List[String]): Task[ExitCode] = { + val server = Resource.make(serve)(s => + Task.suspend(implicitly[ToAsync[Future, Task]].apply(s.close())) + ) + + server.use(_ => Task.never).as(ExitCode.Success) + } +} diff --git a/monix/src/main/scala/io/finch/monix/package.scala b/monix/src/main/scala/io/finch/monix/package.scala new file mode 100644 index 000000000..350608f5e --- /dev/null +++ b/monix/src/main/scala/io/finch/monix/package.scala @@ -0,0 +1,154 @@ +package io.finch + +import java.nio.charset.Charset + +import _root_.monix.eval.TaskLift +import _root_.monix.reactive.Observable +import cats.effect._ +import com.twitter.io.{Buf, Pipe, Reader} +import com.twitter.util.Future +import io.finch.internal.newLine +import io.finch.monix.ObservableF + +package object monix extends ObservableConcurrentEffectInstances { + + type ObservableF[F[_], A] = Observable[A] + + implicit def aliasResponseToRealResponse[F[_], A, CT <: Application.Json](implicit + tr: ToResponse.Aux[F, ObservableF[F, A], CT] + ): ToResponse.Aux[F, Observable[A], CT] = tr + + implicit def observableLiftReader[F[_]](implicit + F: Effect[F], + TA: ToAsync[Future, F] + ): LiftReader[ObservableF, F] = + new LiftReader[ObservableF, F] { + final def apply[A](reader: Reader[Buf], process: Buf => A): ObservableF[F, A] = { + Observable + .repeatEvalF(F.suspend(TA(reader.read()))) + .takeWhile(_.isDefined) + .collect { case Some(buf) => process(buf) } + .guaranteeF(F.delay(reader.discard())) + } + } + + implicit def encodeBufConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, CT <: String]: EncodeStream.Aux[F, ObservableF, Buf, CT] = + new EncodeConcurrentEffectObservable[F, Buf, CT] { + protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk + } +} + +trait ObservableConcurrentEffectInstances extends ObservableEffectInstances { + + implicit def encodeJsonConcurrentObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit + A: Encode.Json[A] + ): EncodeStream.Json[F, ObservableF, A] = + new EncodeNewLineDelimitedConcurrentEffectObservable[F, A, Application.Json] + + implicit def encodeSseConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit + A: Encode.Aux[A, Text.EventStream] + ): EncodeStream.Aux[F, ObservableF, A, Text.EventStream] = + new EncodeNewLineDelimitedConcurrentEffectObservable[F, A, Text.EventStream] + + implicit def encodeTextConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A](implicit + A: Encode.Text[A] + ): EncodeStream.Text[F, ObservableF, A] = + new EncodeConcurrentEffectObservable[F, A, Text.Plain] { + override protected def encodeChunk(chunk: A, cs: Charset): Buf = + A(chunk, cs) + } + + implicit def encodeBufEffectObservable[F[_] : Effect : TaskLift, CT <: String]: EncodeStream.Aux[F, ObservableF, Buf, CT] = + new EncodeEffectObservable[F, Buf, CT] { + protected def encodeChunk(chunk: Buf, cs: Charset): Buf = chunk + } +} + +trait ObservableEffectInstances extends ObservableInstances { + + implicit def encodeJsonEffectObservable[F[_] : Effect : TaskLift, A](implicit + A: Encode.Json[A] + ): EncodeStream.Json[F, ObservableF, A] = + new EncodeNewLineDelimitedEffectObservable[F, A, Application.Json] + + implicit def encodeSseEffectObservable[F[_] : Effect : TaskLift, A](implicit + A: Encode.Aux[A, Text.EventStream] + ): EncodeStream.Aux[F, ObservableF, A, Text.EventStream] = + new EncodeNewLineDelimitedEffectObservable[F, A, Text.EventStream] + + implicit def encodeTextEffectObservable[F[_] : Effect : TaskLift, A](implicit + A: Encode.Text[A] + ): EncodeStream.Text[F, ObservableF, A] = + new EncodeEffectObservable[F, A, Text.Plain] { + override protected def encodeChunk(chunk: A, cs: Charset): Buf = + A(chunk, cs) + } +} + +trait ObservableInstances { + + protected final class EncodeNewLineDelimitedConcurrentEffectObservable[F[_] : ConcurrentEffect : TaskLift, A, CT <: String](implicit + A: Encode.Aux[A, CT] + ) extends EncodeConcurrentEffectObservable[F, A, CT] { + protected def encodeChunk(chunk: A, cs: Charset): Buf = + A(chunk, cs).concat(newLine(cs)) + } + + protected final class EncodeNewLineDelimitedEffectObservable[F[_] : Effect : TaskLift, A, CT <: String](implicit + A: Encode.Aux[A, CT] + ) extends EncodeEffectObservable[F, A, CT] { + protected def encodeChunk(chunk: A, cs: Charset): Buf = + A(chunk, cs).concat(newLine(cs)) + } + + protected abstract class EncodeConcurrentEffectObservable[F[_] : TaskLift, A, CT <: String](implicit + F: ConcurrentEffect[F], + TA: ToAsync[Future, F] + ) extends EncodeObservable[F, A, CT] { + protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]] = + F.bracketCase(F.start(run))(_ => F.pure(reader)) { + case (f, ExitCase.Canceled) => f.cancel + case _ => F.unit + } + } + + protected abstract class EncodeEffectObservable[F[_]: TaskLift, A, CT <: String](implicit + F: Effect[F], + TA: ToAsync[Future, F] + ) extends EncodeObservable[F, A, CT] with (Either[Throwable, Unit] => IO[Unit]) { + + def apply(cb: Either[Throwable, Unit]): IO[Unit] = IO.unit + + protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]] = + F.productR(F.runAsync(run)(this).to[F])(F.pure(reader)) + } + + protected abstract class EncodeObservable[F[_]: TaskLift, A, CT <: String](implicit + F: Effect[F], + TA: ToAsync[Future, F] + ) extends EncodeStream[F, ObservableF, A] { + + type ContentType = CT + + protected def encodeChunk(chunk: A, cs: Charset): Buf + + protected def dispatch(reader: Reader[Buf], run: F[Unit]): F[Reader[Buf]] + + override def apply(s: ObservableF[F, A], cs: Charset): F[Reader[Buf]] = + F.suspend { + val p = new Pipe[Buf] + + val run = s + .map(chunk => encodeChunk(chunk, cs)) + .mapEvalF(chunk => TA(p.write(chunk))) + .guaranteeF(F.suspend(TA(p.close()))) + .completedL + .to[F] + + dispatch(p, run) + + } + } + +} + diff --git a/monix/src/test/scala/io/finch/monix/MonixObservableStreamingSpec.scala b/monix/src/test/scala/io/finch/monix/MonixObservableStreamingSpec.scala new file mode 100644 index 000000000..cf385ec08 --- /dev/null +++ b/monix/src/test/scala/io/finch/monix/MonixObservableStreamingSpec.scala @@ -0,0 +1,29 @@ +package io.finch.monix + +import cats.effect.{ConcurrentEffect, Effect, IO} +import com.twitter.io.Buf +import io.finch.{FinchSpec, StreamingLaws} +import monix.eval.TaskLift +import monix.execution.Scheduler +import monix.reactive.Observable +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +class MonixObservableStreamingSpec extends FinchSpec with ScalaCheckDrivenPropertyChecks { + + implicit val s = Scheduler.global + + checkEffect[IO] + checkConcurrentEffect[IO] + + def checkEffect[F[_]: TaskLift](implicit F: Effect[F]): Unit = + checkAll("monixObservable.streamBody[F[_]: Effect]", StreamingLaws[ObservableF, F]( + list => Observable(list:_*), + stream => F.toIO(stream.map(array => Buf.ByteArray.Owned(array)).toListL.to[F]).unsafeRunSync() + ).all) + + def checkConcurrentEffect[F[_]: TaskLift](implicit F: ConcurrentEffect[F]): Unit = + checkAll("monixObservable.streamBody[F[_]: ConcurrentEffect]", StreamingLaws[ObservableF, F]( + list => Observable(list:_*), + stream => F.toIO(stream.map(array => Buf.ByteArray.Owned(array)).toListL.to[F]).unsafeRunSync() + ).all) +}