Skip to content

Commit

Permalink
Make sure current task build id works with queries / caching (#665)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Jan 9, 2024
1 parent a764405 commit c5b6445
Show file tree
Hide file tree
Showing 19 changed files with 228 additions and 106 deletions.
6 changes: 2 additions & 4 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,7 +1020,7 @@ impl WorkflowClientTrait for Client {
.and_then(|d| d.try_into().ok()),
workflow_run_timeout: options.run_timeout.and_then(|d| d.try_into().ok()),
workflow_task_timeout: options.task_timeout.and_then(|d| d.try_into().ok()),
search_attributes: options.search_attributes.and_then(|d| d.try_into().ok()),
search_attributes: options.search_attributes.map(|d| d.into()),
cron_schedule: options.cron_schedule.unwrap_or_default(),
request_eager_execution: options.enable_eager_workflow_start,
..Default::default()
Expand Down Expand Up @@ -1186,9 +1186,7 @@ impl WorkflowClientTrait for Client {
workflow_task_timeout: workflow_options
.task_timeout
.and_then(|d| d.try_into().ok()),
search_attributes: workflow_options
.search_attributes
.and_then(|d| d.try_into().ok()),
search_attributes: workflow_options.search_attributes.map(|d| d.into()),
cron_schedule: workflow_options.cron_schedule.unwrap_or_default(),
header: options.signal_header,
..Default::default()
Expand Down
6 changes: 4 additions & 2 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -906,8 +906,10 @@ async fn activity_tasks_from_completion_reserve_slots() {
mh.completion_asserts = Some(Box::new(|wftc| {
// Make sure when we see the completion with the schedule act command that it does
// not have the eager execution flag set the first time, and does the second.
if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) =
wftc.commands.get(0).and_then(|cmd| cmd.attributes.as_ref())
if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) = wftc
.commands
.first()
.and_then(|cmd| cmd.attributes.as_ref())
{
if attrs.activity_id == "1" {
assert!(!attrs.request_eager_execution);
Expand Down
2 changes: 1 addition & 1 deletion core/src/protosext/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl Debug for ValidPollWFTQResponse {
self.previous_started_event_id,
self.started_event_id,
self.history.events.len(),
self.history.events.get(0).map(|e| e.event_id),
self.history.events.first().map(|e| e.event_id),
self.legacy_query,
self.query_requests
)
Expand Down
38 changes: 19 additions & 19 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,25 +279,6 @@ fn metric_temporality_to_selector(
MetricTemporality::Delta => ConstantTemporality(Temporality::Delta),
}
}

#[cfg(test)]
pub mod test_initters {
use super::*;
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;

#[allow(dead_code)] // Not always used, called to enable for debugging when needed
pub fn test_telem_console() {
telemetry_init_global(
TelemetryOptionsBuilder::default()
.logging(Logger::Console {
filter: construct_filter_string(Level::DEBUG, Level::WARN),
})
.build()
.unwrap(),
)
.unwrap();
}
}
#[cfg(test)]
pub use test_initters::*;

Expand Down Expand Up @@ -326,3 +307,22 @@ where
format!("[{}]", self.iter().format(","))
}
}

#[cfg(test)]
pub mod test_initters {
use super::*;
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;

#[allow(dead_code)] // Not always used, called to enable for debugging when needed
pub fn test_telem_console() {
telemetry_init_global(
TelemetryOptionsBuilder::default()
.logging(Logger::Console {
filter: construct_filter_string(Level::DEBUG, Level::WARN),
})
.build()
.unwrap(),
)
.unwrap();
}
}
2 changes: 1 addition & 1 deletion core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ pub(crate) fn build_mock_pollers(mut cfg: MockPollCfg) -> MocksHolder {
for (_, tasks) in &mut resp_iter {
// Must extract run id from a workflow task associated with this workflow
// TODO: Case where run id changes for same workflow id is not handled here
if let Some(t) = tasks.get(0) {
if let Some(t) = tasks.front() {
let rid = t.workflow_execution.as_ref().unwrap().run_id.clone();
if !outstanding.has_run(&rid) {
let t = tasks.pop_front().unwrap();
Expand Down
24 changes: 5 additions & 19 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,7 @@ pub struct Worker {
#[async_trait::async_trait]
impl WorkerTrait for Worker {
async fn poll_workflow_activation(&self) -> Result<WorkflowActivation, PollWfError> {
self.next_workflow_activation().await.map(|mut a| {
// Attach this worker's Build ID to the activation if appropriate. This is done here
// to avoid cloning the ID for every workflow instance. Can be lowered when
// https://github.com/temporalio/sdk-core/issues/567 is done
if !a.is_replaying {
a.build_id_for_current_task = self.config.worker_build_id.clone();
}
a
})
self.next_workflow_activation().await
}

#[instrument(skip(self))]
Expand Down Expand Up @@ -227,7 +219,7 @@ impl Worker {

#[allow(clippy::too_many_arguments)] // Not much worth combining here
pub(crate) fn new_with_pollers(
mut config: WorkerConfig,
config: WorkerConfig,
sticky_queue_name: Option<String>,
client: Arc<dyn WorkerClient>,
task_pollers: TaskPollers,
Expand Down Expand Up @@ -384,7 +376,7 @@ impl Worker {
wf_client: client.clone(),
workflows: Workflows::new(
build_wf_basics(
&mut config,
config.clone(),
metrics,
shutdown_token.child_token(),
client.capabilities().cloned().unwrap_or_default(),
Expand Down Expand Up @@ -669,22 +661,16 @@ pub struct PostActivateHookData<'a> {
}

fn build_wf_basics(
config: &mut WorkerConfig,
config: WorkerConfig,
metrics: MetricsContext,
shutdown_token: CancellationToken,
server_capabilities: get_system_info_response::Capabilities,
) -> WorkflowBasics {
WorkflowBasics {
max_cached_workflows: config.max_cached_workflows,
worker_config: Arc::new(config),
shutdown_token,
metrics,
namespace: config.namespace.clone(),
task_queue: config.task_queue.clone(),
ignore_evicts_on_shutdown: config.ignore_evicts_on_shutdown,
fetching_concurrency: config.fetching_concurrency,
server_capabilities,
#[cfg(feature = "save_wf_inputs")]
wf_state_inputs: config.wf_state_inputs.take(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/worker/workflow/history_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl HistoryUpdate {
self.previous_wft_started_id >= 0
}
pub fn first_event_id(&self) -> Option<i64> {
self.events.get(0).map(|e| e.event_id)
self.events.first().map(|e| e.event_id)
}

#[cfg(debug_assertions)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ impl Cancellable for SignalExternalMachine {
fn cancel(&mut self) -> Result<Vec<MachineResponse>, MachineError<Self::Error>> {
let res = OnEventWrapper::on_event_mut(self, SignalExternalMachineEvents::Cancel)?;
let mut ret = vec![];
match res.get(0) {
match res.first() {
Some(SignalExternalCommand::Cancelled) => {
ret = vec![ResolveSignalExternalWorkflow {
seq: self.shared_state.seq,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ mod tests {
// Ensure the upsert command has an empty map when not using the patched command
if !with_patched_cmd {
mp.completion_asserts = Some(Box::new(|wftc| {
assert_matches!(wftc.commands.get(0).and_then(|c| c.attributes.as_ref()).unwrap(),
assert_matches!(wftc.commands.first().and_then(|c| c.attributes.as_ref()).unwrap(),
Attributes::UpsertWorkflowSearchAttributesCommandAttributes(attrs)
if attrs.search_attributes.as_ref().unwrap().indexed_fields.is_empty())
}));
Expand Down
34 changes: 22 additions & 12 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ use std::{
hash::{Hash, Hasher},
iter::Peekable,
rc::Rc,
sync::Arc,
time::{Duration, Instant, SystemTime},
};
use temporal_sdk_core_api::worker::WorkerConfig;
use temporal_sdk_core_protos::{
coresdk::{
common::{NamespacedWorkflowExecution, VersioningIntent},
Expand Down Expand Up @@ -93,16 +95,12 @@ pub(crate) struct WorkflowMachines {
pub last_processed_event: i64,
/// True if the workflow is replaying from history
pub replaying: bool,
/// Namespace this workflow exists in
namespace: String,
/// Workflow identifier
pub workflow_id: String,
/// Workflow type identifier. (Function name, class, etc)
pub workflow_type: String,
/// Identifies the current run
pub run_id: String,
/// The task queue this workflow is operating within
task_queue: String,
/// Is set to true once we've seen the final event in workflow history, to avoid accidentally
/// re-applying the final workflow task.
pub have_seen_terminal_event: bool,
Expand Down Expand Up @@ -160,6 +158,7 @@ pub(crate) struct WorkflowMachines {

/// Metrics context
pub metrics: MetricsContext,
worker_config: Arc<WorkerConfig>,
}

#[derive(Debug, derive_more::Display)]
Expand Down Expand Up @@ -246,11 +245,9 @@ impl WorkflowMachines {
Self {
last_history_from_server: basics.history,
protocol_msgs: vec![],
namespace: basics.namespace,
workflow_id: basics.workflow_id,
workflow_type: basics.workflow_type,
run_id: basics.run_id,
task_queue: basics.task_queue,
drive_me: driven_wf,
replaying,
metrics: basics.metrics,
Expand All @@ -277,6 +274,7 @@ impl WorkflowMachines {
encountered_patch_markers: Default::default(),
local_activity_data: LocalActivityData::default(),
have_seen_terminal_event: false,
worker_config: basics.worker_config,
}
}

Expand Down Expand Up @@ -420,9 +418,15 @@ impl WorkflowMachines {
Some(workflow_activation_job::Variant::QueryWorkflow(_))
)
});
let is_replaying = self.replaying || all_query;
let build_id_for_current_task = if is_replaying {
self.current_wft_build_id.clone().unwrap_or_default()
} else {
self.worker_config.worker_build_id.clone()
};
WorkflowActivation {
timestamp: self.current_wf_time.map(Into::into),
is_replaying: self.replaying || all_query,
is_replaying,
run_id: self.run_id.clone(),
history_length: self.last_processed_event as u32,
jobs,
Expand All @@ -432,7 +436,7 @@ impl WorkflowMachines {
.collect(),
history_size_bytes: self.history_size_bytes,
continue_as_new_suggested: self.continue_as_new_suggested,
build_id_for_current_task: self.current_wft_build_id.clone().unwrap_or_default(),
build_id_for_current_task,
}
}

Expand All @@ -447,7 +451,13 @@ impl WorkflowMachines {
.any(|v| v.is_la_resolution)
}

pub(crate) fn get_metadata_for_wft_complete(&self) -> WorkflowTaskCompletedMetadata {
pub(crate) fn get_metadata_for_wft_complete(&mut self) -> WorkflowTaskCompletedMetadata {
// If this worker has a build ID and we're completing the task, we want to say our ID is the
// current build ID, so that if we get a query before any new history, we properly can
// report that our ID was the one used for the completion.
if !self.worker_config.worker_build_id.is_empty() {
self.current_wft_build_id = Some(self.worker_config.worker_build_id.clone());
}
(*self.observed_internal_flags)
.borrow_mut()
.gather_for_wft_complete()
Expand Down Expand Up @@ -1371,7 +1381,7 @@ impl WorkflowMachines {
}
Some(cancel_we::Target::ChildWorkflowId(wfid)) => (
NamespacedWorkflowExecution {
namespace: self.namespace.clone(),
namespace: self.worker_config.namespace.clone(),
workflow_id: wfid,
run_id: "".to_string(),
},
Expand All @@ -1392,7 +1402,7 @@ impl WorkflowMachines {
WFCommand::SignalExternalWorkflow(attrs) => {
let seq = attrs.seq;
self.add_cmd_to_wf_task(
new_external_signal(attrs, &self.namespace)?,
new_external_signal(attrs, &self.worker_config.namespace)?,
CommandID::SignalExternal(seq).into(),
);
}
Expand Down Expand Up @@ -1539,7 +1549,7 @@ impl WorkflowMachines {
VersioningIntent::Unspecified => {
// If the target TQ is empty, that means use same TQ.
// When TQs match, use compat by default
target_tq.is_empty() || target_tq == self.task_queue
target_tq.is_empty() || target_tq == self.worker_config.task_queue
}
}
}
Expand Down
20 changes: 8 additions & 12 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ use std::{
thread,
time::{Duration, Instant},
};
use temporal_sdk_core_api::errors::{CompleteWfError, PollWfError};
use temporal_sdk_core_api::{
errors::{CompleteWfError, PollWfError},
worker::WorkerConfig,
};
use temporal_sdk_core_protos::{
coresdk::{
workflow_activation::{
Expand Down Expand Up @@ -127,24 +130,17 @@ pub(crate) struct Workflows {
}

pub(crate) struct WorkflowBasics {
pub max_cached_workflows: usize,
pub worker_config: Arc<WorkerConfig>,
pub shutdown_token: CancellationToken,
pub metrics: MetricsContext,
pub namespace: String,
pub task_queue: String,
pub ignore_evicts_on_shutdown: bool,
pub fetching_concurrency: usize,
pub server_capabilities: get_system_info_response::Capabilities,
#[cfg(feature = "save_wf_inputs")]
pub wf_state_inputs: Option<UnboundedSender<Vec<u8>>>,
}

pub(crate) struct RunBasics<'a> {
pub namespace: String,
pub worker_config: Arc<WorkerConfig>,
pub workflow_id: String,
pub workflow_type: String,
pub run_id: String,
pub task_queue: String,
pub history: HistoryUpdate,
pub metrics: MetricsContext,
pub capabilities: &'a get_system_info_response::Capabilities,
Expand All @@ -167,10 +163,10 @@ impl Workflows {
let (local_tx, local_rx) = unbounded_channel();
let (fetch_tx, fetch_rx) = unbounded_channel();
let shutdown_tok = basics.shutdown_token.clone();
let task_queue = basics.task_queue.clone();
let task_queue = basics.worker_config.task_queue.clone();
let extracted_wft_stream = WFTExtractor::build(
client.clone(),
basics.fetching_concurrency,
basics.worker_config.fetching_concurrency,
wft_stream,
UnboundedReceiverStream::new(fetch_rx),
);
Expand Down
Loading

0 comments on commit c5b6445

Please sign in to comment.