From 75800f1c399b1d2b1e86b4cfd6fa81f913a085b6 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 10 Dec 2024 09:06:42 -0800 Subject: [PATCH] Move user metadata to command from variant (#851) --- core/src/abstractions.rs | 2 +- core/src/core_tests/updates.rs | 1 - core/src/core_tests/workers.rs | 2 - core/src/telemetry/log_export.rs | 2 +- core/src/telemetry/mod.rs | 2 + core/src/test_help/mod.rs | 1 + core/src/worker/workflow/driven_workflow.rs | 9 +- .../machines/activity_state_machine.rs | 16 +- .../machines/cancel_external_state_machine.rs | 9 +- .../machines/cancel_workflow_state_machine.rs | 12 +- .../machines/child_workflow_state_machine.rs | 24 +-- .../complete_workflow_state_machine.rs | 37 ++-- .../continue_as_new_workflow_state_machine.rs | 9 +- .../machines/fail_workflow_state_machine.rs | 13 +- core/src/worker/workflow/machines/mod.rs | 3 +- ...odify_workflow_properties_state_machine.rs | 17 +- .../workflow/machines/patch_state_machine.rs | 24 +-- .../machines/signal_external_state_machine.rs | 10 +- .../workflow/machines/timer_state_machine.rs | 34 ++-- .../upsert_search_attributes_state_machine.rs | 22 +-- .../workflow/machines/workflow_machines.rs | 108 +++++++---- core/src/worker/workflow/managed_run.rs | 65 ++++--- core/src/worker/workflow/mod.rs | 86 ++++++--- .../workflow_commands/workflow_commands.proto | 14 +- sdk-core-protos/src/lib.rs | 86 ++++++++- sdk/src/lib.rs | 5 +- sdk/src/workflow_context.rs | 79 +++++--- sdk/src/workflow_context/options.rs | 182 ++++++++++-------- sdk/src/workflow_future.rs | 170 ++++++++-------- test-utils/src/lib.rs | 2 - tests/integ_tests/polling_tests.rs | 1 - tests/integ_tests/queries_tests.rs | 2 - tests/integ_tests/workflow_tests.rs | 2 - .../integ_tests/workflow_tests/activities.rs | 5 - tests/integ_tests/workflow_tests/replay.rs | 1 - tests/integ_tests/workflow_tests/timers.rs | 3 - 36 files changed, 574 insertions(+), 486 deletions(-) diff --git a/core/src/abstractions.rs b/core/src/abstractions.rs index 3bb06ca25..58e15c5af 100644 --- a/core/src/abstractions.rs +++ b/core/src/abstractions.rs @@ -189,7 +189,7 @@ struct UseCtx<'a, SK: SlotKind> { permit: &'a SlotSupplierPermit, } -impl<'a, SK: SlotKind> SlotMarkUsedContext for UseCtx<'a, SK> { +impl SlotMarkUsedContext for UseCtx<'_, SK> { type SlotKind = SK; fn permit(&self) -> &SlotSupplierPermit { diff --git a/core/src/core_tests/updates.rs b/core/src/core_tests/updates.rs index 727dbb367..2c565525b 100644 --- a/core/src/core_tests/updates.rs +++ b/core/src/core_tests/updates.rs @@ -291,7 +291,6 @@ async fn replay_with_signal_and_update_same_task() { StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), - summary: None, } .into(), UpdateResponse { diff --git a/core/src/core_tests/workers.rs b/core/src/core_tests/workers.rs index ce2f16335..00b70e184 100644 --- a/core/src/core_tests/workers.rs +++ b/core/src/core_tests/workers.rs @@ -45,7 +45,6 @@ async fn after_shutdown_of_worker_get_shutdown_err() { workflow_command::Variant::StartTimer(StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), - summary: None, }), )) .await @@ -349,7 +348,6 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool) workflow_command::Variant::StartTimer(StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), - summary: None, }), )) .await diff --git a/core/src/telemetry/log_export.rs b/core/src/telemetry/log_export.rs index 06a3ca992..d4e2afd0f 100644 --- a/core/src/telemetry/log_export.rs +++ b/core/src/telemetry/log_export.rs @@ -168,7 +168,7 @@ impl fmt::Debug for CoreLogStreamConsumer { struct JsonVisitor<'a>(&'a mut HashMap); -impl<'a> tracing::field::Visit for JsonVisitor<'a> { +impl tracing::field::Visit for JsonVisitor<'_> { fn record_f64(&mut self, field: &tracing::field::Field, value: f64) { self.0 .insert(field.name().to_string(), serde_json::json!(value)); diff --git a/core/src/telemetry/mod.rs b/core/src/telemetry/mod.rs index 38e0f304b..b6c432422 100644 --- a/core/src/telemetry/mod.rs +++ b/core/src/telemetry/mod.rs @@ -276,11 +276,13 @@ where } } +/// Helpers for test initialization #[cfg(test)] pub mod test_initters { use super::*; use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder; + /// Turn on logging to the console #[allow(dead_code)] // Not always used, called to enable for debugging when needed pub fn test_telem_console() { telemetry_init_global( diff --git a/core/src/test_help/mod.rs b/core/src/test_help/mod.rs index 7ae7dddb2..4f0f2a97f 100644 --- a/core/src/test_help/mod.rs +++ b/core/src/test_help/mod.rs @@ -1026,6 +1026,7 @@ macro_rules! advance_fut { }; } +/// Helps easily construct prost proto durations from stdlib duration constructors #[macro_export] macro_rules! prost_dur { ($dur_call:ident $args:tt) => { diff --git a/core/src/worker/workflow/driven_workflow.rs b/core/src/worker/workflow/driven_workflow.rs index 7c9966b25..48f7a40f1 100644 --- a/core/src/worker/workflow/driven_workflow.rs +++ b/core/src/worker/workflow/driven_workflow.rs @@ -1,6 +1,6 @@ use crate::{ telemetry::VecDisplayer, - worker::workflow::{OutgoingJob, WFCommand, WorkflowStartedInfo}, + worker::workflow::{OutgoingJob, WFCommand, WFCommandVariant, WorkflowStartedInfo}, }; use prost_types::Timestamp; use std::{ @@ -86,7 +86,12 @@ impl DrivenWorkflow { /// from a buffer that the language side sinks into when it calls [crate::Core::complete_task] pub(super) fn fetch_workflow_iteration_output(&mut self) -> Vec { let in_cmds = self.incoming_commands.try_recv(); - let in_cmds = in_cmds.unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang]); + let in_cmds = in_cmds.unwrap_or_else(|_| { + vec![WFCommand { + variant: WFCommandVariant::NoCommandsFromLang, + metadata: None, + }] + }); debug!(in_cmds = %in_cmds.display(), "wf bridge iteration fetch"); in_cmds } diff --git a/core/src/worker/workflow/machines/activity_state_machine.rs b/core/src/worker/workflow/machines/activity_state_machine.rs index 45e79b8a8..a7925e961 100644 --- a/core/src/worker/workflow/machines/activity_state_machine.rs +++ b/core/src/worker/workflow/machines/activity_state_machine.rs @@ -30,7 +30,6 @@ use temporal_sdk_core_protos::{ ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, ActivityTaskTimedOutEventAttributes, }, - sdk::v1::UserMetadata, }, }; @@ -115,10 +114,6 @@ impl ActivityMachine { internal_flags: InternalFlagsRef, use_compatible_version: bool, ) -> NewMachineWithCommand { - let user_metadata = attrs.summary.clone().map(|x| UserMetadata { - summary: Some(x), - details: None, - }); let mut s = Self::from_parts( Created {}.into(), SharedState { @@ -133,16 +128,11 @@ impl ActivityMachine { ); OnEventWrapper::on_event_mut(&mut s, ActivityMachineEvents::Schedule) .expect("Scheduling activities doesn't fail"); - let command = Command { - command_type: CommandType::ScheduleActivityTask as i32, - attributes: Some(schedule_activity_cmd_to_api( + NewMachineWithCommand { + command: schedule_activity_cmd_to_api( s.shared_state().attrs.clone(), use_compatible_version, - )), - user_metadata, - }; - NewMachineWithCommand { - command, + ), machine: s.into(), } } diff --git a/core/src/worker/workflow/machines/cancel_external_state_machine.rs b/core/src/worker/workflow/machines/cancel_external_state_machine.rs index c0f232583..bb07d689c 100644 --- a/core/src/worker/workflow/machines/cancel_external_state_machine.rs +++ b/core/src/worker/workflow/machines/cancel_external_state_machine.rs @@ -11,7 +11,7 @@ use temporal_sdk_core_protos::{ workflow_activation::ResolveRequestCancelExternalWorkflow, }, temporal::api::{ - command::v1::{command, Command, RequestCancelExternalWorkflowExecutionCommandAttributes}, + command::v1::{command, RequestCancelExternalWorkflowExecutionCommandAttributes}, enums::v1::{CancelExternalWorkflowExecutionFailedCause, CommandType, EventType}, failure::v1::{failure::FailureInfo, ApplicationFailureInfo, Failure}, history::v1::history_event, @@ -75,13 +75,8 @@ pub(super) fn new_external_cancel( reason, }, ); - let cmd = Command { - command_type: CommandType::RequestCancelExternalWorkflowExecution as i32, - attributes: Some(cmd_attrs), - user_metadata: Default::default(), - }; NewMachineWithCommand { - command: cmd, + command: cmd_attrs, machine: s.into(), } } diff --git a/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs b/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs index c4e6d336e..1ec9a9486 100644 --- a/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/cancel_workflow_state_machine.rs @@ -7,10 +7,7 @@ use rustfsm::{fsm, StateMachine, TransitionResult}; use std::convert::TryFrom; use temporal_sdk_core_protos::{ coresdk::workflow_commands::CancelWorkflowExecution, - temporal::api::{ - command::v1::Command, - enums::v1::{CommandType, EventType}, - }, + temporal::api::enums::v1::{CommandType, EventType}, }; fsm! { @@ -34,13 +31,8 @@ pub(super) fn cancel_workflow(attribs: CancelWorkflowExecution) -> NewMachineWit let mut machine = CancelWorkflowMachine::from_parts(Created {}.into(), ()); OnEventWrapper::on_event_mut(&mut machine, CancelWorkflowMachineEvents::Schedule) .expect("Scheduling continue as new machine doesn't fail"); - let command = Command { - command_type: CommandType::CancelWorkflowExecution as i32, - attributes: Some(attribs.into()), - user_metadata: Default::default(), - }; NewMachineWithCommand { - command, + command: attribs.into(), machine: machine.into(), } } diff --git a/core/src/worker/workflow/machines/child_workflow_state_machine.rs b/core/src/worker/workflow/machines/child_workflow_state_machine.rs index 60760e3a4..e3505c9f3 100644 --- a/core/src/worker/workflow/machines/child_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/child_workflow_state_machine.rs @@ -27,7 +27,7 @@ use temporal_sdk_core_protos::{ }, temporal::api::{ command::v1::{ - start_child_workflow_cmd_to_api, Command, + start_child_workflow_cmd_to_api, RequestCancelExternalWorkflowExecutionCommandAttributes, }, common::v1::{Payload, Payloads, WorkflowExecution, WorkflowType}, @@ -41,7 +41,6 @@ use temporal_sdk_core_protos::{ ChildWorkflowExecutionStartedEventAttributes, StartChildWorkflowExecutionFailedEventAttributes, }, - sdk::v1::UserMetadata, }, }; @@ -450,29 +449,10 @@ impl ChildWorkflowMachine { cancelled_before_sent: false, }, ); - let mut attribs = attribs; - let user_metadata = if attribs.static_summary.is_some() || attribs.static_details.is_some() - { - Some(UserMetadata { - summary: attribs.static_summary.take(), - details: attribs.static_details.take(), - }) - } else { - None - }; - let attribs = attribs; OnEventWrapper::on_event_mut(&mut s, ChildWorkflowMachineEvents::Schedule) .expect("Scheduling child workflows doesn't fail"); - let cmd = Command { - command_type: CommandType::StartChildWorkflowExecution as i32, - attributes: Some(start_child_workflow_cmd_to_api( - attribs, - use_compatible_version, - )), - user_metadata, - }; NewMachineWithCommand { - command: cmd, + command: start_child_workflow_cmd_to_api(attribs, use_compatible_version), machine: s.into(), } } diff --git a/core/src/worker/workflow/machines/complete_workflow_state_machine.rs b/core/src/worker/workflow/machines/complete_workflow_state_machine.rs index 3675916bf..af26473b2 100644 --- a/core/src/worker/workflow/machines/complete_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/complete_workflow_state_machine.rs @@ -8,7 +8,7 @@ use std::convert::TryFrom; use temporal_sdk_core_protos::{ coresdk::workflow_commands::CompleteWorkflowExecution, temporal::api::{ - command::v1::Command, + command::v1::command, enums::v1::{CommandType, EventType}, }, }; @@ -30,34 +30,26 @@ fsm! { #[derive(Debug, derive_more::Display)] pub(super) enum CompleteWFCommand { - AddCommand(Command), + AddCommand(command::Attributes), } /// Complete a workflow pub(super) fn complete_workflow(attribs: CompleteWorkflowExecution) -> NewMachineWithCommand { - let (machine, add_cmd) = CompleteWorkflowMachine::new_scheduled(attribs); + let mut machine = CompleteWorkflowMachine::from_parts(Created { attribs }.into(), ()); + let add_cmd = + match OnEventWrapper::on_event_mut(&mut machine, CompleteWorkflowMachineEvents::Schedule) + .expect("Scheduling complete wf machines doesn't fail") + .pop() + { + Some(CompleteWFCommand::AddCommand(c)) => c, + _ => panic!("complete wf machine on_schedule must produce command"), + }; NewMachineWithCommand { command: add_cmd, machine: machine.into(), } } -impl CompleteWorkflowMachine { - /// Create a new WF machine and schedule it - pub(crate) fn new_scheduled(attribs: CompleteWorkflowExecution) -> (Self, Command) { - let mut s = Self::from_parts(Created { attribs }.into(), ()); - let cmd = - match OnEventWrapper::on_event_mut(&mut s, CompleteWorkflowMachineEvents::Schedule) - .expect("Scheduling complete wf machines doesn't fail") - .pop() - { - Some(CompleteWFCommand::AddCommand(c)) => c, - _ => panic!("complete wf machine on_schedule must produce command"), - }; - (s, cmd) - } -} - impl TryFrom for CompleteWorkflowMachineEvents { type Error = WFMachinesError; @@ -94,12 +86,7 @@ impl Created { pub(super) fn on_schedule( self, ) -> CompleteWorkflowMachineTransition { - let cmd = Command { - command_type: CommandType::CompleteWorkflowExecution as i32, - attributes: Some(self.attribs.into()), - user_metadata: Default::default(), - }; - TransitionResult::commands(vec![CompleteWFCommand::AddCommand(cmd)]) + TransitionResult::commands(vec![CompleteWFCommand::AddCommand(self.attribs.into())]) } } diff --git a/core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs b/core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs index a010f470c..067d0c3ec 100644 --- a/core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/continue_as_new_workflow_state_machine.rs @@ -8,7 +8,7 @@ use std::convert::TryFrom; use temporal_sdk_core_protos::{ coresdk::workflow_commands::ContinueAsNewWorkflowExecution, temporal::api::{ - command::v1::{continue_as_new_cmd_to_api, Command}, + command::v1::continue_as_new_cmd_to_api, enums::v1::{CommandType, EventType}, }, }; @@ -37,13 +37,8 @@ pub(super) fn continue_as_new( let mut machine = ContinueAsNewWorkflowMachine::from_parts(Created {}.into(), ()); OnEventWrapper::on_event_mut(&mut machine, ContinueAsNewWorkflowMachineEvents::Schedule) .expect("Scheduling continue as new machine doesn't fail"); - let command = Command { - command_type: CommandType::ContinueAsNewWorkflowExecution as i32, - attributes: Some(continue_as_new_cmd_to_api(attribs, use_compatible_version)), - user_metadata: Default::default(), - }; NewMachineWithCommand { - command, + command: continue_as_new_cmd_to_api(attribs, use_compatible_version), machine: machine.into(), } } diff --git a/core/src/worker/workflow/machines/fail_workflow_state_machine.rs b/core/src/worker/workflow/machines/fail_workflow_state_machine.rs index 92665e08f..996456057 100644 --- a/core/src/worker/workflow/machines/fail_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/fail_workflow_state_machine.rs @@ -8,7 +8,7 @@ use std::convert::TryFrom; use temporal_sdk_core_protos::{ coresdk::workflow_commands::FailWorkflowExecution, temporal::api::{ - command::v1::Command as ProtoCommand, + command::v1::command, enums::v1::{CommandType, EventType}, }, }; @@ -27,7 +27,7 @@ fsm! { #[derive(Debug, derive_more::Display)] pub(super) enum FailWFCommand { - AddCommand(ProtoCommand), + AddCommand(command::Attributes), } /// Fail a workflow @@ -41,7 +41,7 @@ pub(super) fn fail_workflow(attribs: FailWorkflowExecution) -> NewMachineWithCom impl FailWorkflowMachine { /// Create a new WF machine and schedule it - pub(crate) fn new_scheduled(attribs: FailWorkflowExecution) -> (Self, ProtoCommand) { + pub(crate) fn new_scheduled(attribs: FailWorkflowExecution) -> (Self, command::Attributes) { let mut s = Self::from_parts(Created { attribs }.into(), ()); let cmd = match OnEventWrapper::on_event_mut(&mut s, FailWorkflowMachineEvents::Schedule) .expect("Scheduling fail wf machines doesn't fail") @@ -61,12 +61,7 @@ pub(super) struct Created { impl Created { pub(super) fn on_schedule(self) -> FailWorkflowMachineTransition { - let cmd = ProtoCommand { - command_type: CommandType::FailWorkflowExecution as i32, - attributes: Some(self.attribs.into()), - user_metadata: Default::default(), - }; - TransitionResult::commands(vec![FailWFCommand::AddCommand(cmd)]) + TransitionResult::commands(vec![FailWFCommand::AddCommand(self.attribs.into())]) } } diff --git a/core/src/worker/workflow/machines/mod.rs b/core/src/worker/workflow/machines/mod.rs index 6fbc3a522..dcbe9d439 100644 --- a/core/src/worker/workflow/machines/mod.rs +++ b/core/src/worker/workflow/machines/mod.rs @@ -42,7 +42,6 @@ use std::{ fmt::{Debug, Display}, }; use temporal_sdk_core_protos::temporal::api::{ - command::v1::Command as ProtoCommand, enums::v1::{CommandType, EventType}, history::v1::HistoryEvent, }; @@ -310,7 +309,7 @@ where } struct NewMachineWithCommand { - command: ProtoCommand, + command: temporal_sdk_core_protos::temporal::api::command::v1::command::Attributes, machine: Machines, } diff --git a/core/src/worker/workflow/machines/modify_workflow_properties_state_machine.rs b/core/src/worker/workflow/machines/modify_workflow_properties_state_machine.rs index f39646e71..26ce38c80 100644 --- a/core/src/worker/workflow/machines/modify_workflow_properties_state_machine.rs +++ b/core/src/worker/workflow/machines/modify_workflow_properties_state_machine.rs @@ -6,10 +6,7 @@ use crate::worker::workflow::{ use rustfsm::{fsm, StateMachine, TransitionResult}; use temporal_sdk_core_protos::{ coresdk::workflow_commands::ModifyWorkflowProperties, - temporal::api::{ - command::v1::Command, - enums::v1::{CommandType, EventType}, - }, + temporal::api::enums::v1::{CommandType, EventType}, }; fsm! { @@ -28,13 +25,8 @@ pub(super) fn modify_workflow_properties( lang_cmd: ModifyWorkflowProperties, ) -> NewMachineWithCommand { let sm = ModifyWorkflowPropertiesMachine::from_parts(Created {}.into(), ()); - let cmd = Command { - command_type: CommandType::ModifyWorkflowProperties as i32, - attributes: Some(lang_cmd.into()), - user_metadata: Default::default(), - }; NewMachineWithCommand { - command: cmd, + command: lang_cmd.into(), machine: sm.into(), } } @@ -119,7 +111,10 @@ mod tests { }; use temporal_sdk::WfContext; use temporal_sdk_core_protos::{ - temporal::api::{command::v1::command, common::v1::Payload}, + temporal::api::{ + command::v1::{command, Command}, + common::v1::Payload, + }, DEFAULT_WORKFLOW_TYPE, }; diff --git a/core/src/worker/workflow/machines/patch_state_machine.rs b/core/src/worker/workflow/machines/patch_state_machine.rs index 938ed8f98..410d7a210 100644 --- a/core/src/worker/workflow/machines/patch_state_machine.rs +++ b/core/src/worker/workflow/machines/patch_state_machine.rs @@ -42,7 +42,7 @@ use temporal_sdk_core_protos::{ coresdk::{common::build_has_change_marker_details, AsJsonPayloadExt}, temporal::api::{ command::v1::{ - Command, RecordMarkerCommandAttributes, UpsertWorkflowSearchAttributesCommandAttributes, + RecordMarkerCommandAttributes, UpsertWorkflowSearchAttributesCommandAttributes, }, common::v1::SearchAttributes, enums::v1::CommandType, @@ -102,20 +102,14 @@ pub(super) fn has_change<'a>( } else { Executing {}.into() }; - let command = Command { - command_type: CommandType::RecordMarker as i32, - attributes: Some( - RecordMarkerCommandAttributes { - marker_name: PATCH_MARKER_NAME.to_string(), - details: build_has_change_marker_details(&shared_state.patch_id, deprecated) - .context("While encoding patch marker details")?, - header: None, - failure: None, - } - .into(), - ), - user_metadata: Default::default(), - }; + let command = RecordMarkerCommandAttributes { + marker_name: PATCH_MARKER_NAME.to_string(), + details: build_has_change_marker_details(&shared_state.patch_id, deprecated) + .context("While encoding patch marker details")?, + header: None, + failure: None, + } + .into(); let mut machine = PatchMachine::from_parts(initial_state, shared_state); OnEventWrapper::on_event_mut(&mut machine, PatchMachineEvents::Schedule) diff --git a/core/src/worker/workflow/machines/signal_external_state_machine.rs b/core/src/worker/workflow/machines/signal_external_state_machine.rs index 8d4967f59..c827f4d3e 100644 --- a/core/src/worker/workflow/machines/signal_external_state_machine.rs +++ b/core/src/worker/workflow/machines/signal_external_state_machine.rs @@ -15,7 +15,7 @@ use temporal_sdk_core_protos::{ IntoPayloadsExt, }, temporal::api::{ - command::v1::{command, Command, SignalExternalWorkflowExecutionCommandAttributes}, + command::v1::{command, SignalExternalWorkflowExecutionCommandAttributes}, common::v1::WorkflowExecution as UpstreamWE, enums::v1::{CommandType, EventType, SignalExternalWorkflowExecutionFailedCause}, failure::v1::{failure::FailureInfo, ApplicationFailureInfo, CanceledFailureInfo, Failure}, @@ -109,13 +109,8 @@ pub(super) fn new_external_signal( child_workflow_only: only_child, }, ); - let cmd = Command { - command_type: CommandType::SignalExternalWorkflowExecution as i32, - attributes: Some(cmd_attrs), - user_metadata: Default::default(), - }; Ok(NewMachineWithCommand { - command: cmd, + command: cmd_attrs, machine: s.into(), }) } @@ -305,6 +300,7 @@ mod tests { use temporal_sdk::{CancellableFuture, SignalWorkflowOptions, WfContext, WorkflowResult}; use temporal_sdk_core_protos::{ coresdk::workflow_activation::{workflow_activation_job, WorkflowActivationJob}, + temporal::api::command::v1::Command, DEFAULT_WORKFLOW_TYPE, }; use temporal_sdk_core_test_utils::interceptors::ActivationAssertionsInterceptor; diff --git a/core/src/worker/workflow/machines/timer_state_machine.rs b/core/src/worker/workflow/machines/timer_state_machine.rs index 8d992d877..323c787b1 100644 --- a/core/src/worker/workflow/machines/timer_state_machine.rs +++ b/core/src/worker/workflow/machines/timer_state_machine.rs @@ -14,10 +14,9 @@ use temporal_sdk_core_protos::{ HistoryEventId, }, temporal::api::{ - command::v1::Command, + command::v1::command, enums::v1::{CommandType, EventType}, history::v1::{history_event, TimerFiredEventAttributes}, - sdk::v1::UserMetadata, }, }; @@ -50,7 +49,7 @@ fsm! { #[derive(Debug, derive_more::Display)] pub(super) enum TimerMachineCommand { Complete, - IssueCancelCmd(Command), + IssueCancelCmd(command::Attributes), // We don't issue activations for timer cancellations. Lang SDK is expected to cancel // it's own timers when user calls cancel, and they cannot be cancelled by any other // means. @@ -73,21 +72,11 @@ pub(super) fn new_timer(attribs: StartTimer) -> NewMachineWithCommand { impl TimerMachine { /// Create a new timer and immediately schedule it - fn new_scheduled(attribs: StartTimer) -> (Self, Command) { - let mut attribs = attribs; - let user_metadata = attribs.summary.take().map(|x| UserMetadata { - summary: Some(x), - details: None, - }); - let attribs = attribs; + fn new_scheduled(attribs: StartTimer) -> (Self, command::Attributes) { let mut s = Self::new(attribs); OnEventWrapper::on_event_mut(&mut s, TimerMachineEvents::Schedule) .expect("Scheduling timers doesn't fail"); - let cmd = Command { - command_type: CommandType::StartTimer as i32, - attributes: Some(s.shared_state().attrs.clone().into()), - user_metadata, - }; + let cmd = s.shared_state().attrs.into(); (s, cmd) } @@ -215,13 +204,10 @@ impl StartCommandRecorded { self, dat: &mut SharedState, ) -> TimerMachineTransition { - let cmd = Command { - command_type: CommandType::CancelTimer as i32, - attributes: Some(CancelTimer { seq: dat.attrs.seq }.into()), - user_metadata: Default::default(), - }; TransitionResult::ok( - vec![TimerMachineCommand::IssueCancelCmd(cmd)], + vec![TimerMachineCommand::IssueCancelCmd( + CancelTimer { seq: dat.attrs.seq }.into(), + )], CancelTimerCommandCreated::default(), ) } @@ -239,7 +225,9 @@ impl WFMachinesAdapter for TimerMachine { seq: self.shared_state.attrs.seq, } .into()], - TimerMachineCommand::IssueCancelCmd(c) => vec![MachineResponse::IssueNewCommand(c)], + TimerMachineCommand::IssueCancelCmd(c) => { + vec![MachineResponse::IssueNewCommand(c.into())] + } }) } } @@ -249,7 +237,7 @@ impl Cancellable for TimerMachine { Ok( match OnEventWrapper::on_event_mut(self, TimerMachineEvents::Cancel)?.pop() { Some(TimerMachineCommand::IssueCancelCmd(cmd)) => { - vec![MachineResponse::IssueNewCommand(cmd)] + vec![MachineResponse::IssueNewCommand(cmd.into())] } None => vec![], x => panic!("Invalid cancel event response {x:?}"), diff --git a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs index 514badaf8..884122bf0 100644 --- a/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs +++ b/core/src/worker/workflow/machines/upsert_search_attributes_state_machine.rs @@ -13,7 +13,7 @@ use rustfsm::{fsm, StateMachine, TransitionResult}; use temporal_sdk_core_protos::{ coresdk::workflow_commands::UpsertWorkflowSearchAttributes, temporal::api::{ - command::v1::{command, Command, UpsertWorkflowSearchAttributesCommandAttributes}, + command::v1::{command, UpsertWorkflowSearchAttributesCommandAttributes}, common::v1::SearchAttributes, enums::v1::CommandType, history::v1::history_event, @@ -77,19 +77,13 @@ pub(super) fn upsert_search_attrs_internal( fn create_new(sa_map: SearchAttributes) -> NewMachineWithCommand { let sm = UpsertSearchAttributesMachine::from_parts(Created {}.into(), SharedState {}); - let cmd = Command { - command_type: CommandType::UpsertWorkflowSearchAttributes as i32, - attributes: Some( - command::Attributes::UpsertWorkflowSearchAttributesCommandAttributes( - UpsertWorkflowSearchAttributesCommandAttributes { - search_attributes: Some(sa_map), - }, - ), - ), - user_metadata: Default::default(), - }; NewMachineWithCommand { - command: cmd, + command: command::Attributes::UpsertWorkflowSearchAttributesCommandAttributes( + UpsertWorkflowSearchAttributesCommandAttributes { + search_attributes: Some(sa_map), + }, + ), + machine: sm.into(), } } @@ -207,7 +201,7 @@ mod tests { AsJsonPayloadExt, }, temporal::api::{ - command::v1::command::Attributes, + command::v1::{command::Attributes, Command}, common::v1::Payload, enums::v1::EventType, history::v1::{HistoryEvent, UpsertWorkflowSearchAttributesEventAttributes}, diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 02d11af23..eeaed89ae 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -32,7 +32,8 @@ use crate::{ HistEventData, }, CommandID, DrivenWorkflow, HistoryUpdate, InternalFlagsRef, LocalResolution, - OutgoingJob, RunBasics, WFCommand, WFMachinesError, WorkflowStartedInfo, + OutgoingJob, RunBasics, WFCommand, WFCommandVariant, WFMachinesError, + WorkflowStartedInfo, }, ExecutingLAId, LocalActRequest, LocalActivityExecutionResult, LocalActivityResolution, }, @@ -61,11 +62,13 @@ use temporal_sdk_core_protos::{ workflow_commands::ContinueAsNewWorkflowExecution, }, temporal::api::{ - command::v1::{command::Attributes as ProtoCmdAttrs, Command as ProtoCommand}, + command::v1::{ + command::Attributes as ProtoCmdAttrs, Command as ProtoCommand, CommandAttributesExt, + }, enums::v1::EventType, history::v1::{history_event, HistoryEvent}, protocol::v1::{message::SequencingId, Message as ProtocolMessage}, - sdk::v1::WorkflowTaskCompletedMetadata, + sdk::v1::{UserMetadata, WorkflowTaskCompletedMetadata}, }, }; @@ -1156,6 +1159,7 @@ impl WorkflowMachines { }; self.add_cmd_to_wf_task( new_external_cancel(0, we, attrs.child_workflow_only, attrs.reason), + None, CommandIdKind::CoreInternal, ); } @@ -1165,6 +1169,7 @@ impl WorkflowMachines { // workflows by users (but rather, just for them to search with). self.add_cmd_to_wf_task( upsert_search_attrs_internal(attrs), + None, CommandIdKind::NeverResolves, ); } @@ -1267,12 +1272,16 @@ impl WorkflowMachines { /// server. fn handle_driven_results(&mut self, results: Vec) -> Result<()> { for cmd in results { - match cmd { - WFCommand::AddTimer(attrs) => { + match cmd.variant { + WFCommandVariant::AddTimer(attrs) => { let seq = attrs.seq; - self.add_cmd_to_wf_task(new_timer(attrs), CommandID::Timer(seq).into()); + self.add_cmd_to_wf_task( + new_timer(attrs), + cmd.metadata, + CommandID::Timer(seq).into(), + ); } - WFCommand::UpsertSearchAttributes(attrs) => { + WFCommandVariant::UpsertSearchAttributes(attrs) => { self.drive_me .search_attributes_update(attrs.search_attributes.clone()); self.add_cmd_to_wf_task( @@ -1281,13 +1290,14 @@ impl WorkflowMachines { self.observed_internal_flags.clone(), self.replaying, ), + cmd.metadata, CommandIdKind::NeverResolves, ); } - WFCommand::CancelTimer(attrs) => { + WFCommandVariant::CancelTimer(attrs) => { self.process_cancellation(CommandID::Timer(attrs.seq))?; } - WFCommand::AddActivity(attrs) => { + WFCommandVariant::AddActivity(attrs) => { let seq = attrs.seq; let use_compat = self.determine_use_compatible_flag( attrs.versioning_intent(), @@ -1299,10 +1309,11 @@ impl WorkflowMachines { self.observed_internal_flags.clone(), use_compat, ), + cmd.metadata, CommandID::Activity(seq).into(), ); } - WFCommand::AddLocalActivity(attrs) => { + WFCommandVariant::AddLocalActivity(attrs) => { let seq = attrs.seq; let attrs: ValidScheduleLA = ValidScheduleLA::from_schedule_la(attrs).map_err(|e| { @@ -1322,30 +1333,30 @@ impl WorkflowMachines { .insert(CommandID::LocalActivity(seq), machkey); self.process_machine_responses(machkey, mach_resp)?; } - WFCommand::RequestCancelActivity(attrs) => { + WFCommandVariant::RequestCancelActivity(attrs) => { self.process_cancellation(CommandID::Activity(attrs.seq))?; } - WFCommand::RequestCancelLocalActivity(attrs) => { + WFCommandVariant::RequestCancelLocalActivity(attrs) => { self.process_cancellation(CommandID::LocalActivity(attrs.seq))?; } - WFCommand::CompleteWorkflow(attrs) => { - self.add_terminal_command(complete_workflow(attrs)); + WFCommandVariant::CompleteWorkflow(attrs) => { + self.add_terminal_command(complete_workflow(attrs), cmd.metadata); } - WFCommand::FailWorkflow(attrs) => { - self.add_terminal_command(fail_workflow(attrs)); + WFCommandVariant::FailWorkflow(attrs) => { + self.add_terminal_command(fail_workflow(attrs), cmd.metadata); } - WFCommand::ContinueAsNew(attrs) => { + WFCommandVariant::ContinueAsNew(attrs) => { let attrs = self.augment_continue_as_new_with_current_values(attrs); let use_compat = self.determine_use_compatible_flag( attrs.versioning_intent(), &attrs.task_queue, ); - self.add_terminal_command(continue_as_new(attrs, use_compat)); + self.add_terminal_command(continue_as_new(attrs, use_compat), cmd.metadata); } - WFCommand::CancelWorkflow(attrs) => { - self.add_terminal_command(cancel_workflow(attrs)); + WFCommandVariant::CancelWorkflow(attrs) => { + self.add_terminal_command(cancel_workflow(attrs), cmd.metadata); } - WFCommand::SetPatchMarker(attrs) => { + WFCommandVariant::SetPatchMarker(attrs) => { // Do not create commands for change IDs that we have already created commands // for. let encountered_entry = self.encountered_patch_markers.get(&attrs.patch_id); @@ -1360,8 +1371,11 @@ impl WorkflowMachines { self.encountered_patch_markers.keys().map(|s| s.as_str()), self.observed_internal_flags.clone(), )?; - let mkey = - self.add_cmd_to_wf_task(patch_machine, CommandIdKind::NeverResolves); + let mkey = self.add_cmd_to_wf_task( + patch_machine, + cmd.metadata, + CommandIdKind::NeverResolves, + ); self.process_machine_responses(mkey, other_cmds)?; if let Some(ci) = self.encountered_patch_markers.get_mut(&attrs.patch_id) { @@ -1376,7 +1390,7 @@ impl WorkflowMachines { } } } - WFCommand::AddChildWorkflow(attrs) => { + WFCommandVariant::AddChildWorkflow(attrs) => { let seq = attrs.seq; let use_compat = self.determine_use_compatible_flag( attrs.versioning_intent(), @@ -1388,13 +1402,14 @@ impl WorkflowMachines { self.observed_internal_flags.clone(), use_compat, ), + cmd.metadata, CommandID::ChildWorkflowStart(seq).into(), ); } - WFCommand::CancelChild(attrs) => self.process_cancellation( + WFCommandVariant::CancelChild(attrs) => self.process_cancellation( CommandID::ChildWorkflowStart(attrs.child_workflow_seq), )?, - WFCommand::RequestCancelExternalWorkflow(attrs) => { + WFCommandVariant::RequestCancelExternalWorkflow(attrs) => { let we = attrs.workflow_execution.ok_or_else(|| { WFMachinesError::Fatal( "Cancel external workflow command had no workflow_execution field" @@ -1408,30 +1423,33 @@ impl WorkflowMachines { false, format!("Cancel requested by workflow with run id {}", self.run_id), ), + cmd.metadata, CommandID::CancelExternal(attrs.seq).into(), ); } - WFCommand::SignalExternalWorkflow(attrs) => { + WFCommandVariant::SignalExternalWorkflow(attrs) => { let seq = attrs.seq; self.add_cmd_to_wf_task( new_external_signal(attrs, &self.worker_config.namespace)?, + cmd.metadata, CommandID::SignalExternal(seq).into(), ); } - WFCommand::CancelSignalWorkflow(attrs) => { + WFCommandVariant::CancelSignalWorkflow(attrs) => { self.process_cancellation(CommandID::SignalExternal(attrs.seq))?; } - WFCommand::QueryResponse(_) => { + WFCommandVariant::QueryResponse(_) => { // Nothing to do here, queries are handled above the machine level unimplemented!("Query responses should not make it down into the machines") } - WFCommand::ModifyWorkflowProperties(attrs) => { + WFCommandVariant::ModifyWorkflowProperties(attrs) => { self.add_cmd_to_wf_task( modify_workflow_properties(attrs), + cmd.metadata, CommandIdKind::NeverResolves, ); } - WFCommand::UpdateResponse(ur) => { + WFCommandVariant::UpdateResponse(ur) => { let m_key = self.get_machine_by_msg(&ur.protocol_instance_id)?; let mach = self.machine_mut(m_key); if let Machines::UpdateMachine(m) = mach { @@ -1445,7 +1463,7 @@ impl WorkflowMachines { ))); } } - WFCommand::NoCommandsFromLang => (), + WFCommandVariant::NoCommandsFromLang => (), } } Ok(()) @@ -1479,8 +1497,12 @@ impl WorkflowMachines { })?) } - fn add_terminal_command(&mut self, machine: NewMachineWithCommand) { - let cwfm = self.add_new_command_machine(machine); + fn add_terminal_command( + &mut self, + machine: NewMachineWithCommand, + metadata: Option, + ) { + let cwfm = self.add_new_command_machine(machine, metadata); self.workflow_end_time = Some(SystemTime::now()); self.current_wf_task_commands.push_back(cwfm); // Wipe out any pending / executing local activity data since we're about to terminate @@ -1492,9 +1514,10 @@ impl WorkflowMachines { fn add_cmd_to_wf_task( &mut self, machine: NewMachineWithCommand, + metadata: Option, id: CommandIdKind, ) -> MachineKey { - let mach = self.add_new_command_machine(machine); + let mach = self.add_new_command_machine(machine, metadata); let key = mach.machine; if let CommandIdKind::LangIssued(id) = id { self.id_to_machine.insert(id, key); @@ -1506,10 +1529,19 @@ impl WorkflowMachines { key } - fn add_new_command_machine(&mut self, machine: NewMachineWithCommand) -> CommandAndMachine { + fn add_new_command_machine( + &mut self, + machine: NewMachineWithCommand, + metadata: Option, + ) -> CommandAndMachine { let k = self.all_machines.insert(machine.machine); + let cmd = ProtoCommand { + command_type: machine.command.as_type() as i32, + attributes: Some(machine.command), + user_metadata: metadata, + }; CommandAndMachine { - command: MachineAssociatedCommand::Real(Box::new(machine.command)), + command: MachineAssociatedCommand::Real(Box::new(cmd)), machine: k, } } @@ -1577,7 +1609,7 @@ pub(crate) struct MachinesWFTResponseContent<'a> { pub(crate) have_pending_la_resolutions: bool, } -impl<'a> MachinesWFTResponseContent<'a> { +impl MachinesWFTResponseContent<'_> { pub(crate) fn commands(&self) -> Peekable + '_> { self.me.get_commands().peekable() } diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 3601f6b34..311b4960d 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -12,8 +12,8 @@ use crate::{ FailedActivationWFTReport, HeartbeatTimeoutMsg, HistoryUpdate, LocalActivityRequestSink, LocalResolution, NextPageReq, OutstandingActivation, OutstandingTask, PermittedWFT, RequestEvictMsg, RunBasics, - ServerCommandsWithWorkflowInfo, WFCommand, WFMachinesError, WFTReportStatus, - WorkflowTaskInfo, WFT_HEARTBEAT_TIMEOUT_FRACTION, + ServerCommandsWithWorkflowInfo, WFCommand, WFCommandVariant, WFMachinesError, + WFTReportStatus, WorkflowTaskInfo, WFT_HEARTBEAT_TIMEOUT_FRACTION, }, LocalActRequest, LEGACY_QUERY_ID, }, @@ -410,10 +410,14 @@ impl ManagedRun { // If the only command from the activation is a legacy query response, that means we need // to respond differently than a typical activation. if matches!(&commands.as_slice(), - &[WFCommand::QueryResponse(qr)] if qr.query_id == LEGACY_QUERY_ID) + &[WFCommand {variant: WFCommandVariant::QueryResponse(qr), ..}] + if qr.query_id == LEGACY_QUERY_ID) { let qr = match commands.remove(0) { - WFCommand::QueryResponse(qr) => qr, + WFCommand { + variant: WFCommandVariant::QueryResponse(qr), + .. + } => qr, _ => unreachable!("We just verified this is the only command"), }; self.reply_to_complete( @@ -605,9 +609,12 @@ impl ManagedRun { warn!(failure=?failure, "Failing workflow due to nondeterminism error"); return self .successful_completion( - vec![WFCommand::FailWorkflow(FailWorkflowExecution { - failure: failure.failure, - })], + vec![WFCommand { + variant: WFCommandVariant::FailWorkflow(FailWorkflowExecution { + failure: failure.failure, + }), + metadata: None, + }], vec![], resp_chan, ) @@ -1221,10 +1228,10 @@ fn preprocess_command_sequence(commands: Vec) -> (Vec, Vec let mut commands: Vec<_> = commands .into_iter() .filter_map(|c| { - if let WFCommand::QueryResponse(qr) = c { + if let WFCommandVariant::QueryResponse(qr) = c.variant { query_results.push(qr); None - } else if c.is_terminal() { + } else if c.variant.is_terminal() { terminals.push(c); None } else { @@ -1247,13 +1254,13 @@ fn preprocess_command_sequence_old_behavior( let commands: Vec<_> = commands .into_iter() .filter_map(|c| { - if let WFCommand::QueryResponse(qr) = c { + if let WFCommandVariant::QueryResponse(qr) = c.variant { query_results.push(qr); None } else if seen_terminal { None } else { - if c.is_terminal() { + if c.variant.is_terminal() { seen_terminal = true; } Some(c) @@ -1494,7 +1501,7 @@ impl From for RunUpdateErr { #[cfg(test)] mod tests { - use crate::worker::workflow::WFCommand; + use crate::worker::workflow::{WFCommand, WFCommandVariant}; use std::mem::{discriminant, Discriminant}; use command_utils::*; @@ -1591,25 +1598,39 @@ mod tests { use super::*; pub(crate) fn complete() -> WFCommand { - WFCommand::CompleteWorkflow(CompleteWorkflowExecution { result: None }) + WFCommand { + variant: WFCommandVariant::CompleteWorkflow(CompleteWorkflowExecution { + result: None, + }), + metadata: None, + } } pub(crate) fn cancel() -> WFCommand { - WFCommand::CancelWorkflow(CancelWorkflowExecution {}) + WFCommand { + variant: WFCommandVariant::CancelWorkflow(CancelWorkflowExecution {}), + metadata: None, + } } pub(crate) fn query_response() -> WFCommand { - WFCommand::QueryResponse(QueryResult { - query_id: "".into(), - variant: None, - }) + WFCommand { + variant: WFCommandVariant::QueryResponse(QueryResult { + query_id: "".into(), + variant: None, + }), + metadata: None, + } } pub(crate) fn update_response() -> WFCommand { - WFCommand::UpdateResponse(UpdateResponse { - protocol_instance_id: "".into(), - response: None, - }) + WFCommand { + variant: WFCommandVariant::UpdateResponse(UpdateResponse { + protocol_instance_id: "".into(), + response: None, + }), + metadata: None, + } } pub(crate) fn command_types(commands: &[WFCommand]) -> Vec> { diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 889d7b954..6817e1c9a 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -76,7 +76,7 @@ use temporal_sdk_core_protos::{ enums::v1::WorkflowTaskFailedCause, protocol::v1::Message as ProtocolMessage, query::v1::WorkflowQuery, - sdk::v1::WorkflowTaskCompletedMetadata, + sdk::v1::{UserMetadata, WorkflowTaskCompletedMetadata}, taskqueue::v1::StickyExecutionAttributes, workflowservice::v1::{get_system_info_response, PollActivityTaskQueueResponse}, }, @@ -1057,9 +1057,10 @@ fn validate_completion( })?; if commands.len() > 1 - && commands.iter().any( - |c| matches!(c, WFCommand::QueryResponse(q) if q.query_id == LEGACY_QUERY_ID), - ) + && commands.iter().any(|c: &WFCommand| { + matches!(&c.variant, + WFCommandVariant::QueryResponse(q) if q.query_id == LEGACY_QUERY_ID) + }) { return Err(CompleteWfError::MalformedWorkflowCompletion { reason: format!( @@ -1135,9 +1136,16 @@ struct EmptyWorkflowCommandErr; /// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next. /// EX: Create a new timer, complete the workflow, etc. +#[derive(Debug, derive_more::From, derive_more::Display)] +#[display("{}", variant)] +struct WFCommand { + variant: WFCommandVariant, + metadata: Option, +} + #[derive(Debug, derive_more::From, derive_more::Display)] #[allow(clippy::large_enum_variant)] -enum WFCommand { +enum WFCommandVariant { /// Returned when we need to wait for the lang sdk to send us something NoCommandsFromLang, AddActivity(ScheduleActivity), @@ -1166,50 +1174,64 @@ impl TryFrom for WFCommand { type Error = EmptyWorkflowCommandErr; fn try_from(c: WorkflowCommand) -> result::Result { - match c.variant.ok_or(EmptyWorkflowCommandErr)? { - workflow_command::Variant::StartTimer(s) => Ok(Self::AddTimer(s)), - workflow_command::Variant::CancelTimer(s) => Ok(Self::CancelTimer(s)), - workflow_command::Variant::ScheduleActivity(s) => Ok(Self::AddActivity(s)), + let variant = match c.variant.ok_or(EmptyWorkflowCommandErr)? { + workflow_command::Variant::StartTimer(s) => WFCommandVariant::AddTimer(s), + workflow_command::Variant::CancelTimer(s) => WFCommandVariant::CancelTimer(s), + workflow_command::Variant::ScheduleActivity(s) => WFCommandVariant::AddActivity(s), workflow_command::Variant::RequestCancelActivity(s) => { - Ok(Self::RequestCancelActivity(s)) + WFCommandVariant::RequestCancelActivity(s) } workflow_command::Variant::CompleteWorkflowExecution(c) => { - Ok(Self::CompleteWorkflow(c)) + WFCommandVariant::CompleteWorkflow(c) + } + workflow_command::Variant::FailWorkflowExecution(s) => { + WFCommandVariant::FailWorkflow(s) } - workflow_command::Variant::FailWorkflowExecution(s) => Ok(Self::FailWorkflow(s)), - workflow_command::Variant::RespondToQuery(s) => Ok(Self::QueryResponse(s)), + workflow_command::Variant::RespondToQuery(s) => WFCommandVariant::QueryResponse(s), workflow_command::Variant::ContinueAsNewWorkflowExecution(s) => { - Ok(Self::ContinueAsNew(s)) + WFCommandVariant::ContinueAsNew(s) } - workflow_command::Variant::CancelWorkflowExecution(s) => Ok(Self::CancelWorkflow(s)), - workflow_command::Variant::SetPatchMarker(s) => Ok(Self::SetPatchMarker(s)), + workflow_command::Variant::CancelWorkflowExecution(s) => { + WFCommandVariant::CancelWorkflow(s) + } + workflow_command::Variant::SetPatchMarker(s) => WFCommandVariant::SetPatchMarker(s), workflow_command::Variant::StartChildWorkflowExecution(s) => { - Ok(Self::AddChildWorkflow(s)) + WFCommandVariant::AddChildWorkflow(s) } workflow_command::Variant::RequestCancelExternalWorkflowExecution(s) => { - Ok(Self::RequestCancelExternalWorkflow(s)) + WFCommandVariant::RequestCancelExternalWorkflow(s) } workflow_command::Variant::SignalExternalWorkflowExecution(s) => { - Ok(Self::SignalExternalWorkflow(s)) + WFCommandVariant::SignalExternalWorkflow(s) + } + workflow_command::Variant::CancelSignalWorkflow(s) => { + WFCommandVariant::CancelSignalWorkflow(s) + } + workflow_command::Variant::CancelChildWorkflowExecution(s) => { + WFCommandVariant::CancelChild(s) + } + workflow_command::Variant::ScheduleLocalActivity(s) => { + WFCommandVariant::AddLocalActivity(s) } - workflow_command::Variant::CancelSignalWorkflow(s) => Ok(Self::CancelSignalWorkflow(s)), - workflow_command::Variant::CancelChildWorkflowExecution(s) => Ok(Self::CancelChild(s)), - workflow_command::Variant::ScheduleLocalActivity(s) => Ok(Self::AddLocalActivity(s)), workflow_command::Variant::RequestCancelLocalActivity(s) => { - Ok(Self::RequestCancelLocalActivity(s)) + WFCommandVariant::RequestCancelLocalActivity(s) } workflow_command::Variant::UpsertWorkflowSearchAttributes(s) => { - Ok(Self::UpsertSearchAttributes(s)) + WFCommandVariant::UpsertSearchAttributes(s) } workflow_command::Variant::ModifyWorkflowProperties(s) => { - Ok(Self::ModifyWorkflowProperties(s)) + WFCommandVariant::ModifyWorkflowProperties(s) } - workflow_command::Variant::UpdateResponse(s) => Ok(Self::UpdateResponse(s)), - } + workflow_command::Variant::UpdateResponse(s) => WFCommandVariant::UpdateResponse(s), + }; + Ok(Self { + variant, + metadata: c.user_metadata, + }) } } -impl WFCommand { +impl WFCommandVariant { /// Returns true if the command is one which ends the workflow: /// * Completed /// * Failed @@ -1218,10 +1240,10 @@ impl WFCommand { fn is_terminal(&self) -> bool { matches!( self, - WFCommand::CompleteWorkflow(_) - | WFCommand::FailWorkflow(_) - | WFCommand::CancelWorkflow(_) - | WFCommand::ContinueAsNew(_) + WFCommandVariant::CompleteWorkflow(_) + | WFCommandVariant::FailWorkflow(_) + | WFCommandVariant::CancelWorkflow(_) + | WFCommandVariant::ContinueAsNew(_) ) } } diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto index d16608998..482566ea1 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto @@ -14,10 +14,16 @@ import "google/protobuf/empty.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/enums/v1/workflow.proto"; import "temporal/api/failure/v1/message.proto"; +import "temporal/api/sdk/v1/user_metadata.proto"; import "temporal/sdk/core/child_workflow/child_workflow.proto"; import "temporal/sdk/core/common/common.proto"; message WorkflowCommand { + // User metadata that may or may not be persisted into history depending on the command type. + // Lang layers are expected to expose the setting of the internals of this metadata on a + // per-command basis where applicable. + temporal.api.sdk.v1.UserMetadata user_metadata = 100; + oneof variant { StartTimer start_timer = 1; ScheduleActivity schedule_activity = 2; @@ -46,8 +52,6 @@ message StartTimer { // Lang's incremental sequence number, used as the operation identifier uint32 seq = 1; google.protobuf.Duration start_to_fire_timeout = 2; - // Summary that is stored as user_metadata - temporal.api.common.v1.Payload summary = 3; } message CancelTimer { @@ -90,8 +94,6 @@ message ScheduleActivity { bool do_not_eagerly_execute = 14; // Whether this activity should run on a worker with a compatible build id or not. coresdk.common.VersioningIntent versioning_intent = 15; - // Summary that is stored as user_metadata - temporal.api.common.v1.Payload summary = 16; } message ScheduleLocalActivity { @@ -257,10 +259,6 @@ message StartChildWorkflowExecution { child_workflow.ChildWorkflowCancellationType cancellation_type = 18; // Whether this child should run on a worker with a compatible build id or not. coresdk.common.VersioningIntent versioning_intent = 19; - // Static summary of the child workflow - temporal.api.common.v1.Payload static_summary = 20; - // Static details of the child workflow - temporal.api.common.v1.Payload static_details = 21; } // Cancel a child workflow diff --git a/sdk-core-protos/src/lib.rs b/sdk-core-protos/src/lib.rs index 92458246c..32bcbfaf0 100644 --- a/sdk-core-protos/src/lib.rs +++ b/sdk-core-protos/src/lib.rs @@ -605,7 +605,7 @@ pub mod coresdk { r.seq, r.result .as_ref() - .unwrap_or_else(|| &ActivityResolution { status: None }) + .unwrap_or(&ActivityResolution { status: None }) ) } workflow_activation_job::Variant::NotifyHasPatch(_) => { @@ -959,16 +959,16 @@ pub mod coresdk { impl From for WorkflowCommand { fn from(v: workflow_command::Variant) -> Self { - Self { variant: Some(v) } + Self { + variant: Some(v), + user_metadata: None, + } } } impl workflow_completion::Success { pub fn from_variants(cmds: Vec) -> Self { - let cmds: Vec<_> = cmds - .into_iter() - .map(|c| WorkflowCommand { variant: Some(c) }) - .collect(); + let cmds: Vec<_> = cmds.into_iter().map(|c| c.into()).collect(); cmds.into() } } @@ -983,7 +983,7 @@ pub mod coresdk { } } - /// Create a successful activation from a list of commands + /// Create a successful activation from a list of command variants pub fn from_cmds(run_id: impl Into, cmds: Vec) -> Self { let success = workflow_completion::Success::from_variants(cmds); Self { @@ -992,7 +992,7 @@ pub mod coresdk { } } - /// Create a successful activation from just one command + /// Create a successful activation from just one command variant pub fn from_cmd(run_id: impl Into, cmd: workflow_command::Variant) -> Self { let success = workflow_completion::Success::from_variants(vec![cmd]); Self { @@ -1034,6 +1034,7 @@ pub mod coresdk { wfc, WorkflowCommand { variant: Some(workflow_command::Variant::FailWorkflowExecution(_)), + .. } ) }); @@ -1049,6 +1050,7 @@ pub mod coresdk { wfc, WorkflowCommand { variant: Some(workflow_command::Variant::CancelWorkflowExecution(_)), + .. } ) }); @@ -1066,6 +1068,7 @@ pub mod coresdk { variant: Some( workflow_command::Variant::ContinueAsNewWorkflowExecution(_) ), + .. } ) }); @@ -1081,7 +1084,8 @@ pub mod coresdk { wfc, WorkflowCommand { variant: Some(workflow_command::Variant::CompleteWorkflowExecution(_)), - } + .. + }, ) }); } @@ -1572,6 +1576,70 @@ pub mod temporal { } } + pub trait CommandAttributesExt { + fn as_type(&self) -> CommandType; + } + + impl CommandAttributesExt for command::Attributes { + fn as_type(&self) -> CommandType { + match self { + Attributes::ScheduleActivityTaskCommandAttributes(_) => { + CommandType::ScheduleActivityTask + } + Attributes::StartTimerCommandAttributes(_) => CommandType::StartTimer, + Attributes::CompleteWorkflowExecutionCommandAttributes(_) => { + CommandType::CompleteWorkflowExecution + } + Attributes::FailWorkflowExecutionCommandAttributes(_) => { + CommandType::FailWorkflowExecution + } + Attributes::RequestCancelActivityTaskCommandAttributes(_) => { + CommandType::RequestCancelActivityTask + } + Attributes::CancelTimerCommandAttributes(_) => CommandType::CancelTimer, + Attributes::CancelWorkflowExecutionCommandAttributes(_) => { + CommandType::CancelWorkflowExecution + } + Attributes::RequestCancelExternalWorkflowExecutionCommandAttributes( + _, + ) => CommandType::RequestCancelExternalWorkflowExecution, + Attributes::RecordMarkerCommandAttributes(_) => { + CommandType::RecordMarker + } + Attributes::ContinueAsNewWorkflowExecutionCommandAttributes(_) => { + CommandType::ContinueAsNewWorkflowExecution + } + Attributes::StartChildWorkflowExecutionCommandAttributes(_) => { + CommandType::StartChildWorkflowExecution + } + Attributes::SignalExternalWorkflowExecutionCommandAttributes(_) => { + CommandType::SignalExternalWorkflowExecution + } + Attributes::UpsertWorkflowSearchAttributesCommandAttributes(_) => { + CommandType::UpsertWorkflowSearchAttributes + } + Attributes::ProtocolMessageCommandAttributes(_) => { + CommandType::ProtocolMessage + } + Attributes::ModifyWorkflowPropertiesCommandAttributes(_) => { + CommandType::ModifyWorkflowProperties + } + Attributes::ScheduleNexusOperationCommandAttributes(_) => { + CommandType::ScheduleNexusOperation + } + Attributes::RequestCancelNexusOperationCommandAttributes(_) => { + CommandType::RequestCancelNexusOperation + } + } + } + } + + impl Display for command::Attributes { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self.as_type()) + } + } + impl From for command::Attributes { fn from(s: workflow_commands::StartTimer) -> Self { Self::StartTimerCommandAttributes(StartTimerCommandAttributes { diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index a0b9f35af..c47675978 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -91,7 +91,7 @@ use temporal_sdk_core_protos::{ resolve_child_workflow_execution_start::Status as ChildWorkflowStartStatus, workflow_activation_job::Variant, WorkflowActivation, }, - workflow_commands::{workflow_command, ContinueAsNewWorkflowExecution}, + workflow_commands::{workflow_command, ContinueAsNewWorkflowExecution, WorkflowCommand}, workflow_completion::WorkflowActivationCompletion, ActivityTaskCompletion, AsJsonPayloadExt, FromJsonPayloadExt, }, @@ -381,6 +381,7 @@ impl Worker { } impl WorkflowHalf { + #[allow(clippy::type_complexity)] fn workflow_activation_handler( &self, common: &CommonWorker, @@ -747,7 +748,7 @@ enum RustWfCmd { } struct CommandCreateRequest { - cmd: workflow_command::Variant, + cmd: WorkflowCommand, unblocker: oneshot::Sender, } diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 74afd4ace..4c51d6fb7 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -38,9 +38,13 @@ use temporal_sdk_core_protos::{ CancelChildWorkflowExecution, ModifyWorkflowProperties, RequestCancelExternalWorkflowExecution, SetPatchMarker, SignalExternalWorkflowExecution, StartTimer, UpsertWorkflowSearchAttributes, + WorkflowCommand, }, }, - temporal::api::common::v1::{Memo, Payload, SearchAttributes}, + temporal::api::{ + common::v1::{Memo, Payload, SearchAttributes}, + sdk::v1::UserMetadata, + }, }; use tokio::sync::{mpsc, oneshot, watch}; use tokio_stream::wrappers::UnboundedReceiverStream; @@ -152,16 +156,23 @@ impl WfContext { let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq)); self.send( CommandCreateRequest { - cmd: StartTimer { - seq, - start_to_fire_timeout: Some( - opts.duration - .try_into() - .expect("Durations must fit into 64 bits"), + cmd: WorkflowCommand { + variant: Some( + StartTimer { + seq, + start_to_fire_timeout: Some( + opts.duration + .try_into() + .expect("Durations must fit into 64 bits"), + ), + } + .into(), ), - summary: opts.summary.map(|x| x.as_bytes().into()), - } - .into(), + user_metadata: Some(UserMetadata { + summary: opts.summary.map(|x| x.as_bytes().into()), + details: None, + }), + }, unblocker, } .into(), @@ -181,7 +192,7 @@ impl WfContext { let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Activity(seq)); self.send( CommandCreateRequest { - cmd: opts.into_command(seq).into(), + cmd: opts.into_command(seq), unblocker, } .into(), @@ -206,7 +217,7 @@ impl WfContext { let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::LocalActivity(seq)); self.send( CommandCreateRequest { - cmd: opts.into_command(seq).into(), + cmd: opts.into_command(seq), unblocker, } .into(), @@ -314,11 +325,16 @@ impl WfContext { let (cmd, unblocker) = WFCommandFut::new(); self.send( CommandCreateRequest { - cmd: RequestCancelExternalWorkflowExecution { - seq, - workflow_execution: Some(target), - } - .into(), + cmd: WorkflowCommand { + variant: Some( + RequestCancelExternalWorkflowExecution { + seq, + workflow_execution: Some(target), + } + .into(), + ), + user_metadata: None, + }, unblocker, } .into(), @@ -359,14 +375,19 @@ impl WfContext { CancellableWFCommandFut::new(CancellableID::SignalExternalWorkflow(seq)); self.send( CommandCreateRequest { - cmd: SignalExternalWorkflowExecution { - seq, - signal_name: signal.signal_name, - args: signal.data.input, - target: Some(target), - headers: signal.data.headers, - } - .into(), + cmd: WorkflowCommand { + variant: Some( + SignalExternalWorkflowExecution { + seq, + signal_name: signal.signal_name, + args: signal.data.input, + target: Some(target), + headers: signal.data.headers, + } + .into(), + ), + user_metadata: None, + }, unblocker, } .into(), @@ -578,8 +599,8 @@ impl<'a> LATimerBackoffFut<'a> { } } } -impl<'a> Unpin for LATimerBackoffFut<'a> {} -impl<'a> Future for LATimerBackoffFut<'a> { +impl Unpin for LATimerBackoffFut<'_> {} +impl Future for LATimerBackoffFut<'_> { type Output = ActivityResolution; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -634,7 +655,7 @@ impl<'a> Future for LATimerBackoffFut<'a> { poll_res } } -impl<'a> CancellableFuture for LATimerBackoffFut<'a> { +impl CancellableFuture for LATimerBackoffFut<'_> { fn cancel(&self, ctx: &WfContext) { self.did_cancel.store(true, Ordering::Release); if let Some(tf) = self.timer_fut.as_ref() { @@ -716,7 +737,7 @@ impl ChildWorkflow { CancellableWFCommandFut::new_with_dat(CancellableID::ChildWorkflow(child_seq), common); cx.send( CommandCreateRequest { - cmd: self.opts.into_command(child_seq).into(), + cmd: self.opts.into_command(child_seq), unblocker, } .into(), diff --git a/sdk/src/workflow_context/options.rs b/sdk/src/workflow_context/options.rs index c402bb15f..6dcafceac 100644 --- a/sdk/src/workflow_context/options.rs +++ b/sdk/src/workflow_context/options.rs @@ -6,22 +6,20 @@ use temporal_sdk_core_protos::{ child_workflow::ChildWorkflowCancellationType, workflow_commands::{ ActivityCancellationType, ScheduleActivity, ScheduleLocalActivity, - StartChildWorkflowExecution, + StartChildWorkflowExecution, WorkflowCommand, }, }, temporal::api::{ common::v1::{Payload, RetryPolicy}, enums::v1::ParentClosePolicy, + sdk::v1::UserMetadata, }, }; - // TODO: Before release, probably best to avoid using proto types entirely here. They're awkward. pub(crate) trait IntoWorkflowCommand { - type WFCommandType; - /// Produces a workflow command from some options - fn into_command(self, seq: u32) -> Self::WFCommandType; + fn into_command(self, seq: u32) -> WorkflowCommand; } /// Options for scheduling an activity @@ -73,29 +71,38 @@ pub struct ActivityOptions { } impl IntoWorkflowCommand for ActivityOptions { - type WFCommandType = ScheduleActivity; - fn into_command(self, seq: u32) -> ScheduleActivity { - ScheduleActivity { - seq, - activity_id: match self.activity_id { - None => seq.to_string(), - Some(aid) => aid, - }, - activity_type: self.activity_type, - task_queue: self.task_queue.unwrap_or_default(), - schedule_to_close_timeout: self - .schedule_to_close_timeout - .and_then(|d| d.try_into().ok()), - schedule_to_start_timeout: self - .schedule_to_start_timeout - .and_then(|d| d.try_into().ok()), - start_to_close_timeout: self.start_to_close_timeout.and_then(|d| d.try_into().ok()), - heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()), - cancellation_type: self.cancellation_type as i32, - arguments: vec![self.input], - retry_policy: self.retry_policy, - summary: self.summary.map(Into::into), - ..Default::default() + fn into_command(self, seq: u32) -> WorkflowCommand { + WorkflowCommand { + variant: Some( + ScheduleActivity { + seq, + activity_id: match self.activity_id { + None => seq.to_string(), + Some(aid) => aid, + }, + activity_type: self.activity_type, + task_queue: self.task_queue.unwrap_or_default(), + schedule_to_close_timeout: self + .schedule_to_close_timeout + .and_then(|d| d.try_into().ok()), + schedule_to_start_timeout: self + .schedule_to_start_timeout + .and_then(|d| d.try_into().ok()), + start_to_close_timeout: self + .start_to_close_timeout + .and_then(|d| d.try_into().ok()), + heartbeat_timeout: self.heartbeat_timeout.and_then(|d| d.try_into().ok()), + cancellation_type: self.cancellation_type as i32, + arguments: vec![self.input], + retry_policy: self.retry_policy, + ..Default::default() + } + .into(), + ), + user_metadata: self.summary.map(|s| UserMetadata { + summary: Some(s.into()), + details: None, + }), } } } @@ -144,34 +151,43 @@ pub struct LocalActivityOptions { } impl IntoWorkflowCommand for LocalActivityOptions { - type WFCommandType = ScheduleLocalActivity; - fn into_command(mut self, seq: u32) -> ScheduleLocalActivity { + fn into_command(mut self, seq: u32) -> WorkflowCommand { // Allow tests to avoid extra verbosity when they don't care about timeouts // TODO: Builderize LA options self.schedule_to_close_timeout .get_or_insert(Duration::from_secs(100)); - ScheduleLocalActivity { - seq, - attempt: self.attempt.unwrap_or(1), - original_schedule_time: self.original_schedule_time, - activity_id: match self.activity_id { - None => seq.to_string(), - Some(aid) => aid, - }, - activity_type: self.activity_type, - arguments: vec![self.input], - retry_policy: Some(self.retry_policy), - local_retry_threshold: self.timer_backoff_threshold.and_then(|d| d.try_into().ok()), - cancellation_type: self.cancel_type.into(), - schedule_to_close_timeout: self - .schedule_to_close_timeout - .and_then(|d| d.try_into().ok()), - schedule_to_start_timeout: self - .schedule_to_start_timeout - .and_then(|d| d.try_into().ok()), - start_to_close_timeout: self.start_to_close_timeout.and_then(|d| d.try_into().ok()), - ..Default::default() + WorkflowCommand { + variant: Some( + ScheduleLocalActivity { + seq, + attempt: self.attempt.unwrap_or(1), + original_schedule_time: self.original_schedule_time, + activity_id: match self.activity_id { + None => seq.to_string(), + Some(aid) => aid, + }, + activity_type: self.activity_type, + arguments: vec![self.input], + retry_policy: Some(self.retry_policy), + local_retry_threshold: self + .timer_backoff_threshold + .and_then(|d| d.try_into().ok()), + cancellation_type: self.cancel_type.into(), + schedule_to_close_timeout: self + .schedule_to_close_timeout + .and_then(|d| d.try_into().ok()), + schedule_to_start_timeout: self + .schedule_to_start_timeout + .and_then(|d| d.try_into().ok()), + start_to_close_timeout: self + .start_to_close_timeout + .and_then(|d| d.try_into().ok()), + ..Default::default() + } + .into(), + ), + user_metadata: None, } } } @@ -202,31 +218,45 @@ pub struct ChildWorkflowOptions { } impl IntoWorkflowCommand for ChildWorkflowOptions { - type WFCommandType = StartChildWorkflowExecution; - fn into_command(self, seq: u32) -> StartChildWorkflowExecution { - StartChildWorkflowExecution { - seq, - workflow_id: self.workflow_id, - workflow_type: self.workflow_type, - task_queue: self.task_queue.unwrap_or_default(), - input: self.input, - cancellation_type: self.cancel_type as i32, - workflow_id_reuse_policy: self.options.id_reuse_policy as i32, - workflow_execution_timeout: self - .options - .execution_timeout - .and_then(|d| d.try_into().ok()), - workflow_run_timeout: self - .options - .execution_timeout - .and_then(|d| d.try_into().ok()), - workflow_task_timeout: self.options.task_timeout.and_then(|d| d.try_into().ok()), - search_attributes: self.options.search_attributes.unwrap_or_default(), - cron_schedule: self.options.cron_schedule.unwrap_or_default(), - parent_close_policy: self.parent_close_policy as i32, - static_summary: self.static_summary.map(Into::into), - static_details: self.static_details.map(Into::into), - ..Default::default() + fn into_command(self, seq: u32) -> WorkflowCommand { + let user_metadata = if self.static_summary.is_some() || self.static_details.is_some() { + Some(UserMetadata { + summary: self.static_summary.map(Into::into), + details: self.static_details.map(Into::into), + }) + } else { + None + }; + WorkflowCommand { + variant: Some( + StartChildWorkflowExecution { + seq, + workflow_id: self.workflow_id, + workflow_type: self.workflow_type, + task_queue: self.task_queue.unwrap_or_default(), + input: self.input, + cancellation_type: self.cancel_type as i32, + workflow_id_reuse_policy: self.options.id_reuse_policy as i32, + workflow_execution_timeout: self + .options + .execution_timeout + .and_then(|d| d.try_into().ok()), + workflow_run_timeout: self + .options + .execution_timeout + .and_then(|d| d.try_into().ok()), + workflow_task_timeout: self + .options + .task_timeout + .and_then(|d| d.try_into().ok()), + search_attributes: self.options.search_attributes.unwrap_or_default(), + cron_schedule: self.options.cron_schedule.unwrap_or_default(), + parent_close_policy: self.parent_close_policy as i32, + ..Default::default() + } + .into(), + ), + user_metadata, } } } diff --git a/sdk/src/workflow_future.rs b/sdk/src/workflow_future.rs index 783ca79e8..3ae387bcf 100644 --- a/sdk/src/workflow_future.rs +++ b/sdk/src/workflow_future.rs @@ -25,10 +25,11 @@ use temporal_sdk_core_protos::{ update_response, workflow_command, CancelChildWorkflowExecution, CancelSignalWorkflow, CancelTimer, CancelWorkflowExecution, CompleteWorkflowExecution, FailWorkflowExecution, RequestCancelActivity, RequestCancelExternalWorkflowExecution, - RequestCancelLocalActivity, ScheduleActivity, ScheduleLocalActivity, - StartChildWorkflowExecution, StartTimer, UpdateResponse, + RequestCancelLocalActivity, ScheduleActivity, ScheduleLocalActivity, StartTimer, + UpdateResponse, WorkflowCommand, }, - workflow_completion::WorkflowActivationCompletion, + workflow_completion, + workflow_completion::{workflow_activation_completion, WorkflowActivationCompletion}, }, temporal::api::{common::v1::Payload, failure::v1::Failure}, utilities::TryIntoOrNone, @@ -75,7 +76,6 @@ impl WorkflowFunction { incoming_activations, command_status: Default::default(), cancel_sender: cancel_tx, - child_workflow_starts: Default::default(), sig_chans: Default::default(), updates: Default::default(), update_futures: Default::default(), @@ -113,8 +113,6 @@ pub(crate) struct WorkflowFuture { cancel_sender: watch::Sender, /// Copy of the workflow context wf_ctx: WfContext, - /// Mapping of sequence number to a StartChildWorkflowExecution request - child_workflow_starts: HashMap, /// Maps signal IDs to channels to send down when they are signaled sig_chans: HashMap, /// Maps update handlers by name to implementations @@ -152,12 +150,17 @@ impl WorkflowFuture { .expect("Completion channel intact"); } - fn send_completion(&self, run_id: String, activation_cmds: Vec) { + fn send_completion(&self, run_id: String, activation_cmds: Vec) { self.outgoing_completions - .send(WorkflowActivationCompletion::from_cmds( + .send(WorkflowActivationCompletion { run_id, - activation_cmds, - )) + status: Some(workflow_activation_completion::Status::Successful( + workflow_completion::Success { + commands: activation_cmds, + used_internal_flags: vec![], + }, + )), + }) .expect("Completion channel intact"); } @@ -170,7 +173,7 @@ impl WorkflowFuture { fn handle_job( &mut self, variant: Option, - outgoing_cmds: &mut Vec, + outgoing_cmds: &mut Vec, ) -> Result { if let Some(v) = variant { match v { @@ -260,10 +263,13 @@ impl WorkflowFuture { }; match val_res { Ok(_) => { - outgoing_cmds.push(update_response( - u.protocol_instance_id.clone(), - update_response::Response::Accepted(()), - )); + outgoing_cmds.push( + update_response( + u.protocol_instance_id.clone(), + update_response::Response::Accepted(()), + ) + .into(), + ); let handler_fut = (impls.handler)( UpdateContext { wf_ctx: self.wf_ctx.clone(), @@ -275,20 +281,29 @@ impl WorkflowFuture { .push((u.protocol_instance_id, handler_fut)); } Err(e) => { - outgoing_cmds.push(update_response( - u.protocol_instance_id, - update_response::Response::Rejected(e.into()), - )); + outgoing_cmds.push( + update_response( + u.protocol_instance_id, + update_response::Response::Rejected(e.into()), + ) + .into(), + ); } } } else { - outgoing_cmds.push(update_response( - u.protocol_instance_id, - update_response::Response::Rejected( - format!("No update handler registered for update name {}", u.name) + outgoing_cmds.push( + update_response( + u.protocol_instance_id, + update_response::Response::Rejected( + format!( + "No update handler registered for update name {}", + u.name + ) .into(), - ), - )); + ), + ) + .into(), + ); } } @@ -449,7 +464,7 @@ impl WorkflowFuture { &mut self, cx: &mut Context, run_id: &str, - activation_cmds: &mut Vec, + activation_cmds: &mut Vec, ) -> Result { // TODO: Make sure this is *actually* safe before un-prototyping rust sdk let mut res = match AssertUnwindSafe(&mut self.inner) @@ -479,58 +494,52 @@ impl WorkflowFuture { while let Ok(cmd) = self.incoming_commands.try_recv() { match cmd { RustWfCmd::Cancel(cancellable_id) => { - match cancellable_id { + let cmd_variant = match cancellable_id { CancellableID::Timer(seq) => { - activation_cmds - .push(workflow_command::Variant::CancelTimer(CancelTimer { seq })); self.unblock(UnblockEvent::Timer(seq, TimerResult::Cancelled))?; // Re-poll wf future since a timer is now unblocked res = self.inner.poll_unpin(cx); + workflow_command::Variant::CancelTimer(CancelTimer { seq }) } CancellableID::Activity(seq) => { - activation_cmds.push(workflow_command::Variant::RequestCancelActivity( + workflow_command::Variant::RequestCancelActivity( RequestCancelActivity { seq }, - )); + ) } CancellableID::LocalActivity(seq) => { - activation_cmds.push( - workflow_command::Variant::RequestCancelLocalActivity( - RequestCancelLocalActivity { seq }, - ), - ); + workflow_command::Variant::RequestCancelLocalActivity( + RequestCancelLocalActivity { seq }, + ) } CancellableID::ChildWorkflow(seq) => { - activation_cmds.push( - workflow_command::Variant::CancelChildWorkflowExecution( - CancelChildWorkflowExecution { - child_workflow_seq: seq, - }, - ), - ); + workflow_command::Variant::CancelChildWorkflowExecution( + CancelChildWorkflowExecution { + child_workflow_seq: seq, + }, + ) } CancellableID::SignalExternalWorkflow(seq) => { - activation_cmds.push(workflow_command::Variant::CancelSignalWorkflow( - CancelSignalWorkflow { seq }, - )); + workflow_command::Variant::CancelSignalWorkflow(CancelSignalWorkflow { + seq, + }) } CancellableID::ExternalWorkflow { seqnum, execution } => { - activation_cmds.push( - workflow_command::Variant::RequestCancelExternalWorkflowExecution( - RequestCancelExternalWorkflowExecution { - seq: seqnum, - workflow_execution: Some(execution), - }, - ), - ); + workflow_command::Variant::RequestCancelExternalWorkflowExecution( + RequestCancelExternalWorkflowExecution { + seq: seqnum, + workflow_execution: Some(execution), + }, + ) } - } + }; + activation_cmds.push(cmd_variant.into()); } - RustWfCmd::NewCmd(cmd) => { - activation_cmds.push(cmd.cmd.clone()); - let command_id = match cmd.cmd { + RustWfCmd::NewCmd(cmd) => { + let command_id = match cmd.cmd.variant.as_ref().expect("command variant is set") + { workflow_command::Variant::StartTimer(StartTimer { seq, .. }) => { - CommandID::Timer(seq) + CommandID::Timer(*seq) } workflow_command::Variant::ScheduleActivity(ScheduleActivity { seq, @@ -538,14 +547,12 @@ impl WorkflowFuture { }) | workflow_command::Variant::ScheduleLocalActivity( ScheduleLocalActivity { seq, .. }, - ) => CommandID::Activity(seq), + ) => CommandID::Activity(*seq), workflow_command::Variant::SetPatchMarker(_) => { panic!("Set patch marker should be a nonblocking command") } workflow_command::Variant::StartChildWorkflowExecution(req) => { let seq = req.seq; - // Save the start request to support cancellation later - self.child_workflow_starts.insert(seq, req); CommandID::ChildWorkflowStart(seq) } workflow_command::Variant::SignalExternalWorkflowExecution(req) => { @@ -556,6 +563,8 @@ impl WorkflowFuture { } _ => unimplemented!("Command type not implemented"), }; + activation_cmds.push(cmd.cmd); + self.command_status.insert( command_id, WFCommandFutInfo { @@ -564,7 +573,7 @@ impl WorkflowFuture { ); } RustWfCmd::NewNonblockingCmd(cmd) => { - activation_cmds.push(cmd); + activation_cmds.push(cmd.into()); } RustWfCmd::SubscribeChildWorkflowCompletion(sub) => { self.command_status.insert( @@ -598,37 +607,34 @@ impl WorkflowFuture { if let Poll::Ready(res) = res { // TODO: Auto reply with cancel when cancelled (instead of normal exit value) - match res { + let cmd = match res { Ok(exit_val) => match exit_val { // TODO: Generic values WfExitValue::Normal(result) => { - activation_cmds.push(workflow_command::Variant::CompleteWorkflowExecution( + workflow_command::Variant::CompleteWorkflowExecution( CompleteWorkflowExecution { result: Some(result), }, - )); + ) } - WfExitValue::ContinueAsNew(cmd) => activation_cmds.push((*cmd).into()), - WfExitValue::Cancelled => { - activation_cmds.push(workflow_command::Variant::CancelWorkflowExecution( - CancelWorkflowExecution {}, - )); + WfExitValue::ContinueAsNew(cmd) => { + workflow_command::Variant::ContinueAsNewWorkflowExecution(*cmd) } + WfExitValue::Cancelled => workflow_command::Variant::CancelWorkflowExecution( + CancelWorkflowExecution {}, + ), WfExitValue::Evicted => { panic!("Don't explicitly return this") } }, - Err(e) => { - activation_cmds.push(workflow_command::Variant::FailWorkflowExecution( - FailWorkflowExecution { - failure: Some(Failure { - message: e.to_string(), - ..Default::default() - }), - }, - )); - } - } + Err(e) => workflow_command::Variant::FailWorkflowExecution(FailWorkflowExecution { + failure: Some(Failure { + message: e.to_string(), + ..Default::default() + }), + }), + }; + activation_cmds.push(cmd.into()) } Ok(false) } diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 60bc6d382..3b8975a0d 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -717,7 +717,6 @@ pub fn start_timer_cmd(seq: u32, duration: Duration) -> workflow_command::Varian StartTimer { seq, start_to_fire_timeout: Some(duration.try_into().expect("duration fits")), - summary: None, } .into() } @@ -782,7 +781,6 @@ where vec![StartTimer { seq, start_to_fire_timeout: Some(duration.try_into().expect("duration fits")), - summary: None, } .into()], )) diff --git a/tests/integ_tests/polling_tests.rs b/tests/integ_tests/polling_tests.rs index 69af0286e..e40cf5afa 100644 --- a/tests/integ_tests/polling_tests.rs +++ b/tests/integ_tests/polling_tests.rs @@ -41,7 +41,6 @@ async fn out_of_order_completion_doesnt_hang() { StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_millis(50))), - summary: None, } .into(), ] diff --git a/tests/integ_tests/queries_tests.rs b/tests/integ_tests/queries_tests.rs index a974ab2ae..eeb3daa20 100644 --- a/tests/integ_tests/queries_tests.rs +++ b/tests/integ_tests/queries_tests.rs @@ -28,13 +28,11 @@ async fn simple_query_legacy() { StartTimer { seq: 0, start_to_fire_timeout: Some(prost_dur!(from_millis(500))), - summary: None, } .into(), StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(3))), - summary: None, } .into(), ], diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index 70c642fe4..df173847c 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -200,7 +200,6 @@ async fn fail_wf_task(#[values(true, false)] replay: bool) { vec![StartTimer { seq: 0, start_to_fire_timeout: Some(prost_dur!(from_millis(200))), - summary: None, } .into()], )) @@ -325,7 +324,6 @@ async fn signal_workflow_signal_not_handled_on_workflow_completion() { vec![StartTimer { seq: 0, start_to_fire_timeout: Some(prost_dur!(from_millis(10))), - summary: None, } .into()], )) diff --git a/tests/integ_tests/workflow_tests/activities.rs b/tests/integ_tests/workflow_tests/activities.rs index cbf6b043b..10c9a32f4 100644 --- a/tests/integ_tests/workflow_tests/activities.rs +++ b/tests/integ_tests/workflow_tests/activities.rs @@ -360,7 +360,6 @@ async fn activity_cancellation_try_cancel() { StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_millis(50))), - summary: None, } .into(), ] @@ -418,7 +417,6 @@ async fn activity_cancellation_plus_complete_doesnt_double_resolve() { StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_millis(50))), - summary: None, } .into(), ] @@ -463,7 +461,6 @@ async fn activity_cancellation_plus_complete_doesnt_double_resolve() { vec![StartTimer { seq: 2, start_to_fire_timeout: Some(prost_dur!(from_millis(100))), - summary: None, } .into()], )) @@ -562,7 +559,6 @@ async fn activity_cancellation_wait_cancellation_completed() { StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_millis(50))), - summary: None, } .into(), ] @@ -625,7 +621,6 @@ async fn activity_cancellation_abandon() { StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_millis(50))), - summary: None, } .into(), ] diff --git a/tests/integ_tests/workflow_tests/replay.rs b/tests/integ_tests/workflow_tests/replay.rs index ceca12e4e..b48893922 100644 --- a/tests/integ_tests/workflow_tests/replay.rs +++ b/tests/integ_tests/workflow_tests/replay.rs @@ -42,7 +42,6 @@ async fn timer_workflow_replay() { vec![StartTimer { seq: 0, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), - summary: None, } .into()], )) diff --git a/tests/integ_tests/workflow_tests/timers.rs b/tests/integ_tests/workflow_tests/timers.rs index 7d39a6b23..dc521def1 100644 --- a/tests/integ_tests/workflow_tests/timers.rs +++ b/tests/integ_tests/workflow_tests/timers.rs @@ -38,7 +38,6 @@ async fn timer_workflow_manual() { vec![StartTimer { seq: 0, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), - summary: None, } .into()], )) @@ -62,13 +61,11 @@ async fn timer_cancel_workflow() { StartTimer { seq: 0, start_to_fire_timeout: Some(prost_dur!(from_millis(50))), - summary: None, } .into(), StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(10))), - summary: None, } .into(), ],