diff --git a/es68s_ce/src/main/scala/es68/ESStreamSupportFS2.scala b/es68s_ce/src/main/scala/es68/ESStreamSupportFS2.scala new file mode 100644 index 00000000..ab2c979d --- /dev/null +++ b/es68s_ce/src/main/scala/es68/ESStreamSupportFS2.scala @@ -0,0 +1,35 @@ +package es68 + +import cats.Functor +import cats.implicits.{catsSyntaxOptionId, toFunctorOps} +import com.sksamuel.elastic4s.searches.SearchRequest +import com.sksamuel.elastic4s.searches.sort.Sort +import fs2.{Chunk, Stream} + +trait ESStreamSupportFS2 { + + def mkFs2Stream[F[_]: Functor, A]( + request: SearchRequest, + sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting + afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)` + bufferSize: Int = 1024 // page size + )(esExecute: SearchRequest => F[Seq[A]] + ): Stream[F, A] = { + + val request0: SearchRequest = request + .sortBy(sortFields) + .size(bufferSize) + + Stream.unfoldChunkEval(request0) { request: SearchRequest => + esExecute(request) + .map { + case xs if xs.isEmpty => None // no data + case xs => // data + val afterClause = afterFn(xs.last) + val afterQuery = request.searchAfter(afterClause) + (Chunk.from(xs), afterQuery).some + } + } + } + +}