diff --git a/build.sbt b/build.sbt index 1d5db0fc..7e5026d0 100644 --- a/build.sbt +++ b/build.sbt @@ -310,9 +310,10 @@ lazy val typesafe = (project in file("typesafe")) libraryDependencies ++= Seq( Libraries.cats, Libraries.sqlPostgres, - "com.typesafe.akka" %% "akka-actor" % "2.6.17", - "com.typesafe.akka" %% "akka-actor-typed" % "2.6.17", - "com.typesafe.akka" %% "akka-stream" % "2.6.17", + "com.typesafe.akka" %% "akka-actor" % "2.6.21", + "com.typesafe.akka" %% "akka-actor-typed" % "2.6.21", + "com.typesafe.akka" %% "akka-stream" % "2.6.21", + "org.reactivestreams" % "reactive-streams" % "1.0.4", "com.typesafe.akka" %% "akka-http" % "10.5.0", "com.typesafe.akka" %% "akka-http-spray-json" % "10.5.0", "com.typesafe.slick" %% "slick" % "3.4.1", diff --git a/ce3/src/main/scala/fs2x/Fs2ReactivePlayground.scala b/ce3/src/main/scala/fs2x/Fs2ReactivePlayground.scala new file mode 100644 index 00000000..4936b069 --- /dev/null +++ b/ce3/src/main/scala/fs2x/Fs2ReactivePlayground.scala @@ -0,0 +1,30 @@ +package fs2x + +import cats.effect.Async +import fs2.interop.reactivestreams._ +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + +/** ReactivePlayground + * + * - consuming from org.reactivestreams.Publisher[A] + * to fs2.Stream[F, A] + * + * - publishing to org.reactivestreams.Publisher[A] + * via fs2Stream.subscribe(elasticSearchSubscriber) + */ +class Fs2ReactivePlayground[F[_]: Async, A] { + + /** having any library providing source as a [[org.reactivestreams.Publisher]] */ + val elasticSearchPublisher: Publisher[A] = ??? + + /** we can easily convert it to fs2Stream */ + val fs2Stream: fs2.Stream[F, A] = fromPublisher[F, A](elasticSearchPublisher, 1000) + + /** having any library providing source as a [[org.reactivestreams.Subscriber]] */ + val elasticSearchSubscriber: Subscriber[A] = ??? + + /** we can sink top it */ + val r: F[Unit] = fs2Stream.subscribe(elasticSearchSubscriber) + +} diff --git a/typesafe/src/main/scala/streaming/AkkaStreamsReactivePlayground.scala b/typesafe/src/main/scala/streaming/AkkaStreamsReactivePlayground.scala new file mode 100644 index 00000000..bee884ca --- /dev/null +++ b/typesafe/src/main/scala/streaming/AkkaStreamsReactivePlayground.scala @@ -0,0 +1,36 @@ +package streaming + +import akka.NotUsed +import akka.stream.Materializer +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber + +/** ReactivePlayground + * + * - consuming from org.reactivestreams.Publisher[A] + * to Akka Source + * + * - publishing to org.reactivestreams.Publisher[A] + * Akka Sink implementation + */ +class AkkaStreamsReactivePlayground[A] { + + implicit val materializer: Materializer = ??? + + /** having any library providing source as a [[org.reactivestreams.Publisher]] */ + val elasticSearchPublisher: Publisher[A] = ??? + + /** we can easily convert it to Akka Stream */ + val akkaStream: Source[A, NotUsed] = Source.fromPublisher(elasticSearchPublisher) + + /** having any library providing source as a [[org.reactivestreams.Subscriber]] */ + val elasticSearchSubscriber: Subscriber[A] = ??? + + /** we can create a [[akka.stream.scaladsl.Sink]] */ + val sink: Sink[A, NotUsed] = Sink.fromSubscriber(elasticSearchSubscriber) + + val r: NotUsed = akkaStream.to(sink).run + +}