Skip to content

Commit

Permalink
Move user metadata to command from variant (#851)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Dec 10, 2024
1 parent 4a2368d commit 75800f1
Show file tree
Hide file tree
Showing 36 changed files with 574 additions and 486 deletions.
2 changes: 1 addition & 1 deletion core/src/abstractions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ struct UseCtx<'a, SK: SlotKind> {
permit: &'a SlotSupplierPermit,
}

impl<'a, SK: SlotKind> SlotMarkUsedContext for UseCtx<'a, SK> {
impl<SK: SlotKind> SlotMarkUsedContext for UseCtx<'_, SK> {
type SlotKind = SK;

fn permit(&self) -> &SlotSupplierPermit {
Expand Down
1 change: 0 additions & 1 deletion core/src/core_tests/updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,6 @@ async fn replay_with_signal_and_update_same_task() {
StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}
.into(),
UpdateResponse {
Expand Down
2 changes: 0 additions & 2 deletions core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ async fn after_shutdown_of_worker_get_shutdown_err() {
workflow_command::Variant::StartTimer(StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}),
))
.await
Expand Down Expand Up @@ -349,7 +348,6 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool)
workflow_command::Variant::StartTimer(StartTimer {
seq: 1,
start_to_fire_timeout: Some(prost_dur!(from_secs(1))),
summary: None,
}),
))
.await
Expand Down
2 changes: 1 addition & 1 deletion core/src/telemetry/log_export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl fmt::Debug for CoreLogStreamConsumer {

struct JsonVisitor<'a>(&'a mut HashMap<String, serde_json::Value>);

impl<'a> tracing::field::Visit for JsonVisitor<'a> {
impl tracing::field::Visit for JsonVisitor<'_> {
fn record_f64(&mut self, field: &tracing::field::Field, value: f64) {
self.0
.insert(field.name().to_string(), serde_json::json!(value));
Expand Down
2 changes: 2 additions & 0 deletions core/src/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,11 +276,13 @@ where
}
}

