Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unreliable stream interruption #2330

Open
njwilson23 opened this issue Mar 18, 2021 · 5 comments
Open

Unreliable stream interruption #2330

njwilson23 opened this issue Mar 18, 2021 · 5 comments

Comments

@njwilson23
Copy link

When a stream is interrupted, I expect based on these docs that the interruption is final and can only be handled with Stream.bracket (or something built on Stream.bracket).

So, I would not expect the following code, which attempts to restart a stream indefinitely, to work when the provided stream is interrupted:

def resume[F[_] : Concurrent : RaiseThrowable, A, B](mk: A => Stream[F, B], checkpoint: B => A)(start: A): Stream[F, B] = {
  def go(s: Stream[F, Either[Throwable, B]], watermark: A): Pull[F, B, Unit] = s.pull.uncons1.flatMap {
    case Some((Right(b), rest)) => Pull.output1(b) >> go(rest, checkpoint(b))
    case Some((Left(_), _)) => go(mk(watermark).attempt, watermark)
    case None => go(mk(watermark).attempt, watermark)
  }

  go(mk(start).attempt, start).stream
}

However, I find that it sometimes works, and sometimes doesn't. In the following test:

// How many times should we interrupt the stream?
val Interrupts: Int = 1

// Interrupt the stream after five items, up to a max number of times
def interrupter[A](deferred: Deferred[IO, Unit], interruptCount: Ref[IO, Int]): Pipe[IO, A, A] = {
  input: Stream[IO, A] =>
    input.zipWithIndex
      .evalTap {
        case (_, 5) => interruptCount.getAndUpdate(_ + 1).flatMap { i =>
          if (i < Interrupts) {
            deferred.complete(())
          } else IO.unit
        }
        case _ => IO.unit
      }
      .map(_._1)
      .interruptWhen(deferred.get.attempt)
}

val stream: Int => Stream[IO, Int] = Stream.iterate(_)(_ + 1)

val assertion = for {
  interruptCount <- Ref.of[IO, Int](0)
  _ <- resume[Int, Int](
    start => Stream.eval(Deferred[IO, Unit]).flatMap(d => stream(start).through(interrupter(d, interruptCount))),
    _ + 1
  )(0)
    .take(1000)
    .compile
    .toList
    .map(lst => assert(lst == List.range(0, 1000))
} yield ()

if I only interrupt the stream a small number of times (e.g. once), most likely the restart function works and I get the 1000 elements from .take. But if I allow many interruptions (e.g. 10), typically the stream I get is truncated.

This is very surprising! Is this nondeterminism a bug?

@diesalbla
Copy link
Contributor

@njwilson23 Good afternoon, Nat. Thanks for reporting this bug. We will try to look at it as soon as we can.

In the meantime, to help us with testing, could you submit this as an executable IOApp, in a PR? Also, could you describe on which version of fs2 were you running this example? If you could run it in the latests releases of 2.x, as well as the latest milestone of 3.x, that would help us get into it.

@njwilson23
Copy link
Author

Sure, PR opened. I did my testing on 2.5.0, although I've been tracking similar strangeness since at least 2.2.x. I'll try to get it working on 3.x and report back.

@njwilson23
Copy link
Author

I verified that it exhibits the same surprising behaviour with

scalaVersion := "2.13.5"

libraryDependencies += "org.typelevel" %% "cats-core" % "2.4.2"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.0.0-RC2"
libraryDependencies += "co.fs2" %% "fs2-core" % "3.0.0-M9"

@diesalbla
Copy link
Contributor

I have just been running the executable, in the PR you opened, on top of the current main. It seems that the issue has been fixed in the four weeks since that 3.0.0-M9 release.

@njwilson23
Copy link
Author

njwilson23 commented Mar 21, 2021

That's interesting and surprising! However, I tried changing the numbers slightly, requiring 100 interruptions (up from 10), and consuming 10,000 values (up from 1,000). That reproduces the flaky behaviour observed with previous versions, so it doesn't seem like it's fixed in a consistent manner.

I'm also curious, what do we expect to happen? From a (superficial) reading of the code, I expected resume to work with streams experiencing interruption because only the current scope is interrupted. However, based on the documentation I linked in the OP, I would expect that it would not, since it doesn't use bracket or onFinalize. (On main tip commit e4872e6 as well as others I've tried, it sometimes works and sometimes doesn't.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants