This reproduces an abrupt stream cancellation issue in Akka-gRPC streaming usage employing MergeHub
. The only way to work-around this issue that I have found thus far is to dramatically throttle thruput, which is simply not an acceptable workaround. Even so, I cannot guarantee that said workaround actually works 100% of the time. The error is generated by the receiving side (sinkTest
):
Error in stage [akka.stream.scaladsl.MergeHub$$anon$2$$anon$3]: Upstream producer failed with exception, removing from MergeHub now
akka.stream.scaladsl.MergeHub$ProducerFailed: Upstream producer failed with exception, removing from MergeHub now
at akka.stream.scaladsl.MergeHub$$anon$2$$anon$3.onUpstreamFailure(Hub.scala:352)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:525)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:390)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:650)
at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:521)
at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:625)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:800)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:818)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:716)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:295)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: akka.http.scaladsl.model.http2.PeerClosedStreamException: Stream with ID [???] was closed by peer with code CANCEL(0x08)
The sink employs a MergeHub
and simply logs each received element. The emitter is more complex, it generates a large amount of events with random data and pushes it thusly:
- Through a queue
- Elements are grouped / chunked within a time window to produce a new
Source
sequence of events - The flow is throttled
- Each chunk is pushed to the gRPC call asynchronously with some parallelism
This behaviour is the end result of permuting many different approaches and patterns, none of which has done anything to mitigate the stream cancellation issue. Because I am unable to influence the frequency of these errors at all regardless of what approach I take I believe this to be an Akka-gRPC bug.
- Generate the protocol buffer sources
- Compile the project
- Run
sinkTest
first viacom.jonathan.troubleshoot.streaming.Server
- Run
emitterTest
viacom.jonathan.troubleshoot.streaming.Server