Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix a leak in PrefixAndTail operator. #1623

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Dec 22, 2024

Motivation:
Fix @queimadus's report in #1566

Modification:
When the upstream finishes quickly but the downstream is not pulled, a leak can happen, so clean up that.

Result:
I think the leak should be fixed now

code from @queimadus

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString

import scala.concurrent.Await

object PekkoQuickstart extends App {
  private implicit val system: ActorSystem = ActorSystem()

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) => Source(prefix).concatLazy(tail) }

  val r = Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore)

  Await.result(r, scala.concurrent.duration.Duration.Inf)
  println(r.value)

//  Source
//    .repeat(s)
//    .take(30000)
//    .flatMapConcat(x => x)
//    .runWith(Sink.ignore)
//    .onComplete(println(_))

//  Source.empty
//    .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
//    .runWith(Sink.ignore).onComplete(println(_))
}

before:
image
after:
image

Seems there are more leak points.

image

I think the problem is the current logic design where arrays are been used to track and then with the current deign, there will be many many instances of the logic.

So That problem is harder to fix than we expected, but this pr is still valid.

@He-Pin He-Pin added t:stream Pekko Streams bug Something isn't working backport labels Dec 22, 2024
@He-Pin He-Pin added this to the 1.1.3 milestone Dec 22, 2024
emit(out, (builder.result(), Source.empty), () => completeStage())
val prefix = builder.result();
builder = null // free for GC
emit(out, (prefix, Source.empty), () => completeStage())
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will leak before.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good - if @queimadus can try this change too that would be very useful

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually can't seem to reproduce the issue with the example I mentioned with flatMapPrefix with prefixAndTail.

E.g. running the following runs fine with and without these changes

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 4000000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) => tail }

  Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore).onComplete(println(_))

What doesn't run fine is the the following snippet, but it may be a different problem.

 def myLogic(prefix: Seq[ByteString]): Flow[ByteString, ByteString, NotUsed] =
    Flow[ByteString]

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) =>
      Source(prefix).concatLazy(tail).via(myLogic(prefix))
    }

  Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore).onComplete(println(_))

That said, these changes still seem to make sense.

Copy link
Member Author

@He-Pin He-Pin Dec 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This problem is a kind of AsyncCallback leak, #408
where many instances consume memories.

To totally fix this, we will lose some support for debugging I think, eg the org.apache.pekko.stream.impl.fusing.GraphInterpreter#toSnapshot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@queimadus Yes, different problem, the new problem is the interpreter keeps many things in memory

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport bug Something isn't working t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants