From 037ef4d846ca829180ad6b49198369f5aa4c9eea Mon Sep 17 00:00:00 2001 From: Rain Date: Wed, 20 Sep 2023 16:01:49 -0700 Subject: [PATCH] [nextest-runner] move a few pieces of code into separate functions We're going to reuse these functions to handle setup scripts. --- nextest-runner/src/runner.rs | 199 +++++++++++++++++++---------------- 1 file changed, 109 insertions(+), 90 deletions(-) diff --git a/nextest-runner/src/runner.rs b/nextest-runner/src/runner.rs index 4abb3bad8a1..b79345d1bb8 100644 --- a/nextest-runner/src/runner.rs +++ b/nextest-runner/src/runner.rs @@ -17,7 +17,7 @@ use crate::{ }, signal::{JobControlEvent, ShutdownEvent, SignalEvent, SignalHandler, SignalHandlerKind}, target_runner::TargetRunner, - time::{StopwatchEnd, StopwatchStart}, + time::{PausableSleep, StopwatchEnd, StopwatchStart}, }; use async_scoped::TokioScope; use bytes::{Bytes, BytesMut}; @@ -31,7 +31,8 @@ use std::{ fmt::Write, marker::PhantomData, num::NonZeroUsize, - process::Stdio, + pin::Pin, + process::{ExitStatus, Stdio}, sync::atomic::{AtomicBool, Ordering}, time::{Duration, SystemTime}, }; @@ -39,7 +40,7 @@ use tokio::{ io::{AsyncRead, AsyncReadExt}, process::Child, runtime::Runtime, - sync::mpsc::UnboundedSender, + sync::{broadcast, mpsc::UnboundedSender}, }; use uuid::Uuid; @@ -290,13 +291,12 @@ impl<'a> TestRunnerInner<'a> { // - Shutdown signals (once) // - Signals twice // 32 should be more than enough. - let (forward_sender, _forward_receiver) = - tokio::sync::broadcast::channel::(32); + let (forward_sender, _forward_receiver) = broadcast::channel::(32); let forward_sender_ref = &forward_sender; TokioScope::scope_and_block(move |scope| { let (run_sender, mut run_receiver) = tokio::sync::mpsc::unbounded_channel(); - let (cancellation_sender, _cancellation_receiver) = tokio::sync::broadcast::channel(1); + let (cancellation_sender, _cancellation_receiver) = broadcast::channel(1); let exec_cancellation_sender = cancellation_sender.clone(); let exec_fut = async move { @@ -557,25 +557,7 @@ impl<'a> TestRunnerInner<'a> { run_statuses: ExecutionStatuses::new(run_statuses), }); - // Drain the forward receiver of any messages, including those that are - // related to SIGTSTP. - loop { - let message = this_forward_receiver.try_recv(); - match message { - #[cfg(unix)] - Ok(SignalForwardEvent::Stop(sender)) => { - // The receiver being dead isn't really important. - let _ = sender.send(()).await; - } - Err( - tokio::sync::broadcast::error::TryRecvError::Empty - | tokio::sync::broadcast::error::TryRecvError::Closed, - ) => { - break; - } - _ => {} - } - } + drain_forward_receiver(this_forward_receiver).await; }; (threads_required, test_group, fut) }) @@ -615,7 +597,7 @@ impl<'a> TestRunnerInner<'a> { retry_data: RetryData, settings: &TestSettings, run_sender: &UnboundedSender>, - forward_receiver: &mut tokio::sync::broadcast::Receiver, + forward_receiver: &mut broadcast::Receiver, delay_before_start: Duration, ) -> InternalExecuteStatus { let mut stopwatch = crate::time::stopwatch(); @@ -658,7 +640,7 @@ impl<'a> TestRunnerInner<'a> { stopwatch: &mut StopwatchStart, settings: &TestSettings, run_sender: &UnboundedSender>, - forward_receiver: &mut tokio::sync::broadcast::Receiver, + forward_receiver: &mut broadcast::Receiver, delay_before_start: Duration, ) -> Result { let ctx = TestExecuteContext { @@ -763,38 +745,14 @@ impl<'a> TestRunnerInner<'a> { } } recv = forward_receiver.recv() => { - // The sender stays open longer than the whole loop, and the buffer is big - // enough for all messages ever sent through this channel, so a RecvError - // should never happen. - let forward_event = recv.expect("a RecvError should never happen here"); - - match forward_event { - #[cfg(unix)] - SignalForwardEvent::Stop(sender) => { - // It isn't possible to receive a stop event twice since it gets - // debounced in the main signal handler. - stopwatch.pause(); - interval_sleep.as_mut().pause(); - imp::job_control_child(&child, JobControlEvent::Stop); - // The receiver being dead probably means the main thread panicked - // or similar. - let _ = sender.send(()).await; - } - #[cfg(unix)] - SignalForwardEvent::Continue => { - // It's possible to receive a resume event right at the beginning of - // test execution, so debounce it. - if stopwatch.is_paused() { - stopwatch.resume(); - interval_sleep.as_mut().resume(); - imp::job_control_child(&child, JobControlEvent::Continue); - } - } - SignalForwardEvent::Shutdown(event) => { - imp::terminate_child(&mut child, TerminateMode::Signal(event), forward_receiver, job.as_ref()).await; - } - } - + handle_forward_event( + &mut child, + recv, + stopwatch, + interval_sleep.as_mut(), + forward_receiver, + job.as_ref(), + ).await; } }; }; @@ -827,34 +785,7 @@ impl<'a> TestRunnerInner<'a> { let output = res.map_err(RunTestError::Wait)?; let exit_status = output; - let status = status.unwrap_or_else(|| { - if exit_status.success() { - if leaked { - ExecutionResult::Leak - } else { - ExecutionResult::Pass - } - } else { - cfg_if::cfg_if! { - if #[cfg(unix)] { - // On Unix, extract the signal if it's found. - use std::os::unix::process::ExitStatusExt; - let abort_status = exit_status.signal().map(AbortStatus::UnixSignal); - } else if #[cfg(windows)] { - let abort_status = exit_status.code().and_then(|code| { - let exception = windows::Win32::Foundation::NTSTATUS(code); - exception.is_err().then(|| AbortStatus::WindowsNtStatus(exception)) - }); - } else { - let abort_status = None; - } - } - ExecutionResult::Fail { - abort_status, - leaked, - } - } - }); + let status = status.unwrap_or_else(|| create_execution_result(exit_status, leaked)); Ok(InternalExecuteStatus { stdout: stdout.freeze(), @@ -867,6 +798,94 @@ impl<'a> TestRunnerInner<'a> { } } +/// Drains the forward receiver of any messages, including those that are related to SIGTSTP. +async fn drain_forward_receiver(mut receiver: broadcast::Receiver) { + loop { + let message = receiver.try_recv(); + match message { + #[cfg(unix)] + Ok(SignalForwardEvent::Stop(sender)) => { + // The receiver being dead isn't really important. + let _ = sender.send(()).await; + } + Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => { + break; + } + _ => {} + } + } +} + +async fn handle_forward_event( + child: &mut tokio::process::Child, + recv: Result, + stopwatch: &mut StopwatchStart, + mut interval_sleep: Pin<&mut PausableSleep>, + forward_receiver: &mut broadcast::Receiver, + job: Option<&imp::Job>, +) { + // The sender stays open longer than the whole loop, and the buffer is big + // enough for all messages ever sent through this channel, so a RecvError + // should never happen. + let forward_event = recv.expect("a RecvError should never happen here"); + + match forward_event { + #[cfg(unix)] + SignalForwardEvent::Stop(sender) => { + // It isn't possible to receive a stop event twice since it gets + // debounced in the main signal handler. + stopwatch.pause(); + interval_sleep.as_mut().pause(); + imp::job_control_child(child, JobControlEvent::Stop); + // The receiver being dead probably means the main thread panicked + // or similar. + let _ = sender.send(()).await; + } + #[cfg(unix)] + SignalForwardEvent::Continue => { + // It's possible to receive a resume event right at the beginning of + // test execution, so debounce it. + if stopwatch.is_paused() { + stopwatch.resume(); + interval_sleep.as_mut().resume(); + imp::job_control_child(child, JobControlEvent::Continue); + } + } + SignalForwardEvent::Shutdown(event) => { + imp::terminate_child(child, TerminateMode::Signal(event), forward_receiver, job).await; + } + } +} + +fn create_execution_result(exit_status: ExitStatus, leaked: bool) -> ExecutionResult { + if exit_status.success() { + if leaked { + ExecutionResult::Leak + } else { + ExecutionResult::Pass + } + } else { + cfg_if::cfg_if! { + if #[cfg(unix)] { + // On Unix, extract the signal if it's found. + use std::os::unix::process::ExitStatusExt; + let abort_status = exit_status.signal().map(AbortStatus::UnixSignal); + } else if #[cfg(windows)] { + let abort_status = exit_status.code().and_then(|code| { + let exception = windows::Win32::Foundation::NTSTATUS(code); + exception.is_err().then(|| AbortStatus::WindowsNtStatus(exception)) + }); + } else { + let abort_status = None; + } + } + ExecutionResult::Fail { + abort_status, + leaked, + } + } +} + fn collect_output<'a>( child_stdout: Option, stdout: &'a mut BytesMut, @@ -1084,7 +1103,7 @@ pub struct ExecuteStatus { pub stdout: Bytes, /// Standard error for this test. pub stderr: Bytes, - /// The result of execution this test: pass, fail or execution error. + /// The execution result for this test: pass, fail or execution error. pub result: ExecutionResult, /// The time at which the test started. pub start_time: SystemTime, @@ -1676,7 +1695,7 @@ mod imp { pub(super) async fn terminate_child( child: &mut Child, mode: TerminateMode, - _forward_receiver: &mut tokio::sync::broadcast::Receiver, + _forward_receiver: &mut broadcast::Receiver, job: Option<&Job>, ) { // Ignore signal events since Windows propagates them to child processes (this may change if @@ -1761,7 +1780,7 @@ mod imp { pub(super) async fn terminate_child( child: &mut Child, mode: TerminateMode, - forward_receiver: &mut tokio::sync::broadcast::Receiver, + forward_receiver: &mut broadcast::Receiver, _job: Option<&Job>, ) { if let Some(pid) = child.id() {