Skip to content

Commit

Permalink
-- es - streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
djnzx committed Sep 20, 2023
1 parent eec7995 commit 163c14e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 50 deletions.
22 changes: 6 additions & 16 deletions es68s_akka/src/main/scala/es68/ESStreamSupportAkka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,14 @@ package es68

import akka.NotUsed
import akka.stream.scaladsl.Source
import cats.implicits.catsSyntaxOptionId
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.searches.sort.Sort
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

trait ESStreamSupportAkka {
trait ESStreamSupportAkka extends StreamSupportAkka {

def mkAkkaStream[A](
def mkEsAkkaStream[A](
request: SearchRequest,
sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting
afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)`
Expand All @@ -19,22 +18,13 @@ trait ESStreamSupportAkka {
)(implicit ec: ExecutionContext
): Source[A, NotUsed] = {

val request0: SearchRequest = request
val initial: SearchRequest = request
.sortBy(sortFields)
.size(bufferSize)

Source
.unfoldAsync(request0) { request: SearchRequest =>
esExecute(request)
.map {
case xs if xs.isEmpty => None // no data
case xs => // data
val afterClause = afterFn(xs.last)
val afterQuery = request.searchAfter(afterClause)
(afterQuery, xs).some
}
}
.mapConcat(identity)
def reqModifyFn(q: SearchRequest, last: A): SearchRequest = q.searchAfter(afterFn(last))

mkAkkaStream(initial, reqModifyFn)(esExecute)
}

}
29 changes: 12 additions & 17 deletions es68s_ce/src/main/scala/es68/ESStreamSupportFs2.scala
Original file line number Diff line number Diff line change
@@ -1,35 +1,30 @@
package es68

import cats.Functor
import cats.implicits.{catsSyntaxOptionId, toFunctorOps}
import cats.implicits.catsSyntaxOptionId
import cats.implicits.toFunctorOps
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.searches.sort.Sort
import fs2.{Chunk, Stream}
import fs2.Chunk
import fs2.Stream

trait ESStreamSupportFs2 {
trait ESStreamSupportFs2 extends StreamSupportFs2 {

def mkFs2Stream[F[_]: Functor, A](
def mkEsFs2Stream[F[_]: Functor, A](
request: SearchRequest,
sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting
afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)`
bufferSize: Int = 1024 // page size
pageSize: Int = 1024 // page size
)(esExecute: SearchRequest => F[Seq[A]]
): Stream[F, A] = {

val request0: SearchRequest = request
val initial: SearchRequest = request
.sortBy(sortFields)
.size(bufferSize)
.size(pageSize)

Stream.unfoldChunkEval(request0) { request: SearchRequest =>
esExecute(request)
.map {
case xs if xs.isEmpty => None // no data
case xs => // data
val afterClause = afterFn(xs.last)
val afterQuery = request.searchAfter(afterClause)
(Chunk.from(xs), afterQuery).some
}
}
def reqModifyFn(q: SearchRequest, last: A): SearchRequest = q.searchAfter(afterFn(last))

mkFs2Stream(initial, reqModifyFn, pageSize)(esExecute)
}

}
23 changes: 6 additions & 17 deletions es68s_zio/src/main/scala/es68/ESStreamSupportZio.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package es68

import cats.implicits.catsSyntaxOptionId
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.searches.sort.Sort
import zio.ZIO
import zio.stream.ZStream

trait ESStreamSupportZio {
trait ESStreamSupportZio extends StreamSupportZio {

def mkZioStream[A](
def mkEsZioStream[A](
request: SearchRequest,
sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting
afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)`
Expand All @@ -17,23 +16,13 @@ trait ESStreamSupportZio {
): ZStream[Any, Throwable, A] = {

/** initial request we start from */
val request0: SearchRequest = request
val initial: SearchRequest = request
.sortBy(sortFields)
.size(bufferSize)

ZStream
.paginateZIO(request0) { request: SearchRequest =>
esExecute(request)
.map {
case xs if xs.isEmpty => (xs, None) // no data
case xs if xs.length < bufferSize => (xs, None) // last page
case xs =>
val afterClause = afterFn(xs.last)
val afterQuery = request.searchAfter(afterClause)
(xs, afterQuery.some) // normal page
}
}
.flatMap(xs => ZStream.fromIterable(xs))
def reqModifyFn(q: SearchRequest, last: A): SearchRequest = q.searchAfter(afterFn(last))

mkZioStream(initial, reqModifyFn, bufferSize)(esExecute)
}

}

0 comments on commit 163c14e

Please sign in to comment.