Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix a leak in PrefixAndTail operator. #1623

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ import pekko.util.ccompat.JavaConverters._
override def onUpstreamFinish(): Unit = {
if (!prefixComplete) {
// This handles the unpulled out case as well
emit(out, (builder.result(), Source.empty), () => completeStage())
val prefix = builder.result();
builder = null // free for GC
emit(out, (prefix, Source.empty), () => completeStage())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will leak before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good - if @queimadus can try this change too that would be very useful

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually can't seem to reproduce the issue with the example I mentioned with flatMapPrefix with prefixAndTail.

E.g. running the following runs fine with and without these changes

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 4000000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) => tail }

  Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore).onComplete(println(_))

What doesn't run fine is the the following snippet, but it may be a different problem.

 def myLogic(prefix: Seq[ByteString]): Flow[ByteString, ByteString, NotUsed] =
    Flow[ByteString]

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) =>
      Source(prefix).concatLazy(tail).via(myLogic(prefix))
    }

  Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore).onComplete(println(_))

That said, these changes still seem to make sense.

Copy link
Member Author

@He-Pin He-Pin Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem is a kind of AsyncCallback leak, #408
where many instances consume memories.

To totally fix this, we will lose some support for debugging I think, eg the org.apache.pekko.stream.impl.fusing.GraphInterpreter#toSnapshot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@queimadus Yes, different problem, the new problem is the interpreter keeps many things in memory

} else {
if (!tailSource.isClosed) tailSource.complete()
completeStage()
Expand Down
Loading