diff --git a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala index b2e6425f..af2e63bd 100644 --- a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala +++ b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala @@ -1093,9 +1093,106 @@ object ZQuery { if (as.isEmpty) ZQuery.succeed(bf.newBuilder(as).result()) else ZQuery( - ZIO - .foreach[R, Nothing, A, Result[R, E, B], Iterable](as)(f(_).step) - .map(Result.collectAllBatched(_).map(bf.fromSpecific(as))) + ZIO.suspendSucceed { + var blockedRequests: BlockedRequests[R] = BlockedRequests.empty + val doneBuilder: Builder[B, Collection[B]] = bf.newBuilder(as) + val doneIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int + val effectBuilder: ChunkBuilder[ZQuery[R, E, B]] = ChunkBuilder.make[ZQuery[R, E, B]]() + val effectIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int + val failBuilder: ChunkBuilder[Cause[E]] = ChunkBuilder.make[Cause[E]]() + val getBuilder: ChunkBuilder[IO[E, B]] = ChunkBuilder.make[IO[E, B]]() + val getIndicesBuilder: ChunkBuilder[Int] = new ChunkBuilder.Int + var index: Int = 0 + val iterator: Iterator[A] = as.iterator + + ZIO.whileLoop { + iterator.hasNext + } { + f(iterator.next()).step + } { + case Result.Blocked(blockedRequest, Continue.Effect(query)) => + blockedRequests = blockedRequests && blockedRequest + effectBuilder += query + effectIndicesBuilder += index + index += 1 + case Result.Blocked(blockedRequest, Continue.Get(io)) => + blockedRequests = blockedRequests && blockedRequest + getBuilder += io + getIndicesBuilder += index + index += 1 + case Result.Done(b) => + doneBuilder += b + doneIndicesBuilder += index + index += 1 + case Result.Fail(e) => + failBuilder += e + index += 1 + }.as { + val dones = doneBuilder.result() + val doneIndices = doneIndicesBuilder.result() + val effects = effectBuilder.result() + val effectIndices = effectIndicesBuilder.result() + val fails = failBuilder.result() + val gets = getBuilder.result() + val getIndices = getIndicesBuilder.result() + if (gets.isEmpty && effects.isEmpty && fails.isEmpty) + Result.done(bf.fromSpecific(as)(dones)) + else if (fails.isEmpty) { + val continue = if (effects.isEmpty) { + val io = ZIO.collectAll(gets).map { gets => + val array = Array.ofDim[AnyRef](index) + val getsIterator = gets.iterator + val getIndicesIterator = getIndices.iterator + while (getsIterator.hasNext) { + val get = getsIterator.next() + val index = getIndicesIterator.next() + array(index) = get.asInstanceOf[AnyRef] + } + val donesIterator = dones.iterator + val doneIndicesIterator = doneIndices.iterator + while (donesIterator.hasNext) { + val done = donesIterator.next() + val index = doneIndicesIterator.next() + array(index) = done.asInstanceOf[AnyRef] + } + bf.fromSpecific(as)(array.asInstanceOf[Array[B]]) + } + Continue.get(io) + } else { + val query = ZQuery.collectAllBatched(effects).flatMap { effects => + ZQuery.fromZIO(ZIO.collectAll(gets).map { gets => + val array = Array.ofDim[AnyRef](index) + val effectsIterator = effects.iterator + val effectIndicesIterator = effectIndices.iterator + while (effectsIterator.hasNext) { + val effect = effectsIterator.next() + val index = effectIndicesIterator.next() + array(index) = effect.asInstanceOf[AnyRef] + } + val getsIterator = gets.iterator + val getIndicesIterator = getIndices.iterator + while (getsIterator.hasNext) { + val get = getsIterator.next() + val index = getIndicesIterator.next() + array(index) = get.asInstanceOf[AnyRef] + } + val donesIterator = dones.iterator + val doneIndicesIterator = doneIndices.iterator + while (donesIterator.hasNext) { + val done = donesIterator.next() + val index = doneIndicesIterator.next() + array(index) = done.asInstanceOf[AnyRef] + } + bf.fromSpecific(as)(array.asInstanceOf[Array[B]]) + }) + } + Continue.effect(query) + } + Result.blocked(blockedRequests, continue) + } else + Result.fail(fails.foldLeft[Cause[E]](Cause.empty)(_ && _)) + } + } ) final def foreachBatched[R, E, A, B](as: Set[A])(fn: A => ZQuery[R, E, B])(implicit diff --git a/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala b/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala index 53fa09f4..9dcbe44a 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/Continue.scala @@ -169,42 +169,6 @@ private[query] object Continue { ): Continue[R, E, B] = Continue.get(promise.await) - /** - * Collects a collection of continuation into a continuation returning a - * collection of their results, batching requests to data sources. - */ - def collectAllBatched[R, E, A, Collection[+Element] <: Iterable[Element]]( - continues: Collection[Continue[R, E, A]] - )(implicit - bf: BuildFrom[Collection[Continue[R, E, A]], A, Collection[A]], - trace: Trace - ): Continue[R, E, Collection[A]] = - continues.zipWithIndex - .foldLeft[(Chunk[(ZQuery[R, E, A], Int)], Chunk[(IO[E, A], Int)])]((Chunk.empty, Chunk.empty)) { - case ((queries, ios), (continue, index)) => - continue match { - case Effect(query) => (queries :+ ((query, index)), ios) - case Get(io) => (queries, ios :+ ((io, index))) - } - } match { - case (Chunk(), ios) => - get(ZIO.collectAll(ios.map(_._1)).map(bf.fromSpecific(continues))) - case (queries, ios) => - val query = ZQuery.collectAllBatched(queries.map(_._1)).flatMap { as => - val array = Array.ofDim[AnyRef](continues.size) - as.zip(queries.map(_._2)).foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - ZQuery.fromZIO(ZIO.collectAll(ios.map(_._1))).map { as => - as.zip(ios.map(_._2)).foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - bf.fromSpecific(continues)(array.asInstanceOf[Array[A]]) - } - } - effect(query) - } - /** * Collects a collection of continuation into a continuation returning a * collection of their results, in parallel. diff --git a/zio-query/shared/src/main/scala/zio/query/internal/Result.scala b/zio-query/shared/src/main/scala/zio/query/internal/Result.scala index be140491..39a77e79 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/Result.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/Result.scala @@ -111,44 +111,6 @@ private[query] object Result { def blocked[R, E, A](blockedRequests: BlockedRequests[R], continue: Continue[R, E, A]): Result[R, E, A] = Blocked(blockedRequests, continue) - /** - * Collects a collection of results into a single result. Blocked requests - * will be batched. - */ - def collectAllBatched[R, E, A, Collection[+Element] <: Iterable[Element]](results: Collection[Result[R, E, A]])( - implicit - bf: BuildFrom[Collection[Result[R, E, A]], A, Collection[A]], - trace: Trace - ): Result[R, E, Collection[A]] = - results.zipWithIndex - .foldLeft[(Chunk[((BlockedRequests[R], Continue[R, E, A]), Int)], Chunk[(A, Int)], Chunk[(Cause[E], Int)])]( - (Chunk.empty, Chunk.empty, Chunk.empty) - ) { case ((blocked, done, fails), (result, index)) => - result match { - case Blocked(br, c) => (blocked :+ (((br, c), index)), done, fails) - case Done(a) => (blocked, done :+ ((a, index)), fails) - case Fail(e) => (blocked, done, fails :+ ((e, index))) - } - } match { - case (Chunk(), done, Chunk()) => - Result.done(bf.fromSpecific(results)(done.map(_._1))) - case (blocked, done, Chunk()) => - val blockedRequests = blocked.map(_._1._1).foldLeft[BlockedRequests[R]](BlockedRequests.empty)(_ && _) - val continue = Continue.collectAllBatched(blocked.map(_._1._2)).map { as => - val array = Array.ofDim[AnyRef](results.size) - as.zip(blocked.map(_._2)).foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - done.foreach { case (a, i) => - array(i) = a.asInstanceOf[AnyRef] - } - bf.fromSpecific(results)(array.asInstanceOf[Array[A]]) - } - Result.blocked(blockedRequests, continue) - case (_, _, fail) => - Result.fail(fail.map(_._1).foldLeft[Cause[E]](Cause.empty)(_ && _)) - } - /** * Collects a collection of results into a single result. Blocked requests and * their continuations will be executed in parallel.