Skip to content

Commit

Permalink
-- reactive-streams from/to akka / fs2 as an idea to work with Elasti…
Browse files Browse the repository at this point in the history
…c Search
  • Loading branch information
djnzx committed Sep 18, 2023
1 parent af14642 commit 88cab15
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 3 deletions.
7 changes: 4 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,10 @@ lazy val typesafe = (project in file("typesafe"))
libraryDependencies ++= Seq(
Libraries.cats,
Libraries.sqlPostgres,
"com.typesafe.akka" %% "akka-actor" % "2.6.17",
"com.typesafe.akka" %% "akka-actor-typed" % "2.6.17",
"com.typesafe.akka" %% "akka-stream" % "2.6.17",
"com.typesafe.akka" %% "akka-actor" % "2.6.21",
"com.typesafe.akka" %% "akka-actor-typed" % "2.6.21",
"com.typesafe.akka" %% "akka-stream" % "2.6.21",
"org.reactivestreams" % "reactive-streams" % "1.0.4",
"com.typesafe.akka" %% "akka-http" % "10.5.0",
"com.typesafe.akka" %% "akka-http-spray-json" % "10.5.0",
"com.typesafe.slick" %% "slick" % "3.4.1",
Expand Down
30 changes: 30 additions & 0 deletions ce3/src/main/scala/fs2x/Fs2ReactivePlayground.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package fs2x

import cats.effect.Async
import fs2.interop.reactivestreams._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber

/** ReactivePlayground
*
* - consuming from org.reactivestreams.Publisher[A]
* to fs2.Stream[F, A]
*
* - publishing to org.reactivestreams.Publisher[A]
* via fs2Stream.subscribe(elasticSearchSubscriber)
*/
class Fs2ReactivePlayground[F[_]: Async, A] {

/** having any library providing source as a [[org.reactivestreams.Publisher]] */
val elasticSearchPublisher: Publisher[A] = ???

/** we can easily convert it to fs2Stream */
val fs2Stream: fs2.Stream[F, A] = fromPublisher[F, A](elasticSearchPublisher, 1000)

/** having any library providing source as a [[org.reactivestreams.Subscriber]] */
val elasticSearchSubscriber: Subscriber[A] = ???

/** we can sink top it */
val r: F[Unit] = fs2Stream.subscribe(elasticSearchSubscriber)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package streaming

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber

/** ReactivePlayground
*
* - consuming from org.reactivestreams.Publisher[A]
* to Akka Source
*
* - publishing to org.reactivestreams.Publisher[A]
* Akka Sink implementation
*/
class AkkaStreamsReactivePlayground[A] {

implicit val materializer: Materializer = ???

/** having any library providing source as a [[org.reactivestreams.Publisher]] */
val elasticSearchPublisher: Publisher[A] = ???

/** we can easily convert it to Akka Stream */
val akkaStream: Source[A, NotUsed] = Source.fromPublisher(elasticSearchPublisher)

/** having any library providing source as a [[org.reactivestreams.Subscriber]] */
val elasticSearchSubscriber: Subscriber[A] = ???

/** we can create a [[akka.stream.scaladsl.Sink]] */
val sink: Sink[A, NotUsed] = Sink.fromSubscriber(elasticSearchSubscriber)

val r: NotUsed = akkaStream.to(sink).run

}

0 comments on commit 88cab15

Please sign in to comment.