Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix a leak in PrefixAndTail operator. #1623

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

He-Pin
Copy link
Member

@He-Pin He-Pin commented Dec 22, 2024

Motivation:
Fix @queimadus's report in #1566

Modification:
When the upstream finishes quickly but the downstream is not pulled, a leak can happen, so clean up that.

Result:
I think the leak should be fixed now

code from @queimadus

import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.scaladsl.{Sink, Source}
import org.apache.pekko.util.ByteString

import scala.concurrent.Await

object PekkoQuickstart extends App {
  private implicit val system: ActorSystem = ActorSystem()

  val s = Source
    .repeat(())
    .map(_ => ByteString('a' * 400000))
    .take(1000000)
    .prefixAndTail(50000)
    .flatMapConcat { case (prefix, tail) => Source(prefix).concatLazy(tail) }

  val r = Source.empty
    .concatAllLazy(List.tabulate(30000)(_ => s): _*)
    .runWith(Sink.ignore)

  Await.result(r, scala.concurrent.duration.Duration.Inf)
  println(r.value)

//  Source
//    .repeat(s)
//    .take(30000)
//    .flatMapConcat(x => x)
//    .runWith(Sink.ignore)
//    .onComplete(println(_))

//  Source.empty
//    .concatAllLazy(List.tabulate(30000)(_ => Source.lazySource(() => s)): _*)
//    .runWith(Sink.ignore).onComplete(println(_))
}

before:
image
after:
image

Seems there are more leak points.

image

I think the problem is the current logic design where arrays are been used to track and then with the current deign, there will be many many instances of the logic.

So That problem is harder to fix than we expected, but this pr is still valid.

@He-Pin He-Pin added t:stream Pekko Streams bug Something isn't working backport labels Dec 22, 2024
@He-Pin He-Pin added this to the 1.1.3 milestone Dec 22, 2024
@He-Pin
Copy link
Member Author

He-Pin commented Dec 31, 2024

@queimadus continue here #1652

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backport bug Something isn't working t:stream Pekko Streams
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants