Skip to content

Commit

Permalink
Evict workflows when they complete (#847)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Nov 22, 2024
1 parent bde0afb commit b207660
Show file tree
Hide file tree
Showing 23 changed files with 248 additions and 282 deletions.
22 changes: 6 additions & 16 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -573,9 +573,6 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
// then next task is incremental w/ legacy query (for impossible query case)
t.add_full_wf_task();

let barr = Arc::new(Barrier::new(2));
let barr_c = barr.clone();

let tasks = [
hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)),
{
Expand All @@ -592,17 +589,7 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
pr
},
{
let mut pr = hist_to_poll_resp(
&t,
wfid.to_owned(),
ResponseType::UntilResolved(
async move {
barr_c.wait().await;
}
.boxed(),
2,
),
);
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(2));
// Strip beginning of history so the only events are WFT sched/started, we need to look
// like we hit the cache
{
Expand All @@ -629,6 +616,7 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
}
let mut mock = single_hist_mock_sg(wfid, t, tasks, mock, true);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let taskmap = mock.outstanding_task_map.clone().unwrap();
let core = mock_worker(mock);

let wf_fut = async {
Expand All @@ -653,6 +641,9 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
variant: Some(workflow_activation_job::Variant::FireTimer(_)),
},]
);
// We want to make sure the weird-looking query gets received while we're working on other
// stuff, so that we don't see the workflow complete and choose to evict.
taskmap.release_run(&task.run_id);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
schedule_local_activity_cmd(
Expand Down Expand Up @@ -691,7 +682,6 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
))
.await
.unwrap();
barr.wait().await;

if impossible_query_in_task {
// finish last query
Expand Down Expand Up @@ -873,7 +863,7 @@ async fn start_to_close_timeout_allows_retries(#[values(true, false)] la_complet
1,
"1",
None,
Some(Failure::application_failure("la failed".to_string(), false)),
Some(Failure::timeout(TimeoutType::StartToClose)),
|_| {},
);
}
Expand Down
136 changes: 68 additions & 68 deletions core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,7 @@ async fn legacy_query(#[case] include_history: bool) {
.unwrap();
};
let clear_eviction = || async {
let t = worker.poll_workflow_activation().await.unwrap();
assert_matches!(
t.jobs[0].variant,
Some(workflow_activation_job::Variant::RemoveFromCache(_))
);
worker
.complete_workflow_activation(WorkflowActivationCompletion::empty(t.run_id))
.await
.unwrap();
worker.handle_eviction().await;
};

first_wft().await;
Expand Down Expand Up @@ -324,15 +316,7 @@ async fn query_failure_because_nondeterminism(#[values(true, false)] legacy: boo
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs[0].variant,
Some(workflow_activation_job::Variant::RemoveFromCache(_))
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();

core.handle_eviction().await;
core.shutdown().await;
}

Expand Down Expand Up @@ -372,23 +356,31 @@ async fn legacy_query_after_complete(#[values(false, true)] full_history: bool)
mock.worker_cfg(|wc| wc.max_cached_workflows = 10);
let core = mock_worker(mock);

let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
start_timer_cmd(1, Duration::from_secs(1)),
))
.await
.unwrap();
let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
task.run_id,
vec![CompleteWorkflowExecution { result: None }.into()],
))
.await
.unwrap();
let activations = || async {
let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
start_timer_cmd(1, Duration::from_secs(1)),
))
.await
.unwrap();
let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
task.run_id,
vec![CompleteWorkflowExecution { result: None }.into()],
))
.await
.unwrap();
};
activations().await;

if !full_history {
core.handle_eviction().await;
activations().await;
}

// We should get queries two times
for _ in 1..=2 {
for i in 1..=2 {
let task = core.poll_workflow_activation().await.unwrap();
let query = assert_matches!(
task.jobs.as_slice(),
Expand All @@ -402,6 +394,10 @@ async fn legacy_query_after_complete(#[values(false, true)] full_history: bool)
))
.await
.unwrap();
if i == 1 {
core.handle_eviction().await;
activations().await;
}
}

core.shutdown().await;
Expand Down Expand Up @@ -770,8 +766,6 @@ async fn legacy_query_combined_with_timer_fire_repro() {
},
{
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(2));
// Strip history, we need to look like we hit the cache for a legacy query
pr.history = Some(History { events: vec![] });
pr.query = Some(WorkflowQuery {
query_type: "query-type".to_string(),
query_args: Some(b"hi".into()),
Expand All @@ -788,41 +782,44 @@ async fn legacy_query_combined_with_timer_fire_repro() {
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
task.run_id,
vec![
schedule_activity_cmd(
1,
"whatever",
"1",
ActivityCancellationType::TryCancel,
Duration::from_secs(60),
Duration::from_secs(60),
),
start_timer_cmd(1, Duration::from_secs(1)),
],
))
.await
.unwrap();
let activations = || async {
let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
task.run_id,
vec![
schedule_activity_cmd(
1,
"whatever",
"1",
ActivityCancellationType::TryCancel,
Duration::from_secs(60),
Duration::from_secs(60),
),
start_timer_cmd(1, Duration::from_secs(1)),
],
))
.await
.unwrap();

let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
RequestCancelActivity { seq: 1 }.into(),
))
.await
.unwrap();
let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
RequestCancelActivity { seq: 1 }.into(),
))
.await
.unwrap();

