-
Notifications
You must be signed in to change notification settings - Fork 78
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eager Workflow Start (#606) #622
Eager Workflow Start (#606) #622
Conversation
Signed-off-by: Antonio Lain <[email protected]>
Signed-off-by: Antonio Lain <[email protected]>
client/src/worker_registry/mod.rs
Outdated
} | ||
|
||
/// This trait enables local workers to made themselves visible to a shared client instance. | ||
pub trait WorkerRegistry: Send + Sync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could also have a single register
that returns a WorkerPermit
with unregister
on it. That way you could actually get rid of this trait and change fn workers(&self) -> Arc<RwLock<dyn WorkerRegistry>> {
to something like fn register_slot_provider(&self, Box<dyn SlotProvider>) -> WorkerPermit
. Also it will simplify your unregister logic instead of asking the caller to give you their ID again. Also may be worth choosing to use either "worker" or "slot provider" terminology instead of both here.
(none of my suggestions like these are required, just discussion starters)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the move to a single worker per namespace+queue+client it helps to have a worker id on register to detect multiple workers claiming the same namespace+queue+client, and ignore the registration. The logic on the caller worker gets easier too, they don't have to deal with whether they got a permit or not. So I think I'll stick to register(provider)/unregister(id)...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you just consider a second attempted registration of the same namespace+queue a failure? Why do you have to account for identifier at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I can't, because registration will happen whether the programmer is using eager start or not in their code, and I want to make sure that I don't break anything in existing code... I also worry about a worker trying to register twice, and the id helps to decide to do a warn! or silently ignore it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this kind of registry/registration should be eager-workflow-start specific and don't perform eager-workflow-start-enabled-worker-register if that's disabled. Future needs to tie clients and workers together can be done via their separate needs. Also makes your registered worker map much smaller (not that it matters much).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's the goal though, and at least for now it's always disabled. Eager workflow start capability is enabled/disabled at a worker level (but we may allow additional overriding to disable it at the start level if enabled at the worker level) and it will be disabled by default (and we're by no means settled on whether that will change in the future).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is enabled/disabled at worker level in neither java or go. It is also an unnecessary complexity. Once we decide that we want to enable eager in the server, it should just work if clients use the flag in start_workflow....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, my mistake! So I guess you will register every one as eager-capable. I'm now not as big of a fan of erroring on duplicates, heh.
So how would a worker ever call register for itself twice necessitating an identifier (instead of once and given a thing for it to unregister itself)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the current implementation it won't happen, if the constructor fails you get a new id. But I was thinking that one day the registration may not be just for single process, but for a pool of local processes that can all benefit from early start, and having a unique id for a worker in the trait method will hide that implementation. For debugging it also helps to have a unique id per worker, specially if we are imposing the restriction that only one can register.
And as I just mentioned, the "given a thing to unregister" forces the client to deal with failed registrations, while unregister with id is just transparent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally wouldn't be concerned with a potential future of deduplicating, and having an optional "lease" for a successful registration stored on the worker to unregister is normal. The obvious benefits are 1) you don't need a separate trait compared to a single call, and 2) a worker doesn't have to be responsible for its identifier. But if consensus is that we want workers to have this identifier and we want this separate registry, ok.
wft_semaphore.clone(), | ||
external_wft_tx, | ||
); | ||
client.workers().write().register(Box::new(provider)); | ||
Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, wonder if it's clearer for the worker to just impl SlotProvider
trait directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I started that way, but I had to Arc workers + Weak refs, and the comparisons with the hashmap+vector were getting unnecessary ugly, and it required to do more intrusive changes because Workers::new() don't return Arc workers refs.
It was also more difficult to debug compared to just using simple uuid strings, so I think the indirection helps...
client/src/worker_registry/mod.rs
Outdated
#[cfg_attr(test, mockall::automock)] | ||
pub trait SlotProvider: Send + Sync + Debug { | ||
/// A unique identifier for the worker. | ||
fn uuid(&self) -> String; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a lot of string cloning going on, can you have a scalar identifier? If you switch to the WorkerPermit
thing mentioned elsewhere, the worker registry can be responsible for ID creation and therefore can just keep an atomic counter of u64 or something. No need for a worker to provide its ID nor is there a need for a UUID.
Merge branch 'eager-wf-start' of github.com:antlai-temporal/sdk-core into eager-wf-start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking pretty good to me. Just a few more small changes and I think that's it.
client/src/worker_registry/mod.rs
Outdated
#[cfg(test)] | ||
/// Returns (num_providers, num_buckets), where a bucket key is namespace+task_queue. | ||
/// There is only one provider per bucket so `num_providers` should be equal to `num_buckets`. | ||
pub fn num_providers(&self) -> (usize, usize) { | ||
self.manager.read().num_providers() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can just get rid of this entirely now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was checking the invariant that both maps have the same size in the tests, but yes, not much value...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, or you meant just get rid of the comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant the test generally
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I use it in all the unit tests in that file, I need to check that things are deallocated/allocated properly, and with the wrapping I can't get to the maps directly...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a few more minor things, but, LGTM. Thanks!
client/Cargo.toml
Outdated
@@ -23,6 +23,8 @@ once_cell = "1.13" | |||
opentelemetry = { workspace = true, features = ["metrics"] } | |||
parking_lot = "0.12" | |||
prost-types = "0.11" | |||
rand = "0.8.3" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't actually need the rand dep any more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch thanks
client/src/worker_registry/mod.rs
Outdated
p.insert(provider); | ||
Some(self.index.insert(key)) | ||
} else { | ||
warn!("Ignoring registration for worker in bucket {key:?}."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Bucket" isn't going to mean much for a user seeing the warning.
warn!("Ignoring registration for worker in bucket {key:?}."); | |
warn!("Ignoring registration for worker: {key:?}."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
client/src/worker_registry/mod.rs
Outdated
#[cfg_attr(test, mockall::automock)] | ||
pub trait WorkerRegistry { | ||
/// Register a local worker that can provide WFT processing slots. | ||
fn register(&self, provider: Box<dyn SlotProvider + Send + Sync>) -> Option<WorkerKey>; | ||
/// Unregister a provider, typically when its worker starts shutdown. | ||
fn unregister(&self, id: WorkerKey); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm inclined still to say this trait can just go away and these can be implemented on the SlotManager
directly - and in general I try to avoid using mocks except when mocking things that require external resources (ex: the existing mocks are pretty much only for clients / pollers), but this is all internal so typically there's not a big reason to mock anything.
The other two traits make a bit more sense to me since they're implemented outside of the client crate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want "core" to see try_reserve_wft_slot, which can only be called by the client, but I'll remove it...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what pub(crate) is for
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, changing the visibility...
core/src/worker/mod.rs
Outdated
@@ -75,7 +81,8 @@ use { | |||
pub struct Worker { | |||
config: WorkerConfig, | |||
wf_client: Arc<dyn WorkerClient>, | |||
|
|||
/// Registration key for this worker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth saying what the registration key is for
What was changed
Adding to clients the ability of tracking workers that are attached to them, so that we can implement Eager Workflow Start,
a latency optimization that allows the client to directly dispatch the first workflow task to the worker, eliminating a server round-trip.
Why?
Significantly reduces the latency of short-lived workflows.
Checklist
[Feature Request] Enable Eager Workflow Start #606