Skip to content

Commit

Permalink
Don't request eager activities when task queue throttle is set (#845)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource authored Nov 21, 2024
1 parent d31c105 commit 6155b09
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 14 deletions.
2 changes: 2 additions & 0 deletions core-api/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ pub struct WorkerConfig {
/// server-side. Note that this only takes effect upon an activity poll request. If multiple
/// workers on the same queue have different values set, they will thrash with the last poller
/// winning.
///
/// Setting this to a nonzero value will also disable eager activity execution.
#[builder(default)]
pub max_task_queue_activities_per_second: Option<f64>,

Expand Down
20 changes: 9 additions & 11 deletions core/src/core_tests/activity_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,11 +633,11 @@ async fn max_tq_acts_set_passed_to_poll_properly() {
worker.poll_activity_task().await.unwrap();
}

/// This test doesn't test the real worker config since [mock_worker] bypasses the worker
/// constructor, [mock_worker] will not pass an activity poller to the worker when
/// `no_remote_activities` is set to `true`.
#[rstest::rstest]
#[tokio::test]
async fn no_eager_activities_requested_when_worker_options_disable_remote_activities() {
async fn no_eager_activities_requested_when_worker_options_disable_it(
#[values("no_remote", "throttle")] reason: &'static str,
) {
let wfid = "fake_wf_id";
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
Expand All @@ -648,7 +648,6 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi
t.add_full_wf_task();
t.add_workflow_execution_completed();
let num_eager_requested = Arc::new(AtomicUsize::new(0));
// Clone it to move into the callback below
let num_eager_requested_clone = num_eager_requested.clone();

let mut mock = mock_workflow_client();
Expand Down Expand Up @@ -677,14 +676,13 @@ async fn no_eager_activities_requested_when_worker_options_disable_remote_activi
})
});
let mut mock = single_hist_mock_sg(wfid, t, [1], mock, true);
let mut mock_poller = mock_manual_poller();
mock_poller
.expect_poll()
.returning(|| futures_util::future::pending().boxed());
mock.set_act_poller(Box::new(mock_poller));
mock.worker_cfg(|wc| {
wc.max_cached_workflows = 2;
wc.no_remote_activities = true;
if reason == "no_remote" {
wc.no_remote_activities = true;
} else {
wc.max_task_queue_activities_per_second = Some(1.0);
}
});
let core = mock_worker(mock);

Expand Down
9 changes: 6 additions & 3 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,9 +477,12 @@ impl Worker {
la_sink,
local_act_mgr.clone(),
hb_rx,
at_task_mgr
.as_ref()
.map(|mgr| mgr.get_handle_for_workflows()),
at_task_mgr.as_ref().and_then(|mgr| {
match config.max_task_queue_activities_per_second {
Some(persec) if persec > 0.0 => None,
_ => Some(mgr.get_handle_for_workflows()),
}
}),
telem_instance,
),
at_task_mgr,
Expand Down

0 comments on commit 6155b09

Please sign in to comment.