Skip to content

Commit

Permalink
wrote tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew committed Nov 21, 2024
1 parent 4bda68a commit 6da7a69
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 29 deletions.
63 changes: 58 additions & 5 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::{
advance_fut, job_assert, prost_dur,
test_help::{
build_fake_worker, build_mock_pollers, canned_histories, gen_assert_and_reply,
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_worker, poll_and_reply,
single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs, MocksHolder,
QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
mock_manual_poller, mock_poller, mock_poller_from_resps, mock_sdk_cfg, mock_worker,
poll_and_reply, single_hist_mock_sg, test_worker_cfg, MockPollCfg, MockWorkerInputs,
MocksHolder, QueueResponse, ResponseType, WorkerExt, WorkflowCachingPolicy, TEST_Q,
},
worker::client::mocks::{mock_manual_workflow_client, mock_workflow_client},
ActivityHeartbeat, Worker,
Expand Down Expand Up @@ -45,17 +45,18 @@ use temporal_sdk_core_protos::{
},
temporal::api::{
command::v1::{command::Attributes, ScheduleActivityTaskCommandAttributes},
enums::v1::EventType,
enums::v1::{CommandType, EventType},
history::v1::{
history_event::Attributes as EventAttributes, ActivityTaskScheduledEventAttributes,
},
sdk::v1::UserMetadata,
workflowservice::v1::{
PollActivityTaskQueueResponse, RecordActivityTaskHeartbeatResponse,
RespondActivityTaskCanceledResponse, RespondActivityTaskCompletedResponse,
RespondActivityTaskFailedResponse, RespondWorkflowTaskCompletedResponse,
},
},
TestHistoryBuilder, DEFAULT_WORKFLOW_TYPE,
TestHistoryBuilder, DEFAULT_ACTIVITY_TYPE, DEFAULT_WORKFLOW_TYPE,
};
use temporal_sdk_core_test_utils::{fanout_tasks, start_timer_cmd, TestWorker};
use tokio::{join, sync::Barrier, time::sleep};
Expand Down Expand Up @@ -1187,3 +1188,55 @@ async fn activities_must_be_flushed_to_server_on_shutdown(#[values(true, false)]
};
join!(shutdown_task, complete_task);
}

#[tokio::test]
async fn pass_activity_summary_to_metadata() {
crate::telemetry::test_telem_console();
let t = canned_histories::single_activity("1");
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let wf_id = mock_cfg.hists[0].wf_id.clone();
let wf_type = DEFAULT_WORKFLOW_TYPE;
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"activity summary".into()),
details: None,
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
asserts
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::ScheduleActivityTask
);
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
})
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::CompleteWorkflowExecution
);
});
});

let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
worker.register_wf(wf_type, |ctx: WfContext| async move {
ctx.activity(ActivityOptions {
activity_type: DEFAULT_ACTIVITY_TYPE.to_string(),
summary: Some("activity summary".to_string()),
..Default::default()
})
.await;
Ok(().into())
});
worker
.submit_wf(
wf_id.to_owned(),
wf_type.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
73 changes: 65 additions & 8 deletions core/src/core_tests/child_workflows.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,24 @@
use crate::{
replay::DEFAULT_WORKFLOW_TYPE,
test_help::{
build_fake_sdk, canned_histories, mock_sdk, mock_worker, single_hist_mock_sg, MockPollCfg,
ResponseType,
build_fake_sdk, canned_histories, mock_sdk, mock_sdk_cfg, mock_worker, single_hist_mock_sg,
MockPollCfg, ResponseType,
},
worker::client::mocks::mock_workflow_client,
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{ChildWorkflowOptions, Signal, WfContext, WorkflowResult};
use temporal_sdk_core_api::Worker;
use temporal_sdk_core_protos::coresdk::{
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
use temporal_sdk_core_protos::{
coresdk::{
child_workflow::{child_workflow_result, ChildWorkflowCancellationType},
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{
CancelChildWorkflowExecution, CompleteWorkflowExecution, StartChildWorkflowExecution,
},
workflow_completion::WorkflowActivationCompletion,
},
workflow_completion::WorkflowActivationCompletion,
temporal::api::{enums::v1::CommandType, sdk::v1::UserMetadata},
};
use tokio::join;

Expand Down Expand Up @@ -220,3 +223,57 @@ async fn cancel_already_complete_child_ignored() {
.await
.unwrap();
}

#[tokio::test]
async fn pass_child_workflow_summary_to_metadata() {
let wf_id = "1";
let wf_type = DEFAULT_WORKFLOW_TYPE;
let t = canned_histories::single_child_workflow(wf_id);
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"child summary".into()),
details: Some(b"child details".into()),
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
asserts
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::StartChildWorkflowExecution
);
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
})
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::CompleteWorkflowExecution
);
});
});

let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
worker.register_wf(wf_type, move |ctx: WfContext| async move {
ctx.child_workflow(ChildWorkflowOptions {
workflow_id: wf_id.to_string(),
workflow_type: "child".to_string(),
static_summary: Some("child summary".to_string()),
static_details: Some("child details".to_string()),
..Default::default()
})
.start(&ctx)
.await;
Ok(().into())
});
worker
.submit_wf(
wf_id.to_owned(),
wf_type.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
50 changes: 49 additions & 1 deletion core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
time::Duration,
};
use temporal_client::WorkflowOptions;
use temporal_sdk::{ActivityOptions, CancellableFuture, WfContext};
use temporal_sdk::{ActivityOptions, CancellableFuture, TimerOptions, WfContext};
use temporal_sdk_core_api::{
errors::PollWfError,
worker::{
Expand Down Expand Up @@ -64,6 +64,7 @@ use temporal_sdk_core_protos::{
history_event, TimerFiredEventAttributes,
WorkflowPropertiesModifiedExternallyEventAttributes,
},
sdk::v1::UserMetadata,
workflowservice::v1::{
GetWorkflowExecutionHistoryResponse, RespondWorkflowTaskCompletedResponse,
},
Expand Down Expand Up @@ -3086,3 +3087,50 @@ async fn slot_provider_cant_hand_out_more_permits_than_cache_size() {
// polling is not exceeding the task limit
assert_eq!(popped_tasks.load(Ordering::Relaxed), 10);
}

#[tokio::test]
async fn pass_timer_summary_to_metadata() {
let t = canned_histories::single_timer("1");
let mut mock_cfg = MockPollCfg::from_hist_builder(t);
let wf_id = mock_cfg.hists[0].wf_id.clone();
let wf_type = DEFAULT_WORKFLOW_TYPE;
let expected_user_metadata = Some(UserMetadata {
summary: Some(b"timer summary".into()),
details: None,
});
mock_cfg.completion_asserts_from_expectations(|mut asserts| {
asserts
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(wft.commands[0].command_type(), CommandType::StartTimer);
assert_eq!(wft.commands[0].user_metadata, expected_user_metadata)
})
.then(move |wft| {
assert_eq!(wft.commands.len(), 1);
assert_eq!(
wft.commands[0].command_type(),
CommandType::CompleteWorkflowExecution
);
});
});

let mut worker = mock_sdk_cfg(mock_cfg, |_| {});
worker.register_wf(wf_type, |ctx: WfContext| async move {
ctx.timer(TimerOptions {
duration: Duration::from_secs(1),
summary: Some("timer summary".to_string()),
})
.await;
Ok(().into())
});
worker
.submit_wf(
wf_id.to_owned(),
wf_type.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();
worker.run_until_done().await.unwrap();
}
2 changes: 1 addition & 1 deletion sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use tracing::{field, Instrument, Span};
pub use workflow_context::{
ActivityOptions, CancellableFuture, ChildWorkflow, ChildWorkflowOptions, LocalActivityOptions,
PendingChildWorkflow, Signal, SignalData, SignalWorkflowOptions, StartedChildWorkflow,
WfContext,
TimerOptions, WfContext,
};

use crate::{interceptors::WorkerInterceptor, workflow_context::ChildWfCommon};
Expand Down
19 changes: 5 additions & 14 deletions sdk/src/workflow_context.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
mod options;

use options::TimerOptions;
pub use options::{
ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, Signal, SignalData,
SignalWorkflowOptions,
SignalWorkflowOptions, TimerOptions,
};

use crate::{
Expand Down Expand Up @@ -148,24 +147,16 @@ impl WfContext {
}

/// Request to create a timer
pub fn timer(&self, duration: Duration) -> impl CancellableFuture<TimerResult> {
self.timer_with_options(duration, TimerOptions::default())
}

/// Request to create a timer with optioons
pub fn timer_with_options(
&self,
duration: Duration,
opts: TimerOptions,
) -> impl CancellableFuture<TimerResult> {
pub fn timer<T: Into<TimerOptions>>(&self, opts: T) -> impl CancellableFuture<TimerResult> {
let opts: TimerOptions = opts.into();
let seq = self.seq_nums.write().next_timer_seq();
let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq));
self.send(
CommandCreateRequest {
cmd: StartTimer {
seq,
start_to_fire_timeout: Some(
duration
opts.duration
.try_into()
.expect("Durations must fit into 64 bits"),
),
Expand Down Expand Up @@ -630,7 +621,7 @@ impl<'a> Future for LATimerBackoffFut<'a> {
});
}

let timer_f = self.ctx.timer(
let timer_f = self.ctx.timer::<Duration>(
b.backoff_duration
.expect("Duration is set")
.try_into()
Expand Down
14 changes: 14 additions & 0 deletions sdk/src/workflow_context/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl IntoWorkflowCommand for ActivityOptions {
cancellation_type: self.cancellation_type as i32,
arguments: vec![self.input],
retry_policy: self.retry_policy,
summary: self.summary.map(Into::into),
..Default::default()
}
}
Expand Down Expand Up @@ -226,6 +227,8 @@ impl IntoWorkflowCommand for ChildWorkflowOptions {
search_attributes: self.options.search_attributes.unwrap_or_default(),
cron_schedule: self.options.cron_schedule.unwrap_or_default(),
parent_close_policy: self.parent_close_policy as i32,
static_summary: self.static_summary.map(Into::into),
static_details: self.static_details.map(Into::into),
..Default::default()
}
}
Expand Down Expand Up @@ -320,7 +323,18 @@ impl SignalData {
/// Options for timer
#[derive(Default, Debug, Clone)]
pub struct TimerOptions {
/// Duration for the timer
pub duration: Duration,
/// Summary of the timer
/// NOTE: Experimental
pub summary: Option<String>,
}

impl From<Duration> for TimerOptions {
fn from(duration: Duration) -> Self {
TimerOptions {
duration,
..Default::default()
}
}
}

0 comments on commit 6da7a69

Please sign in to comment.