Skip to content

Commit

Permalink
es fs2 streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
djnzx committed Sep 20, 2023
1 parent 0e1810d commit 1b33fee
Showing 1 changed file with 35 additions and 0 deletions.
35 changes: 35 additions & 0 deletions es68s_ce/src/main/scala/es68/ESStreamSupportFS2.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package es68

import cats.Functor
import cats.implicits.{catsSyntaxOptionId, toFunctorOps}
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.searches.sort.Sort
import fs2.{Chunk, Stream}

trait ESStreamSupportFS2 {

def mkFs2Stream[F[_]: Functor, A](
request: SearchRequest,
sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting
afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)`
bufferSize: Int = 1024 // page size
)(esExecute: SearchRequest => F[Seq[A]]
): Stream[F, A] = {

val request0: SearchRequest = request
.sortBy(sortFields)
.size(bufferSize)

Stream.unfoldChunkEval(request0) { request: SearchRequest =>
esExecute(request)
.map {
case xs if xs.isEmpty => None // no data
case xs => // data
val afterClause = afterFn(xs.last)
val afterQuery = request.searchAfter(afterClause)
(Chunk.from(xs), afterQuery).some
}
}
}

}

0 comments on commit 1b33fee

Please sign in to comment.