Skip to content

Commit

Permalink
Bugfix for failure to send protocol msg when resuming unfinished upda…
Browse files Browse the repository at this point in the history
…tes (#624)
  • Loading branch information
Sushisource authored Nov 1, 2023
1 parent 35cbbc4 commit ee7f6ca
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 59 deletions.
1 change: 1 addition & 0 deletions client/src/workflow_handle/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct WorkflowHandle<ClientT, ResultT> {
}

/// Holds needed information to refer to a specific workflow run, or workflow execution chain
#[derive(Debug)]
pub struct WorkflowExecutionInfo {
/// Namespace the workflow lives in
pub namespace: String,
Expand Down
33 changes: 26 additions & 7 deletions core/src/ephemeral_server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,16 @@ pub struct TemporaliteConfig {
impl TemporaliteConfig {
/// Start a Temporalite server.
pub async fn start_server(&self) -> anyhow::Result<EphemeralServer> {
self.start_server_with_output(Stdio::inherit()).await
self.start_server_with_output(Stdio::inherit(), Stdio::inherit())
.await
}

/// Start a Temporalite server with configurable stdout destination.
pub async fn start_server_with_output(&self, output: Stdio) -> anyhow::Result<EphemeralServer> {
pub async fn start_server_with_output(
&self,
output: Stdio,
err_output: Stdio,
) -> anyhow::Result<EphemeralServer> {
// Get exe path
let exe_path = self
.exe
Expand Down Expand Up @@ -104,6 +109,7 @@ impl TemporaliteConfig {
args,
has_test_service: false,
output,
err_output,
})
.await
}
Expand Down Expand Up @@ -140,11 +146,16 @@ pub struct TemporalDevServerConfig {
impl TemporalDevServerConfig {
/// Start a Temporal CLI dev server.
pub async fn start_server(&self) -> anyhow::Result<EphemeralServer> {
self.start_server_with_output(Stdio::inherit()).await
self.start_server_with_output(Stdio::inherit(), Stdio::inherit())
.await
}

/// Start a Temporal CLI dev server with configurable stdout destination.
pub async fn start_server_with_output(&self, output: Stdio) -> anyhow::Result<EphemeralServer> {
pub async fn start_server_with_output(
&self,
output: Stdio,
err_output: Stdio,
) -> anyhow::Result<EphemeralServer> {
// Get exe path
let exe_path = self
.exe
Expand Down Expand Up @@ -191,6 +202,7 @@ impl TemporalDevServerConfig {
args,
has_test_service: false,
output,
err_output,
})
.await
}
Expand All @@ -212,11 +224,16 @@ pub struct TestServerConfig {
impl TestServerConfig {
/// Start a test server.
pub async fn start_server(&self) -> anyhow::Result<EphemeralServer> {
self.start_server_with_output(Stdio::inherit()).await
self.start_server_with_output(Stdio::inherit(), Stdio::inherit())
.await
}

/// Start a test server with configurable stdout.
pub async fn start_server_with_output(&self, output: Stdio) -> anyhow::Result<EphemeralServer> {
pub async fn start_server_with_output(
&self,
output: Stdio,
err_output: Stdio,
) -> anyhow::Result<EphemeralServer> {
// Get exe path
let exe_path = self
.exe
Expand All @@ -237,6 +254,7 @@ impl TestServerConfig {
args,
has_test_service: true,
output,
err_output,
})
.await
}
Expand All @@ -248,6 +266,7 @@ struct EphemeralServerConfig {
args: Vec<String>,
has_test_service: bool,
output: Stdio,
err_output: Stdio,
}

/// Server that will be stopped when dropped.
Expand All @@ -263,11 +282,11 @@ pub struct EphemeralServer {
impl EphemeralServer {
async fn start(config: EphemeralServerConfig) -> anyhow::Result<EphemeralServer> {
// Start process
// TODO(cretz): Offer stdio suppression?
let child = tokio::process::Command::new(config.exe_path)
.args(config.args)
.stdin(Stdio::null())
.stdout(config.output)
.stderr(config.err_output)
.spawn()?;
let target = format!("127.0.0.1:{}", config.port);
let target_url = format!("http://{target}");
Expand Down
46 changes: 17 additions & 29 deletions core/src/worker/workflow/machines/update_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ pub(super) struct SharedState {
instance_id: String,
event_seq_id: i64,
request: UpdateRequest,
replaying_when_created: bool,
}

impl UpdateMachine {
Expand All @@ -93,7 +92,6 @@ impl UpdateMachine {
instance_id: instance_id.clone(),
event_seq_id,
request: request.clone(),
replaying_when_created: replaying,
},
);
let do_update = DoUpdate {
Expand Down Expand Up @@ -151,40 +149,34 @@ impl UpdateMachine {
outgoing_id: String,
msg: UpdateMsg,
) -> Result<Vec<MachineResponse>, WFMachinesError> {
let mut responses = vec![];
if let Some(r) = self.build_msg(outgoing_id.clone(), msg)? {
responses.push(r)
};
responses.push(MachineResponse::IssueNewCommand(
command::Attributes::ProtocolMessageCommandAttributes(
ProtocolMessageCommandAttributes {
message_id: outgoing_id,
},
)
.into(),
));
Ok(responses)
Ok(vec![
self.build_msg(outgoing_id.clone(), msg)?,
MachineResponse::IssueNewCommand(
command::Attributes::ProtocolMessageCommandAttributes(
ProtocolMessageCommandAttributes {
message_id: outgoing_id,
},
)
.into(),
),
])
}

/// Build an outgoing protocol message. Returns Ok(None) during replay, since we don't want
/// to re-send message replies in that case
/// Build an outgoing protocol message.
fn build_msg(
&self,
outgoing_id: String,
msg: UpdateMsg,
) -> Result<Option<MachineResponse>, WFMachinesError> {
if self.shared_state.replaying_when_created {
return Ok(None);
}
) -> Result<MachineResponse, WFMachinesError> {
let accept_body = msg.pack().map_err(|e| {
WFMachinesError::Fatal(format!("Failed to serialize update response: {:?}", e))
})?;
Ok(Some(MachineResponse::IssueNewMessage(ProtocolMessage {
Ok(MachineResponse::IssueNewMessage(ProtocolMessage {
id: outgoing_id.clone(),
protocol_instance_id: self.shared_state.instance_id.clone(),
body: Some(accept_body),
..Default::default()
})))
}))
}
}

Expand Down Expand Up @@ -249,19 +241,15 @@ impl WFMachinesAdapter for UpdateMachine {
}),
)?,
UpdateMachineCommand::Reject(fail) => {
if let Some(r) = self.build_msg(
vec![self.build_msg(
format!("{}/reject", self.shared_state.message_id),
UpdateMsg::Reject(Rejection {
rejected_request_message_id: self.shared_state.message_id.clone(),
rejected_request_sequencing_event_id: self.shared_state.event_seq_id,
failure: Some(fail),
..Default::default()
}),
)? {
vec![r]
} else {
vec![]
}
)?]
}
UpdateMachineCommand::Complete(p) => self.build_command_msg(
format!("{}/complete", self.shared_state.message_id),
Expand Down
6 changes: 5 additions & 1 deletion core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1095,7 +1095,11 @@ impl WorkflowMachines {
})
}
MachineResponse::IssueNewMessage(pm) => {
self.message_outbox.push_back(pm);
// Messages shouldn't be sent back when replaying. This is true for update,
// currently the only user of protocol messages. May eventually change.
if !self.replaying {
self.message_outbox.push_back(pm);
}
}
MachineResponse::NewCoreOriginatedCommand(attrs) => match attrs {
ProtoCmdAttrs::RequestCancelExternalWorkflowExecutionCommandAttributes(
Expand Down
27 changes: 18 additions & 9 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,18 @@ impl CoreWfStarter {
}
}

/// Create a new starter with no initialized worker or runtime override. Useful for starting a
/// new worker on the same queue.
pub fn clone_no_worker(&self) -> Self {
Self {
task_queue_name: self.task_queue_name.clone(),
worker_config: self.worker_config.clone(),
workflow_options: self.workflow_options.clone(),
runtime_override: None,
initted_worker: Default::default(),
}
}

pub async fn worker(&mut self) -> TestWorker {
let w = self.get_worker().await;
let tq = w.get_config().task_queue.clone();
Expand Down Expand Up @@ -372,23 +384,17 @@ pub struct TestWorker {
/// If set true (default), and a client is available, we will fetch workflow results to
/// determine when they have all completed.
pub fetch_results: bool,
iceptor: Option<TestWorkerCompletionIceptor>,
}
impl TestWorker {
/// Create a new test worker
pub fn new(core_worker: Arc<dyn CoreWorker>, task_queue: impl Into<String>) -> Self {
let inner = Worker::new_from_core(core_worker.clone(), task_queue);
let iceptor = TestWorkerCompletionIceptor::new(
TestWorkerShutdownCond::NoAutoShutdown,
Arc::new(inner.shutdown_handle()),
);
Self {
inner,
core_worker,
client: None,
started_workflows: Mutex::new(vec![]),
fetch_results: true,
iceptor: Some(iceptor),
}
}

Expand Down Expand Up @@ -458,9 +464,12 @@ impl TestWorker {
/// See [Self::run_until_done], but allows configuration of some low-level interception.
pub async fn run_until_done_intercepted(
&mut self,
interceptor: Option<impl WorkerInterceptor + 'static>,
next_interceptor: Option<impl WorkerInterceptor + 'static>,
) -> Result<(), anyhow::Error> {
let mut iceptor = self.iceptor.take().unwrap();
let mut iceptor = TestWorkerCompletionIceptor::new(
TestWorkerShutdownCond::NoAutoShutdown,
Arc::new(self.inner.shutdown_handle()),
);
// Automatically use results-based complete detection if we have a client
if self.fetch_results {
if let Some(c) = self.client.clone() {
Expand All @@ -470,7 +479,7 @@ impl TestWorker {
);
}
}
iceptor.next = interceptor.map(|i| Box::new(i) as Box<dyn WorkerInterceptor>);
iceptor.next = next_interceptor.map(|i| Box::new(i) as Box<dyn WorkerInterceptor>);
let get_results_waiter = iceptor.wait_all_wfs();
self.inner.set_worker_interceptor(iceptor);
tokio::try_join!(self.inner.run(), get_results_waiter)?;
Expand Down
Loading

0 comments on commit ee7f6ca

Please sign in to comment.