Skip to content

Commit

Permalink
Optimize ZQuery.foreachBatched (#457)
Browse files Browse the repository at this point in the history
* optimize zquery foreachbatched

* fix version specific issue
  • Loading branch information
adamgfraser authored Dec 12, 2023
1 parent 5efeae9 commit 7cdd3fe
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 77 deletions.
103 changes: 100 additions & 3 deletions zio-query/shared/src/main/scala/zio/query/ZQuery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1093,9 +1093,106 @@ object ZQuery {
if (as.isEmpty) ZQuery.succeed(bf.newBuilder(as).result())
else
ZQuery(
ZIO
.foreach[R, Nothing, A, Result[R, E, B], Iterable](as)(f(_).step)
.map(Result.collectAllBatched(_).map(bf.fromSpecific(as)))
ZIO.suspendSucceed {
var blockedRequests: BlockedRequests[R] = BlockedRequests.empty
val doneBuilder: Builder[B, Collection[B]] = bf.newBuilder(as)
val doneIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int
val effectBuilder: ChunkBuilder[ZQuery[R, E, B]] = ChunkBuilder.make[ZQuery[R, E, B]]()
val effectIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int
val failBuilder: ChunkBuilder[Cause[E]] = ChunkBuilder.make[Cause[E]]()
val getBuilder: ChunkBuilder[IO[E, B]] = ChunkBuilder.make[IO[E, B]]()
val getIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int
var index: Int = 0
val iterator: Iterator[A] = as.iterator

ZIO.whileLoop {
iterator.hasNext
} {
f(iterator.next()).step
} {
case Result.Blocked(blockedRequest, Continue.Effect(query)) =>
blockedRequests = blockedRequests && blockedRequest
effectBuilder += query
effectIndicesBuilder += index
index += 1
case Result.Blocked(blockedRequest, Continue.Get(io)) =>
blockedRequests = blockedRequests && blockedRequest
getBuilder += io
getIndicesBuilder += index
index += 1
case Result.Done(b) =>
doneBuilder += b
doneIndicesBuilder += index
index += 1
case Result.Fail(e) =>
failBuilder += e
index += 1
}.as {
val dones = doneBuilder.result()
val doneIndices = doneIndicesBuilder.result()
val effects = effectBuilder.result()
val effectIndices = effectIndicesBuilder.result()
val fails = failBuilder.result()
val gets = getBuilder.result()
val getIndices = getIndicesBuilder.result()
if (gets.isEmpty && effects.isEmpty && fails.isEmpty)
Result.done(bf.fromSpecific(as)(dones))
else if (fails.isEmpty) {
val continue = if (effects.isEmpty) {
val io = ZIO.collectAll(gets).map { gets =>
val array = Array.ofDim[AnyRef](index)
val getsIterator = gets.iterator
val getIndicesIterator = getIndices.iterator
while (getsIterator.hasNext) {
val get = getsIterator.next()
val index = getIndicesIterator.next()
array(index) = get.asInstanceOf[AnyRef]
}
val donesIterator = dones.iterator
val doneIndicesIterator = doneIndices.iterator
while (donesIterator.hasNext) {
val done = donesIterator.next()
val index = doneIndicesIterator.next()
array(index) = done.asInstanceOf[AnyRef]
}
bf.fromSpecific(as)(array.asInstanceOf[Array[B]])
}
Continue.get(io)
} else {
val query = ZQuery.collectAllBatched(effects).flatMap { effects =>
ZQuery.fromZIO(ZIO.collectAll(gets).map { gets =>
val array = Array.ofDim[AnyRef](index)
val effectsIterator = effects.iterator
val effectIndicesIterator = effectIndices.iterator
while (effectsIterator.hasNext) {
val effect = effectsIterator.next()
val index = effectIndicesIterator.next()
array(index) = effect.asInstanceOf[AnyRef]
}
val getsIterator = gets.iterator
val getIndicesIterator = getIndices.iterator
while (getsIterator.hasNext) {
val get = getsIterator.next()
val index = getIndicesIterator.next()
array(index) = get.asInstanceOf[AnyRef]
}
val donesIterator = dones.iterator
val doneIndicesIterator = doneIndices.iterator
while (donesIterator.hasNext) {
val done = donesIterator.next()
val index = doneIndicesIterator.next()
array(index) = done.asInstanceOf[AnyRef]
}
bf.fromSpecific(as)(array.asInstanceOf[Array[B]])
})
}
Continue.effect(query)
}
Result.blocked(blockedRequests, continue)
} else
Result.fail(fails.foldLeft[Cause[E]](Cause.empty)(_ && _))
}
}
)

final def foreachBatched[R, E, A, B](as: Set[A])(fn: A => ZQuery[R, E, B])(implicit
Expand Down
36 changes: 0 additions & 36 deletions zio-query/shared/src/main/scala/zio/query/internal/Continue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -169,42 +169,6 @@ private[query] object Continue {
): Continue[R, E, B] =
Continue.get(promise.await)

/**
* Collects a collection of continuation into a continuation returning a
* collection of their results, batching requests to data sources.
*/
def collectAllBatched[R, E, A, Collection[+Element] <: Iterable[Element]](
continues: Collection[Continue[R, E, A]]
)(implicit
bf: BuildFrom[Collection[Continue[R, E, A]], A, Collection[A]],
trace: Trace
): Continue[R, E, Collection[A]] =
continues.zipWithIndex
.foldLeft[(Chunk[(ZQuery[R, E, A], Int)], Chunk[(IO[E, A], Int)])]((Chunk.empty, Chunk.empty)) {
case ((queries, ios), (continue, index)) =>
continue match {
case Effect(query) => (queries :+ ((query, index)), ios)
case Get(io) => (queries, ios :+ ((io, index)))
}
} match {
case (Chunk(), ios) =>
get(ZIO.collectAll(ios.map(_._1)).map(bf.fromSpecific(continues)))
case (queries, ios) =>
val query = ZQuery.collectAllBatched(queries.map(_._1)).flatMap { as =>
val array = Array.ofDim[AnyRef](continues.size)
as.zip(queries.map(_._2)).foreach { case (a, i) =>
array(i) = a.asInstanceOf[AnyRef]
}
ZQuery.fromZIO(ZIO.collectAll(ios.map(_._1))).map { as =>
as.zip(ios.map(_._2)).foreach { case (a, i) =>
array(i) = a.asInstanceOf[AnyRef]
}
bf.fromSpecific(continues)(array.asInstanceOf[Array[A]])
}
}
effect(query)
}

/**
* Collects a collection of continuation into a continuation returning a
* collection of their results, in parallel.
Expand Down
38 changes: 0 additions & 38 deletions zio-query/shared/src/main/scala/zio/query/internal/Result.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,44 +111,6 @@ private[query] object Result {
def blocked[R, E, A](blockedRequests: BlockedRequests[R], continue: Continue[R, E, A]): Result[R, E, A] =
Blocked(blockedRequests, continue)

/**
* Collects a collection of results into a single result. Blocked requests
* will be batched.
*/
def collectAllBatched[R, E, A, Collection[+Element] <: Iterable[Element]](results: Collection[Result[R, E, A]])(
implicit
bf: BuildFrom[Collection[Result[R, E, A]], A, Collection[A]],
trace: Trace
): Result[R, E, Collection[A]] =
results.zipWithIndex
.foldLeft[(Chunk[((BlockedRequests[R], Continue[R, E, A]), Int)], Chunk[(A, Int)], Chunk[(Cause[E], Int)])](
(Chunk.empty, Chunk.empty, Chunk.empty)
) { case ((blocked, done, fails), (result, index)) =>
result match {
case Blocked(br, c) => (blocked :+ (((br, c), index)), done, fails)
case Done(a) => (blocked, done :+ ((a, index)), fails)
case Fail(e) => (blocked, done, fails :+ ((e, index)))
}
} match {
case (Chunk(), done, Chunk()) =>
Result.done(bf.fromSpecific(results)(done.map(_._1)))
case (blocked, done, Chunk()) =>
val blockedRequests = blocked.map(_._1._1).foldLeft[BlockedRequests[R]](BlockedRequests.empty)(_ && _)
val continue = Continue.collectAllBatched(blocked.map(_._1._2)).map { as =>
val array = Array.ofDim[AnyRef](results.size)
as.zip(blocked.map(_._2)).foreach { case (a, i) =>
array(i) = a.asInstanceOf[AnyRef]
}
done.foreach { case (a, i) =>
array(i) = a.asInstanceOf[AnyRef]
}
bf.fromSpecific(results)(array.asInstanceOf[Array[A]])
}
Result.blocked(blockedRequests, continue)
case (_, _, fail) =>
Result.fail(fail.map(_._1).foldLeft[Cause[E]](Cause.empty)(_ && _))
}

/**
* Collects a collection of results into a single result. Blocked requests and
* their continuations will be executed in parallel.
Expand Down

0 comments on commit 7cdd3fe

Please sign in to comment.