Skip to content

Commit

Permalink
[nextest-runner] move a few pieces of code into separate functions
Browse files Browse the repository at this point in the history
We're going to reuse these functions to handle setup scripts.
  • Loading branch information
sunshowers committed Sep 25, 2023
1 parent a5b7f1d commit f3964bd
Showing 1 changed file with 110 additions and 90 deletions.
200 changes: 110 additions & 90 deletions nextest-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
},
signal::{JobControlEvent, ShutdownEvent, SignalEvent, SignalHandler, SignalHandlerKind},
target_runner::TargetRunner,
time::{StopwatchEnd, StopwatchStart},
time::{PausableSleep, StopwatchEnd, StopwatchStart},
};
use async_scoped::TokioScope;
use bytes::{Bytes, BytesMut};
Expand All @@ -31,15 +31,16 @@ use std::{
fmt::Write,
marker::PhantomData,
num::NonZeroUsize,
process::Stdio,
pin::Pin,
process::{ExitStatus, Stdio},
sync::atomic::{AtomicBool, Ordering},
time::{Duration, SystemTime},
};
use tokio::{
io::{AsyncRead, AsyncReadExt},
process::Child,
runtime::Runtime,
sync::mpsc::UnboundedSender,
sync::{broadcast, mpsc::UnboundedSender},
};
use uuid::Uuid;

Expand Down Expand Up @@ -290,13 +291,12 @@ impl<'a> TestRunnerInner<'a> {
// - Shutdown signals (once)
// - Signals twice
// 32 should be more than enough.
let (forward_sender, _forward_receiver) =
tokio::sync::broadcast::channel::<SignalForwardEvent>(32);
let (forward_sender, _forward_receiver) = broadcast::channel::<SignalForwardEvent>(32);
let forward_sender_ref = &forward_sender;

TokioScope::scope_and_block(move |scope| {
let (run_sender, mut run_receiver) = tokio::sync::mpsc::unbounded_channel();
let (cancellation_sender, _cancellation_receiver) = tokio::sync::broadcast::channel(1);
let (cancellation_sender, _cancellation_receiver) = broadcast::channel(1);

let exec_cancellation_sender = cancellation_sender.clone();
let exec_fut = async move {
Expand Down Expand Up @@ -557,25 +557,7 @@ impl<'a> TestRunnerInner<'a> {
run_statuses: ExecutionStatuses::new(run_statuses),
});

// Drain the forward receiver of any messages, including those that are
// related to SIGTSTP.
loop {
let message = this_forward_receiver.try_recv();
match message {
#[cfg(unix)]
Ok(SignalForwardEvent::Stop(sender)) => {
// The receiver being dead isn't really important.
let _ = sender.send(()).await;
}
Err(
tokio::sync::broadcast::error::TryRecvError::Empty
| tokio::sync::broadcast::error::TryRecvError::Closed,
) => {
break;
}
_ => {}
}
}
drain_forward_receiver(this_forward_receiver).await;
};
(threads_required, test_group, fut)
})
Expand Down Expand Up @@ -615,7 +597,7 @@ impl<'a> TestRunnerInner<'a> {
retry_data: RetryData,
settings: &TestSettings,
run_sender: &UnboundedSender<InternalTestEvent<'a>>,
forward_receiver: &mut tokio::sync::broadcast::Receiver<SignalForwardEvent>,
forward_receiver: &mut broadcast::Receiver<SignalForwardEvent>,
delay_before_start: Duration,
) -> InternalExecuteStatus {
let mut stopwatch = crate::time::stopwatch();
Expand Down Expand Up @@ -658,7 +640,7 @@ impl<'a> TestRunnerInner<'a> {
stopwatch: &mut StopwatchStart,
settings: &TestSettings,
run_sender: &UnboundedSender<InternalTestEvent<'a>>,
forward_receiver: &mut tokio::sync::broadcast::Receiver<SignalForwardEvent>,
forward_receiver: &mut broadcast::Receiver<SignalForwardEvent>,
delay_before_start: Duration,
) -> Result<InternalExecuteStatus, RunTestError> {
let ctx = TestExecuteContext {
Expand Down Expand Up @@ -763,38 +745,14 @@ impl<'a> TestRunnerInner<'a> {
}
}
recv = forward_receiver.recv() => {
// The sender stays open longer than the whole loop, and the buffer is big
// enough for all messages ever sent through this channel, so a RecvError
// should never happen.
let forward_event = recv.expect("a RecvError should never happen here");

match forward_event {
#[cfg(unix)]
SignalForwardEvent::Stop(sender) => {
// It isn't possible to receive a stop event twice since it gets
// debounced in the main signal handler.
stopwatch.pause();
interval_sleep.as_mut().pause();
imp::job_control_child(&child, JobControlEvent::Stop);
// The receiver being dead probably means the main thread panicked
// or similar.
let _ = sender.send(()).await;
}
#[cfg(unix)]
SignalForwardEvent::Continue => {
// It's possible to receive a resume event right at the beginning of
// test execution, so debounce it.
if stopwatch.is_paused() {
stopwatch.resume();
interval_sleep.as_mut().resume();
imp::job_control_child(&child, JobControlEvent::Continue);
}
}
SignalForwardEvent::Shutdown(event) => {
imp::terminate_child(&mut child, TerminateMode::Signal(event), forward_receiver, job.as_ref()).await;
}
}

handle_forward_event(
&mut child,
recv,
stopwatch,
interval_sleep.as_mut(),
forward_receiver,
job.as_ref(),
).await;
}
};
};
Expand Down Expand Up @@ -827,34 +785,7 @@ impl<'a> TestRunnerInner<'a> {
let output = res.map_err(RunTestError::Wait)?;
let exit_status = output;

let status = status.unwrap_or_else(|| {
if exit_status.success() {
if leaked {
ExecutionResult::Leak
} else {
ExecutionResult::Pass
}
} else {
cfg_if::cfg_if! {
if #[cfg(unix)] {
// On Unix, extract the signal if it's found.
use std::os::unix::process::ExitStatusExt;
let abort_status = exit_status.signal().map(AbortStatus::UnixSignal);
} else if #[cfg(windows)] {
let abort_status = exit_status.code().and_then(|code| {
let exception = windows::Win32::Foundation::NTSTATUS(code);
exception.is_err().then(|| AbortStatus::WindowsNtStatus(exception))
});
} else {
let abort_status = None;
}
}
ExecutionResult::Fail {
abort_status,
leaked,
}
}
});
let status = status.unwrap_or_else(|| create_execution_result(exit_status, leaked));

