From 95518db70c5ca65531ff265f12b03dd22e4a8eeb Mon Sep 17 00:00:00 2001 From: Rain Date: Wed, 27 Nov 2024 23:22:14 +0000 Subject: [PATCH] [nextest-runner] add `RunUnitRequest` Add a wrapper enum around `SignalRequest`. We're going to add an "Info" variant in the future. --- nextest-runner/src/runner.rs | 121 ++++++++++++++++++++++------------- 1 file changed, 76 insertions(+), 45 deletions(-) diff --git a/nextest-runner/src/runner.rs b/nextest-runner/src/runner.rs index 7134e1f38d3..a62b2636c3f 100644 --- a/nextest-runner/src/runner.rs +++ b/nextest-runner/src/runner.rs @@ -349,7 +349,7 @@ impl<'a> TestRunnerInner<'a> { // - Shutdown signals (once) // - Signals twice // 32 should be more than enough. - let (req_tx, _req_rx) = broadcast::channel::(32); + let (req_tx, _req_rx) = broadcast::channel::(32); let req_tx_ref = &req_tx; let mut report_cancel_receiver = std::pin::pin!(report_cancel_receiver); @@ -412,7 +412,7 @@ impl<'a> TestRunnerInner<'a> { // currently running. let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); let mut running_tests = req_tx_ref - .send(SignalRequest::Stop(sender)) + .send(RunUnitRequest::Signal(SignalRequest::Stop(sender))) .expect( "at least one receiver stays open so this should never error out", ); @@ -448,7 +448,8 @@ impl<'a> TestRunnerInner<'a> { #[cfg(unix)] Ok(Some(HandleEventResponse::JobControl(JobControlEvent::Continue))) => { // Nextest has been resumed. Resume all the tests as well. - let _ = req_tx_ref.send(SignalRequest::Continue); + let _ = + req_tx_ref.send(RunUnitRequest::Signal(SignalRequest::Continue)); } #[cfg(not(unix))] Ok(Some(HandleEventResponse::JobControl(e))) => { @@ -485,7 +486,8 @@ impl<'a> TestRunnerInner<'a> { // Ignore errors here: if there are no receivers to cancel, so // be it. Also note the ordering here: cancelled_ref is set // *before* this is sent. - let _ = req_tx_ref.send(SignalRequest::Shutdown(req)); + let _ = req_tx_ref + .send(RunUnitRequest::Signal(SignalRequest::Shutdown(req))); } } } @@ -740,7 +742,7 @@ impl<'a> TestRunnerInner<'a> { &self, script: SetupScriptPacket<'a>, resp_tx: &UnboundedSender>, - req_rx: &mut tokio::sync::broadcast::Receiver, + req_rx: &mut tokio::sync::broadcast::Receiver, ) -> (InternalSetupScriptExecuteStatus, Option) { let mut stopwatch = crate::time::stopwatch(); @@ -777,7 +779,7 @@ impl<'a> TestRunnerInner<'a> { script: SetupScriptPacket<'a>, stopwatch: &mut StopwatchStart, resp_tx: &UnboundedSender>, - req_rx: &mut tokio::sync::broadcast::Receiver, + req_rx: &mut tokio::sync::broadcast::Receiver, ) -> Result<(InternalSetupScriptExecuteStatus, Option), SetupScriptError> { let mut cmd = script.make_command(&self.double_spawn, self.test_list)?; @@ -874,15 +876,23 @@ impl<'a> TestRunnerInner<'a> { } } recv = req_rx.recv() => { - handle_signal_request( - &mut child, - recv, - stopwatch, - interval_sleep.as_mut(), - req_rx, - job.as_ref(), - slow_timeout.grace_period - ).await; + // The sender stays open longer than the whole loop so a + // RecvError should never happen. + let req = recv.expect("req_rx sender is open"); + + match req { + RunUnitRequest::Signal(req) => { + handle_signal_request( + &mut child, + req, + stopwatch, + interval_sleep.as_mut(), + req_rx, + job.as_ref(), + slow_timeout.grace_period + ).await; + } + } } } }; @@ -962,7 +972,7 @@ impl<'a> TestRunnerInner<'a> { &self, test: TestPacket<'a, '_>, resp_tx: &UnboundedSender>, - req_rx: &mut broadcast::Receiver, + req_rx: &mut broadcast::Receiver, ) -> InternalExecuteStatus { let mut stopwatch = crate::time::stopwatch(); let delay_before_start = test.delay_before_start; @@ -994,7 +1004,7 @@ impl<'a> TestRunnerInner<'a> { test: TestPacket<'a, '_>, stopwatch: &mut StopwatchStart, resp_tx: &UnboundedSender>, - req_rx: &mut broadcast::Receiver, + req_rx: &mut broadcast::Receiver, ) -> Result { let ctx = TestExecuteContext { double_spawn: &self.double_spawn, @@ -1090,15 +1100,23 @@ impl<'a> TestRunnerInner<'a> { } } recv = req_rx.recv() => { - handle_signal_request( - &mut child, - recv, - stopwatch, - interval_sleep.as_mut(), - req_rx, - job.as_ref(), - slow_timeout.grace_period - ).await; + // The sender stays open longer than the whole loop so a + // RecvError should never happen. + let req = recv.expect("req_rx sender is open"); + + match req { + RunUnitRequest::Signal(req) => { + handle_signal_request( + &mut child, + req, + stopwatch, + interval_sleep.as_mut(), + req_rx, + job.as_ref(), + slow_timeout.grace_period + ).await; + } + } } }; }; @@ -1155,38 +1173,30 @@ impl<'a> TestRunnerInner<'a> { } /// Drains the request receiver of any messages, including those that are related to SIGTSTP. -fn drain_req_rx(mut receiver: broadcast::Receiver) { +fn drain_req_rx(mut receiver: broadcast::Receiver) { loop { let message = receiver.try_recv(); match message { - #[cfg(unix)] - Ok(SignalRequest::Stop(sender)) => { - // The receiver being dead isn't really important. - let _ = sender.send(()); + Ok(message) => { + message.drain(); } - Err(broadcast::error::TryRecvError::Empty | broadcast::error::TryRecvError::Closed) => { + Err(_) => { break; } - _ => {} } } } async fn handle_signal_request( child: &mut tokio::process::Child, - recv: Result, + req: SignalRequest, // These annotations are needed to silence lints on non-Unix platforms. #[allow(unused_variables)] stopwatch: &mut StopwatchStart, #[allow(unused_mut, unused_variables)] mut interval_sleep: Pin<&mut PausableSleep>, - req_rx: &mut broadcast::Receiver, + req_rx: &mut broadcast::Receiver, job: Option<&imp::Job>, grace_period: Duration, ) { - // The sender stays open longer than the whole loop, and the buffer is big - // enough for all messages ever sent through this channel, so a RecvError - // should never happen. - let req = recv.expect("a RecvError should never happen here"); - match req { #[cfg(unix)] SignalRequest::Stop(sender) => { @@ -1698,6 +1708,27 @@ impl SignalCount { } } +/// Events sent from the test runner to individual test (or setup script) execution tasks. +#[derive(Clone, Debug)] +enum RunUnitRequest { + Signal(SignalRequest), +} + +impl RunUnitRequest { + fn drain(self) { + match self { + #[cfg(unix)] + Self::Signal(SignalRequest::Stop(sender)) => { + // The receiver being dead isn't really important. + let _ = sender.send(()); + } + #[cfg(unix)] + Self::Signal(SignalRequest::Continue) => {} + Self::Signal(SignalRequest::Shutdown(_)) => {} + } + } +} + #[derive(Clone, Debug)] enum SignalRequest { // The mpsc sender is used by each test to indicate that the stop signal has been sent. @@ -2291,7 +2322,7 @@ mod imp { pub(super) async fn terminate_child( child: &mut Child, mode: TerminateMode, - _req_rx: &mut broadcast::Receiver, + _req_rx: &mut broadcast::Receiver, job: Option<&Job>, _grace_period: Duration, ) { @@ -2376,7 +2407,7 @@ mod imp { pub(super) async fn terminate_child( child: &mut Child, mode: TerminateMode, - req_rx: &mut broadcast::Receiver, + req_rx: &mut broadcast::Receiver, _job: Option<&Job>, grace_period: Duration, ) { @@ -2417,19 +2448,19 @@ mod imp { let req = recv.expect("a RecvError should never happen here"); match req { - SignalRequest::Stop(sender) => { + RunUnitRequest::Signal(SignalRequest::Stop(sender)) => { sleep.as_mut().pause(); imp::job_control_child(child, JobControlEvent::Stop); let _ = sender.send(()); } - SignalRequest::Continue => { + RunUnitRequest::Signal(SignalRequest::Continue) => { // Possible to receive a Continue at the beginning of execution. if !sleep.is_paused() { sleep.as_mut().resume(); } imp::job_control_child(child, JobControlEvent::Continue); } - SignalRequest::Shutdown(_) => { + RunUnitRequest::Signal(SignalRequest::Shutdown(_)) => { // Receiving a shutdown signal while in this state always means kill // immediately. unsafe {