Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a couple of wrapper enums #1905

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 96 additions & 53 deletions nextest-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@
// - Shutdown signals (once)
// - Signals twice
// 32 should be more than enough.
let (req_tx, _req_rx) = broadcast::channel::<SignalRequest>(32);
let (req_tx, _req_rx) = broadcast::channel::<RunUnitRequest>(32);
let req_tx_ref = &req_tx;

let mut report_cancel_receiver = std::pin::pin!(report_cancel_receiver);
Expand Down Expand Up @@ -407,12 +407,12 @@

match ctx_mut.handle_event(internal_event) {
#[cfg(unix)]
Ok(Some(JobControlEvent::Stop)) => {
Ok(Some(HandleEventResponse::JobControl(JobControlEvent::Stop))) => {
// This is in reality bounded by the number of tests
// currently running.
let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel();
let mut running_tests = req_tx_ref
.send(SignalRequest::Stop(sender))
.send(RunUnitRequest::Signal(SignalRequest::Stop(sender)))

Check warning on line 415 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L415

Added line #L415 was not covered by tests
.expect(
"at least one receiver stays open so this should never error out",
);
Expand Down Expand Up @@ -446,14 +446,18 @@
imp::raise_stop();
}
#[cfg(unix)]
Ok(Some(JobControlEvent::Continue)) => {
Ok(Some(HandleEventResponse::JobControl(JobControlEvent::Continue))) => {

Check warning on line 449 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L449

Added line #L449 was not covered by tests
// Nextest has been resumed. Resume all the tests as well.
let _ = req_tx_ref.send(SignalRequest::Continue);
let _ =
req_tx_ref.send(RunUnitRequest::Signal(SignalRequest::Continue));

Check warning on line 452 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L451-L452

Added lines #L451 - L452 were not covered by tests
}
#[cfg(not(unix))]
Ok(Some(e)) => {
Ok(Some(HandleEventResponse::JobControl(e))) => {
// On platforms other than Unix this enum is expected to be empty;
// we can check this assumption at compile time like so:
// we can check this assumption at compile time like so.
//
// Rust 1.82 handles empty enums better, and this
// won't be required after we bump the MSRV to that.
match e {}
}
Ok(None) => {}
Expand Down Expand Up @@ -482,7 +486,8 @@
// Ignore errors here: if there are no receivers to cancel, so
// be it. Also note the ordering here: cancelled_ref is set
// *before* this is sent.
let _ = req_tx_ref.send(SignalRequest::Shutdown(req));
let _ = req_tx_ref
.send(RunUnitRequest::Signal(SignalRequest::Shutdown(req)));

Check warning on line 490 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L489-L490

Added lines #L489 - L490 were not covered by tests
}
}
}
Expand Down Expand Up @@ -737,7 +742,7 @@
&self,
script: SetupScriptPacket<'a>,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut tokio::sync::broadcast::Receiver<SignalRequest>,
req_rx: &mut tokio::sync::broadcast::Receiver<RunUnitRequest>,
) -> (InternalSetupScriptExecuteStatus, Option<SetupScriptEnvMap>) {
let mut stopwatch = crate::time::stopwatch();

Expand Down Expand Up @@ -774,7 +779,7 @@
script: SetupScriptPacket<'a>,
stopwatch: &mut StopwatchStart,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut tokio::sync::broadcast::Receiver<SignalRequest>,
req_rx: &mut tokio::sync::broadcast::Receiver<RunUnitRequest>,
) -> Result<(InternalSetupScriptExecuteStatus, Option<SetupScriptEnvMap>), SetupScriptError>
{
let mut cmd = script.make_command(&self.double_spawn, self.test_list)?;
Expand Down Expand Up @@ -871,15 +876,23 @@
}
}
recv = req_rx.recv() => {
handle_signal_request(
&mut child,
recv,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;
// The sender stays open longer than the whole loop so a
// RecvError should never happen.
let req = recv.expect("req_rx sender is open");

match req {
RunUnitRequest::Signal(req) => {
handle_signal_request(
&mut child,
req,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;

Check warning on line 893 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L881-L893

Added lines #L881 - L893 were not covered by tests
}
}
}
}
};
Expand Down Expand Up @@ -959,7 +972,7 @@
&self,
test: TestPacket<'a, '_>,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,
) -> InternalExecuteStatus {
let mut stopwatch = crate::time::stopwatch();
let delay_before_start = test.delay_before_start;
Expand Down Expand Up @@ -991,7 +1004,7 @@
test: TestPacket<'a, '_>,
stopwatch: &mut StopwatchStart,
resp_tx: &UnboundedSender<InternalTestEvent<'a>>,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,
) -> Result<InternalExecuteStatus, RunTestError> {
let ctx = TestExecuteContext {
double_spawn: &self.double_spawn,
Expand Down Expand Up @@ -1087,15 +1100,23 @@
}
}
recv = req_rx.recv() => {
handle_signal_request(
&mut child,
recv,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;
// The sender stays open longer than the whole loop so a
// RecvError should never happen.
let req = recv.expect("req_rx sender is open");

match req {
RunUnitRequest::Signal(req) => {
handle_signal_request(
&mut child,
req,
stopwatch,
interval_sleep.as_mut(),
req_rx,
job.as_ref(),
slow_timeout.grace_period
).await;

Check warning on line 1117 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1105-L1117

Added lines #L1105 - L1117 were not covered by tests
}
}
}
};
};
Expand Down Expand Up @@ -1152,38 +1173,30 @@
}

