Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eager Workflow Start (#606) #622

Merged
merged 22 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
643f0e8
First complete implementation
antlai-temporal Oct 20, 2023
fdc0d73
Merge remote-tracking branch 'upstream/master' into eager-wf-start
antlai-temporal Oct 20, 2023
a491b7a
Draft
antlai-temporal Oct 24, 2023
656df5a
Exclude mock from merging streams
antlai-temporal Oct 26, 2023
f4c5ce7
Adding unit tests
antlai-temporal Oct 30, 2023
829bb35
Add integration test
antlai-temporal Oct 30, 2023
3addfb0
Merge remote-tracking branch 'upstream/master' into eager-wf-start
antlai-temporal Oct 30, 2023
6d2eb19
Adding recover integration test
antlai-temporal Oct 31, 2023
6d26d35
Lint the test code
antlai-temporal Oct 31, 2023
833610a
Enable dynamic config for buildkite
antlai-temporal Oct 31, 2023
0a5c48e
Many review comments addressed
antlai-temporal Nov 1, 2023
9ec20a1
Fix macro needing dummy string
Sushisource Nov 1, 2023
fd0de7c
Consume slot in shedule_wft
antlai-temporal Nov 2, 2023
415c48a
Adding Spencer's macro trick
antlai-temporal Nov 2, 2023
7f10e09
Hide the RwLock for SlotManager
antlai-temporal Nov 2, 2023
3e03980
Forcing one provider per namespace+task_queue+client
antlai-temporal Nov 2, 2023
cbb18bb
Remove quiet flag
antlai-temporal Nov 3, 2023
62db6cf
Replace uuids by SlotMap keys
antlai-temporal Nov 3, 2023
ff6015b
Merge remote-tracking branch 'upstream/master' into eager-wf-start
antlai-temporal Nov 3, 2023
27e5626
Remove WorkerRegistry trait
antlai-temporal Nov 3, 2023
9e8eb40
Merge remote-tracking branch 'upstream/master' into eager-wf-start
antlai-temporal Nov 3, 2023
f7a6e8d
Make try_reserve_wft_slot only visible in the client crate
antlai-temporal Nov 3, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Cargo.lock

/.idea/
*.iml
*~

# Ignore generated protobuf files
src/protos/*.rs
Expand Down
1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ once_cell = "1.13"
opentelemetry = { workspace = true, features = ["metrics"] }
parking_lot = "0.12"
prost-types = "0.11"
slotmap = "1.0"
thiserror = "1.0"
tokio = "1.1"
tonic = { workspace = true, features = ["tls", "tls-roots"] }
Expand Down
23 changes: 17 additions & 6 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

#[macro_use]
extern crate tracing;

mod metrics;
mod raw;
mod retry;
mod worker_registry;
mod workflow_handle;

pub use crate::retry::{CallType, RetryClient, RETRYABLE_ERROR_CODES};
Expand All @@ -23,6 +23,7 @@ pub use temporal_sdk_core_protos::temporal::api::{
},
};
pub use tonic;
pub use worker_registry::{Slot, SlotManager, SlotProvider, WorkerKey};
pub use workflow_handle::{WorkflowExecutionInfo, WorkflowExecutionResult};

use crate::{
Expand Down Expand Up @@ -280,6 +281,7 @@ pub struct ConfiguredClient<C> {
headers: Arc<RwLock<HashMap<String, String>>>,
/// Capabilities as read from the `get_system_info` RPC call made on client connection
capabilities: Option<get_system_info_response::Capabilities>,
workers: Arc<SlotManager>,
}

impl<C> ConfiguredClient<C> {
Expand All @@ -299,6 +301,11 @@ impl<C> ConfiguredClient<C> {
pub fn capabilities(&self) -> Option<&get_system_info_response::Capabilities> {
self.capabilities.as_ref()
}

/// Returns a cloned reference to a registry with workers using this client instance
pub fn workers(&self) -> Arc<SlotManager> {
self.workers.clone()
}
}

// The configured client is effectively a "smart" (dumb) pointer
Expand Down Expand Up @@ -377,6 +384,7 @@ impl ClientOptions {
client: TemporalServiceClient::new(svc),
options: Arc::new(self.clone()),
capabilities: None,
workers: Arc::new(SlotManager::new()),
};
match client
.get_system_info(GetSystemInfoRequest::default())
Expand Down Expand Up @@ -974,6 +982,10 @@ pub struct WorkflowOptions {

/// Optionally associate extra search attributes with a workflow
pub search_attributes: Option<HashMap<String, Payload>>,

/// Optionally enable Eager Workflow Start, a latency optimization using local workers
/// NOTE: Experimental and incompatible with versioning with BuildIDs
pub enable_eager_workflow_start: bool,
}

#[async_trait::async_trait]
Expand All @@ -988,7 +1000,7 @@ impl WorkflowClientTrait for Client {
options: WorkflowOptions,
) -> Result<StartWorkflowExecutionResponse> {
Ok(WorkflowService::start_workflow_execution(
&mut self.inner.client.clone(),
&mut self.inner.clone(),
StartWorkflowExecutionRequest {
namespace: self.namespace.clone(),
input: input.into_payloads(),
Expand All @@ -1006,10 +1018,11 @@ impl WorkflowClientTrait for Client {
workflow_execution_timeout: options
.execution_timeout
.and_then(|d| d.try_into().ok()),
workflow_run_timeout: options.execution_timeout.and_then(|d| d.try_into().ok()),
workflow_run_timeout: options.run_timeout.and_then(|d| d.try_into().ok()),
workflow_task_timeout: options.task_timeout.and_then(|d| d.try_into().ok()),
search_attributes: options.search_attributes.and_then(|d| d.try_into().ok()),
cron_schedule: options.cron_schedule.unwrap_or_default(),
request_eager_execution: options.enable_eager_workflow_start,
..Default::default()
},
)
Expand Down Expand Up @@ -1169,9 +1182,7 @@ impl WorkflowClientTrait for Client {
workflow_execution_timeout: workflow_options
.execution_timeout
.and_then(|d| d.try_into().ok()),
workflow_run_timeout: workflow_options
.execution_timeout
.and_then(|d| d.try_into().ok()),
workflow_run_timeout: workflow_options.run_timeout.and_then(|d| d.try_into().ok()),
workflow_task_timeout: workflow_options
.task_timeout
.and_then(|d| d.try_into().ok()),
Expand Down
76 changes: 73 additions & 3 deletions client/src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
use crate::{
metrics::{namespace_kv, task_queue_kv},
raw::sealed::RawClientLike,
worker_registry::{Slot, SlotManager},
Client, ConfiguredClient, InterceptedMetricsSvc, RetryClient, TemporalServiceClient,
LONG_POLL_TIMEOUT,
};
use futures::{future::BoxFuture, FutureExt, TryFutureExt};
use std::sync::Arc;
use temporal_sdk_core_api::telemetry::metrics::MetricKeyValue;
use temporal_sdk_core_protos::{
grpc::health::v1::{health_client::HealthClient, *},
Expand Down Expand Up @@ -55,6 +57,9 @@ pub(super) mod sealed {
/// Return a mutable ref to the health service client instance
fn health_client_mut(&mut self) -> &mut HealthClient<Self::SvcType>;

/// Return a registry with workers using this client instance
fn get_workers_info(&self) -> Option<Arc<SlotManager>>;

async fn call<F, Req, Resp>(
&mut self,
_call_name: &'static str,
Expand Down Expand Up @@ -111,6 +116,10 @@ where
self.get_client_mut().health_client_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
self.get_client().get_workers_info()
}

async fn call<F, Req, Resp>(
&mut self,
call_name: &'static str,
Expand Down Expand Up @@ -173,6 +182,10 @@ where
fn health_client_mut(&mut self) -> &mut HealthClient<Self::SvcType> {
self.health_svc_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
None
}
}

impl<T> RawClientLike for ConfiguredClient<TemporalServiceClient<T>>
Expand Down Expand Up @@ -216,6 +229,10 @@ where
fn health_client_mut(&mut self) -> &mut HealthClient<Self::SvcType> {
self.client.health_client_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
Some(self.workers())
}
}

impl RawClientLike for Client {
Expand Down Expand Up @@ -252,6 +269,10 @@ impl RawClientLike for Client {
fn health_client_mut(&mut self) -> &mut HealthClient<Self::SvcType> {
self.inner.health_client_mut()
}

fn get_workers_info(&self) -> Option<Arc<SlotManager>> {
self.inner.get_workers_info()
}
}

/// Helper for cloning a tonic request as long as the inner message may be cloned.
Expand Down Expand Up @@ -356,10 +377,29 @@ macro_rules! proxy {
self.call(stringify!($method), fact, request.into_request())
}
};
($client_type:tt, $client_meth:ident, $method:ident, $req:ty, $resp:ty,
$closure_before:expr, $closure_after:expr) => {
#[doc = concat!("See [", stringify!($client_type), "::", stringify!($method), "]")]
fn $method(
&mut self,
request: impl tonic::IntoRequest<$req>,
) -> BoxFuture<Result<tonic::Response<$resp>, tonic::Status>> {
#[allow(unused_mut)]
let fact = |c: &mut Self, mut req: tonic::Request<$req>| {
let data = type_closure_two_arg(&mut req, c.get_workers_info().unwrap(),
$closure_before);
let mut c = c.$client_meth().clone();
async move {
type_closure_two_arg(c.$method(req).await, data, $closure_after)
}.boxed()
};
self.call(stringify!($method), fact, request.into_request())
}
};
}
macro_rules! proxier {
( $trait_name:ident; $impl_list_name:ident; $client_type:tt; $client_meth:ident;
$(($method:ident, $req:ty, $resp:ty $(, $closure:expr)? );)* ) => {
$(($method:ident, $req:ty, $resp:ty $(, $closure:expr $(, $closure_after:expr)?)? );)* ) => {
#[cfg(test)]
const $impl_list_name: &'static [&'static str] = &[$(stringify!($method)),*];
/// Trait version of the generated client with modifications to attach appropriate metric
Expand All @@ -377,7 +417,8 @@ macro_rules! proxier {
as tonic::codegen::Body>::Error: Into<tonic::codegen::StdError> + Send,
{
$(
proxy!($client_type, $client_meth, $method, $req, $resp $(,$closure)*);
proxy!($client_type, $client_meth, $method, $req, $resp
$(,$closure $(,$closure_after)*)*);
)*
}
};
Expand All @@ -388,6 +429,10 @@ fn type_closure_arg<T, R>(arg: T, f: impl FnOnce(T) -> R) -> R {
f(arg)
}

fn type_closure_two_arg<T, R, S>(arg1: R, arg2: T, f: impl FnOnce(R, T) -> S) -> S {
f(arg1, arg2)
}

proxier! {
WorkflowService; ALL_IMPLEMENTED_WORKFLOW_SERVICE_RPCS; WorkflowServiceClient; workflow_client_mut;
(
Expand Down Expand Up @@ -435,10 +480,35 @@ proxier! {
start_workflow_execution,
StartWorkflowExecutionRequest,
StartWorkflowExecutionResponse,
|r| {
|r, workers| {
let mut slot: Option<Box<dyn Slot + Send>> = None;
let mut labels = AttachMetricLabels::namespace(r.get_ref().namespace.clone());
labels.task_q(r.get_ref().task_queue.clone());
r.extensions_mut().insert(labels);
let req_mut = r.get_mut();
if req_mut.request_eager_execution {
let namespace = req_mut.namespace.clone();
let task_queue = req_mut.task_queue.clone().unwrap().name.clone();
match workers.try_reserve_wft_slot(namespace, task_queue) {
Some(s) => slot = Some(s),
None => req_mut.request_eager_execution = false
}
}
slot
},
|resp, slot| {
if let Some(mut s) = slot {
cretz marked this conversation as resolved.
Show resolved Hide resolved
if let Ok(response) = resp.as_ref() {
if let Some(task) = response.get_ref().clone().eager_workflow_task {
if let Err(e) = s.schedule_wft(task) {
// This is a latency issue, i.e., the client does not need to handle
// this error, because the WFT will be retried after a timeout.
warn!(details = ?e, "Eager workflow task rejected by worker.");
}
}
}
}
resp
}
);
(
Expand Down
Loading
Loading