Skip to content

Commit

Permalink
[nextest-runner] spawn a task for each test
Browse files Browse the repository at this point in the history
Previously we'd use the same task for all tests -- that turns out to be fine in
most cases, but doesn't quite provide per-test isolation, and is slower in
repos like clap that have lots of small tests. Switch to using the same task
instead.

It's cool that async-scoped kind of works here, but non-static tasks are such
a pain to work with in general.

This makes the clap tests significantly faster. On `fc55ad08`, runtime on my
desktop (Ryzen 7950X/Linux) goes from 0.36 seconds to 0.24.
  • Loading branch information
sunshowers committed Dec 11, 2024
1 parent f11b78d commit bd415d4
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 27 deletions.
8 changes: 7 additions & 1 deletion nextest-runner/src/list/test_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use owo_colors::OwoColorize;
use std::{
collections::{BTreeMap, BTreeSet},
ffi::{OsStr, OsString},
io,
fmt, io,
path::PathBuf,
sync::{Arc, OnceLock},
};
Expand Down Expand Up @@ -1070,6 +1070,12 @@ pub struct TestInstanceId<'a> {
pub test_name: &'a str,
}

impl fmt::Display for TestInstanceId<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{} {}", self.binary_id, self.test_name)
}
}

/// Context required for test execution.
#[derive(Clone, Debug)]
pub struct TestExecuteContext<'a> {
Expand Down
8 changes: 5 additions & 3 deletions nextest-runner/src/runner/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
//! receives events from the executor and from other inputs (e.g. signal and
//! input handling), and sends events to the reporter.
use super::{RunUnitRequest, ShutdownRequest};
use super::{RunUnitRequest, RunnerTaskState, ShutdownRequest};
use crate::{
config::{MaxFail, ScriptConfig, ScriptId},
input::{InputEvent, InputHandler},
Expand Down Expand Up @@ -104,7 +104,7 @@ where
report_cancel_rx: oneshot::Receiver<()>,
cancelled_ref: &AtomicBool,
cancellation_sender: broadcast::Sender<()>,
) {
) -> RunnerTaskState {
let mut report_cancel_rx = std::pin::pin!(report_cancel_rx);

let mut signals_done = false;
Expand All @@ -118,7 +118,7 @@ where
Some(event) => InternalEvent::Executor(event),
None => {
// All runs have been completed.
break;
break RunnerTaskState::finished_no_children();
}
}
},
Expand Down Expand Up @@ -705,6 +705,8 @@ where
if self.disable_signal_3_times_panic {
SignalCount::Twice
} else {
// TODO: a panic here won't currently lead to other
// tasks being cancelled. This should be fixed.
panic!("Signaled 3 times, exiting immediately");
}
}
Expand Down
117 changes: 97 additions & 20 deletions nextest-runner/src/runner/imp.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright (c) The nextest Contributors
// SPDX-License-Identifier: MIT OR Apache-2.0