/// Helpers for test initialization
#[cfg(test)]
pub mod test_initters {
use super::*;
use temporal_sdk_core_api::telemetry::TelemetryOptionsBuilder;

/// Turn on logging to the console
#[allow(dead_code)] // Not always used, called to enable for debugging when needed
pub fn test_telem_console() {
telemetry_init_global(
Expand Down
1 change: 1 addition & 0 deletions core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@ macro_rules! advance_fut {
};
}

/// Helps easily construct prost proto durations from stdlib duration constructors
#[macro_export]
macro_rules! prost_dur {
($dur_call:ident $args:tt) => {
Expand Down
9 changes: 7 additions & 2 deletions core/src/worker/workflow/driven_workflow.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
telemetry::VecDisplayer,
worker::workflow::{OutgoingJob, WFCommand, WorkflowStartedInfo},
worker::workflow::{OutgoingJob, WFCommand, WFCommandVariant, WorkflowStartedInfo},
};
use prost_types::Timestamp;
use std::{
Expand Down Expand Up @@ -86,7 +86,12 @@ impl DrivenWorkflow {
/// from a buffer that the language side sinks into when it calls [crate::Core::complete_task]
pub(super) fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand> {
let in_cmds = self.incoming_commands.try_recv();
let in_cmds = in_cmds.unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang]);
let in_cmds = in_cmds.unwrap_or_else(|_| {
vec![WFCommand {
variant: WFCommandVariant::NoCommandsFromLang,
metadata: None,
}]
});
debug!(in_cmds = %in_cmds.display(), "wf bridge iteration fetch");
in_cmds
}
Expand Down
16 changes: 3 additions & 13 deletions core/src/worker/workflow/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use temporal_sdk_core_protos::{
ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes,
ActivityTaskTimedOutEventAttributes,
},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -115,10 +114,6 @@ impl ActivityMachine {
internal_flags: InternalFlagsRef,
use_compatible_version: bool,
) -> NewMachineWithCommand {
let user_metadata = attrs.summary.clone().map(|x| UserMetadata {
summary: Some(x),
details: None,
});
let mut s = Self::from_parts(
Created {}.into(),
SharedState {
Expand All @@ -133,16 +128,11 @@ impl ActivityMachine {
);
OnEventWrapper::on_event_mut(&mut s, ActivityMachineEvents::Schedule)
.expect("Scheduling activities doesn't fail");
let command = Command {
command_type: CommandType::ScheduleActivityTask as i32,
attributes: Some(schedule_activity_cmd_to_api(
NewMachineWithCommand {
command: schedule_activity_cmd_to_api(
s.shared_state().attrs.clone(),
use_compatible_version,
)),
user_metadata,
};
NewMachineWithCommand {
command,
),
machine: s.into(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use temporal_sdk_core_protos::{
workflow_activation::ResolveRequestCancelExternalWorkflow,
},
temporal::api::{
command::v1::{command, Command, RequestCancelExternalWorkflowExecutionCommandAttributes},
command::v1::{command, RequestCancelExternalWorkflowExecutionCommandAttributes},
enums::v1::{CancelExternalWorkflowExecutionFailedCause, CommandType, EventType},
failure::v1::{failure::FailureInfo, ApplicationFailureInfo, Failure},
history::v1::history_event,
Expand Down Expand Up @@ -75,13 +75,8 @@ pub(super) fn new_external_cancel(
reason,
},
);
let cmd = Command {
command_type: CommandType::RequestCancelExternalWorkflowExecution as i32,
attributes: Some(cmd_attrs),
user_metadata: Default::default(),
};
NewMachineWithCommand {
command: cmd,
command: cmd_attrs,
machine: s.into(),
}
}
Expand Down
12 changes: 2 additions & 10 deletions core/src/worker/workflow/machines/cancel_workflow_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,7 @@ use rustfsm::{fsm, StateMachine, TransitionResult};
use std::convert::TryFrom;
use temporal_sdk_core_protos::{
coresdk::workflow_commands::CancelWorkflowExecution,
temporal::api::{
command::v1::Command,
enums::v1::{CommandType, EventType},
},
temporal::api::enums::v1::{CommandType, EventType},
};

fsm! {
Expand All @@ -34,13 +31,8 @@ pub(super) fn cancel_workflow(attribs: CancelWorkflowExecution) -> NewMachineWit
let mut machine = CancelWorkflowMachine::from_parts(Created {}.into(), ());
OnEventWrapper::on_event_mut(&mut machine, CancelWorkflowMachineEvents::Schedule)
.expect("Scheduling continue as new machine doesn't fail");
let command = Command {
command_type: CommandType::CancelWorkflowExecution as i32,
attributes: Some(attribs.into()),
user_metadata: Default::default(),
};
NewMachineWithCommand {
command,
command: attribs.into(),
machine: machine.into(),
}
}
Expand Down
24 changes: 2 additions & 22 deletions core/src/worker/workflow/machines/child_workflow_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use temporal_sdk_core_protos::{
},
temporal::api::{
command::v1::{
start_child_workflow_cmd_to_api, Command,
start_child_workflow_cmd_to_api,
RequestCancelExternalWorkflowExecutionCommandAttributes,
},
common::v1::{Payload, Payloads, WorkflowExecution, WorkflowType},
Expand All @@ -41,7 +41,6 @@ use temporal_sdk_core_protos::{
ChildWorkflowExecutionStartedEventAttributes,
StartChildWorkflowExecutionFailedEventAttributes,
},
sdk::v1::UserMetadata,
},
};

Expand Down Expand Up @@ -450,29 +449,10 @@ impl ChildWorkflowMachine {
cancelled_before_sent: false,
},
);
let mut attribs = attribs;
let user_metadata = if attribs.static_summary.is_some() || attribs.static_details.is_some()
{
Some(UserMetadata {
summary: attribs.static_summary.take(),
details: attribs.static_details.take(),
})
} else {
None
};
let attribs = attribs;
OnEventWrapper::on_event_mut(&mut s, ChildWorkflowMachineEvents::Schedule)
.expect("Scheduling child workflows doesn't fail");
let cmd = Command {
command_type: CommandType::StartChildWorkflowExecution as i32,
attributes: Some(start_child_workflow_cmd_to_api(
attribs,
use_compatible_version,
)),
user_metadata,
};
NewMachineWithCommand {
command: cmd,
command: start_child_workflow_cmd_to_api(attribs, use_compatible_version),
machine: s.into(),
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::convert::TryFrom;
use temporal_sdk_core_protos::{
coresdk::workflow_commands::CompleteWorkflowExecution,
temporal::api::{
command::v1::Command,
command::v1::command,
enums::v1::{CommandType, EventType},
},
};
Expand All @@ -30,34 +30,26 @@ fsm! {

#[derive(Debug, derive_more::Display)]
pub(super) enum CompleteWFCommand {
AddCommand(Command),
AddCommand(command::Attributes),
}

/// Complete a workflow
pub(super) fn complete_workflow(attribs: CompleteWorkflowExecution) -> NewMachineWithCommand {
let (machine, add_cmd) = CompleteWorkflowMachine::new_scheduled(attribs);
let mut machine = CompleteWorkflowMachine::from_parts(Created { attribs }.into(), ());
let add_cmd =
match OnEventWrapper::on_event_mut(&mut machine, CompleteWorkflowMachineEvents::Schedule)
.expect("Scheduling complete wf machines doesn't fail")
.pop()
{
Some(CompleteWFCommand::AddCommand(c)) => c,
_ => panic!("complete wf machine on_schedule must produce command"),
};
NewMachineWithCommand {
command: add_cmd,
machine: machine.into(),
}
}

impl CompleteWorkflowMachine {
/// Create a new WF machine and schedule it
pub(crate) fn new_scheduled(attribs: CompleteWorkflowExecution) -> (Self, Command) {
let mut s = Self::from_parts(Created { attribs }.into(), ());
let cmd =
match OnEventWrapper::on_event_mut(&mut s, CompleteWorkflowMachineEvents::Schedule)
.expect("Scheduling complete wf machines doesn't fail")
.pop()
{
Some(CompleteWFCommand::AddCommand(c)) => c,
_ => panic!("complete wf machine on_schedule must produce command"),
};
(s, cmd)
}
}

impl TryFrom<HistEventData> for CompleteWorkflowMachineEvents {
type Error = WFMachinesError;

Expand Down Expand Up @@ -94,12 +86,7 @@ impl Created {
pub(super) fn on_schedule(
self,
) -> CompleteWorkflowMachineTransition<CompleteWorkflowCommandCreated> {
let cmd = Command {
command_type: CommandType::CompleteWorkflowExecution as i32,
attributes: Some(self.attribs.into()),
user_metadata: Default::default(),
};
TransitionResult::commands(vec![CompleteWFCommand::AddCommand(cmd)])
TransitionResult::commands(vec![CompleteWFCommand::AddCommand(self.attribs.into())])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::convert::TryFrom;
use temporal_sdk_core_protos::{
coresdk::workflow_commands::ContinueAsNewWorkflowExecution,
temporal::api::{
command::v1::{continue_as_new_cmd_to_api, Command},
command::v1::continue_as_new_cmd_to_api,
enums::v1::{CommandType, EventType},
},
};
Expand Down Expand Up @@ -37,13 +37,8 @@ pub(super) fn continue_as_new(
let mut machine = ContinueAsNewWorkflowMachine::from_parts(Created {}.into(), ());
OnEventWrapper::on_event_mut(&mut machine, ContinueAsNewWorkflowMachineEvents::Schedule)
.expect("Scheduling continue as new machine doesn't fail");
let command = Command {
command_type: CommandType::ContinueAsNewWorkflowExecution as i32,
attributes: Some(continue_as_new_cmd_to_api(attribs, use_compatible_version)),
user_metadata: Default::default(),
};
NewMachineWithCommand {
command,
command: continue_as_new_cmd_to_api(attribs, use_compatible_version),
machine: machine.into(),
}
}
Expand Down
13 changes: 4 additions & 9 deletions core/src/worker/workflow/machines/fail_workflow_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::convert::TryFrom;
use temporal_sdk_core_protos::{
coresdk::workflow_commands::FailWorkflowExecution,
temporal::api::{
command::v1::Command as ProtoCommand,
command::v1::command,
enums::v1::{CommandType, EventType},
},
};
Expand All @@ -27,7 +27,7 @@ fsm! {

#[derive(Debug, derive_more::Display)]
pub(super) enum FailWFCommand {
AddCommand(ProtoCommand),
AddCommand(command::Attributes),
}

/// Fail a workflow
Expand All @@ -41,7 +41,7 @@ pub(super) fn fail_workflow(attribs: FailWorkflowExecution) -> NewMachineWithCom

impl FailWorkflowMachine {
/// Create a new WF machine and schedule it
pub(crate) fn new_scheduled(attribs: FailWorkflowExecution) -> (Self, ProtoCommand) {
pub(crate) fn new_scheduled(attribs: FailWorkflowExecution) -> (Self, command::Attributes) {
let mut s = Self::from_parts(Created { attribs }.into(), ());
let cmd = match OnEventWrapper::on_event_mut(&mut s, FailWorkflowMachineEvents::Schedule)
.expect("Scheduling fail wf machines doesn't fail")
Expand All @@ -61,12 +61,7 @@ pub(super) struct Created {

impl Created {
pub(super) fn on_schedule(self) -> FailWorkflowMachineTransition<FailWorkflowCommandCreated> {
let cmd = ProtoCommand {
command_type: CommandType::FailWorkflowExecution as i32,
attributes: Some(self.attribs.into()),
user_metadata: Default::default(),
};
TransitionResult::commands(vec![FailWFCommand::AddCommand(cmd)])
TransitionResult::commands(vec![FailWFCommand::AddCommand(self.attribs.into())])
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/worker/workflow/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use std::{
fmt::{Debug, Display},
};
use temporal_sdk_core_protos::temporal::api::{
command::v1::Command as ProtoCommand,
enums::v1::{CommandType, EventType},
history::v1::HistoryEvent,
};
Expand Down Expand Up @@ -310,7 +309,7 @@ where
}

struct NewMachineWithCommand {
command: ProtoCommand,
command: temporal_sdk_core_protos::temporal::api::command::v1::command::Attributes,
machine: Machines,
}

Expand Down
Loading

0 comments on commit 75800f1

Please sign in to comment.