From 163c14eb3c141a919fd5d99f45be957e9397b9cf Mon Sep 17 00:00:00 2001 From: Alexey Rykhalskiy Date: Wed, 20 Sep 2023 15:23:41 +0300 Subject: [PATCH] -- es - streaming --- .../main/scala/es68/ESStreamSupportAkka.scala | 22 ++++---------- .../main/scala/es68/ESStreamSupportFs2.scala | 29 ++++++++----------- .../main/scala/es68/ESStreamSupportZio.scala | 23 ++++----------- 3 files changed, 24 insertions(+), 50 deletions(-) diff --git a/es68s_akka/src/main/scala/es68/ESStreamSupportAkka.scala b/es68s_akka/src/main/scala/es68/ESStreamSupportAkka.scala index 566bbd53..815687bb 100644 --- a/es68s_akka/src/main/scala/es68/ESStreamSupportAkka.scala +++ b/es68s_akka/src/main/scala/es68/ESStreamSupportAkka.scala @@ -2,15 +2,14 @@ package es68 import akka.NotUsed import akka.stream.scaladsl.Source -import cats.implicits.catsSyntaxOptionId import com.sksamuel.elastic4s.searches.SearchRequest import com.sksamuel.elastic4s.searches.sort.Sort import scala.concurrent.ExecutionContext import scala.concurrent.Future -trait ESStreamSupportAkka { +trait ESStreamSupportAkka extends StreamSupportAkka { - def mkAkkaStream[A]( + def mkEsAkkaStream[A]( request: SearchRequest, sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)` @@ -19,22 +18,13 @@ trait ESStreamSupportAkka { )(implicit ec: ExecutionContext ): Source[A, NotUsed] = { - val request0: SearchRequest = request + val initial: SearchRequest = request .sortBy(sortFields) .size(bufferSize) - Source - .unfoldAsync(request0) { request: SearchRequest => - esExecute(request) - .map { - case xs if xs.isEmpty => None // no data - case xs => // data - val afterClause = afterFn(xs.last) - val afterQuery = request.searchAfter(afterClause) - (afterQuery, xs).some - } - } - .mapConcat(identity) + def reqModifyFn(q: SearchRequest, last: A): SearchRequest = q.searchAfter(afterFn(last)) + + mkAkkaStream(initial, reqModifyFn)(esExecute) } } diff --git a/es68s_ce/src/main/scala/es68/ESStreamSupportFs2.scala b/es68s_ce/src/main/scala/es68/ESStreamSupportFs2.scala index 55502d9a..1a7ec239 100644 --- a/es68s_ce/src/main/scala/es68/ESStreamSupportFs2.scala +++ b/es68s_ce/src/main/scala/es68/ESStreamSupportFs2.scala @@ -1,35 +1,30 @@ package es68 import cats.Functor -import cats.implicits.{catsSyntaxOptionId, toFunctorOps} +import cats.implicits.catsSyntaxOptionId +import cats.implicits.toFunctorOps import com.sksamuel.elastic4s.searches.SearchRequest import com.sksamuel.elastic4s.searches.sort.Sort -import fs2.{Chunk, Stream} +import fs2.Chunk +import fs2.Stream -trait ESStreamSupportFs2 { +trait ESStreamSupportFs2 extends StreamSupportFs2 { - def mkFs2Stream[F[_]: Functor, A]( + def mkEsFs2Stream[F[_]: Functor, A]( request: SearchRequest, sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)` - bufferSize: Int = 1024 // page size + pageSize: Int = 1024 // page size )(esExecute: SearchRequest => F[Seq[A]] ): Stream[F, A] = { - val request0: SearchRequest = request + val initial: SearchRequest = request .sortBy(sortFields) - .size(bufferSize) + .size(pageSize) - Stream.unfoldChunkEval(request0) { request: SearchRequest => - esExecute(request) - .map { - case xs if xs.isEmpty => None // no data - case xs => // data - val afterClause = afterFn(xs.last) - val afterQuery = request.searchAfter(afterClause) - (Chunk.from(xs), afterQuery).some - } - } + def reqModifyFn(q: SearchRequest, last: A): SearchRequest = q.searchAfter(afterFn(last)) + + mkFs2Stream(initial, reqModifyFn, pageSize)(esExecute) } } diff --git a/es68s_zio/src/main/scala/es68/ESStreamSupportZio.scala b/es68s_zio/src/main/scala/es68/ESStreamSupportZio.scala index 8bdc8000..b9855c2d 100644 --- a/es68s_zio/src/main/scala/es68/ESStreamSupportZio.scala +++ b/es68s_zio/src/main/scala/es68/ESStreamSupportZio.scala @@ -1,14 +1,13 @@ package es68 -import cats.implicits.catsSyntaxOptionId import com.sksamuel.elastic4s.searches.SearchRequest import com.sksamuel.elastic4s.searches.sort.Sort import zio.ZIO import zio.stream.ZStream -trait ESStreamSupportZio { +trait ESStreamSupportZio extends StreamSupportZio { - def mkZioStream[A]( + def mkEsZioStream[A]( request: SearchRequest, sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)` @@ -17,23 +16,13 @@ trait ESStreamSupportZio { ): ZStream[Any, Throwable, A] = { /** initial request we start from */ - val request0: SearchRequest = request + val initial: SearchRequest = request .sortBy(sortFields) .size(bufferSize) - ZStream - .paginateZIO(request0) { request: SearchRequest => - esExecute(request) - .map { - case xs if xs.isEmpty => (xs, None) // no data - case xs if xs.length < bufferSize => (xs, None) // last page - case xs => - val afterClause = afterFn(xs.last) - val afterQuery = request.searchAfter(afterClause) - (xs, afterQuery.some) // normal page - } - } - .flatMap(xs => ZStream.fromIterable(xs)) + def reqModifyFn(q: SearchRequest, last: A): SearchRequest = q.searchAfter(afterFn(last)) + + mkZioStream(initial, reqModifyFn, bufferSize)(esExecute) } }