Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Sep 1, 2023
1 parent 9f21897 commit 341ff1e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 50 deletions.
37 changes: 15 additions & 22 deletions core/src/worker/workflow/managed_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl ManagedRun {
/// Called whenever a new workflow task is obtained for this run
pub(super) fn incoming_wft(&mut self, pwft: PermittedWFT) -> RunUpdateAct {
let res = self._incoming_wft(pwft);
self.update_to_acts(res.map(Into::into), true)
self.update_to_acts(res.map(Into::into))
}

fn _incoming_wft(
Expand Down Expand Up @@ -282,7 +282,7 @@ impl ManagedRun {
/// Checks if any further activations need to go out for this run and produces them if so.
pub(super) fn check_more_activations(&mut self) -> RunUpdateAct {
let res = self._check_more_activations();
self.update_to_acts(res.map(Into::into), false)
self.update_to_acts(res.map(Into::into))
}

fn _check_more_activations(&mut self) -> Result<Option<ActivationOrAuto>, RunUpdateErr> {
Expand Down Expand Up @@ -443,17 +443,14 @@ impl ManagedRun {
span: Span::current(),
})
} else {
Ok(self.update_to_acts(
Err(RunUpdateErr {
source: WFMachinesError::Fatal(
"Run's paginator was absent when attempting to fetch next history \
Ok(self.update_to_acts(Err(RunUpdateErr {
source: WFMachinesError::Fatal(
"Run's paginator was absent when attempting to fetch next history \
page. This is a Core SDK bug."
.to_string(),
),
complete_resp_chan: rac.resp_chan,
}),
false,
))
.to_string(),
),
complete_resp_chan: rac.resp_chan,
})))
};
}

