Skip to content

Commit

Permalink
Ensure stale queries can get retried
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 17, 2023
1 parent 67a7be2 commit 2424f43
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 13 deletions.
4 changes: 4 additions & 0 deletions core/src/worker/workflow/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,10 @@ impl WorkflowMachines {
self.drive_me.get_started_info()
}

pub(crate) fn get_last_wft_started_id(&self) -> i64 {
self.current_started_event_id
}

pub(crate) fn prepare_for_wft_response(&mut self) -> MachinesWFTResponseContent {
MachinesWFTResponseContent {
replaying: self.replaying,
Expand Down
37 changes: 30 additions & 7 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ pub(super) struct ManagedRun {
/// * Lang takes too long to complete a task and the task times out
/// * Many queries are submitted concurrently and reach this worker (in this case, multiple
/// tasks can be outstanding)
/// * Multiple speculative tasks (ex: for updates) may also exist at once
/// * Multiple speculative tasks (ex: for updates) may also exist at once (but only the
/// latest one will matter).
task_buffer: BufferedTasks,
/// Is set if an eviction has been requested for this run
trying_to_evict: Option<RequestEvictMsg>,
Expand Down Expand Up @@ -194,6 +195,7 @@ impl ManagedRun {
complete_resp_chan: None,
});
}
let was_legacy_query = legacy_query_from_poll.is_some();
if let Some(lq) = legacy_query_from_poll {
pending_queries.push(lq);
}
Expand All @@ -206,6 +208,16 @@ impl ManagedRun {
permit: pwft.permit,
});

if was_legacy_query
&& work.update.wft_started_id == 0
&& work.update.previous_wft_started_id < self.wfm.machines.get_last_wft_started_id()
{
return Ok(Some(ActivationOrAuto::AutoFail {
run_id: self.run_id().to_string(),
machines_err: WFMachinesError::Fatal("Query expired".to_string()),
}));
}

