Skip to content

Commit

Permalink
Use new helper in more places
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Nov 22, 2024
1 parent f91a13b commit 2628aa7
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 89 deletions.
20 changes: 2 additions & 18 deletions core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,7 @@ async fn legacy_query(#[case] include_history: bool) {
.unwrap();
};
let clear_eviction = || async {
let t = worker.poll_workflow_activation().await.unwrap();
assert_matches!(
t.jobs[0].variant,
Some(workflow_activation_job::Variant::RemoveFromCache(_))
);
worker
.complete_workflow_activation(WorkflowActivationCompletion::empty(t.run_id))
.await
.unwrap();
worker.handle_eviction().await;
};

first_wft().await;
Expand Down Expand Up @@ -324,15 +316,7 @@ async fn query_failure_because_nondeterminism(#[values(true, false)] legacy: boo
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();
let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs[0].variant,
Some(workflow_activation_job::Variant::RemoveFromCache(_))
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();

core.handle_eviction().await;
core.shutdown().await;
}

Expand Down
8 changes: 2 additions & 6 deletions core/src/core_tests/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use temporal_sdk_core_protos::{
PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, ShutdownWorkerResponse,
},
};
use temporal_sdk_core_test_utils::start_timer_cmd;
use temporal_sdk_core_test_utils::{start_timer_cmd, WorkerTestHelpers};
use tokio::sync::{watch, Barrier};

#[tokio::test]
Expand Down Expand Up @@ -259,11 +259,7 @@ async fn worker_does_not_panic_on_retry_exhaustion_of_nonfatal_net_err() {
.await
.unwrap();
// We should see an eviction
let res = core.poll_workflow_activation().await.unwrap();
assert_matches!(
res.jobs[0].variant,
Some(workflow_activation_job::Variant::RemoveFromCache(_))
);
core.handle_eviction().await;
}

#[rstest::rstest]
Expand Down
27 changes: 4 additions & 23 deletions core/src/core_tests/workflow_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1329,17 +1329,7 @@ async fn fail_wft_then_recover() {
.await
.unwrap();
// We must handle an eviction now
let evict_act = core.poll_workflow_activation().await.unwrap();
assert_eq!(evict_act.run_id, act.run_id);
assert_matches!(
evict_act.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
}]
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(evict_act.run_id))
.await
.unwrap();
core.handle_eviction().await;

// Workflow starting over, this time issue the right command
let act = core.poll_workflow_activation().await.unwrap();
Expand Down Expand Up @@ -1530,6 +1520,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
// row because we purposefully time out rather than spamming.
for _ in 1..=2 {
let activation = worker.poll_workflow_activation().await.unwrap();
run_id.clone_from(&activation.run_id);
// Issue a nonsense completion that will trigger a WFT failure
worker
.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
Expand All @@ -1538,18 +1529,7 @@ async fn failing_wft_doesnt_eat_permit_forever() {
))
.await
.unwrap();
let activation = worker.poll_workflow_activation().await.unwrap();
assert_matches!(
activation.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
},]
);
run_id.clone_from(&activation.run_id);
worker
.complete_workflow_activation(WorkflowActivationCompletion::empty(activation.run_id))
.await
.unwrap();
worker.handle_eviction().await;
}
assert_eq!(worker.outstanding_workflow_tasks().await, 0);
// We should be "out of work" because the mock service thinks we didn't complete the last task,
Expand Down Expand Up @@ -2505,6 +2485,7 @@ async fn core_internal_flags() {
core.shutdown().await;
}

// TODO: Flakes sometimes
#[tokio::test]
async fn post_terminal_commands_are_retained_when_not_replaying() {
// History contains a non-terminal command (N) followed by the terminal
Expand Down
11 changes: 1 addition & 10 deletions tests/integ_tests/queries_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,8 @@ async fn query_should_not_be_sent_if_wft_about_to_fail() {
))
.await
.unwrap();
let task = core.poll_workflow_activation().await.unwrap();
// Should *not* get a query here. If the bug wasn't fixed, this job would have a query.
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
}]
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();
core.handle_eviction().await;

// We can still service the query by trying again
let task = core.poll_workflow_activation().await.unwrap();
Expand Down
36 changes: 4 additions & 32 deletions tests/integ_tests/workflow_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,17 +191,8 @@ async fn fail_wf_task(#[values(true, false)] replay: bool) {

// The server will want to retry the task. This time we finish the workflow -- but we need
// to poll a couple of times as there will be more than one required workflow activation.
let task = core.poll_workflow_activation().await.unwrap();
// The first poll response will tell us to evict
assert_matches!(
task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
}]
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(task.run_id))
.await
.unwrap();
// The first poll response will tell us to evict.
core.handle_eviction().await;

let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
Expand Down Expand Up @@ -369,18 +360,8 @@ async fn signal_workflow_signal_not_handled_on_workflow_completion() {
// Send completion - not having seen a poll response with a signal in it yet (unhandled
// command error will be logged as a warning and an eviction will be issued)
core.complete_execution(&run_id).await;

// We should be told to evict
let res = core.poll_workflow_activation().await.unwrap();
assert_matches!(
res.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
}]
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(res.run_id))
.await
.unwrap();
core.handle_eviction().await;
// Loop to the top to handle wf from the beginning
continue;
}
Expand Down Expand Up @@ -476,16 +457,7 @@ async fn wft_timeout_doesnt_create_unsolvable_autocomplete() {
.await
.unwrap();
// Now poll again, it will be an eviction b/c non-sticky mode.
let wf_task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
wf_task.jobs.as_slice(),
[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::RemoveFromCache(_)),
}]
);
core.complete_workflow_activation(WorkflowActivationCompletion::empty(wf_task.run_id))
.await
.unwrap();
core.handle_eviction().await;
// Start from the beginning
poll_sched_act().await;
let wf_task = core.poll_workflow_activation().await.unwrap();
Expand Down

0 comments on commit 2628aa7

Please sign in to comment.