Skip to content

Commit

Permalink
[nextest-runner] add RunUnitRequest
Browse files Browse the repository at this point in the history
Add a wrapper enum around `SignalRequest`. We're going to add an "Info" variant
in the future.
  • Loading branch information
sunshowers committed Nov 27, 2024
1 parent b30377f commit 95518db
Showing 1 changed file with 76 additions and 45 deletions.
121 changes: 76 additions & 45 deletions nextest-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ impl<'a> TestRunnerInner<'a> {
// - Shutdown signals (once)
// - Signals twice
// 32 should be more than enough.
let (req_tx, _req_rx) = broadcast::channel::<SignalRequest>(32);
let (req_tx, _req_rx) = broadcast::channel::<RunUnitRequest>(32);
let req_tx_ref = &req_tx;

let mut report_cancel_receiver = std::pin::pin!(report_cancel_receiver);
Expand Down Expand Up @@ -412,7 +412,7 @@ impl<'a> TestRunnerInner<'a> {
// currently running.
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let mut running_tests = req_tx_ref
.send(SignalRequest::Stop(sender))
.send(RunUnitRequest::Signal(SignalRequest::Stop(sender)))

Check warning on line 415 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L415

Added line #L415 was not covered by tests
.expect(
"at least one receiver stays open so this should never error out",
);
Expand Down Expand Up @@ -448,7 +448,8 @@ impl<'a> TestRunnerInner<'a> {
#[cfg(unix)]
Ok(Some(HandleEventResponse::JobControl(JobControlEvent::Continue))) => {

Check warning on line 449 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L449

Added line #L449 was not covered by tests
// Nextest has been resumed. Resume all the tests as well.
let _ = req_tx_ref.send(SignalRequest::Continue);
let _ =
req_tx_ref.send(RunUnitRequest::Signal(SignalRequest::Continue));

Check warning on line 452 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L451-L452

Added lines #L451 - L452 were not covered by tests
}
#[cfg(not(unix))]
Ok(Some(HandleEventResponse::JobControl(e))) => {
Expand Down Expand Up @@ -485,7 +486,8 @@ impl<'a> TestRunnerInner<'a> {
// Ignore errors here: if there are no receivers to cancel, so
// be it. Also note the ordering here: cancelled_ref is set
// *before* this is sent.
let _ = req_tx_ref.send(SignalRequest::Shutdown(req));
let _ = req_tx_ref
.send(RunUnitRequest::Signal(SignalRequest::Shutdown(req)));

Check warning on line 490 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L489-L490

Added lines #L489 - L490 were not covered by tests
}
}
}
Expand Down Expand Up @@ -740,7 +742,7 @@ impl<'a> TestRunnerInner<'a> {
&self,
script: SetupScriptPacket<'a>,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut tokio::sync::broadcast::Receiver<SignalRequest>,
req_rx: &mut tokio::sync::broadcast::Receiver<RunUnitRequest>,
) -> (InternalSetupScriptExecuteStatus, Option<SetupScriptEnvMap>) {
let mut stopwatch = crate::time::stopwatch();

Expand Down Expand Up @@ -777,7 +779,7 @@ impl<'a> TestRunnerInner<'a> {
script: SetupScriptPacket<'a>,
stopwatch: &mut StopwatchStart,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut tokio::sync::broadcast::Receiver<SignalRequest>,
req_rx: &mut tokio::sync::broadcast::Receiver<RunUnitRequest>,
) -> Result<(InternalSetupScriptExecuteStatus, Option<SetupScriptEnvMap>), SetupScriptError>
{
let mut cmd = script.make_command(&self.double_spawn, self.test_list)?;
Expand Down Expand Up @@ -874,15 +876,23 @@ impl<'a> TestRunnerInner<'a> {
}
}
recv = req_rx.recv() => {
handle_signal_request(
&mut child,
recv,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;
// The sender stays open longer than the whole loop so a
// RecvError should never happen.
let req = recv.expect("req_rx sender is open");

match req {
RunUnitRequest::Signal(req) => {
handle_signal_request(
&mut child,
req,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;

Check warning on line 893 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L881-L893

Added lines #L881 - L893 were not covered by tests
}
}
}
}
};
Expand Down Expand Up @@ -962,7 +972,7 @@ impl<'a> TestRunnerInner<'a> {
&self,
test: TestPacket<'a, '_>,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,
) -> InternalExecuteStatus {
let mut stopwatch = crate::time::stopwatch();
let delay_before_start = test.delay_before_start;
Expand Down Expand Up @@ -994,7 +1004,7 @@ impl<'a> TestRunnerInner<'a> {
test: TestPacket<'a, '_>,
stopwatch: &mut StopwatchStart,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,
) -> Result<InternalExecuteStatus, RunTestError> {
let ctx = TestExecuteContext {
double_spawn: &self.double_spawn,
Expand Down Expand Up @@ -1090,15 +1100,23 @@ impl<'a> TestRunnerInner<'a> {
}
}
recv = req_rx.recv() => {
handle_signal_request(
&mut child,
recv,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;
// The sender stays open longer than the whole loop so a
// RecvError should never happen.
let req = recv.expect("req_rx sender is open");

match req {
RunUnitRequest::Signal(req) => {
handle_signal_request(
&mut child,
req,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;

Check warning on line 1117 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1105-L1117

Added lines #L1105 - L1117 were not covered by tests
}
}
}
};
};
Expand Down Expand Up @@ -1155,38 +1173,30 @@ impl<'a> TestRunnerInner<'a> {
}

/// Drains the request receiver of any messages, including those that are related to SIGTSTP.
fn drain_req_rx(mut receiver: broadcast::Receiver<SignalRequest>) {
fn drain_req_rx(mut receiver: broadcast::Receiver<RunUnitRequest>) {
loop {
let message = receiver.try_recv();
match message {
#[cfg(unix)]
Ok(SignalRequest::Stop(sender)) => {
// The receiver being dead isn't really important.
let _ = sender.send(());
Ok(message) => {
message.drain();

Check warning on line 1181 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1180-L1181

Added lines #L1180 - L1181 were not covered by tests
}
Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
Err(_) => {
break;
}
_ => {}
}
}
}

async fn handle_signal_request(
child: &mut tokio::process::Child,
recv: Result<SignalRequest, broadcast::error::RecvError>,
req: SignalRequest,

Check warning on line 1192 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1192

Added line #L1192 was not covered by tests
// These annotations are needed to silence lints on non-Unix platforms.
#[allow(unused_variables)] stopwatch: &mut StopwatchStart,
#[allow(unused_mut, unused_variables)] mut interval_sleep: Pin<&mut PausableSleep>,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,

Check warning on line 1196 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1196

Added line #L1196 was not covered by tests
job: Option<&imp::Job>,
grace_period: Duration,
) {
// The sender stays open longer than the whole loop, and the buffer is big
// enough for all messages ever sent through this channel, so a RecvError
// should never happen.
let req = recv.expect("a RecvError should never happen here");

match req {
#[cfg(unix)]
SignalRequest::Stop(sender) => {
Expand Down Expand Up @@ -1698,6 +1708,27 @@ impl SignalCount {
}
}

/// Events sent from the test runner to individual test (or setup script) execution tasks.
#[derive(Clone, Debug)]
enum RunUnitRequest {
Signal(SignalRequest),
}

impl RunUnitRequest {
fn drain(self) {
match self {

Check warning on line 1719 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1718-L1719

Added lines #L1718 - L1719 were not covered by tests
#[cfg(unix)]
Self::Signal(SignalRequest::Stop(sender)) => {
// The receiver being dead isn't really important.
let _ = sender.send(());
}

Check warning on line 1724 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1721-L1724

Added lines #L1721 - L1724 were not covered by tests
#[cfg(unix)]
Self::Signal(SignalRequest::Continue) => {}
Self::Signal(SignalRequest::Shutdown(_)) => {}

Check warning on line 1727 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1726-L1727

Added lines #L1726 - L1727 were not covered by tests
}
}

Check warning on line 1729 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1729

Added line #L1729 was not covered by tests
}

#[derive(Clone, Debug)]
enum SignalRequest {
// The mpsc sender is used by each test to indicate that the stop signal has been sent.
Expand Down Expand Up @@ -2291,7 +2322,7 @@ mod imp {
pub(super) async fn terminate_child(
child: &mut Child,
mode: TerminateMode,
_req_rx: &mut broadcast::Receiver<SignalRequest>,
_req_rx: &mut broadcast::Receiver<RunUnitRequest>,
job: Option<&Job>,
_grace_period: Duration,
) {
Expand Down Expand Up @@ -2376,7 +2407,7 @@ mod imp {
pub(super) async fn terminate_child(
child: &mut Child,
mode: TerminateMode,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,
_job: Option<&Job>,
grace_period: Duration,
) {
Expand Down Expand Up @@ -2417,19 +2448,19 @@ mod imp {
let req = recv.expect("a RecvError should never happen here");

match req {
SignalRequest::Stop(sender) => {
RunUnitRequest::Signal(SignalRequest::Stop(sender)) => {

Check warning on line 2451 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L2451

Added line #L2451 was not covered by tests
sleep.as_mut().pause();
imp::job_control_child(child, JobControlEvent::Stop);
let _ = sender.send(());
}
SignalRequest::Continue => {
RunUnitRequest::Signal(SignalRequest::Continue) => {
// Possible to receive a Continue at the beginning of execution.
if !sleep.is_paused() {
sleep.as_mut().resume();
}
imp::job_control_child(child, JobControlEvent::Continue);
}
SignalRequest::Shutdown(_) => {
RunUnitRequest::Signal(SignalRequest::Shutdown(_)) => {
// Receiving a shutdown signal while in this state always means kill
// immediately.
unsafe {
Expand Down

0 comments on commit 95518db

Please sign in to comment.