From a5b7f1d09d06b97a1190d60e23d31d605fb82dbb Mon Sep 17 00:00:00 2001 From: Rain Date: Sun, 24 Sep 2023 17:31:33 -0700 Subject: [PATCH] [nextest-runner] move exec_fut to above run_fut We need to ensure that events generated by setup scripts are displayed. Delaying setting up the execution future to the end means that events generated by setup scripts will be buffered up, but not displayed. --- nextest-runner/src/runner.rs | 247 ++++++++++++++++++----------------- 1 file changed, 124 insertions(+), 123 deletions(-) diff --git a/nextest-runner/src/runner.rs b/nextest-runner/src/runner.rs index 22199e857a7..4abb3bad8a1 100644 --- a/nextest-runner/src/runner.rs +++ b/nextest-runner/src/runner.rs @@ -297,9 +297,131 @@ impl<'a> TestRunnerInner<'a> { TokioScope::scope_and_block(move |scope| { let (run_sender, mut run_receiver) = tokio::sync::mpsc::unbounded_channel(); let (cancellation_sender, _cancellation_receiver) = tokio::sync::broadcast::channel(1); - { - let cancellation_sender = cancellation_sender.clone(); + let exec_cancellation_sender = cancellation_sender.clone(); + let exec_fut = async move { + let mut signals_done = false; + + loop { + let internal_event = tokio::select! { + internal_event = run_receiver.recv() => { + match internal_event { + Some(event) => InternalEvent::Test(event), + None => { + // All runs have been completed. + break; + } + } + }, + internal_event = signal_handler.recv(), if !signals_done => { + match internal_event { + Some(event) => InternalEvent::Signal(event), + None => { + signals_done = true; + continue; + } + } + }, + }; + + match ctx_mut.handle_event(internal_event) { + #[cfg(unix)] + Ok(Some(JobControlEvent::Stop)) => { + // There are test_threads or fewer tests running so this buffer is + // big enough. + let (sender, mut receiver) = + tokio::sync::mpsc::channel(self.test_threads); + let mut running_tests = forward_sender_ref + .send(SignalForwardEvent::Stop(sender)) + .expect( + "at least one receiver stays open so this should never error out", + ); + // One event to account for the receiver held open at the top. + running_tests -= 1; + + // There's a possibility of a race condition between a test exiting and + // sending the message to the receiver. For that reason, don't wait more + // than 100ms on children to stop. + let mut sleep = + std::pin::pin!(tokio::time::sleep(Duration::from_millis(100))); + + loop { + tokio::select! { + _ = receiver.recv(), if running_tests > 0 => { + running_tests -= 1; + log::debug!( + "stopping tests: running tests down to {running_tests}" + ); + } + _ = &mut sleep => { + break; + } + else => { + break; + } + }; + } + + // Now stop nextest itself. + imp::raise_stop(); + } + #[cfg(unix)] + Ok(Some(JobControlEvent::Continue)) => { + // Nextest has been resumed. Resume all the tests as well. + let _ = forward_sender_ref.send(SignalForwardEvent::Continue); + } + #[cfg(not(unix))] + Ok(Some(_)) => { + crate::helpers::statically_unreachable(); + } + Ok(None) => {} + Err(err) => { + // If an error happens, it is because either the callback failed or + // a cancellation notice was received. If the callback failed, we need + // to send a further cancellation notice as well. + // + // Also note the ordering here: canceled_ref is set *before* + // notifications are broadcast. This prevents race conditions. + canceled_ref.store(true, Ordering::Release); + let _ = exec_cancellation_sender.send(()); + match err { + InternalError::Error(err) => { + // Ignore errors that happen during error cancellation. + if first_error_mut.is_none() { + *first_error_mut = Some(err); + } + let _ = ctx_mut.begin_cancel(CancelReason::ReportError); + } + InternalError::TestFailureCanceled(err) => { + // A test failure has caused cancellation to begin. + if first_error_mut.is_none() { + *first_error_mut = err; + } + } + InternalError::SignalCanceled(forward_event, err) => { + // A signal has caused cancellation to begin. + if first_error_mut.is_none() { + *first_error_mut = err; + } + // Let all the child processes know about the signal, and + // continue to handle events. + // + // Ignore errors here: if there are no receivers to cancel, so + // be it. Also note the ordering here: canceled_ref is set + // *before* this is sent. + let _ = forward_sender_ref + .send(SignalForwardEvent::Shutdown(forward_event)); + } + } + } + } + } + }; + + // Read events from the receiver to completion. + scope.spawn_cancellable(exec_fut, || ()); + + { // groups is going to be passed to future_queue_grouped. let groups = self .profile @@ -465,127 +587,6 @@ impl<'a> TestRunnerInner<'a> { // Run the stream to completion. scope.spawn_cancellable(run_fut, || ()); } - let exec_fut = async move { - let mut signals_done = false; - - loop { - let internal_event = tokio::select! { - internal_event = run_receiver.recv() => { - match internal_event { - Some(event) => InternalEvent::Test(event), - None => { - // All runs have been completed. - break; - } - } - }, - internal_event = signal_handler.recv(), if !signals_done => { - match internal_event { - Some(event) => InternalEvent::Signal(event), - None => { - signals_done = true; - continue; - } - } - }, - }; - - match ctx_mut.handle_event(internal_event) { - #[cfg(unix)] - Ok(Some(JobControlEvent::Stop)) => { - // There are test_threads or fewer tests running so this buffer is - // big enough. - let (sender, mut receiver) = - tokio::sync::mpsc::channel(self.test_threads); - let mut running_tests = forward_sender_ref - .send(SignalForwardEvent::Stop(sender)) - .expect( - "at least one receiver stays open so this should never error out", - ); - // One event to account for the receiver held open at the top. - running_tests -= 1; - - // There's a possibility of a race condition between a test exiting and - // sending the message to the receiver. For that reason, don't wait more - // than 100ms on children to stop. - let mut sleep = - std::pin::pin!(tokio::time::sleep(Duration::from_millis(100))); - - loop { - tokio::select! { - _ = receiver.recv(), if running_tests > 0 => { - running_tests -= 1; - log::debug!( - "stopping tests: running tests down to {running_tests}" - ); - } - _ = &mut sleep => { - break; - } - else => { - break; - } - }; - } - - // Now stop nextest itself. - imp::raise_stop(); - } - #[cfg(unix)] - Ok(Some(JobControlEvent::Continue)) => { - // Nextest has been resumed. Resume all the tests as well. - let _ = forward_sender_ref.send(SignalForwardEvent::Continue); - } - #[cfg(not(unix))] - Ok(Some(_)) => { - crate::helpers::statically_unreachable(); - } - Ok(None) => {} - Err(err) => { - // If an error happens, it is because either the callback failed or - // a cancellation notice was received. If the callback failed, we need - // to send a further cancellation notice as well. - // - // Also note the ordering here: canceled_ref is set *before* - // notifications are broadcast. This prevents race conditions. - canceled_ref.store(true, Ordering::Release); - let _ = cancellation_sender.send(()); - match err { - InternalError::Error(err) => { - // Ignore errors that happen during error cancellation. - if first_error_mut.is_none() { - *first_error_mut = Some(err); - } - let _ = ctx_mut.begin_cancel(CancelReason::ReportError); - } - InternalError::TestFailureCanceled(err) => { - // A test failure has caused cancellation to begin. - if first_error_mut.is_none() { - *first_error_mut = err; - } - } - InternalError::SignalCanceled(forward_event, err) => { - // A signal has caused cancellation to begin. - if first_error_mut.is_none() { - *first_error_mut = err; - } - // Let all the child processes know about the signal, and - // continue to handle events. - // - // Ignore errors here: if there are no receivers to cancel, so - // be it. Also note the ordering here: canceled_ref is set - // *before* this is sent. - let _ = forward_sender_ref - .send(SignalForwardEvent::Shutdown(forward_event)); - } - } - } - } - } - }; - - // Read events from the receiver to completion. - scope.spawn_cancellable(exec_fut, || ()); }); match ctx.run_finished() {