-
Notifications
You must be signed in to change notification settings - Fork 603
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Buffer through #2205
base: main
Are you sure you want to change the base?
Buffer through #2205
Conversation
@@ -346,6 +346,25 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, | |||
go(Nil, false, this).stream | |||
} | |||
|
|||
// A builder for queues. | |||
trait MakeQueue[F[_]] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure where to put this, but it lets us create a queue-variant chosen by the user but they can't tamper with its contents at all since create
is polymorphic.
Looks good to me. I ran in to the same issue when thinking about a new fs2 queue type. Re: |
Sounds good, I can try to come up with some other names. I also don't think we really need the |
I'm starting to explore some preliminary ideas for an fs2-oriented
Queue
. FS2 provides an integration with the Cats Effect Queue, but it is neither stream- nor chunk-aware, so there are some inefficiencies and workarounds in place to get things to play well. Based on conversation in Gitter, some desirable properties of such fs2-oriented queue are: stream termination, chunk awareness, error propagation, and backpressure.Before I get any hopes up, I haven't actually implement a
Queue
in this PR. I add a new combinator,bufferThrough
, which is implemented in terms of the CE queue and I think satisfies most of the properties laid out above.bufferThrough
is pretty much theidentity
function, but elements are buffered through a queue. Actually, there is another function that already does exactly this:prefetch
. The main difference is thatbufferThrough
gives you more freedom to control backpressure semantics by supplying whatever kind of queue you want, using a trick similar to the one that @SystemFw used forAsync.cont
. Accordingly,prefetch
can be implemented in terms ofbufferThrough
.So I'm not actually introducing anything groundbreaking here! My goal is rather to draw attention to yet another dimension of streaming queues: the topology of the system.
prefetch
andbufferThrough
are both single-producer, single-consumer systems. I'm not sure yet how to generalize over the remaining three corners of the matrix, if that's even possible.An interesting question is how stream termination would work in multiple-producer systems, if it even makes sense at all. The only behavior that makes sense to me is terminating all producers and consumers whenever a single producer terminates, but that doesn't seem very useful. At that point, I think you would just use
enqueueUnterminated
anddequeueUnterminated
freely.I should've probably opened a separate issue or discussion for this, but oh well.