-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial implementation #1
Conversation
I don't think a long name now is a problem.
I believe you should return a A builder might be nicer to find references for. For example, one that is used like
If you don't use
This should not affect performance significantly. In any interesting case both branches should be perfectly predictable. |
…input rather than panic, add concurrency control + customizable channel size to secondary runtime
…der method naming
…s of custom executor closure input to include `Send + static`, abstract cancellation logic, change multithreaded tokio default strategy from block_in_place -> spawn_blocking, stash the default strategy in its own oncelock
@arielb1 - all comments should be addressed, thanks again for the close read The main thing I want to call out is, I ended up adding a secondary I needed this because I started having lifetime issues on the I figured it was probably fine since it still avoids the sharp edge of libraries calling Otherwise, I tried to break changes into commits, though some got a bit bigger. Here's the rundown: Changes since last review
|
… since it requires polling
Additional feedback from ariel - better to keep log messages on setting strategy inside a shared function where the once lock is actually set. Similarly only error there rather than sprinkled throughout constructor. |
…` call, consolidate tests, rename `run_compute_heavy_future()` to `execute_compute_heavy_future()`
src/lib.rs
Outdated
} | ||
} | ||
|
||
#[must_use = "doesn't do anything unless used"]#[derive(Default)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cargo fmt missing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
LGTM but plz rustfmt |
Appreciate the review @arielb1 I'll follow up with separate issues on:
|
My main feedback here is that I'm not sure about the mental model / use cases. It seems like the strategy to use isn't necessarily application-global (or at least, there isn't a sensible default). Instead, if may make more sense to pick it based on the individual future? The properties of the future (or the number of futures) might dictate that choice. Or perhaps put another way: I am a customer. How should I pick between these different options? Is there one that is the best? |
Thanks for that @rcoh . Well stated. I do have a few issues open that touch on this:
I think we need some clearer recommendations around defaults. My knee jerk without profiling would be to guess that, probably spawn blocking + concurrency control is the right 'default behavior' to handle assorted calls to this by libraries. And then, if you know you have an issue with frequently blocking futures, the secondary tokio runtime might be better. I want to validate those assumptions a bit more via profiling and then intend to flesh out the defaults as well as provide better explanations + recommendations in the module docs. W/r/t to selective handling per future, I agree with that as well. I see two parts of this:
|
#[tokio::test(flavor = "multi_thread", worker_threads = 10)] | ||
async fn block_in_place_concurrency() { | ||
initialize(); | ||
|
||
let start = std::time::Instant::now(); | ||
|
||
let mut futures = Vec::new(); | ||
|
||
for _ in 0..5 { | ||
let future = async move { std::thread::sleep(Duration::from_millis(15)) }; | ||
// we need to spawn here since otherwise block in place will cancel other futures inside the same task, | ||
// ref https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html | ||
let future = | ||
tokio::task::spawn(async move { execute_compute_heavy_future(future).await }); | ||
futures.push(future); | ||
} | ||
|
||
join_all(futures).await; | ||
|
||
let elapsed_millis = start.elapsed().as_millis(); | ||
assert!(elapsed_millis < 50, "futures did not run concurrently"); | ||
|
||
assert!(elapsed_millis > 20, "futures exceeded max concurrency"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there are 10 workers, I'm not sure if this actually tests that execute_compute_heavy_future
is working? Seems like the number of futures would need to exceed the number of workers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, good point, let me update these tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, I think we want 4 workers here, so that we can validate both that concurrency limit is firing, but also that the futures are playing nicely with worker threads blocking
pub(crate) struct SpawnBlockingExecutor { | ||
concurrency_limit: ConcurrencyLimit, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if this should actually hold onto a handle—you could initialize it from Runtime::current
at creation time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benefit of that being, it would ensure that the spawn_blocking happens in the originating context instead of calling, in case of multiple runtimes? Or something else?
if let Err(err) = tokio::task::spawn_blocking(move || { | ||
tokio::runtime::Handle::current().block_on(wrapped_future) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this actually work? I think once a bad future is in the runtime, it's going to potentially tie up a worker
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think I found that approach from something alice posted somewhere, let me dig it up.
It's currently test e2e with a thread::sleep so it seems to work as expected?
https://github.com/jlizen/compute-heavy-future-executor/blob/main/tests/spawn_blocking_strategy.rs#L36
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was from here, which isn't particularly authoritative
https://stackoverflow.com/questions/76965631/how-do-i-spawn-possibly-blocking-async-tasks-in-tokio
Going to poke around this more closely, thanks for the callout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, alice also discusses it in discord eg https://discord.com/channels/500028886025895936/500336333500448798/1189652644881514586
But that doesn't address it being a potentially blocking future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To summarize offline discussion - the behavior is kind of surprising after doing some tracing of polls vs threads. Calling Handle::block_on()
from inside spawn_blocking()
just spawns a task to a regular worker thread for that runtime. Which makes it pretty useless for our purposes.
Which kind of makes sense, how else would tokio know how to schedule the sleeping futures without a runtime, and it doesn't want to lazily spin up a current thread runtime for every spawn blocking thread. At that point probably you're better off managing your own threadpool full of current thread runtimes or using a delegated secondary multithreaded runtime.
Anyway, PR coming to rip spawn blocking out and default to secondary tokio runtime in case the tokio feature is enabled. I think probably we need to tear our block_in_place
as well, for similar reasons.
Then meanwhile we have our custom executor as an escape hatch for the 'pool of current thread runtimes' case, other async runtimes, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm also wondering if we should have a (still async) execute_sync()
function that accepts a sync input function, for the 'definitely this is blocking and not containing futures' segments of a compute heavy workload.
Since, for those, there is a simpler use case that might just involves spawn_blocking
along with concurrency control.
Today, library authors can pick between spawn_blocking()
on them (pretty opinionated, also tokio-specific) or just intermingling them with futures (no caller ability to slice them apart from futures that will need to sleep). Whereas ideally they could configure a strategy for handling sync inside of async, which gives the consumer escape hatch to avoid spawn_blocking()
(perhaps prefer block_in_place()
, perhaps sending to rayon, etc).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cut #8
Sharing initial commit for design feedback. Please pick it to pieces. Glad to split into smaller commits if it'd be helpful. I figured it was short enough that seeing the whole picture might be useful.
Some specific questions:
Crate name
Is
compute-heavy-future-executor
too long? Most crates are like 6 letters long. I was trying to make it clear what it does, but it's pretty klunky. Open to alternativesStrategy-setting API
I don't love the current ergonomics of initializing a strategy. We have all these calls like
initialize_block_in_place_strategy()
which either initialize the oncelock or panic.I was considering something like a builder or an enum, but it felt ridiculous given that there is only really one step for these and then we stash any output in the once lock. And then they are more verbose to call.
Other ideas welcome.
Custom executor
I wanted to put in an escape hatch to allow alternative async runtimes / customizing existing strategies with extra metrics or whatever / etc. But, the current form isn't particularly pleasant.
Ideally the caller would be able to implement the
ComputeHeavyFutureExecutor
trait themselves, but there were a few issues:Box<dyn ComputeHeavyFutureExecutor>
because the trait isn't object safe to its own generic bounds on itsexecute(fut: F)
methodInstead I stored a closure inside of the
CustomExecutor
struct. That closure takes a future withAny
output on the way in, and returns one withAny
output. Because ourexecute
call does have concrete typing, we can temporarily type erase the future's output before sending it into the closure, and then downcast it back on the flip side.I think it's sound (please correct me, of course). Unfortunately we only are validating that the closure we get doesn't mangle the type, on initialization, not at compile time.
In addition to complexity, this also adds quite a few extra allocations due to all the added vtables and such.
I'm open to alternative approaches. Or, if it's better just to drop this functionality from the library entirely, and force people to stay on the rails, I'm open to that too.
Using
get_or_init()
insidespawn_compute_heavy_future()
Feedback on some dummy code in rustls/tokio-rustls#94 (comment) was that
get_or_init()
was overkill top call from inside the spawn handling.As best as I can tell, using
get_or_init()
rather than matching against the runtime flavor every time this function is called, is strictly more efficient - since, it anyway starts off withOnceCell::get()
, and then only calls the secondary logic if it was uninitialized - ref https://doc.rust-lang.org/beta/src/std/sync/once_lock.rs.html#387This means that with
get_or_init()
, in case we need to use defaults, we have the single get branch every call, and the first call, the additional initialization logic.Meanwhile with just a naked
get().unwrap_or_else()
, we still have the single get branch every call, and then the additional match branch against runtime flavor.Probably a minor point either way, just calling out my reasoning so that it can be corrected as needed :)
Use of log crate
It felt strange depending on
tracing
when tokio itself is optional. I was browsing other libraries and was seeing that many of them just use the simplelog
crate. Which has interop with tracing anyway.Is that the right choice? Or are libraries just using
log
because they were written beforetracing
was widely used?Testing
Unit tests + doc tests succeed locally, with and without the tokio cfg flag enabled. I didn't want to overload this PR even further by setting some github actions, but I'll get to it!