/// Drains the request receiver of any messages, including those that are related to SIGTSTP.
fn drain_req_rx(mut receiver: broadcast::Receiver<SignalRequest>) {
fn drain_req_rx(mut receiver: broadcast::Receiver<RunUnitRequest>) {
loop {
let message = receiver.try_recv();
match message {
#[cfg(unix)]
Ok(SignalRequest::Stop(sender)) => {
// The receiver being dead isn't really important.
let _ = sender.send(());
Ok(message) => {
message.drain();

Check warning on line 1181 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1180-L1181

Added lines #L1180 - L1181 were not covered by tests
}
Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => {
Err(_) => {
break;
}
_ => {}
}
}
}

async fn handle_signal_request(
child: &mut tokio::process::Child,
recv: Result<SignalRequest, broadcast::error::RecvError>,
req: SignalRequest,

Check warning on line 1192 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1192

Added line #L1192 was not covered by tests
// These annotations are needed to silence lints on non-Unix platforms.
#[allow(unused_variables)] stopwatch: &mut StopwatchStart,
#[allow(unused_mut, unused_variables)] mut interval_sleep: Pin<&mut PausableSleep>,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,

Check warning on line 1196 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1196

Added line #L1196 was not covered by tests
job: Option<&imp::Job>,
grace_period: Duration,
) {
// The sender stays open longer than the whole loop, and the buffer is big
// enough for all messages ever sent through this channel, so a RecvError
// should never happen.
let req = recv.expect("a RecvError should never happen here");

match req {
#[cfg(unix)]
SignalRequest::Stop(sender) => {
Expand Down Expand Up @@ -1695,6 +1708,27 @@
}
}

/// Events sent from the test runner to individual test (or setup script) execution tasks.
#[derive(Clone, Debug)]
enum RunUnitRequest {
Signal(SignalRequest),
}

impl RunUnitRequest {
fn drain(self) {
match self {

Check warning on line 1719 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1718-L1719

Added lines #L1718 - L1719 were not covered by tests
#[cfg(unix)]
Self::Signal(SignalRequest::Stop(sender)) => {
// The receiver being dead isn't really important.
let _ = sender.send(());
}

Check warning on line 1724 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1721-L1724

Added lines #L1721 - L1724 were not covered by tests
#[cfg(unix)]
Self::Signal(SignalRequest::Continue) => {}
Self::Signal(SignalRequest::Shutdown(_)) => {}

Check warning on line 1727 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1726-L1727

Added lines #L1726 - L1727 were not covered by tests
}
}

Check warning on line 1729 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L1729

Added line #L1729 was not covered by tests
}

#[derive(Clone, Debug)]
enum SignalRequest {
// The mpsc sender is used by each test to indicate that the stop signal has been sent.
Expand Down Expand Up @@ -1765,6 +1799,13 @@
}
}

/// The return result of `handle_event`.
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum HandleEventResponse {
#[cfg_attr(not(unix), allow(dead_code))]
JobControl(JobControlEvent),
}

struct CallbackContext<F> {
callback: F,
run_id: ReportUuid,
Expand Down Expand Up @@ -1833,15 +1874,15 @@
fn callback(
&mut self,
kind: TestEventKind<'a>,
) -> Result<Option<JobControlEvent>, InternalCancel> {
) -> Result<Option<HandleEventResponse>, InternalCancel> {
self.basic_callback(kind);
Ok(None)
}

fn handle_event(
&mut self,
event: InternalEvent<'a>,
) -> Result<Option<JobControlEvent>, InternalCancel> {
) -> Result<Option<HandleEventResponse>, InternalCancel> {
match event {
InternalEvent::Test(InternalTestEvent::SetupScriptStarted {
script_id,
Expand Down Expand Up @@ -2007,7 +2048,7 @@
running: self.running,
})?;
self.stopwatch.pause();
Ok(Some(JobControlEvent::Stop))
Ok(Some(HandleEventResponse::JobControl(JobControlEvent::Stop)))

Check warning on line 2051 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L2051

Added line #L2051 was not covered by tests
} else {
Ok(None)
}
Expand All @@ -2021,7 +2062,9 @@
setup_scripts_running: self.setup_scripts_running,
running: self.running,
})?;
Ok(Some(JobControlEvent::Continue))
Ok(Some(HandleEventResponse::JobControl(
JobControlEvent::Continue,
)))

Check warning on line 2067 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L2065-L2067

Added lines #L2065 - L2067 were not covered by tests
} else {
Ok(None)
}
Expand Down Expand Up @@ -2279,7 +2322,7 @@
pub(super) async fn terminate_child(
child: &mut Child,
mode: TerminateMode,
_req_rx: &mut broadcast::Receiver<SignalRequest>,
_req_rx: &mut broadcast::Receiver<RunUnitRequest>,
job: Option<&Job>,
_grace_period: Duration,
) {
Expand Down Expand Up @@ -2364,7 +2407,7 @@
pub(super) async fn terminate_child(
child: &mut Child,
mode: TerminateMode,
req_rx: &mut broadcast::Receiver<SignalRequest>,
req_rx: &mut broadcast::Receiver<RunUnitRequest>,
_job: Option<&Job>,
grace_period: Duration,
) {
Expand Down Expand Up @@ -2405,19 +2448,19 @@
let req = recv.expect("a RecvError should never happen here");

match req {
SignalRequest::Stop(sender) => {
RunUnitRequest::Signal(SignalRequest::Stop(sender)) => {

Check warning on line 2451 in nextest-runner/src/runner.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/runner.rs#L2451

Added line #L2451 was not covered by tests
sleep.as_mut().pause();
imp::job_control_child(child, JobControlEvent::Stop);
let _ = sender.send(());
}
SignalRequest::Continue => {
RunUnitRequest::Signal(SignalRequest::Continue) => {
// Possible to receive a Continue at the beginning of execution.
if !sleep.is_paused() {
sleep.as_mut().resume();
}
imp::job_control_child(child, JobControlEvent::Continue);
}
SignalRequest::Shutdown(_) => {
RunUnitRequest::Signal(SignalRequest::Shutdown(_)) => {
// Receiving a shutdown signal while in this state always means kill
// immediately.
unsafe {
Expand Down
Loading