This is an Pekko Streams extension for the ReactiveMongo cursors.
In your project/Build.scala
:
resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
libraryDependencies ++= Seq(
"org.reactivemongo" %% "reactivemongo" % VERSION,
"org.reactivemongo" %% "reactivemongo-pekkostream" % VERSION)
Java 1.8+ is required, and
VERSION
must include the-pekko
qualifier.
Then in your code:
import scala.concurrent.Future
import reactivemongo.api.bson.{ BSONDocument, BSONDocumentReader }
import reactivemongo.api.bson.collection.BSONCollection
// Reactive streams imports
import org.reactivestreams.Publisher
import org.apache.pekko.stream.scaladsl.Source
// ReactiveMongo extensions
import reactivemongo.pekkostream.{ PekkoStreamCursor, cursorProducer, State }
implicit def materializer: org.apache.pekko.stream.Materializer = ???
val ageReader = BSONDocumentReader.field[Int]("age")
def foo(collection: BSONCollection): (Source[Int, Future[State]], Publisher[Int]) = {
implicit def reader: BSONDocumentReader[Int] = ageReader
val cursor: PekkoStreamCursor[Int] =
collection.find(BSONDocument.empty/* findAll */).
sort(BSONDocument("id" -> 1)).cursor[Int]()
val src: Source[Int, Future[State]] = cursor.documentSource()
val pub: Publisher[Int] = cursor.documentPublisher()
src -> pub
}
More examples
The developer guide is available online.
The API documentation is available online.