// The update field is only populated in the event we hit the cache
let activation = if work.update.is_real() {
self.metrics.sticky_cache_hit();
Expand Down Expand Up @@ -500,14 +512,15 @@ impl ManagedRun {
}

/// Called whenever either core lang cannot complete a workflow activation. EX: Nondeterminism
/// or user code threw/panicked, respectively. The `cause` and `reason` fields are determined
/// inside core always. The `failure` field may come from lang. `resp_chan` will be used to
/// unblock the completion call when everything we need to do to fulfill it has happened.
/// or user code threw/panicked. The `cause` and `reason` fields are determined inside core
/// always. The `failure` field may come from lang. `resp_chan` will be used to unblock the
/// completion call when everything we need to do to fulfill it has happened.
pub(super) fn failed_completion(
&mut self,
cause: WorkflowTaskFailedCause,
reason: EvictionReason,
failure: workflow_completion::Failure,
is_auto_fail: bool,
resp_chan: Option<oneshot::Sender<ActivationCompleteResult>>,
) -> RunUpdateAct {
let tt = if let Some(tt) = self.wft.as_ref().map(|t| t.info.task_token.clone()) {
Expand Down Expand Up @@ -538,9 +551,17 @@ impl ManagedRun {
let rur = evict_req_outcome.into_run_update_resp();
// If the outstanding WFT is a legacy query task, report that we need to fail it
let outcome = if self.pending_work_is_legacy_query() {
ActivationCompleteOutcome::ReportWFTFail(
FailedActivationWFTReport::ReportLegacyQueryFailure(tt, failure),
)
// We don't want to fail queries that could otherwise be retried
let failtype = if is_auto_fail
&& matches!(
reason,
EvictionReason::Unspecified | EvictionReason::PaginationOrHistoryFetch
) {
FailedActivationWFTReport::Report(tt, cause, failure)
} else {
FailedActivationWFTReport::ReportLegacyQueryFailure(tt, failure)
};
ActivationCompleteOutcome::ReportWFTFail(failtype)
} else if should_report {
ActivationCompleteOutcome::ReportWFTFail(FailedActivationWFTReport::Report(
tt, cause, failure,
Expand Down Expand Up @@ -796,6 +817,7 @@ impl ManagedRun {
WorkflowTaskFailedCause::Unspecified,
info.reason,
Failure::application_failure(info.message, false).into(),
true,
c.resp_chan,
);
return EvictionRequestResult::EvictionRequested(attempts, run_upd);
Expand Down Expand Up @@ -920,6 +942,7 @@ impl ManagedRun {
fail_cause,
fail.source.evict_reason(),
Failure::application_failure(wft_fail_str, false).into(),
true,
Some(resp_chan),
)
} else {
Expand Down
7 changes: 5 additions & 2 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ impl Workflows {
post_activate_hook: Option<impl Fn(PostActivateHookData)>,
) -> Result<(), CompleteWfError> {
let is_empty_completion = completion.is_empty();
let completion = validate_completion(completion)?;
let completion = validate_completion(completion, is_autocomplete)?;
let run_id = completion.run_id().to_string();
let (tx, rx) = oneshot::channel();
let was_sent = self.send_local(WFActCompleteMsg {
Expand Down Expand Up @@ -1056,6 +1056,7 @@ impl BufferedTasks {

fn validate_completion(
completion: WorkflowActivationCompletion,
is_autocomplete: bool,
) -> Result<ValidatedCompletion, CompleteWfError> {
match completion.status {
Some(workflow_activation_completion::Status::Successful(success)) => {
Expand All @@ -1067,7 +1068,7 @@ fn validate_completion(
.collect::<Result<Vec<_>, EmptyWorkflowCommandErr>>()
.map_err(|_| CompleteWfError::MalformedWorkflowCompletion {
reason: "At least one workflow command in the completion contained \
an empty variant"
an empty variant"
.to_owned(),
run_id: completion.run_id.clone(),
})?;
Expand Down Expand Up @@ -1107,6 +1108,7 @@ fn validate_completion(
Ok(ValidatedCompletion::Fail {
run_id: completion.run_id,
failure,
is_autocomplete,
})
}
None => Err(CompleteWfError::MalformedWorkflowCompletion {
Expand All @@ -1131,6 +1133,7 @@ enum ValidatedCompletion {
Fail {
run_id: String,
failure: Failure,
is_autocomplete: bool,
},
}

Expand Down
13 changes: 11 additions & 2 deletions core/src/worker/workflow/workflow_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,19 @@ impl WFStream {
None
}
},
ValidatedCompletion::Fail { failure, .. } => rh.failed_completion(
ValidatedCompletion::Fail {
failure,
is_autocomplete,
..
} => rh.failed_completion(
failure.force_cause(),
EvictionReason::LangFail,
if is_autocomplete {
EvictionReason::Unspecified
} else {
EvictionReason::LangFail
},
failure,
is_autocomplete,
complete.response_tx,
),
},
Expand Down
105 changes: 103 additions & 2 deletions tests/integ_tests/queries_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn simple_query_legacy() {
let task = core.poll_workflow_activation().await.unwrap();
core.complete_execution(&task.run_id).await;
};
let (q_resp, _) = tokio::join!(query_fut, workflow_completions_future);
let (q_resp, _) = join!(query_fut, workflow_completions_future);
// Ensure query response is as expected
assert_eq!(&q_resp.unwrap()[0].data, query_resp);
}
Expand Down Expand Up @@ -310,7 +310,7 @@ async fn fail_legacy_query() {
let task = core.poll_workflow_activation().await.unwrap();
core.complete_execution(&task.run_id).await;
};
let (q_resp, _) = tokio::join!(query_fut, workflow_completions_future);
let (q_resp, _) = join!(query_fut, workflow_completions_future);
// Ensure query response is a failure and has the right message
assert_eq!(q_resp.message(), query_err);
}
Expand Down Expand Up @@ -382,3 +382,104 @@ async fn multiple_concurrent_queries_no_new_history() {
panic!("Should not have taken this long");
}
}

#[tokio::test]
async fn query_superseded_by_newer_wft_is_discarded() {
let mut starter = init_core_and_create_wf("query_superseded_by_newer_wft_is_discarded").await;
let core = starter.get_worker().await;
let workflow_id = starter.get_task_queue().to_string();
let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
task.run_id.clone(),
vec![], // complete with no commands so that there will be no new history
))
.await
.unwrap();
let client = starter.get_client().await;
// Send two queries so that one of them is buffered
let query_futs = (1..=2).map(|_| async {
client
.query_workflow_execution(
workflow_id.to_string(),
task.run_id.to_string(),
WorkflowQuery {
query_type: "myquery".to_string(),
query_args: Some(b"hi".into()),
header: None,
},
)
.await
.unwrap();
});
let complete_fut = async {
let task = core.poll_workflow_activation().await.unwrap();
let query = assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)),
}] => q
);
// While handling the first query, signal the workflow so a new WFT is generated and the
// second query is still in the buffer
client
.signal_workflow_execution(
workflow_id.to_string(),
task.run_id.to_string(),
"blah".to_string(),
None,
None,
)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: query.query_id.clone(),
variant: Some(
QuerySuccess {
response: Some("done".into()),
}
.into(),
),
}
.into(),
))
.await
.unwrap();
// We should get the signal activation since the in-buffer query should've been failed
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(_)),
}]
);
core.complete_execution(&task.run_id).await;
// Query will get retried by server since we fail the task w/ the stale query
let task = core.poll_workflow_activation().await.unwrap();
let query = assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(q)),
}] => q
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: query.query_id.clone(),
variant: Some(
QuerySuccess {
response: Some("done".into()),
}
.into(),
),
}
.into(),
))
.await
.unwrap();
};
join!(join_all(query_futs), complete_fut);
drain_pollers_and_shutdown(&core).await;
}

0 comments on commit 2424f43

Please sign in to comment.