diff --git a/core/src/core_tests/local_activities.rs b/core/src/core_tests/local_activities.rs index 26948695b..766cc5d40 100644 --- a/core/src/core_tests/local_activities.rs +++ b/core/src/core_tests/local_activities.rs @@ -32,9 +32,7 @@ use temporal_sdk_core_protos::{ coresdk::{ activity_result::ActivityExecutionResult, workflow_activation::{workflow_activation_job, WorkflowActivationJob}, - workflow_commands::{ - ActivityCancellationType, QueryResult, QuerySuccess, ScheduleLocalActivity, - }, + workflow_commands::{ActivityCancellationType, ScheduleLocalActivity}, workflow_completion::WorkflowActivationCompletion, ActivityTaskCompletion, AsJsonPayloadExt, }, @@ -47,7 +45,7 @@ use temporal_sdk_core_protos::{ DEFAULT_ACTIVITY_TYPE, }; use temporal_sdk_core_test_utils::{ - schedule_local_activity_cmd, start_timer_cmd, WorkerTestHelpers, + query_ok, schedule_local_activity_cmd, start_timer_cmd, WorkerTestHelpers, }; use tokio::{join, select, sync::Barrier}; @@ -527,16 +525,7 @@ async fn query_during_wft_heartbeat_doesnt_accidentally_fail_to_continue_heartbe barrier.wait().await; core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: query.query_id.clone(), - variant: Some( - QuerySuccess { - response: Some("whatever".into()), - } - .into(), - ), - } - .into(), + query_ok(&query.query_id, "whatev"), )) .await .unwrap(); @@ -699,16 +688,7 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: "q1".to_string(), - variant: Some( - QuerySuccess { - response: Some("whatev".into()), - } - .into(), - ), - } - .into(), + query_ok("q1", "whatev"), )) .await .unwrap(); @@ -717,18 +697,9 @@ async fn la_resolve_during_legacy_query_does_not_combine(#[case] impossible_quer if impossible_query_in_task { // finish last query let task = core.poll_workflow_activation().await.unwrap(); - core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - vec![QueryResult { - query_id: LEGACY_QUERY_ID.to_string(), - variant: Some( - QuerySuccess { - response: Some("whatev".into()), - } - .into(), - ), - } - .into()], + query_ok(LEGACY_QUERY_ID, "whatev"), )) .await .unwrap(); @@ -1221,3 +1192,106 @@ async fn local_activities_can_be_delivered_during_shutdown() { assert_matches!(wf_r.unwrap_err(), PollWfError::ShutDown); assert_matches!(act_r.unwrap_err(), PollActivityError::ShutDown); } + +#[tokio::test] +async fn queries_can_be_received_while_heartbeating() { + let wfid = "fake_wf_id"; + let mut t = TestHistoryBuilder::default(); + t.add_wfe_started_with_wft_timeout(Duration::from_millis(200)); + t.add_full_wf_task(); + t.add_full_wf_task(); + t.add_full_wf_task(); + + let tasks = [ + hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::ToTaskNum(1)), + { + let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(2)); + pr.queries = HashMap::new(); + pr.queries.insert( + "q1".to_string(), + WorkflowQuery { + query_type: "query-type".to_string(), + query_args: Some(b"hi".into()), + header: None, + }, + ); + pr + }, + { + let mut pr = hist_to_poll_resp(&t, wfid.to_owned(), ResponseType::OneTask(3)); + pr.query = Some(WorkflowQuery { + query_type: "query-type".to_string(), + query_args: Some(b"hi".into()), + header: None, + }); + pr + }, + ]; + let mut mock = mock_workflow_client(); + mock.expect_respond_legacy_query() + .times(1) + .returning(move |_, _| Ok(Default::default())); + let mut mock = single_hist_mock_sg(wfid, t, tasks, mock, true); + mock.worker_cfg(|wc| wc.max_cached_workflows = 1); + let core = mock_worker(mock); + + let task = core.poll_workflow_activation().await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + &[WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::StartWorkflow(_)), + },] + ); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + schedule_local_activity_cmd( + 1, + "act-id", + ActivityCancellationType::TryCancel, + Duration::from_secs(60), + ), + )) + .await + .unwrap(); + + let task = core.poll_workflow_activation().await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + &[WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::QueryWorkflow(ref q)), + }] + if q.query_id == "q1" + ); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + query_ok("q1", "whatev"), + )) + .await + .unwrap(); + + let task = core.poll_workflow_activation().await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + &[WorkflowActivationJob { + variant: Some(workflow_activation_job::Variant::QueryWorkflow(ref q)), + }] + if q.query_id == LEGACY_QUERY_ID + ); + core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( + task.run_id, + query_ok(LEGACY_QUERY_ID, "whatev"), + )) + .await + .unwrap(); + + // Handle the activity so we can shut down cleanly + let act_task = core.poll_activity_task().await.unwrap(); + core.complete_activity_task(ActivityTaskCompletion { + task_token: act_task.task_token, + result: Some(ActivityExecutionResult::ok(vec![1].into())), + }) + .await + .unwrap(); + + core.drain_pollers_and_shutdown().await; +} diff --git a/core/src/core_tests/queries.rs b/core/src/core_tests/queries.rs index 7eab99556..18a1fd9fd 100644 --- a/core/src/core_tests/queries.rs +++ b/core/src/core_tests/queries.rs @@ -17,7 +17,7 @@ use temporal_sdk_core_protos::{ }, workflow_commands::{ query_result, ActivityCancellationType, CompleteWorkflowExecution, - ContinueAsNewWorkflowExecution, QueryResult, QuerySuccess, RequestCancelActivity, + ContinueAsNewWorkflowExecution, QueryResult, RequestCancelActivity, }, workflow_completion::WorkflowActivationCompletion, }, @@ -33,7 +33,9 @@ use temporal_sdk_core_protos::{ }, TestHistoryBuilder, }; -use temporal_sdk_core_test_utils::{schedule_activity_cmd, start_timer_cmd, WorkerTestHelpers}; +use temporal_sdk_core_test_utils::{ + query_ok, schedule_activity_cmd, start_timer_cmd, WorkerTestHelpers, +}; #[rstest::rstest] #[case::with_history(true)] @@ -111,16 +113,7 @@ async fn legacy_query(#[case] include_history: bool) { worker .complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: query.query_id.clone(), - variant: Some( - QuerySuccess { - response: Some(query_resp.into()), - } - .into(), - ), - } - .into(), + query_ok(&query.query_id, query_resp), )) .await .unwrap(); @@ -223,18 +216,7 @@ async fn new_queries(#[values(1, 3)] num_queries: usize) { let mut commands = vec![]; for i in 1..=num_queries { - commands.push( - QueryResult { - query_id: format!("q{i}"), - variant: Some( - QuerySuccess { - response: Some(query_resp.into()), - } - .into(), - ), - } - .into(), - ); + commands.push(query_ok(format!("q{i}"), query_resp)); } core.complete_workflow_activation(WorkflowActivationCompletion::from_cmds( task.run_id, @@ -410,16 +392,7 @@ async fn legacy_query_after_complete(#[values(false, true)] full_history: bool) ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: query.query_id.clone(), - variant: Some( - QuerySuccess { - response: Some("whatever".into()), - } - .into(), - ), - } - .into(), + query_ok(query.query_id.clone(), "whatever"), )) .await .unwrap(); @@ -533,16 +506,7 @@ async fn query_cache_miss_causes_page_fetch_dont_reply_wft_too_early( ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: "the-query".to_string(), - variant: Some( - QuerySuccess { - response: Some(query_resp.into()), - } - .into(), - ), - } - .into(), + query_ok("the-query".to_string(), query_resp), )) .await .unwrap(); @@ -631,16 +595,7 @@ async fn query_replay_with_continue_as_new_doesnt_reply_empty_command() { core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: query.query_id.clone(), - variant: Some( - QuerySuccess { - response: Some("whatever".into()), - } - .into(), - ), - } - .into(), + query_ok(query.query_id.clone(), "whatever"), )) .await .unwrap(); @@ -689,16 +644,7 @@ async fn legacy_query_response_gets_not_found_not_fatal() { // Fail wft which should result in query being failed core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: LEGACY_QUERY_ID.to_string(), - variant: Some( - QuerySuccess { - response: Some("hi".into()), - } - .into(), - ), - } - .into(), + query_ok(LEGACY_QUERY_ID.to_string(), "hi"), )) .await .unwrap(); @@ -883,16 +829,7 @@ async fn legacy_query_combined_with_timer_fire_repro() { ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: "the-query".to_string(), - variant: Some( - QuerySuccess { - response: Some("whatever".into()), - } - .into(), - ), - } - .into(), + query_ok("the-query".to_string(), "whatever"), )) .await .unwrap(); @@ -907,16 +844,7 @@ async fn legacy_query_combined_with_timer_fire_repro() { ); core.complete_workflow_activation(WorkflowActivationCompletion::from_cmd( task.run_id, - QueryResult { - query_id: LEGACY_QUERY_ID.to_string(), - variant: Some( - QuerySuccess { - response: Some("whatever".into()), - } - .into(), - ), - } - .into(), + query_ok(LEGACY_QUERY_ID.to_string(), "whatever"), )) .await .unwrap(); diff --git a/test-utils/src/lib.rs b/test-utils/src/lib.rs index 6964b71d6..623f8a826 100644 --- a/test-utils/src/lib.rs +++ b/test-utils/src/lib.rs @@ -47,8 +47,8 @@ use temporal_sdk_core_api::{ use temporal_sdk_core_protos::{ coresdk::{ workflow_commands::{ - workflow_command, ActivityCancellationType, CompleteWorkflowExecution, - ScheduleActivity, ScheduleLocalActivity, StartTimer, + workflow_command, ActivityCancellationType, CompleteWorkflowExecution, QueryResult, + QuerySuccess, ScheduleActivity, ScheduleLocalActivity, StartTimer, }, workflow_completion::WorkflowActivationCompletion, }, @@ -674,6 +674,19 @@ pub fn start_timer_cmd(seq: u32, duration: Duration) -> workflow_command::Varian .into() } +pub fn query_ok(id: impl Into, response: impl Into) -> workflow_command::Variant { + QueryResult { + query_id: id.into(), + variant: Some( + QuerySuccess { + response: Some(response.into()), + } + .into(), + ), + } + .into() +} + /// Given a desired number of concurrent executions and a provided function that produces a future, /// run that many instances of the future concurrently. /// diff --git a/tests/integ_tests/workflow_tests/local_activities.rs b/tests/integ_tests/workflow_tests/local_activities.rs index 76b522afc..b2eba762a 100644 --- a/tests/integ_tests/workflow_tests/local_activities.rs +++ b/tests/integ_tests/workflow_tests/local_activities.rs @@ -589,7 +589,6 @@ async fn repro_nondeterminism_with_timer_bug() { #[rstest::rstest] #[tokio::test] async fn weird_la_nondeterminism_repro(#[values(true, false)] fix_hist: bool) { - init_integ_telem(); let mut hist = history_from_proto_binary( "histories/evict_while_la_running_no_interference-85_history.bin", ) @@ -618,7 +617,6 @@ async fn weird_la_nondeterminism_repro(#[values(true, false)] fix_hist: bool) { #[tokio::test] async fn second_weird_la_nondeterminism_repro() { - init_integ_telem(); let mut hist = history_from_proto_binary( "histories/evict_while_la_running_no_interference-23_history.bin", ) @@ -644,7 +642,6 @@ async fn second_weird_la_nondeterminism_repro() { #[tokio::test] async fn third_weird_la_nondeterminism_repro() { - init_integ_telem(); let mut hist = history_from_proto_binary( "histories/evict_while_la_running_no_interference-16_history.bin", )