// First should get the activity resolve
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::ResolveActivity(_)),
}]
);
core.complete_execution(&task.run_id).await;
// First should get the activity resolve
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::ResolveActivity(_)),
}]
);
core.complete_execution(&task.run_id).await;
};
activations().await;

// Then the queries
let task = core.poll_workflow_activation().await.unwrap();
Expand All @@ -840,6 +837,9 @@ async fn legacy_query_combined_with_timer_fire_repro() {
.await
.unwrap();

core.handle_eviction().await;
activations().await;

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
Expand Down
8 changes: 2 additions & 6 deletions core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use temporal_sdk_core_protos::{
PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, ShutdownWorkerResponse,
},
};
use temporal_sdk_core_test_utils::start_timer_cmd;
use temporal_sdk_core_test_utils::{start_timer_cmd, WorkerTestHelpers};
use tokio::sync::{watch, Barrier};

#[tokio::test]
Expand Down Expand Up @@ -260,11 +260,7 @@ async fn worker_does_not_panic_on_retry_exhaustion_of_nonfatal_net_err() {
.await
.unwrap();
// We should see an eviction
let res = core.poll_workflow_activation().await.unwrap();
assert_matches!(
res.jobs[0].variant,
Some(workflow_activation_job::Variant::RemoveFromCache(_))
);
core.handle_eviction().await;
}

#[rstest::rstest]
Expand Down
27 changes: 4 additions & 23 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1330,17 +1330,7 @@ async fn fail_wft_then_recover() {
.await
.unwrap();
// We must handle an eviction now
let evict_act = core.poll_workflow_activation().await.unwrap();
assert_eq!(evict_act.run_id, act.run_id);
assert_matches!(
evict_act.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
}]
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(evict_act.run_id))
.await
.unwrap();
core.handle_eviction().await;

// Workflow starting over, this time issue the right command
let act = core.poll_workflow_activation().await.unwrap();
Expand Down Expand Up @@ -1531,6 +1521,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
// row because we purposefully time out rather than spamming.
for _ in 1..=2 {
let activation = worker.poll_workflow_activation().await.unwrap();
run_id.clone_from(&activation.run_id);
// Issue a nonsense completion that will trigger a WFT failure
worker
.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
Expand All @@ -1539,18 +1530,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
))
.await
.unwrap();
let activation = worker.poll_workflow_activation().await.unwrap();
assert_matches!(
activation.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
},]
);
run_id.clone_from(&activation.run_id);
worker
.complete_workflow_activation(WorkflowActivationCompletion::empty(activation.run_id))
.await
.unwrap();
worker.handle_eviction().await;
}
assert_eq!(worker.outstanding_workflow_tasks().await, 0);
// We should be "out of work" because the mock service thinks we didn't complete the last task,
Expand Down Expand Up @@ -2601,6 +2581,7 @@ async fn _do_post_terminal_commands_test(

let act = core.poll_workflow_activation().await.unwrap();

core.initiate_shutdown();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
act.run_id,
commands_sent_by_lang,
Expand Down
12 changes: 7 additions & 5 deletions core/src/telemetry/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ struct Instruments {
sticky_cache_hit: Arc<dyn Counter>,
sticky_cache_miss: Arc<dyn Counter>,
sticky_cache_size: Arc<dyn Gauge>,
sticky_cache_evictions: Arc<dyn Counter>,
sticky_cache_forced_evictions: Arc<dyn Counter>,
}

impl MetricsContext {
Expand Down Expand Up @@ -263,8 +263,10 @@ impl MetricsContext {
}

/// Count a workflow being evicted from the cache
pub(crate) fn cache_eviction(&self) {
self.instruments.sticky_cache_evictions.add(1, &self.kvs);
pub(crate) fn forced_cache_eviction(&self) {
self.instruments
.sticky_cache_forced_evictions
.add(1, &self.kvs);
}
}

Expand Down Expand Up @@ -423,7 +425,7 @@ impl Instruments {
description: "Current number of cached workflows".into(),
unit: "".into(),
}),
sticky_cache_evictions: meter.counter(MetricParameters {
sticky_cache_forced_evictions: meter.counter(MetricParameters {
name: "sticky_cache_total_forced_eviction".into(),
description: "Count of evictions of cached workflows".into(),
unit: "".into(),
Expand Down Expand Up @@ -867,7 +869,7 @@ mod tests {
true,
);
let mc = MetricsContext::top_level("foo".to_string(), "q".to_string(), &telem_instance);
mc.cache_eviction();
mc.forced_cache_eviction();
let events = call_buffer.retrieve();
let a1 = assert_matches!(
&events[0],
Expand Down
2 changes: 1 addition & 1 deletion core/src/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ pub(crate) fn build_mock_pollers(mut cfg: MockPollCfg) -> MocksHolder {
tokio::select! {
_ = outstanding_wakeup.notified() => {}
_ = tokio::time::sleep(Duration::from_secs(60)) => {}
};
}
}
}
});
Expand Down
7 changes: 6 additions & 1 deletion core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,12 @@ impl Worker {
if let Some(name) = self.workflows.get_sticky_queue_name() {
// This is a best effort call and we can still shutdown the worker if it fails
match self.client.shutdown_worker(name).await {
Err(err) if err.code() != tonic::Code::Unavailable => {
Err(err)
if !matches!(
err.code(),
tonic::Code::Unimplemented | tonic::Code::Unavailable
) =>
{
warn!("Failed to shutdown sticky queue {:?}", err);
}
_ => {}
Expand Down
Loading

0 comments on commit b207660

Please sign in to comment.