From 1739c7427193bcc577cb1ee3f21c190fe0c73a20 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Jan 2024 14:00:56 -0800 Subject: [PATCH 1/2] Add repro test --- test-utils/src/lib.rs | 4 +- tests/integ_tests/workflow_tests/patches.rs | 49 +++++++++++++++++++++ 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 17eddc6d7..97cdd076f 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -240,7 +240,9 @@ impl CoreWfStarter { /// Start the workflow defined by the builder and return run id pub async fn start_wf(&mut self) -> String { - self.start_wf_with_id(self.task_queue_name.clone()).await + let mut worker = self.worker().await; + self.start_with_worker(self.task_queue_name.clone(), &mut worker) + .await } pub async fn start_with_worker( diff --git a/tests/integ_tests/workflow_tests/patches.rs b/tests/integ_tests/workflow_tests/patches.rs index 12482e1a2..1a15610ba 100644 --- a/tests/integ_tests/workflow_tests/patches.rs +++ b/tests/integ_tests/workflow_tests/patches.rs @@ -5,6 +5,9 @@ use std::{ }, time::Duration, }; +use temporal_client::WorkflowClientTrait; +use tokio::{join, sync::Notify}; +use tokio_stream::StreamExt; use temporal_sdk::{WfContext, WorkflowResult}; use temporal_sdk_core_test_utils::CoreWfStarter; @@ -151,3 +154,49 @@ async fn can_remove_deprecated_patch_near_other_patch() { starter.start_with_worker(wf_name, &mut worker).await; worker.run_until_done().await.unwrap(); } + +#[tokio::test] +async fn deprecated_patch_removal() { + let wf_name = "deprecated_patch_removal"; + let mut starter = CoreWfStarter::new(wf_name); + starter.no_remote_activities(); + let mut worker = starter.worker().await; + let client = starter.get_client().await; + let wf_id = starter.get_task_queue().to_string(); + let did_die = Arc::new(AtomicBool::new(false)); + let send_sig = Arc::new(Notify::new()); + let send_sig_c = send_sig.clone(); + worker.fetch_results = false; + worker.register_wf(wf_id.clone(), move |ctx: WfContext| { + let did_die = did_die.clone(); + let send_sig_c = send_sig_c.clone(); + async move { + if !did_die.load(Ordering::Acquire) { + assert!(ctx.deprecate_patch("getting-deprecated")); + } + send_sig_c.notify_one(); + ctx.make_signal_channel("sig").next().await; + + ctx.timer(Duration::from_millis(1)).await; + + if !did_die.load(Ordering::Acquire) { + did_die.store(true, Ordering::Release); + ctx.force_task_fail(anyhow::anyhow!("i'm ded")); + } + Ok(().into()) + } + }); + + starter.start_wf().await; + let sig_fut = async { + send_sig.notified().await; + client + .signal_workflow_execution(wf_id, "".to_string(), "sig".to_string(), None, None) + .await + .unwrap() + }; + let run_fut = async { + worker.run_until_done().await.unwrap(); + }; + join!(sig_fut, run_fut); +} From 1a3318ccac98c987271387640ad2a3b4cb703f7a Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Jan 2024 14:31:56 -0800 Subject: [PATCH 2/2] Fix no-new-command causing issues with deprecated patches --- client/src/workflow_handle/mod.rs | 4 +++ .../workflow/machines/workflow_machines.rs | 26 +++++++++---------- test-utils/src/lib.rs | 4 +-- tests/integ_tests/workflow_tests/patches.rs | 5 ++-- 4 files changed, 19 insertions(+), 20 deletions(-) diff --git a/client/src/workflow_handle/mod.rs b/client/src/workflow_handle/mod.rs index 54bbb0bad..db42c2e82 100644 --- a/client/src/workflow_handle/mod.rs +++ b/client/src/workflow_handle/mod.rs @@ -91,6 +91,10 @@ where } } + pub fn info(&self) -> &WorkflowExecutionInfo { + &self.info + } + pub async fn get_workflow_result( &self, opts: GetWorkflowResultOpts, diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index cd1ca526a..79062ecc2 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -861,16 +861,14 @@ impl WorkflowMachines { let event_id = event.event_id; let consumed_cmd = loop { - if let Some(peek_machine) = self.commands.front() { - let mach = self.machine(peek_machine.machine); - match patch_marker_handling(event, mach, next_event)? { - EventHandlingOutcome::SkipCommand => { - self.commands.pop_front(); - continue; - } - eho @ EventHandlingOutcome::SkipEvent { .. } => return Ok(eho), - EventHandlingOutcome::Normal => {} + let maybe_machine = self.commands.front().map(|mk| self.machine(mk.machine)); + match patch_marker_handling(event, maybe_machine, next_event)? { + EventHandlingOutcome::SkipCommand => { + self.commands.pop_front(); + continue; } + eho @ EventHandlingOutcome::SkipEvent { .. } => return Ok(eho), + EventHandlingOutcome::Normal => {} } let maybe_command = self.commands.pop_front(); @@ -1598,11 +1596,11 @@ enum EventHandlingOutcome { /// [WorkflowMachines::handle_command_event] fn patch_marker_handling( event: &HistoryEvent, - mach: &Machines, + mach: Option<&Machines>, next_event: Option<&HistoryEvent>, ) -> Result { let patch_machine = match mach { - Machines::PatchMachine(pm) => Some(pm), + Some(Machines::PatchMachine(pm)) => Some(pm), _ => None, }; let patch_details = event.get_patch_marker_details(); @@ -1633,9 +1631,9 @@ fn patch_marker_handling( Ok(EventHandlingOutcome::Normal) } } else { - // Version markers can be skipped in the event they are deprecated - // Is deprecated. We can simply ignore this event, as deprecated change - // markers are allowed without matching changed calls. + // Version markers can be skipped in the event they are deprecated. We can simply + // ignore this event, as deprecated change markers are allowed without matching changed + // calls. if deprecated { debug!("Deprecated patch marker tried against non-patch machine, skipping."); skip_one_or_two_events(next_event) diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 97cdd076f..17eddc6d7 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -240,9 +240,7 @@ impl CoreWfStarter { /// Start the workflow defined by the builder and return run id pub async fn start_wf(&mut self) -> String { - let mut worker = self.worker().await; - self.start_with_worker(self.task_queue_name.clone(), &mut worker) - .await + self.start_wf_with_id(self.task_queue_name.clone()).await } pub async fn start_with_worker( diff --git a/tests/integ_tests/workflow_tests/patches.rs b/tests/integ_tests/workflow_tests/patches.rs index 1a15610ba..a992dfd86 100644 --- a/tests/integ_tests/workflow_tests/patches.rs +++ b/tests/integ_tests/workflow_tests/patches.rs @@ -166,8 +166,7 @@ async fn deprecated_patch_removal() { let did_die = Arc::new(AtomicBool::new(false)); let send_sig = Arc::new(Notify::new()); let send_sig_c = send_sig.clone(); - worker.fetch_results = false; - worker.register_wf(wf_id.clone(), move |ctx: WfContext| { + worker.register_wf(wf_name, move |ctx: WfContext| { let did_die = did_die.clone(); let send_sig_c = send_sig_c.clone(); async move { @@ -187,7 +186,7 @@ async fn deprecated_patch_removal() { } }); - starter.start_wf().await; + starter.start_with_worker(wf_name, &mut worker).await; let sig_fut = async { send_sig.notified().await; client