use super::{DispatcherContext, ExecutorContext};
use super::{DispatcherContext, ExecutorContext, RunnerTaskState};
use crate::{
config::{
EvaluatableProfile, MaxFail, RetryPolicy, SetupScriptExecuteData, TestGroup, TestThreads,
Expand Down Expand Up @@ -30,7 +30,7 @@ use tokio::{
sync::{broadcast, mpsc::unbounded_channel, oneshot},
task::JoinError,
};
use tracing::debug;
use tracing::{debug, warn};

/// Test runner options.
#[derive(Debug, Default)]
Expand Down Expand Up @@ -298,20 +298,23 @@ impl<'a> TestRunnerInner<'a> {
cancelled_ref,
cancellation_sender.clone(),
);
scope.spawn_cancellable(dispatcher_fut, || ());
scope.spawn_cancellable(dispatcher_fut, || RunnerTaskState::Cancelled);

let (script_tx, mut script_rx) = unbounded_channel::<SetupScriptExecuteData<'a>>();
let script_resp_tx = resp_tx.clone();
let run_scripts_fut = async move {
// Since script tasks are run serially, we just reuse the one
// script task.
let script_data = executor_cx_ref
.run_setup_scripts(script_resp_tx, cancelled_ref)
.await;
if script_tx.send(script_data).is_err() {
// The dispatcher has shut down, so we should too.
debug!("script_tx.send failed, shutting down");
}
RunnerTaskState::finished_no_children()
};
scope.spawn_cancellable(run_scripts_fut, || ());
scope.spawn_cancellable(run_scripts_fut, || RunnerTaskState::Cancelled);

let Some(script_data) = script_rx.blocking_recv() else {
// Most likely the harness is shutting down, so we should too.
Expand All @@ -337,33 +340,107 @@ impl<'a> TestRunnerInner<'a> {
TestGroup::Global => None,
TestGroup::Custom(name) => Some(name.clone()),
};

let fut = executor_cx_ref.run_test_instance(
test_instance,
settings,
resp_tx.clone(),
cancelled_ref,
cancellation_sender.subscribe(),
setup_script_data.clone(),
);
let cancel_rx = cancellation_sender.subscribe();
let resp_tx = resp_tx.clone();
let setup_script_data = setup_script_data.clone();

// Use a separate Tokio task for each test. For repos with
// lots of small tests, this has been observed to be much
// faster than using a single task for all tests (what we
// used to do). It also provides some degree of per-test
// isolation.
let fut = async move {
// SAFETY: Within an outer scope_and_block (which we
// have here), scope_and_collect is safe as long as the
// returned future isn't forgotten. We're not forgetting
// it below -- we're running it to completion
// immediately.
//
// But recursive scoped calls really feel like pushing
// against the limits of async-scoped. For example,
// there's no way built into async-scoped to propagate a
// cancellation signal from the outer scope to the inner
// scope. (But there could be, right? That seems
// solvable via channels. And we could likely do our own
// channels here.)
let ((), mut ret) = unsafe {
TokioScope::scope_and_collect(move |scope| {
scope.spawn(executor_cx_ref.run_test_instance(
test_instance,
settings,
resp_tx.clone(),
cancelled_ref,
cancel_rx,
setup_script_data,
))
})
}
.await;

// If no future was started, that's really strange.
// Worth at least logging.
let Some(result) = ret.pop() else {
warn!(
"no task was started for test instance: {}",
test_instance.id()
);
return None;
};
match result {
Ok(()) => None,
Err(join_error) => Some(join_error),
}
};

(threads_required, test_group, fut)
})
// future_queue_grouped means tests are spawned in order but returned in
// any order.
// future_queue_grouped means tests are spawned in the order
// defined, but returned in any order.
.future_queue_grouped(self.test_threads, groups)
.collect::<()>();

scope.spawn_cancellable(run_tests_fut, || ());
// Drop the None values.
.filter_map(|r| std::future::ready(r))
.collect::<Vec<_>>()
// Interestingly, using a more idiomatic `async move {
// run_tests_fut.await ... }` block causes Rust 1.83 to complain
// about a weird lifetime mismatch. FutureExt::map as used below
// does not.
.map(|child_join_errors| RunnerTaskState::Finished { child_join_errors });

scope.spawn_cancellable(run_tests_fut, || RunnerTaskState::Cancelled);
});

dispatcher_cx.run_finished();

// Were there any join errors?
// Were there any join errors in tasks?
//
// If one of the tasks panics, we likely end up stuck because the
// dispatcher, which is spawned in the same async-scoped block, doesn't
// get relayed the panic immediately. That should probably be fixed at
// some point.
let mut cancelled_count = 0;
let join_errors = results
.into_iter()
.filter_map(|r| r.err())
.flat_map(|r| {
match r {
Ok(RunnerTaskState::Finished { child_join_errors }) => child_join_errors,
// Largely ignore cancelled tasks since it most likely means
// shutdown -- we don't cancel tasks manually.
Ok(RunnerTaskState::Cancelled) => {
cancelled_count += 1;
Vec::new()
}
Err(join_error) => vec![join_error],
}
})
.collect::<Vec<_>>();

if cancelled_count > 0 {
debug!(
"{} tasks were cancelled -- this \
generally should only happen due to panics",
cancelled_count
);
}
if !join_errors.is_empty() {
return Err(join_errors);
}
Expand Down
23 changes: 20 additions & 3 deletions nextest-runner/src/runner/internal_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,12 @@ use crate::{
};
use nextest_metadata::MismatchReason;
use std::time::Duration;
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
use tokio::{
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
},
task::JoinError,
};

/// An internal event.
Expand Down Expand Up @@ -230,3 +233,17 @@ pub(super) enum InternalTerminateReason {
Timeout,
Signal(ShutdownRequest),
}

pub(super) enum RunnerTaskState {
Finished { child_join_errors: Vec<JoinError> },
Cancelled,
}

impl RunnerTaskState {
/// Mark a runner task as finished and having not run any children.
pub(super) fn finished_no_children() -> Self {
Self::Finished {
child_join_errors: Vec::new(),
}
}
}

0 comments on commit bd415d4

Please sign in to comment.