From bd415d49876d90d4b151a214c0d5f43669bd0150 Mon Sep 17 00:00:00 2001 From: Rain Date: Wed, 11 Dec 2024 02:51:14 +0000 Subject: [PATCH] [nextest-runner] spawn a task for each test Previously we'd use the same task for all tests -- that turns out to be fine in most cases, but doesn't quite provide per-test isolation, and is slower in repos like clap that have lots of small tests. Switch to using the same task instead. It's cool that async-scoped kind of works here, but non-static tasks are such a pain to work with in general. This makes the clap tests significantly faster. On `fc55ad08`, runtime on my desktop (Ryzen 7950X/Linux) goes from 0.36 seconds to 0.24. --- nextest-runner/src/list/test_list.rs | 8 +- nextest-runner/src/runner/dispatcher.rs | 8 +- nextest-runner/src/runner/imp.rs | 117 +++++++++++++++---- nextest-runner/src/runner/internal_events.rs | 23 +++- 4 files changed, 129 insertions(+), 27 deletions(-) diff --git a/nextest-runner/src/list/test_list.rs b/nextest-runner/src/list/test_list.rs index 43636de11c5..edf3dd9918e 100644 --- a/nextest-runner/src/list/test_list.rs +++ b/nextest-runner/src/list/test_list.rs @@ -31,7 +31,7 @@ use owo_colors::OwoColorize; use std::{ collections::{BTreeMap, BTreeSet}, ffi::{OsStr, OsString}, - io, + fmt, io, path::PathBuf, sync::{Arc, OnceLock}, }; @@ -1070,6 +1070,12 @@ pub struct TestInstanceId<'a> { pub test_name: &'a str, } +impl fmt::Display for TestInstanceId<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} {}", self.binary_id, self.test_name) + } +} + /// Context required for test execution. #[derive(Clone, Debug)] pub struct TestExecuteContext<'a> { diff --git a/nextest-runner/src/runner/dispatcher.rs b/nextest-runner/src/runner/dispatcher.rs index 98dba26e6d5..23eb90a181d 100644 --- a/nextest-runner/src/runner/dispatcher.rs +++ b/nextest-runner/src/runner/dispatcher.rs @@ -7,7 +7,7 @@ //! receives events from the executor and from other inputs (e.g. signal and //! input handling), and sends events to the reporter. -use super::{RunUnitRequest, ShutdownRequest}; +use super::{RunUnitRequest, RunnerTaskState, ShutdownRequest}; use crate::{ config::{MaxFail, ScriptConfig, ScriptId}, input::{InputEvent, InputHandler}, @@ -104,7 +104,7 @@ where report_cancel_rx: oneshot::Receiver<()>, cancelled_ref: &AtomicBool, cancellation_sender: broadcast::Sender<()>, - ) { + ) -> RunnerTaskState { let mut report_cancel_rx = std::pin::pin!(report_cancel_rx); let mut signals_done = false; @@ -118,7 +118,7 @@ where Some(event) => InternalEvent::Executor(event), None => { // All runs have been completed. - break; + break RunnerTaskState::finished_no_children(); } } }, @@ -705,6 +705,8 @@ where if self.disable_signal_3_times_panic { SignalCount::Twice } else { + // TODO: a panic here won't currently lead to other + // tasks being cancelled. This should be fixed. panic!("Signaled 3 times, exiting immediately"); } } diff --git a/nextest-runner/src/runner/imp.rs b/nextest-runner/src/runner/imp.rs index 4dd59558dd6..ae66f47e6a5 100644 --- a/nextest-runner/src/runner/imp.rs +++ b/nextest-runner/src/runner/imp.rs @@ -1,7 +1,7 @@ // Copyright (c) The nextest Contributors // SPDX-License-Identifier: MIT OR Apache-2.0 -use super::{DispatcherContext, ExecutorContext}; +use super::{DispatcherContext, ExecutorContext, RunnerTaskState}; use crate::{ config::{ EvaluatableProfile, MaxFail, RetryPolicy, SetupScriptExecuteData, TestGroup, TestThreads, @@ -30,7 +30,7 @@ use tokio::{ sync::{broadcast, mpsc::unbounded_channel, oneshot}, task::JoinError, }; -use tracing::debug; +use tracing::{debug, warn}; /// Test runner options. #[derive(Debug, Default)] @@ -298,11 +298,13 @@ impl<'a> TestRunnerInner<'a> { cancelled_ref, cancellation_sender.clone(), ); - scope.spawn_cancellable(dispatcher_fut, || ()); + scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled); let (script_tx, mut script_rx) = unbounded_channel::>(); let script_resp_tx = resp_tx.clone(); let run_scripts_fut = async move { + // Since script tasks are run serially, we just reuse the one + // script task. let script_data = executor_cx_ref .run_setup_scripts(script_resp_tx, cancelled_ref) .await; @@ -310,8 +312,9 @@ impl<'a> TestRunnerInner<'a> { // The dispatcher has shut down, so we should too. debug!("script_tx.send failed, shutting down"); } + RunnerTaskState::finished_no_children() }; - scope.spawn_cancellable(run_scripts_fut, || ()); + scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled); let Some(script_data) = script_rx.blocking_recv() else { // Most likely the harness is shutting down, so we should too. @@ -337,33 +340,107 @@ impl<'a> TestRunnerInner<'a> { TestGroup::Global => None, TestGroup::Custom(name) => Some(name.clone()), }; - - let fut = executor_cx_ref.run_test_instance( - test_instance, - settings, - resp_tx.clone(), - cancelled_ref, - cancellation_sender.subscribe(), - setup_script_data.clone(), - ); + let cancel_rx = cancellation_sender.subscribe(); + let resp_tx = resp_tx.clone(); + let setup_script_data = setup_script_data.clone(); + + // Use a separate Tokio task for each test. For repos with + // lots of small tests, this has been observed to be much + // faster than using a single task for all tests (what we + // used to do). It also provides some degree of per-test + // isolation. + let fut = async move { + // SAFETY: Within an outer scope_and_block (which we + // have here), scope_and_collect is safe as long as the + // returned future isn't forgotten. We're not forgetting + // it below -- we're running it to completion + // immediately. + // + // But recursive scoped calls really feel like pushing + // against the limits of async-scoped. For example, + // there's no way built into async-scoped to propagate a + // cancellation signal from the outer scope to the inner + // scope. (But there could be, right? That seems + // solvable via channels. And we could likely do our own + // channels here.) + let ((), mut ret) = unsafe { + TokioScope::scope_and_collect(move |scope| { + scope.spawn(executor_cx_ref.run_test_instance( + test_instance, + settings, + resp_tx.clone(), + cancelled_ref, + cancel_rx, + setup_script_data, + )) + }) + } + .await; + + // If no future was started, that's really strange. + // Worth at least logging. + let Some(result) = ret.pop() else { + warn!( + "no task was started for test instance: {}", + test_instance.id() + ); + return None; + }; + match result { + Ok(()) => None, + Err(join_error) => Some(join_error), + } + }; (threads_required, test_group, fut) }) - // future_queue_grouped means tests are spawned in order but returned in - // any order. + // future_queue_grouped means tests are spawned in the order + // defined, but returned in any order. .future_queue_grouped(self.test_threads, groups) - .collect::<()>(); - - scope.spawn_cancellable(run_tests_fut, || ()); + // Drop the None values. + .filter_map(|r| std::future::ready(r)) + .collect::>() + // Interestingly, using a more idiomatic `async move { + // run_tests_fut.await ... }` block causes Rust 1.83 to complain + // about a weird lifetime mismatch. FutureExt::map as used below + // does not. + .map(|child_join_errors| RunnerTaskState::Finished { child_join_errors }); + + scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled); }); dispatcher_cx.run_finished(); - // Were there any join errors? + // Were there any join errors in tasks? + // + // If one of the tasks panics, we likely end up stuck because the + // dispatcher, which is spawned in the same async-scoped block, doesn't + // get relayed the panic immediately. That should probably be fixed at + // some point. + let mut cancelled_count = 0; let join_errors = results .into_iter() - .filter_map(|r| r.err()) + .flat_map(|r| { + match r { + Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors, + // Largely ignore cancelled tasks since it most likely means + // shutdown -- we don't cancel tasks manually. + Ok(RunnerTaskState::Cancelled) => { + cancelled_count += 1; + Vec::new() + } + Err(join_error) => vec![join_error], + } + }) .collect::>(); + + if cancelled_count > 0 { + debug!( + "{} tasks were cancelled -- this \ + generally should only happen due to panics", + cancelled_count + ); + } if !join_errors.is_empty() { return Err(join_errors); } diff --git a/nextest-runner/src/runner/internal_events.rs b/nextest-runner/src/runner/internal_events.rs index 35d211dcb12..9a994f224e7 100644 --- a/nextest-runner/src/runner/internal_events.rs +++ b/nextest-runner/src/runner/internal_events.rs @@ -24,9 +24,12 @@ use crate::{ }; use nextest_metadata::MismatchReason; use std::time::Duration; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, +use tokio::{ + sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, + }, + task::JoinError, }; /// An internal event. @@ -230,3 +233,17 @@ pub(super) enum InternalTerminateReason { Timeout, Signal(ShutdownRequest), } + +pub(super) enum RunnerTaskState { + Finished { child_join_errors: Vec }, + Cancelled, +} + +impl RunnerTaskState { + /// Mark a runner task as finished and having not run any children. + pub(super) fn finished_no_children() -> Self { + Self::Finished { + child_join_errors: Vec::new(), + } + } +}