Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add streaming API #463

Draft
wants to merge 57 commits into
base: master
Choose a base branch
from
Draft

Add streaming API #463

wants to merge 57 commits into from

Conversation

jpsamaroo
Copy link
Member

@jpsamaroo jpsamaroo commented Dec 21, 2023

Adds a spawn_streaming task queue to transform tasks into continuously-executing equivalents that automatically take from inputs streams/channels and put their result to an output stream/channel. Useful for processing tons of individual elements of some large (or infinite) collection.

Todo:

  • Migrate streams on first use
  • Add per-task input buffering to Stream object
  • Add no-allocation ring buffer for process-local put/take to Stream
  • Make buffering amount configurable
  • Add API for constructing streams based on inferred return type, desired buffer size, and source/destination
  • Allow finish_stream(xyz; return=abc) to return custom value (else nothing)
  • Upstream MemPool migration changes (Add DRef migration support JuliaData/MemPool.jl#80)
  • Add docs
  • Add tests
  • (Optional) Adapt ring buffer to support server-local put/take (use mmap?)
  • (Optional) Make value fetching configurable
  • (Optional) Support a waitany-style input stream, taking inputs from multiple tasks
  • (Optional) take! from input streams concurrently, and waitall on them before continuing
  • (Optional) put! into output streams concurrently, and waitall on them before continuing
  • (Optional) Allow using default or previously-cached value if sender not ready
  • (Optional) Allow dropping stream values (after timeout, receiver not ready, over-pressured, etc.)
  • (Optional) Add utility for tracking stream transfer rates (Add streaming throughput monitor #494)
  • (Optional) Add programmable behavior on upstream/downstream Stream closure (how should errors/finishing propagate?)

@JamesWrigley
Copy link
Collaborator

Am I correct in thinking that all the necessary items except for tests are complete?

@jpsamaroo
Copy link
Member Author

Generally yes, I think we're pretty close to this being merge-ready. There are some remaining TODOs that I need to finish, but most are reasonably small. I could definitely use help with writing tests - just validating that we can run various kinds of pipelines and that they work across multiple workers would be really useful.

Instead of taking/putting values sequentially (which may block), runs
"pull" and "push" tasks for each input and output, respectively. Uses
buffers to communicate values between pullers/pushers and the streaming
task, instead of only using one buffer per task-to-task connection.
Switch from RemoteFetcher to RemoteChannelFetcher
Pass object rather than type to `stream_{push,pull}_values!`
ProcessRingBuffer: Don't exit on graceful interrupt when non-empty
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants