Skip to content

Commit

Permalink
code written, need to write tests
Browse files Browse the repository at this point in the history
  • Loading branch information
yuandrew committed Nov 20, 2024
1 parent dd37e29 commit 4bda68a
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 4 deletions.
1 change: 1 addition & 0 deletions core/src/core_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ async fn replay_with_signal_and_update_same_task() {
StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}
.into(),
UpdateResponse {
Expand Down
2 changes: 2 additions & 0 deletions core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async fn after_shutdown_of_worker_get_shutdown_err() {
workflow_command::Variant::StartTimer(StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}),
))
.await
Expand Down Expand Up @@ -352,6 +353,7 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
workflow_command::Variant::StartTimer(StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}),
))
.await
Expand Down
7 changes: 6 additions & 1 deletion core/src/worker/workflow/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use temporal_sdk_core_protos::{
ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes,
ActivityTaskTimedOutEventAttributes,
},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -114,6 +115,10 @@ impl ActivityMachine {
internal_flags: InternalFlagsRef,
use_compatible_version: bool,
) -> NewMachineWithCommand {
let user_metadata = attrs.summary.clone().map(|x| UserMetadata {
summary: Some(x),
details: None,
});
let mut s = Self::from_parts(
Created {}.into(),
SharedState {
Expand All @@ -134,7 +139,7 @@ impl ActivityMachine {
s.shared_state().attrs.clone(),
use_compatible_version,
)),
user_metadata: Default::default(),
user_metadata,
};
NewMachineWithCommand {
command,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use temporal_sdk_core_protos::{
ChildWorkflowExecutionStartedEventAttributes,
StartChildWorkflowExecutionFailedEventAttributes,
},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -449,6 +450,15 @@ impl ChildWorkflowMachine {
cancelled_before_sent: false,
},
);
let user_metadata = if attribs.static_summary.is_some() || attribs.static_details.is_some()
{
Some(UserMetadata {
summary: attribs.static_summary.clone(),
details: attribs.static_details.clone(),
})
} else {
None
};
OnEventWrapper::on_event_mut(&mut s, ChildWorkflowMachineEvents::Schedule)
.expect("Scheduling child workflows doesn't fail");
let cmd = Command {
Expand All @@ -457,7 +467,7 @@ impl ChildWorkflowMachine {
attribs,
use_compatible_version,
)),
user_metadata: Default::default(),
user_metadata,
};
NewMachineWithCommand {
command: cmd,
Expand Down
9 changes: 7 additions & 2 deletions core/src/worker/workflow/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use temporal_sdk_core_protos::{
command::v1::Command,
enums::v1::{CommandType, EventType},
history::v1::{history_event, TimerFiredEventAttributes},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -73,13 +74,17 @@ pub(super) fn new_timer(attribs: StartTimer) -> NewMachineWithCommand {
impl TimerMachine {
/// Create a new timer and immediately schedule it
fn new_scheduled(attribs: StartTimer) -> (Self, Command) {
let user_metadata = attribs.summary.clone().map(|x| UserMetadata {
summary: Some(x),
details: None,
});
let mut s = Self::new(attribs);
OnEventWrapper::on_event_mut(&mut s, TimerMachineEvents::Schedule)
.expect("Scheduling timers doesn't fail");
let cmd = Command {
command_type: CommandType::StartTimer as i32,
attributes: Some(s.shared_state().attrs.into()),
user_metadata: Default::default(),
attributes: Some(s.shared_state().attrs.clone().into()),
user_metadata,
};
(s, cmd)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ message StartTimer {
// Lang's incremental sequence number, used as the operation identifier
uint32 seq = 1;
google.protobuf.Duration start_to_fire_timeout = 2;
// Summary that is stored as user_metadata
temporal.api.common.v1.Payload summary = 3;
}

message CancelTimer {
Expand Down Expand Up @@ -88,6 +90,8 @@ message ScheduleActivity {
bool do_not_eagerly_execute = 14;
// Whether this activity should run on a worker with a compatible build id or not.
coresdk.common.VersioningIntent versioning_intent = 15;
// Summary that is stored as user_metadata
temporal.api.common.v1.Payload summary = 16;
}

message ScheduleLocalActivity {
Expand Down Expand Up @@ -253,6 +257,10 @@ message StartChildWorkflowExecution {
child_workflow.ChildWorkflowCancellationType cancellation_type = 18;
// Whether this child should run on a worker with a compatible build id or not.
coresdk.common.VersioningIntent versioning_intent = 19;
// Static summary of the child workflow
temporal.api.common.v1.Payload static_summary = 20;
// Static details of the child workflow
temporal.api.common.v1.Payload static_details = 21;
}

// Cancel a child workflow
Expand Down
11 changes: 11 additions & 0 deletions sdk/src/workflow_context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
mod options;

use options::TimerOptions;
pub use options::{
ActivityOptions, ChildWorkflowOptions, LocalActivityOptions, Signal, SignalData,
SignalWorkflowOptions,
Expand Down Expand Up @@ -148,6 +149,15 @@ impl WfContext {

/// Request to create a timer
pub fn timer(&self, duration: Duration) -> impl CancellableFuture<TimerResult> {
self.timer_with_options(duration, TimerOptions::default())
}

/// Request to create a timer with optioons
pub fn timer_with_options(
&self,
duration: Duration,
opts: TimerOptions,
) -> impl CancellableFuture<TimerResult> {
let seq = self.seq_nums.write().next_timer_seq();
let (cmd, unblocker) = CancellableWFCommandFut::new(CancellableID::Timer(seq));
self.send(
Expand All @@ -159,6 +169,7 @@ impl WfContext {
.try_into()
.expect("Durations must fit into 64 bits"),
),
summary: opts.summary.map(|x| x.as_bytes().into()),
}
.into(),
unblocker,
Expand Down
17 changes: 17 additions & 0 deletions sdk/src/workflow_context/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub struct ActivityOptions {
pub cancellation_type: ActivityCancellationType,
/// Activity retry policy
pub retry_policy: Option<RetryPolicy>,
/// Summary of the activity
/// NOTE: Experimental
pub summary: Option<String>,
}

impl IntoWorkflowCommand for ActivityOptions {
Expand Down Expand Up @@ -192,6 +195,12 @@ pub struct ChildWorkflowOptions {
pub options: WorkflowOptions,
/// How to respond to parent workflow ending
pub parent_close_policy: ParentClosePolicy,
/// Static summary of the child workflow
/// NOTE: Experimental
pub static_summary: Option<String>,
/// Static details of the child workflow
/// NOTE: Experimental
pub static_details: Option<String>,
}

impl IntoWorkflowCommand for ChildWorkflowOptions {
Expand Down Expand Up @@ -307,3 +316,11 @@ impl SignalData {
self
}
}

/// Options for timer
#[derive(Default, Debug, Clone)]
pub struct TimerOptions {
/// Summary of the timer
/// NOTE: Experimental
pub summary: Option<String>,
}
2 changes: 2 additions & 0 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,7 @@ pub fn start_timer_cmd(seq: u32, duration: Duration) -> workflow_command::Varian
StartTimer {
seq,
start_to_fire_timeout: Some(duration.try_into().expect("duration fits")),
summary: None,
}
.into()
}
Expand Down Expand Up @@ -779,6 +780,7 @@ where
vec![StartTimer {
seq,
start_to_fire_timeout: Some(duration.try_into().expect("duration fits")),
summary: None,
}
.into()],
))
Expand Down

0 comments on commit 4bda68a

Please sign in to comment.