Ok(InternalExecuteStatus {
stdout: stdout.freeze(),
Expand All @@ -867,6 +798,95 @@ impl<'a> TestRunnerInner<'a> {
}
}

/// Drains the forward receiver of any messages, including those that are related to SIGTSTP.
async fn drain_forward_receiver(mut receiver: broadcast::Receiver<SignalForwardEvent>) {
loop {
let message = receiver.try_recv();
match message {
#[cfg(unix)]
Ok(SignalForwardEvent::Stop(sender)) => {
// The receiver being dead isn't really important.
let _ = sender.send(()).await;
}
Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
break;
}
_ => {}
}
}
}

async fn handle_forward_event(
child: &mut tokio::process::Child,
recv: Result<SignalForwardEvent, broadcast::error::RecvError>,
// These annotations are needed to silence lints on non-Unix platforms.
#[allow(unused_variables)] stopwatch: &mut StopwatchStart,
#[allow(unused_mut, unused_variables)] mut interval_sleep: Pin<&mut PausableSleep>,
forward_receiver: &mut broadcast::Receiver<SignalForwardEvent>,
job: Option<&imp::Job>,
) {
// The sender stays open longer than the whole loop, and the buffer is big
// enough for all messages ever sent through this channel, so a RecvError
// should never happen.
let forward_event = recv.expect("a RecvError should never happen here");

match forward_event {
#[cfg(unix)]
SignalForwardEvent::Stop(sender) => {
// It isn't possible to receive a stop event twice since it gets
// debounced in the main signal handler.
stopwatch.pause();
interval_sleep.as_mut().pause();
imp::job_control_child(child, JobControlEvent::Stop);
// The receiver being dead probably means the main thread panicked
// or similar.
let _ = sender.send(()).await;
}
#[cfg(unix)]
SignalForwardEvent::Continue => {
// It's possible to receive a resume event right at the beginning of
// test execution, so debounce it.
if stopwatch.is_paused() {
stopwatch.resume();
interval_sleep.as_mut().resume();
imp::job_control_child(child, JobControlEvent::Continue);
}
}
SignalForwardEvent::Shutdown(event) => {
imp::terminate_child(child, TerminateMode::Signal(event), forward_receiver, job).await;
}
}
}

fn create_execution_result(exit_status: ExitStatus, leaked: bool) -> ExecutionResult {
if exit_status.success() {
if leaked {
ExecutionResult::Leak
} else {
ExecutionResult::Pass
}
} else {
cfg_if::cfg_if! {
if #[cfg(unix)] {
// On Unix, extract the signal if it's found.
use std::os::unix::process::ExitStatusExt;
let abort_status = exit_status.signal().map(AbortStatus::UnixSignal);
} else if #[cfg(windows)] {
let abort_status = exit_status.code().and_then(|code| {
let exception = windows::Win32::Foundation::NTSTATUS(code);
exception.is_err().then(|| AbortStatus::WindowsNtStatus(exception))
});
} else {
let abort_status = None;
}
}
ExecutionResult::Fail {
abort_status,
leaked,
}
}
}

fn collect_output<'a>(
child_stdout: Option<tokio::process::ChildStdout>,
stdout: &'a mut BytesMut,
Expand Down Expand Up @@ -1084,7 +1104,7 @@ pub struct ExecuteStatus {
pub stdout: Bytes,
/// Standard error for this test.
pub stderr: Bytes,
/// The result of execution this test: pass, fail or execution error.
/// The execution result for this test: pass, fail or execution error.
pub result: ExecutionResult,
/// The time at which the test started.
pub start_time: SystemTime,
Expand Down Expand Up @@ -1676,7 +1696,7 @@ mod imp {
pub(super) async fn terminate_child(
child: &mut Child,
mode: TerminateMode,
_forward_receiver: &mut tokio::sync::broadcast::Receiver<SignalForwardEvent>,
_forward_receiver: &mut broadcast::Receiver<SignalForwardEvent>,
job: Option<&Job>,
) {
// Ignore signal events since Windows propagates them to child processes (this may change if
Expand Down Expand Up @@ -1761,7 +1781,7 @@ mod imp {
pub(super) async fn terminate_child(
child: &mut Child,
mode: TerminateMode,
forward_receiver: &mut tokio::sync::broadcast::Receiver<SignalForwardEvent>,
forward_receiver: &mut broadcast::Receiver<SignalForwardEvent>,
_job: Option<&Job>,
) {
if let Some(pid) = child.id() {
Expand Down

0 comments on commit f3964bd

Please sign in to comment.