From c5b6445fc688ca1c6caa0e1e01b2b99d1c5b7371 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 9 Jan 2024 10:32:05 -0800 Subject: [PATCH] Make sure current task build id works with queries / caching (#665) --- client/src/lib.rs | 6 +- core/src/core_tests/activity_tasks.rs | 6 +- core/src/protosext/mod.rs | 2 +- core/src/telemetry/mod.rs | 38 ++--- core/src/test_help/mod.rs | 2 +- core/src/worker/mod.rs | 24 +-- core/src/worker/workflow/history_update.rs | 2 +- .../machines/signal_external_state_machine.rs | 2 +- .../upsert_search_attributes_state_machine.rs | 2 +- .../workflow/machines/workflow_machines.rs | 34 +++-- core/src/worker/workflow/mod.rs | 20 +-- core/src/worker/workflow/run_cache.rs | 24 ++- core/src/worker/workflow/workflow_stream.rs | 8 +- .../workflow_stream/saved_wf_inputs.rs | 4 +- sdk-core-protos/src/history_info.rs | 2 +- sdk-core-protos/src/lib.rs | 2 +- sdk/src/workflow_future.rs | 4 +- tests/integ_tests/visibility_tests.rs | 8 +- tests/integ_tests/workflow_tests.rs | 144 +++++++++++++++++- 19 files changed, 228 insertions(+), 106 deletions(-) diff --git a/client/src/lib.rs b/client/src/lib.rs index b634c6087..622955e24 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1020,7 +1020,7 @@ impl WorkflowClientTrait for Client { .and_then(|d| d.try_into().ok()), workflow_run_timeout: options.run_timeout.and_then(|d| d.try_into().ok()), workflow_task_timeout: options.task_timeout.and_then(|d| d.try_into().ok()), - search_attributes: options.search_attributes.and_then(|d| d.try_into().ok()), + search_attributes: options.search_attributes.map(|d| d.into()), cron_schedule: options.cron_schedule.unwrap_or_default(), request_eager_execution: options.enable_eager_workflow_start, ..Default::default() @@ -1186,9 +1186,7 @@ impl WorkflowClientTrait for Client { workflow_task_timeout: workflow_options .task_timeout .and_then(|d| d.try_into().ok()), - search_attributes: workflow_options - .search_attributes - .and_then(|d| d.try_into().ok()), + search_attributes: workflow_options.search_attributes.map(|d| d.into()), cron_schedule: workflow_options.cron_schedule.unwrap_or_default(), header: options.signal_header, ..Default::default() diff --git a/core/src/core_tests/activity_tasks.rs b/core/src/core_tests/activity_tasks.rs index 6ba4239b9..eaea6cc26 100644 --- a/core/src/core_tests/activity_tasks.rs +++ b/core/src/core_tests/activity_tasks.rs @@ -906,8 +906,10 @@ async fn activity_tasks_from_completion_reserve_slots() { mh.completion_asserts = Some(Box::new(|wftc| { // Make sure when we see the completion with the schedule act command that it does // not have the eager execution flag set the first time, and does the second. - if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) = - wftc.commands.get(0).and_then(|cmd| cmd.attributes.as_ref()) + if let Some(Attributes::ScheduleActivityTaskCommandAttributes(attrs)) = wftc + .commands + .first() + .and_then(|cmd| cmd.attributes.as_ref()) { if attrs.activity_id == "1" { assert!(!attrs.request_eager_execution); diff --git a/core/src/protosext/mod.rs b/core/src/protosext/mod.rs index 0ea748122..730e78479 100644 --- a/core/src/protosext/mod.rs +++ b/core/src/protosext/mod.rs @@ -82,7 +82,7 @@ impl Debug for ValidPollWFTQResponse { self.previous_started_event_id, self.started_event_id, self.history.events.len(), - self.history.events.get(0).map(|e| e.event_id), + self.history.events.first().map(|e| e.event_id), self.legacy_query, self.query_requests ) diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index b6d87f650..9f12216f9 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -279,25 +279,6 @@ fn metric_temporality_to_selector( MetricTemporality::Delta => ConstantTemporality(Temporality::Delta), } } - -#[cfg(test)] -pub mod test_initters { - use super::*; - use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder; - - #[allow(dead_code)] // Not always used, called to enable for debugging when needed - pub fn test_telem_console() { - telemetry_init_global( - TelemetryOptionsBuilder::default() - .logging(Logger::Console { - filter: construct_filter_string(Level::DEBUG, Level::WARN), - }) - .build() - .unwrap(), - ) - .unwrap(); - } -} #[cfg(test)] pub use test_initters::*; @@ -326,3 +307,22 @@ where format!("[{}]", self.iter().format(",")) } } + +#[cfg(test)] +pub mod test_initters { + use super::*; + use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder; + + #[allow(dead_code)] // Not always used, called to enable for debugging when needed + pub fn test_telem_console() { + telemetry_init_global( + TelemetryOptionsBuilder::default() + .logging(Logger::Console { + filter: construct_filter_string(Level::DEBUG, Level::WARN), + }) + .build() + .unwrap(), + ) + .unwrap(); + } +} diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 6089e15b9..4eb073fe6 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -591,7 +591,7 @@ pub(crate) fn build_mock_pollers(mut cfg: MockPollCfg) -> MocksHolder { for (_, tasks) in &mut resp_iter { // Must extract run id from a workflow task associated with this workflow // TODO: Case where run id changes for same workflow id is not handled here - if let Some(t) = tasks.get(0) { + if let Some(t) = tasks.front() { let rid = t.workflow_execution.as_ref().unwrap().run_id.clone(); if !outstanding.has_run(&rid) { let t = tasks.pop_front().unwrap(); diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 639c0c36e..32121c51c 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -102,15 +102,7 @@ pub struct Worker { #[async_trait::async_trait] impl WorkerTrait for Worker { async fn poll_workflow_activation(&self) -> Result { - self.next_workflow_activation().await.map(|mut a| { - // Attach this worker's Build ID to the activation if appropriate. This is done here - // to avoid cloning the ID for every workflow instance. Can be lowered when - // https://github.com/temporalio/sdk-core/issues/567 is done - if !a.is_replaying { - a.build_id_for_current_task = self.config.worker_build_id.clone(); - } - a - }) + self.next_workflow_activation().await } #[instrument(skip(self))] @@ -227,7 +219,7 @@ impl Worker { #[allow(clippy::too_many_arguments)] // Not much worth combining here pub(crate) fn new_with_pollers( - mut config: WorkerConfig, + config: WorkerConfig, sticky_queue_name: Option, client: Arc, task_pollers: TaskPollers, @@ -384,7 +376,7 @@ impl Worker { wf_client: client.clone(), workflows: Workflows::new( build_wf_basics( - &mut config, + config.clone(), metrics, shutdown_token.child_token(), client.capabilities().cloned().unwrap_or_default(), @@ -669,22 +661,16 @@ pub struct PostActivateHookData<'a> { } fn build_wf_basics( - config: &mut WorkerConfig, + config: WorkerConfig, metrics: MetricsContext, shutdown_token: CancellationToken, server_capabilities: get_system_info_response::Capabilities, ) -> WorkflowBasics { WorkflowBasics { - max_cached_workflows: config.max_cached_workflows, + worker_config: Arc::new(config), shutdown_token, metrics, - namespace: config.namespace.clone(), - task_queue: config.task_queue.clone(), - ignore_evicts_on_shutdown: config.ignore_evicts_on_shutdown, - fetching_concurrency: config.fetching_concurrency, server_capabilities, - #[cfg(feature = "save_wf_inputs")] - wf_state_inputs: config.wf_state_inputs.take(), } } diff --git a/core/src/worker/workflow/history_update.rs b/core/src/worker/workflow/history_update.rs index 819c8e3a2..06467a1cd 100644 --- a/core/src/worker/workflow/history_update.rs +++ b/core/src/worker/workflow/history_update.rs @@ -464,7 +464,7 @@ impl HistoryUpdate { self.previous_wft_started_id >= 0 } pub fn first_event_id(&self) -> Option { - self.events.get(0).map(|e| e.event_id) + self.events.first().map(|e| e.event_id) } #[cfg(debug_assertions)] diff --git a/core/src/worker/workflow/machines/signal_external_state_machine.rs b/core/src/worker/workflow/machines/signal_external_state_machine.rs index 59dae3c9e..5049df484 100644 --- a/core/src/worker/workflow/machines/signal_external_state_machine.rs +++ b/core/src/worker/workflow/machines/signal_external_state_machine.rs @@ -275,7 +275,7 @@ impl Cancellable for SignalExternalMachine { fn cancel(&mut self) -> Result, MachineError> { let res = OnEventWrapper::on_event_mut(self, SignalExternalMachineEvents::Cancel)?; let mut ret = vec![]; - match res.get(0) { + match res.first() { Some(SignalExternalCommand::Cancelled) => { ret = vec![ResolveSignalExternalWorkflow { seq: self.shared_state.seq, diff --git a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs index 67bc5c7c2..cf15c4ba6 100644 --- a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs +++ b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs @@ -344,7 +344,7 @@ mod tests { // Ensure the upsert command has an empty map when not using the patched command if !with_patched_cmd { mp.completion_asserts = Some(Box::new(|wftc| { - assert_matches!(wftc.commands.get(0).and_then(|c| c.attributes.as_ref()).unwrap(), + assert_matches!(wftc.commands.first().and_then(|c| c.attributes.as_ref()).unwrap(), Attributes::UpsertWorkflowSearchAttributesCommandAttributes(attrs) if attrs.search_attributes.as_ref().unwrap().indexed_fields.is_empty()) })); diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 3888d6c22..4481df11c 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -46,8 +46,10 @@ use std::{ hash::{Hash, Hasher}, iter::Peekable, rc::Rc, + sync::Arc, time::{Duration, Instant, SystemTime}, }; +use temporal_sdk_core_api::worker::WorkerConfig; use temporal_sdk_core_protos::{ coresdk::{ common::{NamespacedWorkflowExecution, VersioningIntent}, @@ -93,16 +95,12 @@ pub(crate) struct WorkflowMachines { pub last_processed_event: i64, /// True if the workflow is replaying from history pub replaying: bool, - /// Namespace this workflow exists in - namespace: String, /// Workflow identifier pub workflow_id: String, /// Workflow type identifier. (Function name, class, etc) pub workflow_type: String, /// Identifies the current run pub run_id: String, - /// The task queue this workflow is operating within - task_queue: String, /// Is set to true once we've seen the final event in workflow history, to avoid accidentally /// re-applying the final workflow task. pub have_seen_terminal_event: bool, @@ -160,6 +158,7 @@ pub(crate) struct WorkflowMachines { /// Metrics context pub metrics: MetricsContext, + worker_config: Arc, } #[derive(Debug, derive_more::Display)] @@ -246,11 +245,9 @@ impl WorkflowMachines { Self { last_history_from_server: basics.history, protocol_msgs: vec![], - namespace: basics.namespace, workflow_id: basics.workflow_id, workflow_type: basics.workflow_type, run_id: basics.run_id, - task_queue: basics.task_queue, drive_me: driven_wf, replaying, metrics: basics.metrics, @@ -277,6 +274,7 @@ impl WorkflowMachines { encountered_patch_markers: Default::default(), local_activity_data: LocalActivityData::default(), have_seen_terminal_event: false, + worker_config: basics.worker_config, } } @@ -420,9 +418,15 @@ impl WorkflowMachines { Some(workflow_activation_job::Variant::QueryWorkflow(_)) ) }); + let is_replaying = self.replaying || all_query; + let build_id_for_current_task = if is_replaying { + self.current_wft_build_id.clone().unwrap_or_default() + } else { + self.worker_config.worker_build_id.clone() + }; WorkflowActivation { timestamp: self.current_wf_time.map(Into::into), - is_replaying: self.replaying || all_query, + is_replaying, run_id: self.run_id.clone(), history_length: self.last_processed_event as u32, jobs, @@ -432,7 +436,7 @@ impl WorkflowMachines { .collect(), history_size_bytes: self.history_size_bytes, continue_as_new_suggested: self.continue_as_new_suggested, - build_id_for_current_task: self.current_wft_build_id.clone().unwrap_or_default(), + build_id_for_current_task, } } @@ -447,7 +451,13 @@ impl WorkflowMachines { .any(|v| v.is_la_resolution) } - pub(crate) fn get_metadata_for_wft_complete(&self) -> WorkflowTaskCompletedMetadata { + pub(crate) fn get_metadata_for_wft_complete(&mut self) -> WorkflowTaskCompletedMetadata { + // If this worker has a build ID and we're completing the task, we want to say our ID is the + // current build ID, so that if we get a query before any new history, we properly can + // report that our ID was the one used for the completion. + if !self.worker_config.worker_build_id.is_empty() { + self.current_wft_build_id = Some(self.worker_config.worker_build_id.clone()); + } (*self.observed_internal_flags) .borrow_mut() .gather_for_wft_complete() @@ -1371,7 +1381,7 @@ impl WorkflowMachines { } Some(cancel_we::Target::ChildWorkflowId(wfid)) => ( NamespacedWorkflowExecution { - namespace: self.namespace.clone(), + namespace: self.worker_config.namespace.clone(), workflow_id: wfid, run_id: "".to_string(), }, @@ -1392,7 +1402,7 @@ impl WorkflowMachines { WFCommand::SignalExternalWorkflow(attrs) => { let seq = attrs.seq; self.add_cmd_to_wf_task( - new_external_signal(attrs, &self.namespace)?, + new_external_signal(attrs, &self.worker_config.namespace)?, CommandID::SignalExternal(seq).into(), ); } @@ -1539,7 +1549,7 @@ impl WorkflowMachines { VersioningIntent::Unspecified => { // If the target TQ is empty, that means use same TQ. // When TQs match, use compat by default - target_tq.is_empty() || target_tq == self.task_queue + target_tq.is_empty() || target_tq == self.worker_config.task_queue } } } diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 0f8e838c6..c668f3b5f 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -58,7 +58,10 @@ use std::{ thread, time::{Duration, Instant}, }; -use temporal_sdk_core_api::errors::{CompleteWfError, PollWfError}; +use temporal_sdk_core_api::{ + errors::{CompleteWfError, PollWfError}, + worker::WorkerConfig, +}; use temporal_sdk_core_protos::{ coresdk::{ workflow_activation::{ @@ -127,24 +130,17 @@ pub(crate) struct Workflows { } pub(crate) struct WorkflowBasics { - pub max_cached_workflows: usize, + pub worker_config: Arc, pub shutdown_token: CancellationToken, pub metrics: MetricsContext, - pub namespace: String, - pub task_queue: String, - pub ignore_evicts_on_shutdown: bool, - pub fetching_concurrency: usize, pub server_capabilities: get_system_info_response::Capabilities, - #[cfg(feature = "save_wf_inputs")] - pub wf_state_inputs: Option>>, } pub(crate) struct RunBasics<'a> { - pub namespace: String, + pub worker_config: Arc, pub workflow_id: String, pub workflow_type: String, pub run_id: String, - pub task_queue: String, pub history: HistoryUpdate, pub metrics: MetricsContext, pub capabilities: &'a get_system_info_response::Capabilities, @@ -167,10 +163,10 @@ impl Workflows { let (local_tx, local_rx) = unbounded_channel(); let (fetch_tx, fetch_rx) = unbounded_channel(); let shutdown_tok = basics.shutdown_token.clone(); - let task_queue = basics.task_queue.clone(); + let task_queue = basics.worker_config.task_queue.clone(); let extracted_wft_stream = WFTExtractor::build( client.clone(), - basics.fetching_concurrency, + basics.worker_config.fetching_concurrency, wft_stream, UnboundedReceiverStream::new(fetch_rx), ); diff --git a/core/src/worker/workflow/run_cache.rs b/core/src/worker/workflow/run_cache.rs index 59384d9bc..fd970f475 100644 --- a/core/src/worker/workflow/run_cache.rs +++ b/core/src/worker/workflow/run_cache.rs @@ -7,13 +7,12 @@ use crate::{ MetricsContext, }; use lru::LruCache; -use std::{num::NonZeroUsize, rc::Rc}; +use std::{num::NonZeroUsize, rc::Rc, sync::Arc}; +use temporal_sdk_core_api::worker::WorkerConfig; use temporal_sdk_core_protos::temporal::api::workflowservice::v1::get_system_info_response; pub(super) struct RunCache { - max: usize, - namespace: String, - task_queue: String, + worker_config: Arc, server_capabilities: get_system_info_response::Capabilities, /// Run id -> Data runs: LruCache, @@ -24,24 +23,20 @@ pub(super) struct RunCache { impl RunCache { pub fn new( - max_cache_size: usize, - namespace: String, - task_queue: String, + worker_config: Arc, server_capabilities: get_system_info_response::Capabilities, local_activity_request_sink: impl LocalActivityRequestSink, metrics: MetricsContext, ) -> Self { // The cache needs room for at least one run, otherwise we couldn't do anything. In // "0" size mode, the run is evicted once the workflow task is complete. - let lru_size = if max_cache_size > 0 { - max_cache_size + let lru_size = if worker_config.max_cached_workflows > 0 { + worker_config.max_cached_workflows } else { 1 }; Self { - max: max_cache_size, - namespace, - task_queue, + worker_config, server_capabilities, runs: LruCache::new( NonZeroUsize::new(lru_size).expect("LRU size is guaranteed positive"), @@ -68,11 +63,10 @@ impl RunCache { .with_new_attrs([workflow_type(pwft.work.workflow_type.clone())]); let (mrh, rur) = ManagedRun::new( RunBasics { - namespace: self.namespace.clone(), + worker_config: self.worker_config.clone(), workflow_id: pwft.work.execution.workflow_id.clone(), workflow_type: pwft.work.workflow_type.clone(), run_id: run_id.clone(), - task_queue: self.task_queue.clone(), history: HistoryUpdate::dummy(), metrics, capabilities: &self.server_capabilities, @@ -124,6 +118,6 @@ impl RunCache { self.runs.len() } pub fn cache_capacity(&self) -> usize { - self.max + self.worker_config.max_cached_workflows } } diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index 7716de938..3c90d4369 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -95,21 +95,19 @@ impl WFStream { let mut state = WFStream { buffered_polls_need_cache_slot: Default::default(), runs: RunCache::new( - basics.max_cached_workflows, - basics.namespace.clone(), - basics.task_queue.clone(), + basics.worker_config.clone(), basics.server_capabilities.clone(), local_activity_request_sink, basics.metrics.clone(), ), shutdown_token: basics.shutdown_token, - ignore_evicts_on_shutdown: basics.ignore_evicts_on_shutdown, + ignore_evicts_on_shutdown: basics.worker_config.ignore_evicts_on_shutdown, metrics: basics.metrics, runs_needing_fetching: Default::default(), history_fetch_refcounter: Arc::new(HistfetchRC {}), #[cfg(feature = "save_wf_inputs")] - wf_state_inputs: basics.wf_state_inputs, + wf_state_inputs: basics.worker_config.wf_state_inputs.clone(), }; all_inputs .map(move |action: WFStreamInput| { diff --git a/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs b/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs index e455078d2..d9f035149 100644 --- a/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs +++ b/core/src/worker/workflow/workflow_stream/saved_wf_inputs.rs @@ -23,7 +23,7 @@ use tokio_util::sync::CancellationToken; /// /// Use `CoreWfStarter::enable_wf_state_input_recording` from the integration test utilities to /// activate saving the data to disk, and use the `wf_input_replay` example binary to replay. -pub async fn replay_wf_state_inputs(mut config: WorkerConfig, inputs: impl Stream>) { +pub async fn replay_wf_state_inputs(config: WorkerConfig, inputs: impl Stream>) { use crate::worker::build_wf_basics; let la_resp_q = Arc::new(SegQueue::new()); @@ -43,7 +43,7 @@ pub async fn replay_wf_state_inputs(mut config: WorkerConfig, inputs: impl Strea }) }); let basics = build_wf_basics( - &mut config, + config, MetricsContext::no_op(), CancellationToken::new(), DEFAULT_TEST_CAPABILITIES.clone(), diff --git a/sdk-core-protos/src/history_info.rs b/sdk-core-protos/src/history_info.rs index 5bdd565c7..62086cbf6 100644 --- a/sdk-core-protos/src/history_info.rs +++ b/sdk-core-protos/src/history_info.rs @@ -37,7 +37,7 @@ impl HistoryInfo { let mut workflow_task_started_event_id = 0; let mut wf_task_count = 0; let mut history = events.iter().peekable(); - let started_attrs = match &events.get(0).unwrap().attributes { + let started_attrs = match &events.first().unwrap().attributes { Some(history_event::Attributes::WorkflowExecutionStartedEventAttributes(attrs)) => { attrs.clone() } diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index 8d2e4d0f7..151838aae 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -336,7 +336,7 @@ pub mod coresdk { ) -> Option { details .get("data") - .and_then(|p| p.payloads.get(0)) + .and_then(|p| p.payloads.first()) .and_then(|p| std::str::from_utf8(&p.data).ok()) .and_then(|s| serde_json::from_str(s).ok()) } diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index ed1a85dc6..c5f6d97a1 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -236,7 +236,7 @@ impl WorkflowFuture { let defp = Payload::default(); let val_res = if u.run_validator { match panic::catch_unwind(AssertUnwindSafe(|| { - (impls.validator)(&info, u.input.get(0).unwrap_or(&defp)) + (impls.validator)(&info, u.input.first().unwrap_or(&defp)) })) { Ok(r) => r, Err(e) => { @@ -257,7 +257,7 @@ impl WorkflowFuture { wf_ctx: self.wf_ctx.clone(), info, }, - u.input.get(0).unwrap_or(&defp), + u.input.first().unwrap_or(&defp), ); self.update_futures .push((u.protocol_instance_id, handler_fut)); diff --git a/tests/integ_tests/visibility_tests.rs b/tests/integ_tests/visibility_tests.rs index c7840c8d7..4df85ef6f 100644 --- a/tests/integ_tests/visibility_tests.rs +++ b/tests/integ_tests/visibility_tests.rs @@ -35,8 +35,8 @@ async fn client_list_open_closed_workflow_executions() { // List above OPEN workflow let start_time_filter = StartTimeFilter { - earliest_time: Some(earliest).and_then(|t| t.try_into().ok()), - latest_time: Some(latest).and_then(|t| t.try_into().ok()), + earliest_time: Some(earliest).map(|t| t.into()), + latest_time: Some(latest).map(|t| t.into()), }; let filter = ListOpenFilters::ExecutionFilter(WorkflowExecutionFilter { workflow_id: wf_name.clone(), @@ -63,8 +63,8 @@ async fn client_list_open_closed_workflow_executions() { 1, Default::default(), Some(StartTimeFilter { - earliest_time: Some(earliest).and_then(|t| t.try_into().ok()), - latest_time: Some(latest).and_then(|t| t.try_into().ok()), + earliest_time: Some(earliest).map(|t| t.into()), + latest_time: Some(latest).map(|t| t.into()), }), Some(ListClosedFilters::ExecutionFilter( WorkflowExecutionFilter { diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index aa958c62e..ff3392c53 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -35,17 +35,19 @@ use temporal_sdk_core_protos::{ coresdk::{ activity_result::ActivityExecutionResult, workflow_activation::{workflow_activation_job, WorkflowActivation, WorkflowActivationJob}, - workflow_commands::{ActivityCancellationType, FailWorkflowExecution, StartTimer}, + workflow_commands::{ + ActivityCancellationType, FailWorkflowExecution, QueryResult, QuerySuccess, StartTimer, + }, workflow_completion::WorkflowActivationCompletion, ActivityTaskCompletion, AsJsonPayloadExt, IntoCompletion, }, - temporal::api::{failure::v1::Failure, history::v1::history_event}, + temporal::api::{failure::v1::Failure, history::v1::history_event, query::v1::WorkflowQuery}, }; use temporal_sdk_core_test_utils::{ drain_pollers_and_shutdown, history_from_proto_binary, init_core_and_create_wf, init_core_replay_preloaded, schedule_activity_cmd, CoreWfStarter, WorkerTestHelpers, }; -use tokio::time::sleep; +use tokio::{join, time::sleep}; use uuid::Uuid; // TODO: We should get expected histories for these tests and confirm that the history at the end @@ -592,3 +594,139 @@ async fn slow_completes_with_small_cache() { .await .unwrap(); } + +#[tokio::test] +async fn build_id_correct_in_wf_info() { + let wf_type = "build_id_correct_in_wf_info"; + let mut starter = CoreWfStarter::new(wf_type); + starter + .worker_config + .worker_build_id("1.0") + .no_remote_activities(true); + let core = starter.get_worker().await; + starter.start_wf().await; + let client = starter.get_client().await; + let workflow_id = starter.get_task_queue().to_string(); + + let res = core.poll_workflow_activation().await.unwrap(); + assert_eq!(res.build_id_for_current_task, "1.0"); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( + res.run_id.clone(), + vec![], + )) + .await + .unwrap(); + + // Ensure a query on first wft also sees the correct id + let query_fut = async { + client + .query_workflow_execution( + workflow_id.clone(), + res.run_id.to_string(), + WorkflowQuery { + query_type: "q1".to_string(), + ..Default::default() + }, + ) + .await + .unwrap() + }; + let complete_fut = async { + let task = core.poll_workflow_activation().await.unwrap(); + let query = assert_matches!( + task.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), + }] => q + ); + assert_eq!(task.build_id_for_current_task, "1.0"); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + QueryResult { + query_id: query.query_id.clone(), + variant: Some( + QuerySuccess { + response: Some("done".into()), + } + .into(), + ), + } + .into(), + )) + .await + .unwrap(); + }; + join!(query_fut, complete_fut); + starter.shutdown().await; + client + .reset_sticky_task_queue(workflow_id.clone(), "".to_string()) + .await + .unwrap(); + + let mut starter = starter.clone_no_worker(); + starter.worker_config.worker_build_id("2.0"); + let core = starter.get_worker().await; + + let query_fut = async { + client + .query_workflow_execution( + workflow_id.clone(), + res.run_id.to_string(), + WorkflowQuery { + query_type: "q2".to_string(), + ..Default::default() + }, + ) + .await + .unwrap() + }; + let complete_fut = async { + let res = core.poll_workflow_activation().await.unwrap(); + assert_eq!(res.build_id_for_current_task, "1.0"); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( + res.run_id.clone(), + vec![], + )) + .await + .unwrap(); + let task = core.poll_workflow_activation().await.unwrap(); + let query = assert_matches!( + task.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), + }] => q + ); + assert_eq!(task.build_id_for_current_task, "1.0"); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + QueryResult { + query_id: query.query_id.clone(), + variant: Some( + QuerySuccess { + response: Some("done".into()), + } + .into(), + ), + } + .into(), + )) + .await + .unwrap(); + }; + join!(query_fut, complete_fut); + + client + .signal_workflow_execution( + workflow_id.clone(), + "".to_string(), + "whatever".to_string(), + None, + None, + ) + .await + .unwrap(); + + let res = core.poll_workflow_activation().await.unwrap(); + assert_eq!(res.build_id_for_current_task, "2.0"); + core.complete_execution(&res.run_id).await; +}