diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index c6e7e6408..3888d6c22 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -368,6 +368,10 @@ impl WorkflowMachines { self.drive_me.get_started_info() } + pub(crate) fn get_last_wft_started_id(&self) -> i64 { + self.current_started_event_id + } + pub(crate) fn prepare_for_wft_response(&mut self) -> MachinesWFTResponseContent { MachinesWFTResponseContent { replaying: self.replaying, diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 3a1837092..fd4503275 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -78,7 +78,8 @@ pub(super) struct ManagedRun { /// * Lang takes too long to complete a task and the task times out /// * Many queries are submitted concurrently and reach this worker (in this case, multiple /// tasks can be outstanding) - /// * Multiple speculative tasks (ex: for updates) may also exist at once + /// * Multiple speculative tasks (ex: for updates) may also exist at once (but only the + /// latest one will matter). task_buffer: BufferedTasks, /// Is set if an eviction has been requested for this run trying_to_evict: Option, @@ -194,6 +195,7 @@ impl ManagedRun { complete_resp_chan: None, }); } + let was_legacy_query = legacy_query_from_poll.is_some(); if let Some(lq) = legacy_query_from_poll { pending_queries.push(lq); } @@ -206,6 +208,16 @@ impl ManagedRun { permit: pwft.permit, }); + if was_legacy_query + && work.update.wft_started_id == 0 + && work.update.previous_wft_started_id < self.wfm.machines.get_last_wft_started_id() + { + return Ok(Some(ActivationOrAuto::AutoFail { + run_id: self.run_id().to_string(), + machines_err: WFMachinesError::Fatal("Query expired".to_string()), + })); + } + // The update field is only populated in the event we hit the cache let activation = if work.update.is_real() { self.metrics.sticky_cache_hit(); @@ -500,14 +512,15 @@ impl ManagedRun { } /// Called whenever either core lang cannot complete a workflow activation. EX: Nondeterminism - /// or user code threw/panicked, respectively. The `cause` and `reason` fields are determined - /// inside core always. The `failure` field may come from lang. `resp_chan` will be used to - /// unblock the completion call when everything we need to do to fulfill it has happened. + /// or user code threw/panicked. The `cause` and `reason` fields are determined inside core + /// always. The `failure` field may come from lang. `resp_chan` will be used to unblock the + /// completion call when everything we need to do to fulfill it has happened. pub(super) fn failed_completion( &mut self, cause: WorkflowTaskFailedCause, reason: EvictionReason, failure: workflow_completion::Failure, + is_auto_fail: bool, resp_chan: Option>, ) -> RunUpdateAct { let tt = if let Some(tt) = self.wft.as_ref().map(|t| t.info.task_token.clone()) { @@ -538,9 +551,17 @@ impl ManagedRun { let rur = evict_req_outcome.into_run_update_resp(); // If the outstanding WFT is a legacy query task, report that we need to fail it let outcome = if self.pending_work_is_legacy_query() { - ActivationCompleteOutcome::ReportWFTFail( - FailedActivationWFTReport::ReportLegacyQueryFailure(tt, failure), - ) + // We don't want to fail queries that could otherwise be retried + let failtype = if is_auto_fail + && matches!( + reason, + EvictionReason::Unspecified | EvictionReason::PaginationOrHistoryFetch + ) { + FailedActivationWFTReport::Report(tt, cause, failure) + } else { + FailedActivationWFTReport::ReportLegacyQueryFailure(tt, failure) + }; + ActivationCompleteOutcome::ReportWFTFail(failtype) } else if should_report { ActivationCompleteOutcome::ReportWFTFail(FailedActivationWFTReport::Report( tt, cause, failure, @@ -796,6 +817,7 @@ impl ManagedRun { WorkflowTaskFailedCause::Unspecified, info.reason, Failure::application_failure(info.message, false).into(), + true, c.resp_chan, ); return EvictionRequestResult::EvictionRequested(attempts, run_upd); @@ -920,6 +942,7 @@ impl ManagedRun { fail_cause, fail.source.evict_reason(), Failure::application_failure(wft_fail_str, false).into(), + true, Some(resp_chan), ) } else { diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 2065aad7d..cd9cda39f 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -314,7 +314,7 @@ impl Workflows { post_activate_hook: Option, ) -> Result<(), CompleteWfError> { let is_empty_completion = completion.is_empty(); - let completion = validate_completion(completion)?; + let completion = validate_completion(completion, is_autocomplete)?; let run_id = completion.run_id().to_string(); let (tx, rx) = oneshot::channel(); let was_sent = self.send_local(WFActCompleteMsg { @@ -1056,6 +1056,7 @@ impl BufferedTasks { fn validate_completion( completion: WorkflowActivationCompletion, + is_autocomplete: bool, ) -> Result { match completion.status { Some(workflow_activation_completion::Status::Successful(success)) => { @@ -1067,7 +1068,7 @@ fn validate_completion( .collect::, EmptyWorkflowCommandErr>>() .map_err(|_| CompleteWfError::MalformedWorkflowCompletion { reason: "At least one workflow command in the completion contained \ - an empty variant" + an empty variant" .to_owned(), run_id: completion.run_id.clone(), })?; @@ -1107,6 +1108,7 @@ fn validate_completion( Ok(ValidatedCompletion::Fail { run_id: completion.run_id, failure, + is_autocomplete, }) } None => Err(CompleteWfError::MalformedWorkflowCompletion { @@ -1131,6 +1133,7 @@ enum ValidatedCompletion { Fail { run_id: String, failure: Failure, + is_autocomplete: bool, }, } diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index 6a8a4edd5..618ca1072 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -301,10 +301,19 @@ impl WFStream { None } }, - ValidatedCompletion::Fail { failure, .. } => rh.failed_completion( + ValidatedCompletion::Fail { + failure, + is_autocomplete, + .. + } => rh.failed_completion( failure.force_cause(), - EvictionReason::LangFail, + if is_autocomplete { + EvictionReason::Unspecified + } else { + EvictionReason::LangFail + }, failure, + is_autocomplete, complete.response_tx, ), }, diff --git a/tests/integ_tests/queries_tests.rs b/tests/integ_tests/queries_tests.rs index 9425ce725..fd3a86789 100644 --- a/tests/integ_tests/queries_tests.rs +++ b/tests/integ_tests/queries_tests.rs @@ -104,7 +104,7 @@ async fn simple_query_legacy() { let task = core.poll_workflow_activation().await.unwrap(); core.complete_execution(&task.run_id).await; }; - let (q_resp, _) = tokio::join!(query_fut, workflow_completions_future); + let (q_resp, _) = join!(query_fut, workflow_completions_future); // Ensure query response is as expected assert_eq!(&q_resp.unwrap()[0].data, query_resp); } @@ -310,7 +310,7 @@ async fn fail_legacy_query() { let task = core.poll_workflow_activation().await.unwrap(); core.complete_execution(&task.run_id).await; }; - let (q_resp, _) = tokio::join!(query_fut, workflow_completions_future); + let (q_resp, _) = join!(query_fut, workflow_completions_future); // Ensure query response is a failure and has the right message assert_eq!(q_resp.message(), query_err); } @@ -382,3 +382,104 @@ async fn multiple_concurrent_queries_no_new_history() { panic!("Should not have taken this long"); } } + +#[tokio::test] +async fn query_superseded_by_newer_wft_is_discarded() { + let mut starter = init_core_and_create_wf("query_superseded_by_newer_wft_is_discarded").await; + let core = starter.get_worker().await; + let workflow_id = starter.get_task_queue().to_string(); + let task = core.poll_workflow_activation().await.unwrap(); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( + task.run_id.clone(), + vec![], // complete with no commands so that there will be no new history + )) + .await + .unwrap(); + let client = starter.get_client().await; + // Send two queries so that one of them is buffered + let query_futs = (1..=2).map(|_| async { + client + .query_workflow_execution( + workflow_id.to_string(), + task.run_id.to_string(), + WorkflowQuery { + query_type: "myquery".to_string(), + query_args: Some(b"hi".into()), + header: None, + }, + ) + .await + .unwrap(); + }); + let complete_fut = async { + let task = core.poll_workflow_activation().await.unwrap(); + let query = assert_matches!( + task.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), + }] => q + ); + // While handling the first query, signal the workflow so a new WFT is generated and the + // second query is still in the buffer + client + .signal_workflow_execution( + workflow_id.to_string(), + task.run_id.to_string(), + "blah".to_string(), + None, + None, + ) + .await + .unwrap(); + tokio::time::sleep(Duration::from_secs(1)).await; + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + QueryResult { + query_id: query.query_id.clone(), + variant: Some( + QuerySuccess { + response: Some("done".into()), + } + .into(), + ), + } + .into(), + )) + .await + .unwrap(); + // We should get the signal activation since the in-buffer query should've been failed + let task = core.poll_workflow_activation().await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)), + }] + ); + core.complete_execution(&task.run_id).await; + // Query will get retried by server since we fail the task w/ the stale query + let task = core.poll_workflow_activation().await.unwrap(); + let query = assert_matches!( + task.jobs.as_slice(), + [WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)), + }] => q + ); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + QueryResult { + query_id: query.query_id.clone(), + variant: Some( + QuerySuccess { + response: Some("done".into()), + } + .into(), + ), + } + .into(), + )) + .await + .unwrap(); + }; + join!(join_all(query_futs), complete_fut); + drain_pollers_and_shutdown(&core).await; +}