diff --git a/core/src/core_tests/updates.rs b/core/src/core_tests/updates.rs index 2c565525b..727dbb367 100644 --- a/core/src/core_tests/updates.rs +++ b/core/src/core_tests/updates.rs @@ -291,6 +291,7 @@ async fn replay_with_signal_and_update_same_task() { StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), + summary: None, } .into(), UpdateResponse { diff --git a/core/src/core_tests/workers.rs b/core/src/core_tests/workers.rs index 3a5ea2519..b4107632f 100644 --- a/core/src/core_tests/workers.rs +++ b/core/src/core_tests/workers.rs @@ -45,6 +45,7 @@ async fn after_shutdown_of_worker_get_shutdown_err() { workflow_command::Variant::StartTimer(StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), + summary: None, }), )) .await @@ -352,6 +353,7 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool) workflow_command::Variant::StartTimer(StartTimer { seq: 1, start_to_fire_timeout: Some(prost_dur!(from_secs(1))), + summary: None, }), )) .await diff --git a/core/src/worker/workflow/machines/activity_state_machine.rs b/core/src/worker/workflow/machines/activity_state_machine.rs index ee60f0089..45e79b8a8 100644 --- a/core/src/worker/workflow/machines/activity_state_machine.rs +++ b/core/src/worker/workflow/machines/activity_state_machine.rs @@ -30,6 +30,7 @@ use temporal_sdk_core_protos::{ ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, ActivityTaskTimedOutEventAttributes, }, + sdk::v1::UserMetadata, }, }; @@ -114,6 +115,10 @@ impl ActivityMachine { internal_flags: InternalFlagsRef, use_compatible_version: bool, ) -> NewMachineWithCommand { + let user_metadata = attrs.summary.clone().map(|x| UserMetadata { + summary: Some(x), + details: None, + }); let mut s = Self::from_parts( Created {}.into(), SharedState { @@ -134,7 +139,7 @@ impl ActivityMachine { s.shared_state().attrs.clone(), use_compatible_version, )), - user_metadata: Default::default(), + user_metadata, }; NewMachineWithCommand { command, diff --git a/core/src/worker/workflow/machines/child_workflow_state_machine.rs b/core/src/worker/workflow/machines/child_workflow_state_machine.rs index 79067c304..2e34900ff 100644 --- a/core/src/worker/workflow/machines/child_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/child_workflow_state_machine.rs @@ -41,6 +41,7 @@ use temporal_sdk_core_protos::{ ChildWorkflowExecutionStartedEventAttributes, StartChildWorkflowExecutionFailedEventAttributes, }, + sdk::v1::UserMetadata, }, }; @@ -449,6 +450,15 @@ impl ChildWorkflowMachine { cancelled_before_sent: false, }, ); + let user_metadata = if attribs.static_summary.is_some() || attribs.static_details.is_some() + { + Some(UserMetadata { + summary: attribs.static_summary.clone(), + details: attribs.static_details.clone(), + }) + } else { + None + }; OnEventWrapper::on_event_mut(&mut s, ChildWorkflowMachineEvents::Schedule) .expect("Scheduling child workflows doesn't fail"); let cmd = Command { @@ -457,7 +467,7 @@ impl ChildWorkflowMachine { attribs, use_compatible_version, )), - user_metadata: Default::default(), + user_metadata, }; NewMachineWithCommand { command: cmd, diff --git a/core/src/worker/workflow/machines/timer_state_machine.rs b/core/src/worker/workflow/machines/timer_state_machine.rs index 65674dfef..74060e237 100644 --- a/core/src/worker/workflow/machines/timer_state_machine.rs +++ b/core/src/worker/workflow/machines/timer_state_machine.rs @@ -17,6 +17,7 @@ use temporal_sdk_core_protos::{ command::v1::Command, enums::v1::{CommandType, EventType}, history::v1::{history_event, TimerFiredEventAttributes}, + sdk::v1::UserMetadata, }, }; @@ -73,13 +74,17 @@ pub(super) fn new_timer(attribs: StartTimer) -> NewMachineWithCommand { impl TimerMachine { /// Create a new timer and immediately schedule it fn new_scheduled(attribs: StartTimer) -> (Self, Command) { + let user_metadata = attribs.summary.clone().map(|x| UserMetadata { + summary: Some(x), + details: None, + }); let mut s = Self::new(attribs); OnEventWrapper::on_event_mut(&mut s, TimerMachineEvents::Schedule) .expect("Scheduling timers doesn't fail"); let cmd = Command { command_type: CommandType::StartTimer as i32, - attributes: Some(s.shared_state().attrs.into()), - user_metadata: Default::default(), + attributes: Some(s.shared_state().attrs.clone().into()), + user_metadata, }; (s, cmd) } diff --git a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto index da0e397a4..aa55226df 100644 --- a/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto +++ b/sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto @@ -46,6 +46,8 @@ message StartTimer { // Lang's incremental sequence number, used as the operation identifier uint32 seq = 1; google.protobuf.Duration start_to_fire_timeout = 2; + // Summary that is stored as user_metadata + temporal.api.common.v1.Payload summary = 3; } message CancelTimer { @@ -88,6 +90,8 @@ message ScheduleActivity { bool do_not_eagerly_execute = 14; // Whether this activity should run on a worker with a compatible build id or not. coresdk.common.VersioningIntent versioning_intent = 15; + // Summary that is stored as user_metadata + temporal.api.common.v1.Payload summary = 16; } message ScheduleLocalActivity { @@ -253,6 +257,10 @@ message StartChildWorkflowExecution { child_workflow.ChildWorkflowCancellationType cancellation_type = 18; // Whether this child should run on a worker with a compatible build id or not. coresdk.common.VersioningIntent versioning_intent = 19; + // Static summary of the child workflow + temporal.api.common.v1.Payload static_summary = 20; + // Static details of the child workflow + temporal.api.common.v1.Payload static_details = 21; } // Cancel a child workflow diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 622a026e9..73e20afc1 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -1,5 +1,6 @@ mod options; +use options::TimerOptions; pub use options::{ ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, Signal, SignalData, SignalWorkflowOptions, @@ -148,6 +149,15 @@ impl WfContext { /// Request to create a timer pub fn timer(&self, duration: Duration) -> impl CancellableFuture { + self.timer_with_options(duration, TimerOptions::default()) + } + + /// Request to create a timer with optioons + pub fn timer_with_options( + &self, + duration: Duration, + opts: TimerOptions, + ) -> impl CancellableFuture { let seq = self.seq_nums.write().next_timer_seq(); let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq)); self.send( @@ -159,6 +169,7 @@ impl WfContext { .try_into() .expect("Durations must fit into 64 bits"), ), + summary: opts.summary.map(|x| x.as_bytes().into()), } .into(), unblocker, diff --git a/sdk/src/workflow_context/options.rs b/sdk/src/workflow_context/options.rs index d23b85293..06f6fda29 100644 --- a/sdk/src/workflow_context/options.rs +++ b/sdk/src/workflow_context/options.rs @@ -68,6 +68,9 @@ pub struct ActivityOptions { pub cancellation_type: ActivityCancellationType, /// Activity retry policy pub retry_policy: Option, + /// Summary of the activity + /// NOTE: Experimental + pub summary: Option, } impl IntoWorkflowCommand for ActivityOptions { @@ -192,6 +195,12 @@ pub struct ChildWorkflowOptions { pub options: WorkflowOptions, /// How to respond to parent workflow ending pub parent_close_policy: ParentClosePolicy, + /// Static summary of the child workflow + /// NOTE: Experimental + pub static_summary: Option, + /// Static details of the child workflow + /// NOTE: Experimental + pub static_details: Option, } impl IntoWorkflowCommand for ChildWorkflowOptions { @@ -307,3 +316,11 @@ impl SignalData { self } } + +/// Options for timer +#[derive(Default, Debug, Clone)] +pub struct TimerOptions { + /// Summary of the timer + /// NOTE: Experimental + pub summary: Option, +} diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 1bb401fc9..39ff40771 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -716,6 +716,7 @@ pub fn start_timer_cmd(seq: u32, duration: Duration) -> workflow_command::Varian StartTimer { seq, start_to_fire_timeout: Some(duration.try_into().expect("duration fits")), + summary: None, } .into() } @@ -779,6 +780,7 @@ where vec![StartTimer { seq, start_to_fire_timeout: Some(duration.try_into().expect("duration fits")), + summary: None, } .into()], ))