Skip to content

Commit

Permalink
Add test to ensure queries working during LA heartbeat. Cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Sep 1, 2023
1 parent 341ff1e commit 1c66c9d
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 124 deletions.
144 changes: 109 additions & 35 deletions core/src/core_tests/local_activities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ use temporal_sdk_core_protos::{
coresdk::{
activity_result::ActivityExecutionResult,
workflow_activation::{workflow_activation_job, WorkflowActivationJob},
workflow_commands::{
ActivityCancellationType, QueryResult, QuerySuccess, ScheduleLocalActivity,
},
workflow_commands::{ActivityCancellationType, ScheduleLocalActivity},
workflow_completion::WorkflowActivationCompletion,
ActivityTaskCompletion, AsJsonPayloadExt,
},
Expand All @@ -47,7 +45,7 @@ use temporal_sdk_core_protos::{
DEFAULT_ACTIVITY_TYPE,
};
use temporal_sdk_core_test_utils::{
schedule_local_activity_cmd, start_timer_cmd, WorkerTestHelpers,
query_ok, schedule_local_activity_cmd, start_timer_cmd, WorkerTestHelpers,
};
use tokio::{join, select, sync::Barrier};

Expand Down Expand Up @@ -527,16 +525,7 @@ async fn query_during_wft_heartbeat_doesnt_accidentally_fail_to_continue_heartbe
barrier.wait().await;
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: query.query_id.clone(),
variant: Some(
QuerySuccess {
response: Some("whatever".into()),
}
.into(),
),
}
.into(),
query_ok(&query.query_id, "whatev"),
))
.await
.unwrap();
Expand Down Expand Up @@ -699,16 +688,7 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: "q1".to_string(),
variant: Some(
QuerySuccess {
response: Some("whatev".into()),
}
.into(),
),
}
.into(),
query_ok("q1", "whatev"),
))
.await
.unwrap();
Expand All @@ -717,18 +697,9 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer
if impossible_query_in_task {
// finish last query
let task = core.poll_workflow_activation().await.unwrap();
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
vec![QueryResult {
query_id: LEGACY_QUERY_ID.to_string(),
variant: Some(
QuerySuccess {
response: Some("whatev".into()),
}
.into(),
),
}
.into()],
query_ok(LEGACY_QUERY_ID, "whatev"),
))
.await
.unwrap();
Expand Down Expand Up @@ -1221,3 +1192,106 @@ async fn local_activities_can_be_delivered_during_shutdown() {
assert_matches!(wf_r.unwrap_err(), PollWfError::ShutDown);
assert_matches!(act_r.unwrap_err(), PollActivityError::ShutDown);
}

#[tokio::test]
async fn queries_can_be_received_while_heartbeating() {
let wfid = "fake_wf_id";
let mut t = TestHistoryBuilder::default();
t.add_wfe_started_with_wft_timeout(Duration::from_millis(200));
t.add_full_wf_task();
t.add_full_wf_task();
t.add_full_wf_task();

let tasks = [
hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)),
{
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(2));
pr.queries = HashMap::new();
pr.queries.insert(
"q1".to_string(),
WorkflowQuery {
query_type: "query-type".to_string(),
query_args: Some(b"hi".into()),
header: None,
},
);
pr
},
{
let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(3));
pr.query = Some(WorkflowQuery {
query_type: "query-type".to_string(),
query_args: Some(b"hi".into()),
header: None,
});
pr
},
];
let mut mock = mock_workflow_client();
mock.expect_respond_legacy_query()
.times(1)
.returning(move |_, _| Ok(Default::default()));
let mut mock = single_hist_mock_sg(wfid, t, tasks, mock, true);
mock.worker_cfg(|wc| wc.max_cached_workflows = 1);
let core = mock_worker(mock);

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
&[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::StartWorkflow(_)),
},]
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
schedule_local_activity_cmd(
1,
"act-id",
ActivityCancellationType::TryCancel,
Duration::from_secs(60),
),
))
.await
.unwrap();

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
&[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(ref q)),
}]
if q.query_id == "q1"
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
query_ok("q1", "whatev"),
))
.await
.unwrap();

let task = core.poll_workflow_activation().await.unwrap();
assert_matches!(
task.jobs.as_slice(),
&[WorkflowActivationJob {
variant: Some(workflow_activation_job::Variant::QueryWorkflow(ref q)),
}]
if q.query_id == LEGACY_QUERY_ID
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
query_ok(LEGACY_QUERY_ID, "whatev"),
))
.await
.unwrap();

// Handle the activity so we can shut down cleanly
let act_task = core.poll_activity_task().await.unwrap();
core.complete_activity_task(ActivityTaskCompletion {
task_token: act_task.task_token,
result: Some(ActivityExecutionResult::ok(vec![1].into())),
})
.await
.unwrap();

