From 258e6c9082db2066106e17b716d5eaf3c0541f0d Mon Sep 17 00:00:00 2001 From: Rain Date: Sun, 15 Dec 2024 01:22:57 +0000 Subject: [PATCH] [nextest-runner] remove cancellation AtomicBool We've changed things such that the executor always checks in with the dispatcher before starting a unit. This makes the bool unnecessary. The cancel receiver is currently necessary for the handle_delay_between_attempts step, which should cancel the run no matter what. We'll remove this shortly by moving this functionality into req_rx. --- nextest-runner/src/runner/dispatcher.rs | 12 ++------- nextest-runner/src/runner/executor.rs | 34 +++++-------------------- nextest-runner/src/runner/imp.rs | 15 ++--------- 3 files changed, 10 insertions(+), 51 deletions(-) diff --git a/nextest-runner/src/runner/dispatcher.rs b/nextest-runner/src/runner/dispatcher.rs index ebe2a76243d..da31ed618f3 100644 --- a/nextest-runner/src/runner/dispatcher.rs +++ b/nextest-runner/src/runner/dispatcher.rs @@ -23,11 +23,7 @@ use crate::{ use chrono::Local; use debug_ignore::DebugIgnore; use quick_junit::ReportUuid; -use std::{ - collections::BTreeMap, - sync::atomic::{AtomicBool, Ordering}, - time::Duration, -}; +use std::{collections::BTreeMap, time::Duration}; use tokio::sync::{ broadcast, mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, @@ -102,7 +98,6 @@ where signal_handler: &mut SignalHandler, input_handler: &mut InputHandler, report_cancel_rx: oneshot::Receiver<()>, - cancelled_ref: &AtomicBool, cancellation_sender: broadcast::Sender<()>, ) -> RunnerTaskState { let mut report_cancel_rx = std::pin::pin!(report_cancel_rx); @@ -275,10 +270,7 @@ where self.info_finished(total.saturating_sub(index + 1)); } HandleEventResponse::Cancel(cancel) => { - // A cancellation notice was received. Note the ordering here: - // cancelled_ref is set *before* notifications are broadcast. This - // prevents race conditions. - cancelled_ref.store(true, Ordering::Release); + // A cancellation notice was received. let _ = cancellation_sender.send(()); match cancel { // Some of the branches here don't do anything, but are specified diff --git a/nextest-runner/src/runner/executor.rs b/nextest-runner/src/runner/executor.rs index f9d051df1d6..05faa927bca 100644 --- a/nextest-runner/src/runner/executor.rs +++ b/nextest-runner/src/runner/executor.rs @@ -39,10 +39,7 @@ use std::{ num::NonZeroUsize, pin::Pin, process::{ExitStatus, Stdio}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, + sync::Arc, time::Duration, }; use tokio::{ @@ -92,7 +89,6 @@ impl<'a> ExecutorContext<'a> { pub(super) async fn run_setup_scripts( &self, resp_tx: UnboundedSender>, - cancelled_ref: &AtomicBool, ) -> SetupScriptExecuteData<'a> { let setup_scripts = self.profile.setup_scripts(self.test_list); let total = setup_scripts.len(); @@ -108,11 +104,6 @@ impl<'a> ExecutorContext<'a> { let config = script.config; let script_fut = async move { - if cancelled_ref.load(Ordering::Acquire) { - // Check for test cancellation. - return None; - } - let (req_rx_tx, req_rx_rx) = oneshot::channel(); let _ = this_resp_tx.send(ExecutorEvent::SetupScriptStarted { script_id: script_id.clone(), @@ -172,15 +163,9 @@ impl<'a> ExecutorContext<'a> { test_instance: TestInstance<'a>, settings: TestSettings<'a>, resp_tx: UnboundedSender>, - cancelled_ref: &AtomicBool, mut cancel_receiver: broadcast::Receiver<()>, setup_script_data: Arc>, ) { - if cancelled_ref.load(Ordering::Acquire) { - // Check for test cancellation. - return; - } - debug!(test_name = test_instance.name, "running test"); let settings = Arc::new(settings); @@ -224,14 +209,10 @@ impl<'a> ExecutorContext<'a> { total_attempts, }; - // Note: do not check for cancellation here. - // Only check for cancellation after the first - // run, to avoid a situation where run_statuses - // is empty. - if retry_data.attempt > 1 { - // Ensure that the dispatcher believes the run is still ongoing. If the run is - // cancelled, the dispatcher will let us know. + // Ensure that the dispatcher believes the run is still ongoing. + // If the run is cancelled, the dispatcher will let us know by + // dropping the receiver. let (tx, rx) = oneshot::channel(); _ = resp_tx.send(ExecutorEvent::RetryStarted { test_instance, @@ -242,8 +223,8 @@ impl<'a> ExecutorContext<'a> { match rx.await { Ok(()) => {} Err(_) => { - // The receiver was dropped -- the dispatcher has signaled that this unit - // should exit. + // The receiver was dropped -- the dispatcher has + // signaled that this unit should exit. return; } } @@ -265,9 +246,6 @@ impl<'a> ExecutorContext<'a> { if run_status.result.is_success() { // The test succeeded. break run_status; - } else if cancelled_ref.load(Ordering::Acquire) { - // The test was cancelled. - break run_status; } else if retry_data.attempt < retry_data.total_attempts { // Retry this test: send a retry event, then retry the loop. delay = backoff_iter diff --git a/nextest-runner/src/runner/imp.rs b/nextest-runner/src/runner/imp.rs index 93f263b9329..4f7a794db3e 100644 --- a/nextest-runner/src/runner/imp.rs +++ b/nextest-runner/src/runner/imp.rs @@ -20,11 +20,7 @@ use async_scoped::TokioScope; use future_queue::StreamExt; use futures::prelude::*; use quick_junit::ReportUuid; -use std::{ - convert::Infallible, - fmt, - sync::{atomic::AtomicBool, Arc}, -}; +use std::{convert::Infallible, fmt, sync::Arc}; use tokio::{ runtime::Runtime, sync::{broadcast, mpsc::unbounded_channel, oneshot}, @@ -178,7 +174,6 @@ impl<'a> TestRunner<'a> { F: FnMut(TestEvent<'a>) -> Result<(), E> + Send, E: fmt::Debug + Send, { - let cancelled = AtomicBool::new(false); let (report_cancel_tx, report_cancel_rx) = oneshot::channel(); // If report_cancel_tx is None, at least one error has occurred and the @@ -190,7 +185,6 @@ impl<'a> TestRunner<'a> { let res = self.inner.execute( &mut self.signal_handler, &mut self.input_handler, - &cancelled, report_cancel_rx, |event| { match callback(event) { @@ -247,7 +241,6 @@ impl<'a> TestRunnerInner<'a> { &self, signal_handler: &mut SignalHandler, input_handler: &mut InputHandler, - cancelled_ref: &AtomicBool, report_cancel_rx: oneshot::Receiver<()>, callback: F, ) -> Result> @@ -295,7 +288,6 @@ impl<'a> TestRunnerInner<'a> { signal_handler, input_handler, report_cancel_rx, - cancelled_ref, cancellation_sender.clone(), ); scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled); @@ -305,9 +297,7 @@ impl<'a> TestRunnerInner<'a> { let run_scripts_fut = async move { // Since script tasks are run serially, we just reuse the one // script task. - let script_data = executor_cx_ref - .run_setup_scripts(script_resp_tx, cancelled_ref) - .await; + let script_data = executor_cx_ref.run_setup_scripts(script_resp_tx).await; if script_tx.send(script_data).is_err() { // The dispatcher has shut down, so we should too. debug!("script_tx.send failed, shutting down"); @@ -369,7 +359,6 @@ impl<'a> TestRunnerInner<'a> { test_instance, settings, resp_tx.clone(), - cancelled_ref, cancel_rx, setup_script_data, ))