-
Notifications
You must be signed in to change notification settings - Fork 71
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement simple atomic stream select #585
base: main
Are you sure you want to change the base?
Conversation
25a9d99
to
468126d
Compare
468126d
to
11c0c6e
Compare
I'm currently also trying to understand the 0-capacity stream mechanics, and hopefully will come up with a way to (ideally) select out of a mixed set of streams. |
lib_eio/stream.ml
Outdated
then ( | ||
cancel_all (); | ||
Mutex.unlock t.mutex; | ||
enqueue (Ok (f (Queue.take t.items)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another domain could still have set finished
to true using a different stream by the time this runs.
(also, you can't use Queue.take
after unlocking)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies for missing this, it should have been obvious! Please take another look, it seemed not too difficult to fix.
Otherwise, `Stdlib.Effect.Continuation_already_resumed` will be raised.
I've added a small benchmark (f733045) which helped fix two bugs (the two preceding commits). |
for #577, to supersede #578.
I want to emphasize that this change is based on my likely too superficial understanding of the library. It is likely that there exists a more elegant way to implement it.
Waiters
module slightly, which is what I had hoped to avoid by using multiple fibers previously. The way I modified it makes sense for this kind of application, but it could be that there is a race condition or logical trap I overlooked.select_of_many
does, but maybe it helps you while reviewing it :-)Locking
streams are handled at the moment, with anassert
in place to enforce this. Mostly for the reason that I've wanted to get an initial implementation going before trying to handle everything.Again, I'm not at all offended if a tighter implementation ends up replacing this PR, but my fingers were itching to try and implement it myself :-)