From 25a504676cf180929a2243e526408a5242d0b5ed Mon Sep 17 00:00:00 2001 From: hepin Date: Sun, 22 Dec 2024 18:47:01 +0800 Subject: [PATCH] chore: Fix leak in FlatMapPrefix operator. --- .../stream/impl/fusing/FlatMapPrefix.scala | 25 ++++++++++--------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala index c3672ae1eaa..254b9318e9e 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlatMapPrefix.scala @@ -43,8 +43,10 @@ import pekko.util.OptionVal .mandatoryAttribute[Attributes.NestedMaterializationCancellationPolicy] .propagateToNestedMaterialization val matPromise = Promise[M]() - val logic = new GraphStageLogic(shape) with InHandler with OutHandler { - val accumulated = collection.mutable.Buffer.empty[In] + object logic extends GraphStageLogic(shape) with InHandler with OutHandler { + private var left = if (n < 0) 0 else n + private var builder = Vector.newBuilder[In] + builder.sizeHint(left) private var subSource = OptionVal.none[SubSourceOutlet[In]] private var subSink = OptionVal.none[SubSinkInlet[Out]] @@ -65,11 +67,12 @@ import pekko.util.OptionVal subSource match { case OptionVal.Some(s) => s.push(grab(in)) case _ => - accumulated.append(grab(in)) - if (accumulated.size == n) { + builder += grab(in) + left -= 1 + if (left == 0) { materializeFlow() } else { - // gi'me some more! + // give me some more! pull(in) } } @@ -98,12 +101,10 @@ import pekko.util.OptionVal // delegate to subSink s.pull() case _ => - if (accumulated.size < n) pull(in) - else if (accumulated.size == n) { + if (left > 0) pull(in) + else if (left == 0) { // corner case for n = 0, can be handled in FlowOps materializeFlow() - } else { - throw new IllegalStateException(s"Unexpected accumulated size: ${accumulated.size} (n: $n)") } } } @@ -114,7 +115,7 @@ import pekko.util.OptionVal case _ => if (propagateToNestedMaterialization) { downstreamCause = OptionVal.Some(cause) - if (accumulated.size == n) { + if (left == 0) { // corner case for n = 0, can be handled in FlowOps materializeFlow() } else if (!hasBeenPulled(in)) { // if in was already closed, nested flow would have already been materialized @@ -128,8 +129,8 @@ import pekko.util.OptionVal def materializeFlow(): Unit = try { - val prefix = accumulated.toVector - accumulated.clear() + val prefix = builder.result() + builder = null // free for GC subSource = OptionVal.Some(new SubSourceOutlet[In]("FlatMapPrefix.subSource")) val theSubSource = subSource.get subSink = OptionVal.Some(new SubSinkInlet[Out]("FlatMapPrefix.subSink"))