core.drain_pollers_and_shutdown().await;
}
96 changes: 12 additions & 84 deletions core/src/core_tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use temporal_sdk_core_protos::{
},
workflow_commands::{
query_result, ActivityCancellationType, CompleteWorkflowExecution,
ContinueAsNewWorkflowExecution, QueryResult, QuerySuccess, RequestCancelActivity,
ContinueAsNewWorkflowExecution, QueryResult, RequestCancelActivity,
},
workflow_completion::WorkflowActivationCompletion,
},
Expand All @@ -33,7 +33,9 @@ use temporal_sdk_core_protos::{
},
TestHistoryBuilder,
};
use temporal_sdk_core_test_utils::{schedule_activity_cmd, start_timer_cmd, WorkerTestHelpers};
use temporal_sdk_core_test_utils::{
query_ok, schedule_activity_cmd, start_timer_cmd, WorkerTestHelpers,
};

#[rstest::rstest]
#[case::with_history(true)]
Expand Down Expand Up @@ -111,16 +113,7 @@ async fn legacy_query(#[case] include_history: bool) {
worker
.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: query.query_id.clone(),
variant: Some(
QuerySuccess {
response: Some(query_resp.into()),
}
.into(),
),
}
.into(),
query_ok(&query.query_id, query_resp),
))
.await
.unwrap();
Expand Down Expand Up @@ -223,18 +216,7 @@ async fn new_queries(#[values(1, 3)] num_queries: usize) {

let mut commands = vec![];
for i in 1..=num_queries {
commands.push(
QueryResult {
query_id: format!("q{i}"),
variant: Some(
QuerySuccess {
response: Some(query_resp.into()),
}
.into(),
),
}
.into(),
);
commands.push(query_ok(format!("q{i}"), query_resp));
}
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds(
task.run_id,
Expand Down Expand Up @@ -410,16 +392,7 @@ async fn legacy_query_after_complete(#[values(false, true)] full_history: bool)
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: query.query_id.clone(),
variant: Some(
QuerySuccess {
response: Some("whatever".into()),
}
.into(),
),
}
.into(),
query_ok(query.query_id.clone(), "whatever"),
))
.await
.unwrap();
Expand Down Expand Up @@ -533,16 +506,7 @@ async fn query_cache_miss_causes_page_fetch_dont_reply_wft_too_early(
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: "the-query".to_string(),
variant: Some(
QuerySuccess {
response: Some(query_resp.into()),
}
.into(),
),
}
.into(),
query_ok("the-query".to_string(), query_resp),
))
.await
.unwrap();
Expand Down Expand Up @@ -631,16 +595,7 @@ async fn query_replay_with_continue_as_new_doesnt_reply_empty_command() {

core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: query.query_id.clone(),
variant: Some(
QuerySuccess {
response: Some("whatever".into()),
}
.into(),
),
}
.into(),
query_ok(query.query_id.clone(), "whatever"),
))
.await
.unwrap();
Expand Down Expand Up @@ -689,16 +644,7 @@ async fn legacy_query_response_gets_not_found_not_fatal() {
// Fail wft which should result in query being failed
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: LEGACY_QUERY_ID.to_string(),
variant: Some(
QuerySuccess {
response: Some("hi".into()),
}
.into(),
),
}
.into(),
query_ok(LEGACY_QUERY_ID.to_string(), "hi"),
))
.await
.unwrap();
Expand Down Expand Up @@ -883,16 +829,7 @@ async fn legacy_query_combined_with_timer_fire_repro() {
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: "the-query".to_string(),
variant: Some(
QuerySuccess {
response: Some("whatever".into()),
}
.into(),
),
}
.into(),
query_ok("the-query".to_string(), "whatever"),
))
.await
.unwrap();
Expand All @@ -907,16 +844,7 @@ async fn legacy_query_combined_with_timer_fire_repro() {
);
core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd(
task.run_id,
QueryResult {
query_id: LEGACY_QUERY_ID.to_string(),
variant: Some(
QuerySuccess {
response: Some("whatever".into()),
}
.into(),
),
}
.into(),
query_ok(LEGACY_QUERY_ID.to_string(), "whatever"),
))
.await
.unwrap();
Expand Down
17 changes: 15 additions & 2 deletions test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use temporal_sdk_core_api::{
use temporal_sdk_core_protos::{
coresdk::{
workflow_commands::{
workflow_command, ActivityCancellationType, CompleteWorkflowExecution,
ScheduleActivity, ScheduleLocalActivity, StartTimer,
workflow_command, ActivityCancellationType, CompleteWorkflowExecution, QueryResult,
QuerySuccess, ScheduleActivity, ScheduleLocalActivity, StartTimer,
},
workflow_completion::WorkflowActivationCompletion,
},
Expand Down Expand Up @@ -674,6 +674,19 @@ pub fn start_timer_cmd(seq: u32, duration: Duration) -> workflow_command::Varian
.into()
}

pub fn query_ok(id: impl Into<String>, response: impl Into<Payload>) -> workflow_command::Variant {
QueryResult {
query_id: id.into(),
variant: Some(
QuerySuccess {
response: Some(response.into()),
}
.into(),
),
}
.into()
}

/// Given a desired number of concurrent executions and a provided function that produces a future,
/// run that many instances of the future concurrently.
///
Expand Down
Loading

0 comments on commit 1c66c9d

Please sign in to comment.