diff --git a/core-api/src/worker.rs b/core-api/src/worker.rs index 40e938d3..66c05b1f 100644 --- a/core-api/src/worker.rs +++ b/core-api/src/worker.rs @@ -80,6 +80,8 @@ pub struct WorkerConfig { /// server-side. Note that this only takes effect upon an activity poll request. If multiple /// workers on the same queue have different values set, they will thrash with the last poller /// winning. + /// + /// Setting this to a nonzero value will also disable eager activity execution. #[builder(default)] pub max_task_queue_activities_per_second: Option, diff --git a/core/src/core_tests/activity_tasks.rs b/core/src/core_tests/activity_tasks.rs index b67f94e7..69c8c123 100644 --- a/core/src/core_tests/activity_tasks.rs +++ b/core/src/core_tests/activity_tasks.rs @@ -633,11 +633,11 @@ async fn max_tq_acts_set_passed_to_poll_properly() { worker.poll_activity_task().await.unwrap(); } -/// This test doesn't test the real worker config since [mock_worker] bypasses the worker -/// constructor, [mock_worker] will not pass an activity poller to the worker when -/// `no_remote_activities` is set to `true`. +#[rstest::rstest] #[tokio::test] -async fn no_eager_activities_requested_when_worker_options_disable_remote_activities() { +async fn no_eager_activities_requested_when_worker_options_disable_it( + #[values("no_remote", "throttle")] reason: &'static str, +) { let wfid = "fake_wf_id"; let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -648,7 +648,6 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi t.add_full_wf_task(); t.add_workflow_execution_completed(); let num_eager_requested = Arc::new(AtomicUsize::new(0)); - // Clone it to move into the callback below let num_eager_requested_clone = num_eager_requested.clone(); let mut mock = mock_workflow_client(); @@ -677,14 +676,13 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi }) }); let mut mock = single_hist_mock_sg(wfid, t, [1], mock, true); - let mut mock_poller = mock_manual_poller(); - mock_poller - .expect_poll() - .returning(|| futures_util::future::pending().boxed()); - mock.set_act_poller(Box::new(mock_poller)); mock.worker_cfg(|wc| { wc.max_cached_workflows = 2; - wc.no_remote_activities = true; + if reason == "no_remote" { + wc.no_remote_activities = true; + } else { + wc.max_task_queue_activities_per_second = Some(1.0); + } }); let core = mock_worker(mock); diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index 6112dc3b..fac1a56a 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -477,9 +477,12 @@ impl Worker { la_sink, local_act_mgr.clone(), hb_rx, - at_task_mgr - .as_ref() - .map(|mgr| mgr.get_handle_for_workflows()), + at_task_mgr.as_ref().and_then(|mgr| { + match config.max_task_queue_activities_per_second { + Some(persec) if persec > 0.0 => None, + _ => Some(mgr.get_handle_for_workflows()), + } + }), telem_instance, ), at_task_mgr,