Skip to content

Commit

Permalink
Fix worker shutdown during replay test (#631)
Browse files Browse the repository at this point in the history
  • Loading branch information
antlai-temporal authored Nov 14, 2023
1 parent a1351da commit 8afcc50
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 5 deletions.
5 changes: 5 additions & 0 deletions core/src/worker/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ pub(crate) trait WorkerClient: Sync + Send {
#[allow(clippy::needless_lifetimes)] // Clippy is wrong here
fn capabilities<'a>(&'a self) -> Option<&'a get_system_info_response::Capabilities>;
fn workers(&self) -> Arc<SlotManager>;
fn is_mock(&self) -> bool;
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -387,6 +388,10 @@ impl WorkerClient for WorkerClientBag {
fn workers(&self) -> Arc<SlotManager> {
self.client.get_client().inner().workers()
}

fn is_mock(&self) -> bool {
false
}
}

/// A version of [RespondWorkflowTaskCompletedRequest] that will finish being filled out by the
Expand Down
4 changes: 4 additions & 0 deletions core/src/worker/client/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub(crate) fn mock_workflow_client() -> MockWorkerClient {
.returning(|| Some(DEFAULT_TEST_CAPABILITIES));
r.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
r.expect_is_mock().returning(|| true);
r
}

Expand All @@ -38,6 +39,7 @@ pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient {
.returning(|| Some(DEFAULT_TEST_CAPABILITIES));
r.expect_workers()
.returning(|| DEFAULT_WORKERS_REGISTRY.clone());
r.expect_is_mock().returning(|| true);
r
}

Expand Down Expand Up @@ -116,5 +118,7 @@ mockall::mock! {
fn capabilities(&self) -> Option<&'static get_system_info_response::Capabilities>;

fn workers(&self) -> Arc<SlotManager>;

fn is_mock(&self) -> bool;
}
}
14 changes: 10 additions & 4 deletions core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::{
ActivityHeartbeat, CompleteActivityError, PollActivityError, PollWfError, WorkerTrait,
};
use activities::WorkerActivityTasks;
use futures_util::stream;
use futures_util::{stream, StreamExt};
use slot_provider::SlotProvider;
use std::{
convert::TryInto,
Expand Down Expand Up @@ -73,7 +73,6 @@ use {
protosext::ValidPollWFTQResponse,
},
futures::stream::BoxStream,
futures_util::StreamExt,
temporal_sdk_core_protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse,
};

Expand Down Expand Up @@ -313,8 +312,15 @@ impl Worker {
sticky_queue_poller,
));
let wft_stream = new_wft_poller(wf_task_poll_buffer, metrics.clone());
let wft_stream =
stream::select(wft_stream, UnboundedReceiverStream::new(external_wft_rx));
let wft_stream = if !client.is_mock() {
// Some replay tests combine a mock client with real pollers,
// and they don't need to use the external stream
stream::select(wft_stream, UnboundedReceiverStream::new(external_wft_rx))
.left_stream()
} else {
wft_stream.right_stream()
};

#[cfg(test)]
let wft_stream = wft_stream.left_stream();
(wft_stream, act_poll_buffer)
Expand Down
29 changes: 28 additions & 1 deletion tests/integ_tests/workflow_tests/replay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::integ_tests::workflow_tests::patches::changes_wf;
use assert_matches::assert_matches;
use parking_lot::Mutex;
use std::{collections::HashSet, sync::Arc, time::Duration};
use temporal_sdk::{interceptors::WorkerInterceptor, WfContext, WorkflowFunction};
use temporal_sdk::{interceptors::WorkerInterceptor, WfContext, Worker, WorkflowFunction};
use temporal_sdk_core::replay::{HistoryFeeder, HistoryForReplay};
use temporal_sdk_core_api::errors::{PollActivityError, PollWfError};
use temporal_sdk_core_protos::{
Expand Down Expand Up @@ -167,6 +167,19 @@ async fn replay_ok_ending_with_timed_out() {
replay_abrupt_ending(t2).await;
}

#[tokio::test]
async fn replay_shutdown_worker() {
let t = canned_histories::single_timer("1");
let func = timers_wf(1);
let mut worker = replay_sdk_worker([test_hist_to_replay(t)]);
worker.register_wf(DEFAULT_WORKFLOW_TYPE, func);
let shutdown_ctr_i = UniqueShutdownWorker::default();
let shutdown_ctr = shutdown_ctr_i.runs.clone();
worker.set_worker_interceptor(shutdown_ctr_i);
worker.run().await.unwrap();
assert_eq!(shutdown_ctr.lock().len(), 1);
}

#[rstest::rstest]
#[tokio::test]
async fn multiple_histories_replay(#[values(false, true)] use_feeder: bool) {
Expand Down Expand Up @@ -288,3 +301,17 @@ impl WorkerInterceptor for UniqueRunsCounter {
self.runs.lock().insert(completion.run_id.clone());
}
}

#[derive(Default)]
struct UniqueShutdownWorker {
runs: Arc<Mutex<HashSet<String>>>,
}
#[async_trait::async_trait(?Send)]
impl WorkerInterceptor for UniqueShutdownWorker {
fn on_shutdown(&self, _sdk_worker: &Worker) {
// Assumed one worker per task queue.
self.runs
.lock()
.insert(_sdk_worker.task_queue().to_string());
}
}

0 comments on commit 8afcc50

Please sign in to comment.