Skip to content

Commit

Permalink
-- es whatever version - akka streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
djnzx committed Sep 20, 2023
1 parent e993fad commit 0e1810d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 30 deletions.
31 changes: 1 addition & 30 deletions es68/src/main/scala/es68/ExploreElastic68Tests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package es68

import com.sksamuel.elastic4s.IndexAndType
import com.sksamuel.elastic4s.RefreshPolicy
import com.sksamuel.elastic4s.http.{ElasticClient, HttpResponse, RequestFailure, RequestSuccess, Response}
import com.sksamuel.elastic4s.http.{ElasticClient, RequestFailure, RequestSuccess, Response}
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.index.CreateIndexResponse
import com.sksamuel.elastic4s.http.index.IndexResponse
Expand All @@ -14,13 +14,9 @@ import com.sksamuel.elastic4s.indexes.IndexRequest
import com.sksamuel.elastic4s.mappings.MappingDefinition
import com.sksamuel.elastic4s.mappings.dynamictemplate.DynamicMapping
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.searches.queries.BoolQuery
import com.sksamuel.elastic4s.searches.queries.term.TermQuery
import es68.campaign.model.TargetType.{Instant, Registration, TargetList}
import io.circe.generic.AutoDerivation
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funsuite.AnyFunSuite
import org.slf4j.{Logger, LoggerFactory}

class ExploreElastic68Tests extends AnyFunSuite with BeforeAndAfterAll {

Expand Down Expand Up @@ -102,30 +98,5 @@ class ExploreElastic68Tests extends AnyFunSuite with BeforeAndAfterAll {
resp.foreach((x: SearchResponse) => println("There were" -> x.totalHits))
}

test("client123") {
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import es68.IndexableDerivation.hitReaderWithCirce
import es68.campaign.model.CampaignEntity
import ESClient._
import es68.campaign.model.ChronoUnitInstances._

val client = ESClient.apply[Future]("campaign.conf")
implicit val logger: Logger = LoggerFactory.getLogger(this.getClass)

val tQuery: TermQuery = termQuery("targetType", Instant.entryName)
val bQuery: BoolQuery = must(tQuery)

val indexAndType = "paridirect_campaign/campaign"
val q: SearchRequest = search(indexAndType)//.query(bQuery)
val xs: TotalResult[CampaignEntity] = client.execute0[SearchRequest, SearchResponse](q)
.map(_.toTotalResult[CampaignEntity]).await

pprint.pprintln(xs.total)
pprint.pprintln(xs.data.size)
// xs.foreach(x => pprint.pprintln(x))
// client.close()

}

}
40 changes: 40 additions & 0 deletions es68s_akka/src/main/scala/es68/ESStreamSupportAkka.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package es68

import akka.NotUsed
import akka.stream.scaladsl.Source
import cats.implicits.catsSyntaxOptionId
import com.sksamuel.elastic4s.searches.SearchRequest
import com.sksamuel.elastic4s.searches.sort.Sort
import scala.concurrent.ExecutionContext
import scala.concurrent.Future

trait ESStreamSupportAkka {

def mkAkkaStream[A](
request: SearchRequest,
sortFields: Seq[Sort], // we are about to paginate, so we need consistent sorting
afterFn: A => Seq[Any], // we need the extractor function to be used in `es.searchAfter(...)`
bufferSize: Int = 1024 // page size
)(esExecute: SearchRequest => Future[Seq[A]]
)(implicit ec: ExecutionContext
): Source[A, NotUsed] = {

val request0: SearchRequest = request
.sortBy(sortFields)
.size(bufferSize)

Source
.unfoldAsync(request0) { request: SearchRequest =>
esExecute(request)
.map {
case xs if xs.isEmpty => None // no data
case xs => // data
val afterClause = afterFn(xs.last)
val afterQuery = request.searchAfter(afterClause)
(afterQuery, xs).some
}
}
.mapConcat(identity)
}

}

0 comments on commit 0e1810d

Please sign in to comment.