Expand All @@ -471,7 +468,7 @@ impl ManagedRun {
paginator: HistoryPaginator,
) -> RunUpdateAct {
let res = self._fetched_page_completion(update, paginator);
self.update_to_acts(res.map(Into::into), false)
self.update_to_acts(res.map(Into::into))
}
fn _fetched_page_completion(
&mut self,
Expand Down Expand Up @@ -562,12 +559,12 @@ impl ManagedRun {
/// Called when local activities resolve
pub(super) fn local_resolution(&mut self, res: LocalResolution) -> RunUpdateAct {
let res = self._local_resolution(res);
self.update_to_acts(res.map(Into::into), false)
self.update_to_acts(res.map(Into::into))
}

fn process_completion(&mut self, completion: RunActivationCompletion) -> RunUpdateAct {
let res = self._process_completion(completion, None);
self.update_to_acts(res.map(Into::into), false)
self.update_to_acts(res.map(Into::into))
}

fn _process_completion(
Expand Down Expand Up @@ -686,7 +683,7 @@ impl ManagedRun {
} else {
None
};
self.update_to_acts(Ok(maybe_act).map(Into::into), false)
self.update_to_acts(Ok(maybe_act).map(Into::into))
}
/// Returns `true` if autocompletion should be issued, which will actually cause us to end up
/// in [completion] again, at which point we'll start a new heartbeat timeout, which will
Expand Down Expand Up @@ -810,11 +807,7 @@ impl ManagedRun {

/// Take the result of some update to ourselves and turn it into a return value of zero or more
/// actions
fn update_to_acts(
&mut self,
outcome: Result<ActOrFulfill, RunUpdateErr>,
in_response_to_wft: bool,
) -> RunUpdateAct {
fn update_to_acts(&mut self, outcome: Result<ActOrFulfill, RunUpdateErr>) -> RunUpdateAct {
match outcome {
Ok(act_or_fulfill) => {
let (mut maybe_act, maybe_fulfill) = match act_or_fulfill {
Expand All @@ -826,7 +819,7 @@ impl ManagedRun {
match self._check_more_activations() {
Ok(oa) => maybe_act = oa,
Err(e) => {
return self.update_to_acts(Err(e), in_response_to_wft);
return self.update_to_acts(Err(e));
}
}
}
Expand Down
81 changes: 53 additions & 28 deletions core/src/worker/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,25 +270,8 @@ impl Workflows {
match al {
ActivationOrAuto::LangActivation(mut act)
| ActivationOrAuto::ReadyForQueries(mut act) => {
sort_act_jobs(&mut act);
prepare_to_ship_activation(&mut act);
debug!(activation=%act, "Sending activation to lang");
// TODO: REMOVE ======(make into debug assert validation)===============
let any_job_is_query = act.jobs.iter().any(|j| {
matches!(
j.variant,
Some(workflow_activation_job::Variant::QueryWorkflow(_))
)
});
let all_jobs_are_query = act.jobs.iter().all(|j| {
matches!(
j.variant,
Some(workflow_activation_job::Variant::QueryWorkflow(_))
)
});
if any_job_is_query && !all_jobs_are_query {
panic!("Ahhh query job with non query jobs: {:?}", &act);
}
// TODO: REMOVE ===========================================================
break Ok(act);
}
ActivationOrAuto::Autocomplete { run_id } => {
Expand Down Expand Up @@ -1353,9 +1336,33 @@ impl LocalActivityRequestSink for LAReqSink {
}
}

/// Sorts jobs in an activation to be in the order lang expects:
/// `patches -> signals -> other -> queries`
fn sort_act_jobs(wfa: &mut WorkflowActivation) {
/// Sorts jobs in an activation to be in the order lang expects, and confirms any invariants
/// activations must uphold.
///
/// ## Ordering
/// `patches -> signals -> other -X-> queries`
///
/// ## Invariants:
/// * Queries always go in their own activation
fn prepare_to_ship_activation(wfa: &mut WorkflowActivation) {
let any_job_is_query = wfa.jobs.iter().any(|j| {
matches!(
j.variant,
Some(workflow_activation_job::Variant::QueryWorkflow(_))
)
});
let all_jobs_are_query = wfa.jobs.iter().all(|j| {
matches!(
j.variant,
Some(workflow_activation_job::Variant::QueryWorkflow(_))
)
});
if any_job_is_query && !all_jobs_are_query {
dbg_panic!(
"About to issue an activation that contains query jobs with non-query jobs: {:?}",
&wfa
);
}
wfa.jobs.sort_by(|j1, j2| {
// Unwrapping is fine here since we'll never issue empty variants
let j1v = j1.variant.as_ref().unwrap();
Expand All @@ -1367,6 +1374,9 @@ fn sort_act_jobs(wfa: &mut WorkflowActivation) {
match v {
workflow_activation_job::Variant::NotifyHasPatch(_) => 1,
workflow_activation_job::Variant::SignalWorkflow(_) => 2,
// In principle we should never actually need to sort these with the others, since
// queries always get their own activation, but, maintaining the semantic is
// reasonable.
workflow_activation_job::Variant::QueryWorkflow(_) => 4,
_ => 3,
}
Expand Down Expand Up @@ -1394,11 +1404,6 @@ mod tests {
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::FireTimer(
Default::default(),
Expand All @@ -1412,7 +1417,7 @@ mod tests {
],
..Default::default()
};
sort_act_jobs(&mut act);
prepare_to_ship_activation(&mut act);
let variants = act
.jobs
.into_iter()
Expand All @@ -1425,8 +1430,28 @@ mod tests {
workflow_activation_job::Variant::SignalWorkflow(_),
workflow_activation_job::Variant::FireTimer(_),
workflow_activation_job::Variant::ResolveActivity(_),
workflow_activation_job::Variant::QueryWorkflow(_)
]
)
}

#[test]
#[should_panic]
fn queries_cannot_go_with_other_jobs() {
let mut act = WorkflowActivation {
jobs: vec![
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::SignalWorkflow(
Default::default(),
)),
},
WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(
Default::default(),
)),
},
],
..Default::default()
};
prepare_to_ship_activation(&mut act);
}
}

0 comments on commit 341ff1e

Please sign in to comment.