diff --git a/core/src/core_tests/activity_tasks.rs b/core/src/core_tests/activity_tasks.rs index b67f94e7..6c1f1655 100644 --- a/core/src/core_tests/activity_tasks.rs +++ b/core/src/core_tests/activity_tasks.rs @@ -2,9 +2,9 @@ use crate::{ advance_fut, job_assert, prost_dur, test_help::{ build_fake_worker, build_mock_pollers, canned_histories, gen_assert_and_reply, - mock_manual_poller, mock_poller, mock_poller_from_resps, mock_worker, poll_and_reply, - single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder, - QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q, + mock_manual_poller, mock_poller, mock_poller_from_resps, mock_sdk_cfg, mock_worker, + poll_and_reply, single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, + MocksHolder, QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q, }, worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client}, ActivityHeartbeat, Worker, @@ -45,17 +45,18 @@ use temporal_sdk_core_protos::{ }, temporal::api::{ command::v1::{command::Attributes, ScheduleActivityTaskCommandAttributes}, - enums::v1::EventType, + enums::v1::{CommandType, EventType}, history::v1::{ history_event::Attributes as EventAttributes, ActivityTaskScheduledEventAttributes, }, + sdk::v1::UserMetadata, workflowservice::v1::{ PollActivityTaskQueueResponse, RecordActivityTaskHeartbeatResponse, RespondActivityTaskCanceledResponse, RespondActivityTaskCompletedResponse, RespondActivityTaskFailedResponse, RespondWorkflowTaskCompletedResponse, }, }, - TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE, + TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE, }; use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, TestWorker}; use tokio::{join, sync::Barrier, time::sleep}; @@ -1187,3 +1188,55 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)] }; join!(shutdown_task, complete_task); } + +#[tokio::test] +async fn pass_activity_summary_to_metadata() { + crate::telemetry::test_telem_console(); + let t = canned_histories::single_activity("1"); + let mut mock_cfg = MockPollCfg::from_hist_builder(t); + let wf_id = mock_cfg.hists[0].wf_id.clone(); + let wf_type = DEFAULT_WORKFLOW_TYPE; + let expected_user_metadata = Some(UserMetadata { + summary: Some(b"activity summary".into()), + details: None, + }); + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + asserts + .then(move |wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!( + wft.commands[0].command_type(), + CommandType::ScheduleActivityTask + ); + assert_eq!(wft.commands[0].user_metadata, expected_user_metadata) + }) + .then(move |wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!( + wft.commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + }); + }); + + let mut worker = mock_sdk_cfg(mock_cfg, |_| {}); + worker.register_wf(wf_type, |ctx: WfContext| async move { + ctx.activity(ActivityOptions { + activity_type: DEFAULT_ACTIVITY_TYPE.to_string(), + summary: Some("activity summary".to_string()), + ..Default::default() + }) + .await; + Ok(().into()) + }); + worker + .submit_wf( + wf_id.to_owned(), + wf_type.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); +} diff --git a/core/src/core_tests/child_workflows.rs b/core/src/core_tests/child_workflows.rs index 7782bc84..9a3ef525 100644 --- a/core/src/core_tests/child_workflows.rs +++ b/core/src/core_tests/child_workflows.rs @@ -1,21 +1,24 @@ use crate::{ replay::DEFAULT_WORKFLOW_TYPE, test_help::{ - build_fake_sdk, canned_histories, mock_sdk, mock_worker, single_hist_mock_sg, MockPollCfg, - ResponseType, + build_fake_sdk, canned_histories, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg, + MockPollCfg, ResponseType, }, worker::client::mocks::mock_workflow_client, }; use temporal_client::WorkflowOptions; use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowResult}; use temporal_sdk_core_api::Worker; -use temporal_sdk_core_protos::coresdk::{ - child_workflow::{child_workflow_result, ChildWorkflowCancellationType}, - workflow_activation::{workflow_activation_job, WorkflowActivationJob}, - workflow_commands::{ - CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution, +use temporal_sdk_core_protos::{ + coresdk::{ + child_workflow::{child_workflow_result, ChildWorkflowCancellationType}, + workflow_activation::{workflow_activation_job, WorkflowActivationJob}, + workflow_commands::{ + CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution, + }, + workflow_completion::WorkflowActivationCompletion, }, - workflow_completion::WorkflowActivationCompletion, + temporal::api::{enums::v1::CommandType, sdk::v1::UserMetadata}, }; use tokio::join; @@ -220,3 +223,57 @@ async fn cancel_already_complete_child_ignored() { .await .unwrap(); } + +#[tokio::test] +async fn pass_child_workflow_summary_to_metadata() { + let wf_id = "1"; + let wf_type = DEFAULT_WORKFLOW_TYPE; + let t = canned_histories::single_child_workflow(wf_id); + let mut mock_cfg = MockPollCfg::from_hist_builder(t); + let expected_user_metadata = Some(UserMetadata { + summary: Some(b"child summary".into()), + details: Some(b"child details".into()), + }); + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + asserts + .then(move |wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!( + wft.commands[0].command_type(), + CommandType::StartChildWorkflowExecution + ); + assert_eq!(wft.commands[0].user_metadata, expected_user_metadata) + }) + .then(move |wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!( + wft.commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + }); + }); + + let mut worker = mock_sdk_cfg(mock_cfg, |_| {}); + worker.register_wf(wf_type, move |ctx: WfContext| async move { + ctx.child_workflow(ChildWorkflowOptions { + workflow_id: wf_id.to_string(), + workflow_type: "child".to_string(), + static_summary: Some("child summary".to_string()), + static_details: Some("child details".to_string()), + ..Default::default() + }) + .start(&ctx) + .await; + Ok(().into()) + }); + worker + .submit_wf( + wf_id.to_owned(), + wf_type.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); +} diff --git a/core/src/core_tests/workflow_tasks.rs b/core/src/core_tests/workflow_tasks.rs index f80d00ec..d218a29c 100644 --- a/core/src/core_tests/workflow_tasks.rs +++ b/core/src/core_tests/workflow_tasks.rs @@ -29,7 +29,7 @@ use std::{ time::Duration, }; use temporal_client::WorkflowOptions; -use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext}; +use temporal_sdk::{ActivityOptions, CancellableFuture, TimerOptions, WfContext}; use temporal_sdk_core_api::{ errors::PollWfError, worker::{ @@ -64,6 +64,7 @@ use temporal_sdk_core_protos::{ history_event, TimerFiredEventAttributes, WorkflowPropertiesModifiedExternallyEventAttributes, }, + sdk::v1::UserMetadata, workflowservice::v1::{ GetWorkflowExecutionHistoryResponse, RespondWorkflowTaskCompletedResponse, }, @@ -3086,3 +3087,50 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() { // polling is not exceeding the task limit assert_eq!(popped_tasks.load(Ordering::Relaxed), 10); } + +#[tokio::test] +async fn pass_timer_summary_to_metadata() { + let t = canned_histories::single_timer("1"); + let mut mock_cfg = MockPollCfg::from_hist_builder(t); + let wf_id = mock_cfg.hists[0].wf_id.clone(); + let wf_type = DEFAULT_WORKFLOW_TYPE; + let expected_user_metadata = Some(UserMetadata { + summary: Some(b"timer summary".into()), + details: None, + }); + mock_cfg.completion_asserts_from_expectations(|mut asserts| { + asserts + .then(move |wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!(wft.commands[0].command_type(), CommandType::StartTimer); + assert_eq!(wft.commands[0].user_metadata, expected_user_metadata) + }) + .then(move |wft| { + assert_eq!(wft.commands.len(), 1); + assert_eq!( + wft.commands[0].command_type(), + CommandType::CompleteWorkflowExecution + ); + }); + }); + + let mut worker = mock_sdk_cfg(mock_cfg, |_| {}); + worker.register_wf(wf_type, |ctx: WfContext| async move { + ctx.timer(TimerOptions { + duration: Duration::from_secs(1), + summary: Some("timer summary".to_string()), + }) + .await; + Ok(().into()) + }); + worker + .submit_wf( + wf_id.to_owned(), + wf_type.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + worker.run_until_done().await.unwrap(); +} diff --git a/sdk/src/lib.rs b/sdk/src/lib.rs index a103aa63..b03c864b 100644 --- a/sdk/src/lib.rs +++ b/sdk/src/lib.rs @@ -57,7 +57,7 @@ use tracing::{field, Instrument, Span}; pub use workflow_context::{ ActivityOptions, CancellableFuture, ChildWorkflow, ChildWorkflowOptions, LocalActivityOptions, PendingChildWorkflow, Signal, SignalData, SignalWorkflowOptions, StartedChildWorkflow, - WfContext, + TimerOptions, WfContext, }; use crate::{interceptors::WorkerInterceptor, workflow_context::ChildWfCommon}; diff --git a/sdk/src/workflow_context.rs b/sdk/src/workflow_context.rs index 73e20afc..809112d8 100644 --- a/sdk/src/workflow_context.rs +++ b/sdk/src/workflow_context.rs @@ -1,9 +1,8 @@ mod options; -use options::TimerOptions; pub use options::{ ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, Signal, SignalData, - SignalWorkflowOptions, + SignalWorkflowOptions, TimerOptions, }; use crate::{ @@ -148,16 +147,8 @@ impl WfContext { } /// Request to create a timer - pub fn timer(&self, duration: Duration) -> impl CancellableFuture { - self.timer_with_options(duration, TimerOptions::default()) - } - - /// Request to create a timer with optioons - pub fn timer_with_options( - &self, - duration: Duration, - opts: TimerOptions, - ) -> impl CancellableFuture { + pub fn timer>(&self, opts: T) -> impl CancellableFuture { + let opts: TimerOptions = opts.into(); let seq = self.seq_nums.write().next_timer_seq(); let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq)); self.send( @@ -165,7 +156,7 @@ impl WfContext { cmd: StartTimer { seq, start_to_fire_timeout: Some( - duration + opts.duration .try_into() .expect("Durations must fit into 64 bits"), ), @@ -630,7 +621,7 @@ impl<'a> Future for LATimerBackoffFut<'a> { }); } - let timer_f = self.ctx.timer( + let timer_f = self.ctx.timer::( b.backoff_duration .expect("Duration is set") .try_into() diff --git a/sdk/src/workflow_context/options.rs b/sdk/src/workflow_context/options.rs index 06f6fda2..e24fefd0 100644 --- a/sdk/src/workflow_context/options.rs +++ b/sdk/src/workflow_context/options.rs @@ -95,6 +95,7 @@ impl IntoWorkflowCommand for ActivityOptions { cancellation_type: self.cancellation_type as i32, arguments: vec![self.input], retry_policy: self.retry_policy, + summary: self.summary.map(Into::into), ..Default::default() } } @@ -226,6 +227,8 @@ impl IntoWorkflowCommand for ChildWorkflowOptions { search_attributes: self.options.search_attributes.unwrap_or_default(), cron_schedule: self.options.cron_schedule.unwrap_or_default(), parent_close_policy: self.parent_close_policy as i32, + static_summary: self.static_summary.map(Into::into), + static_details: self.static_details.map(Into::into), ..Default::default() } } @@ -320,7 +323,18 @@ impl SignalData { /// Options for timer #[derive(Default, Debug, Clone)] pub struct TimerOptions { + /// Duration for the timer + pub duration: Duration, /// Summary of the timer /// NOTE: Experimental pub summary: Option, } + +impl From for TimerOptions { + fn from(duration: Duration) -> Self { + TimerOptions { + duration, + ..Default::default() + } + } +}