diff --git a/nextest-runner/src/runner.rs b/nextest-runner/src/runner.rs index 22199e857a7..4abb3bad8a1 100644 --- a/nextest-runner/src/runner.rs +++ b/nextest-runner/src/runner.rs @@ -297,9 +297,131 @@ impl<'a> TestRunnerInner<'a> { TokioScope::scope_and_block(move |scope| { let (run_sender, mut run_receiver) = tokio::sync::mpsc::unbounded_channel(); let (cancellation_sender, _cancellation_receiver) = tokio::sync::broadcast::channel(1); - { - let cancellation_sender = cancellation_sender.clone(); + let exec_cancellation_sender = cancellation_sender.clone(); + let exec_fut = async move { + let mut signals_done = false; + + loop { + let internal_event = tokio::select! { + internal_event = run_receiver.recv() => { + match internal_event { + Some(event) => InternalEvent::Test(event), + None => { + // All runs have been completed. + break; + } + } + }, + internal_event = signal_handler.recv(), if !signals_done => { + match internal_event { + Some(event) => InternalEvent::Signal(event), + None => { + signals_done = true; + continue; + } + } + }, + }; + + match ctx_mut.handle_event(internal_event) { + #[cfg(unix)] + Ok(Some(JobControlEvent::Stop)) => { + // There are test_threads or fewer tests running so this buffer is + // big enough. + let (sender, mut receiver) = + tokio::sync::mpsc::channel(self.test_threads); + let mut running_tests = forward_sender_ref + .send(SignalForwardEvent::Stop(sender)) + .expect( + "at least one receiver stays open so this should never error out", + ); + // One event to account for the receiver held open at the top. + running_tests -= 1; + + // There's a possibility of a race condition between a test exiting and + // sending the message to the receiver. For that reason, don't wait more + // than 100ms on children to stop. + let mut sleep = + std::pin::pin!(tokio::time::sleep(Duration::from_millis(100))); + + loop { + tokio::select! { + _ = receiver.recv(), if running_tests > 0 => { + running_tests -= 1; + log::debug!( + "stopping tests: running tests down to {running_tests}" + ); + } + _ = &mut sleep => { + break; + } + else => { + break; + } + }; + } + + // Now stop nextest itself. + imp::raise_stop(); + } + #[cfg(unix)] + Ok(Some(JobControlEvent::Continue)) => { + // Nextest has been resumed. Resume all the tests as well. + let _ = forward_sender_ref.send(SignalForwardEvent::Continue); + } + #[cfg(not(unix))] + Ok(Some(_)) => { + crate::helpers::statically_unreachable(); + } + Ok(None) => {} + Err(err) => { + // If an error happens, it is because either the callback failed or + // a cancellation notice was received. If the callback failed, we need + // to send a further cancellation notice as well. + // + // Also note the ordering here: canceled_ref is set *before* + // notifications are broadcast. This prevents race conditions. + canceled_ref.store(true, Ordering::Release); + let _ = exec_cancellation_sender.send(()); + match err { + InternalError::Error(err) => { + // Ignore errors that happen during error cancellation. + if first_error_mut.is_none() { + *first_error_mut = Some(err); + } + let _ = ctx_mut.begin_cancel(CancelReason::ReportError); + } + InternalError::TestFailureCanceled(err) => { + // A test failure has caused cancellation to begin. + if first_error_mut.is_none() { + *first_error_mut = err; + } + } + InternalError::SignalCanceled(forward_event, err) => { + // A signal has caused cancellation to begin. + if first_error_mut.is_none() { + *first_error_mut = err; + } + // Let all the child processes know about the signal, and + // continue to handle events. + // + // Ignore errors here: if there are no receivers to cancel, so + // be it. Also note the ordering here: canceled_ref is set + // *before* this is sent. + let _ = forward_sender_ref + .send(SignalForwardEvent::Shutdown(forward_event)); + } + } + } + } + } + }; + + // Read events from the receiver to completion. + scope.spawn_cancellable(exec_fut, || ()); + + { // groups is going to be passed to future_queue_grouped. let groups = self .profile @@ -465,127 +587,6 @@ impl<'a> TestRunnerInner<'a> { // Run the stream to completion. scope.spawn_cancellable(run_fut, || ()); } - let exec_fut = async move { - let mut signals_done = false; - - loop { - let internal_event = tokio::select! { - internal_event = run_receiver.recv() => { - match internal_event { - Some(event) => InternalEvent::Test(event), - None => { - // All runs have been completed. - break; - } - } - }, - internal_event = signal_handler.recv(), if !signals_done => { - match internal_event { - Some(event) => InternalEvent::Signal(event), - None => { - signals_done = true; - continue; - } - } - }, - }; - - match ctx_mut.handle_event(internal_event) { - #[cfg(unix)] - Ok(Some(JobControlEvent::Stop)) => { - // There are test_threads or fewer tests running so this buffer is - // big enough. - let (sender, mut receiver) = - tokio::sync::mpsc::channel(self.test_threads); - let mut running_tests = forward_sender_ref - .send(SignalForwardEvent::Stop(sender)) - .expect( - "at least one receiver stays open so this should never error out", - ); - // One event to account for the receiver held open at the top. - running_tests -= 1; - - // There's a possibility of a race condition between a test exiting and - // sending the message to the receiver. For that reason, don't wait more - // than 100ms on children to stop. - let mut sleep = - std::pin::pin!(tokio::time::sleep(Duration::from_millis(100))); - - loop { - tokio::select! { - _ = receiver.recv(), if running_tests > 0 => { - running_tests -= 1; - log::debug!( - "stopping tests: running tests down to {running_tests}" - ); - } - _ = &mut sleep => { - break; - } - else => { - break; - } - }; - } - - // Now stop nextest itself. - imp::raise_stop(); - } - #[cfg(unix)] - Ok(Some(JobControlEvent::Continue)) => { - // Nextest has been resumed. Resume all the tests as well. - let _ = forward_sender_ref.send(SignalForwardEvent::Continue); - } - #[cfg(not(unix))] - Ok(Some(_)) => { - crate::helpers::statically_unreachable(); - } - Ok(None) => {} - Err(err) => { - // If an error happens, it is because either the callback failed or - // a cancellation notice was received. If the callback failed, we need - // to send a further cancellation notice as well. - // - // Also note the ordering here: canceled_ref is set *before* - // notifications are broadcast. This prevents race conditions. - canceled_ref.store(true, Ordering::Release); - let _ = cancellation_sender.send(()); - match err { - InternalError::Error(err) => { - // Ignore errors that happen during error cancellation. - if first_error_mut.is_none() { - *first_error_mut = Some(err); - } - let _ = ctx_mut.begin_cancel(CancelReason::ReportError); - } - InternalError::TestFailureCanceled(err) => { - // A test failure has caused cancellation to begin. - if first_error_mut.is_none() { - *first_error_mut = err; - } - } - InternalError::SignalCanceled(forward_event, err) => { - // A signal has caused cancellation to begin. - if first_error_mut.is_none() { - *first_error_mut = err; - } - // Let all the child processes know about the signal, and - // continue to handle events. - // - // Ignore errors here: if there are no receivers to cancel, so - // be it. Also note the ordering here: canceled_ref is set - // *before* this is sent. - let _ = forward_sender_ref - .send(SignalForwardEvent::Shutdown(forward_event)); - } - } - } - } - } - }; - - // Read events from the receiver to completion. - scope.spawn_cancellable(exec_fut, || ()); }); match ctx.run_finished() {