Skip to content

Commit

Permalink
Add CaptureStrategy
Browse files Browse the repository at this point in the history
While adding tests for the libtestjson output, I noticed that my assumption that stdout/stderr ordering would be _mostly_ correct except in "extreme" scenarios was...completely wrong.

With even the most simple mixed stdout/stderr output (no multithreading, no unbuffered output, no huge single writes) that the ordering of the output was wrong on my machine more often than not. After playing around with the idea of using a pseudo terminal, similar to how alacritty or other terminal emulators work, I checked the even easier case of...just passing the same file descriptor for both stdout and stderr...which works.

Committing this in a separate branch so that I can push and add the Windows implementation before merging this back to the combined output branch.
  • Loading branch information
Jake-Shadle committed Dec 20, 2023
1 parent 8a22794 commit 3a61f35
Show file tree
Hide file tree
Showing 7 changed files with 346 additions and 80 deletions.
18 changes: 15 additions & 3 deletions cargo-nextest/src/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -742,13 +742,16 @@ pub struct TestRunnerOpts {
}

impl TestRunnerOpts {
fn to_builder(&self, no_capture: bool) -> Option<TestRunnerBuilder> {
fn to_builder(
&self,
cap_strat: nextest_runner::test_output::CaptureStrategy,
) -> Option<TestRunnerBuilder> {
if self.no_run {
return None;
}

let mut builder = TestRunnerBuilder::default();
builder.set_no_capture(no_capture);
builder.set_capture_strategy(cap_strat);
if let Some(retries) = self.retries {
builder.set_retries(RetryPolicy::new_without_delay(retries));
}
Expand Down Expand Up @@ -1547,6 +1550,15 @@ impl App {
)?)
}
};
use nextest_runner::test_output::CaptureStrategy;

let cap_strat = if no_capture {
CaptureStrategy::None
} else if matches!(reporter_opts.message_format, MessageFormat::Human) {
CaptureStrategy::Split
} else {
CaptureStrategy::Combined
};

let filter_exprs = self.build_filtering_expressions()?;
let test_filter_builder = self.build_filter.make_test_filter_builder(filter_exprs)?;
Expand Down Expand Up @@ -1579,7 +1591,7 @@ impl App {
}

let handler = SignalHandlerKind::Standard;
let runner_builder = match runner_opts.to_builder(no_capture) {
let runner_builder = match runner_opts.to_builder(cap_strat) {
Some(runner_builder) => runner_builder,
None => {
// This means --no-run was passed in. Exit.
Expand Down
36 changes: 11 additions & 25 deletions nextest-runner/src/list/test_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,39 +751,25 @@ impl<'g> RustTestArtifact<'g> {
argv.push("--ignored");
}

let mut cmd = TestCommand::new(
let cmd = TestCommand::new(
lctx,
program.clone(),
&argv,
&self.cwd,
&self.package,
&self.non_test_binaries,
);
// Capture stdout and stderr, and close stdin.
cmd.command_mut()
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());

let child = cmd
.spawn()
.map_err(|error| CreateTestListError::CommandExecFail {
binary_id: self.binary_id.clone(),
command: std::iter::once(program.clone())
.chain(argv.iter().map(|&s| s.to_owned()))
.collect(),
error,
})?;

let output = child.wait_with_output().await.map_err(|error| {
CreateTestListError::CommandExecFail {
binary_id: self.binary_id.clone(),
command: std::iter::once(program.clone())
.chain(argv.iter().map(|&s| s.to_owned()))
.collect(),
error,
}
})?;
let output =
cmd.wait_with_output()
.await
.map_err(|error| CreateTestListError::CommandExecFail {
binary_id: self.binary_id.clone(),
command: std::iter::once(program.clone())
.chain(argv.iter().map(|&s| s.to_owned()))
.collect(),
error,
})?;

if output.status.success() {
String::from_utf8(output.stdout).map_err(|err| CreateTestListError::CommandNonUtf8 {
Expand Down
49 changes: 25 additions & 24 deletions nextest-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use crate::{
},
signal::{JobControlEvent, ShutdownEvent, SignalEvent, SignalHandler, SignalHandlerKind},
target_runner::TargetRunner,
test_output::TestOutput,
test_output::{CaptureStrategy, TestOutput},
time::{PausableSleep, StopwatchEnd, StopwatchStart},
};
use async_scoped::TokioScope;
Expand Down Expand Up @@ -124,18 +124,25 @@ impl Iterator for BackoffIter {
/// Test runner options.
#[derive(Debug, Default)]
pub struct TestRunnerBuilder {
no_capture: bool,
capture_strategy: CaptureStrategy,
retries: Option<RetryPolicy>,
fail_fast: Option<bool>,
test_threads: Option<TestThreads>,
}

impl TestRunnerBuilder {
/// Sets no-capture mode.
/// Sets the capture strategy for the test runner
///
/// In this mode, tests will always be run serially: `test_threads` will always be 1.
pub fn set_no_capture(&mut self, no_capture: bool) -> &mut Self {
self.no_capture = no_capture;
/// * [`CaptureStrategy::Split`]
/// * pro: output from `stdout` and `stderr` can be identified and easily split
/// * con: ordering between the streams cannot be guaranteed
/// * [`CaptureStrategy::Combined`]
/// * pro: output is guaranteed to be ordered as it would in a terminal emulator
/// * con: distinction between `stdout` and `stderr` is lost
/// * [`CaptureStrategy::None`] -
/// * In this mode, tests will always be run serially: `test_threads` will always be 1.
pub fn set_capture_strategy(&mut self, strategy: CaptureStrategy) -> &mut Self {
self.capture_strategy = strategy;
self
}

Expand Down Expand Up @@ -166,9 +173,9 @@ impl TestRunnerBuilder {
double_spawn: DoubleSpawnInfo,
target_runner: TargetRunner,
) -> Result<TestRunner<'a>, TestRunnerBuildError> {
let test_threads = match self.no_capture {
true => 1,
false => self
let test_threads = match self.capture_strategy {
CaptureStrategy::None => 1,
CaptureStrategy::Combined | CaptureStrategy::Split => self
.test_threads
.unwrap_or_else(|| profile.test_threads())
.compute(),
Expand All @@ -183,7 +190,7 @@ impl TestRunnerBuilder {

Ok(TestRunner {
inner: TestRunnerInner {
no_capture: self.no_capture,
capture_strategy: self.capture_strategy,
profile,
test_threads,
force_retries: self.retries,
Expand Down Expand Up @@ -243,7 +250,7 @@ impl<'a> TestRunner<'a> {

#[derive(Debug)]
struct TestRunnerInner<'a> {
no_capture: bool,
capture_strategy: CaptureStrategy,
profile: &'a NextestProfile<'a>,
test_threads: usize,
// This is Some if the user specifies a retry policy over the command-line.
Expand Down Expand Up @@ -718,7 +725,7 @@ impl<'a> TestRunnerInner<'a> {
let job = imp::Job::create().ok();

// The --no-capture CLI argument overrides the config.
if !self.no_capture {
if self.capture_strategy != CaptureStrategy::None {
if script.config.capture_stdout {
command_mut.stdout(std::process::Stdio::piped());
}
Expand Down Expand Up @@ -954,14 +961,9 @@ impl<'a> TestRunnerInner<'a> {
// best-effort thing.
let job = imp::Job::create().ok();

if !self.no_capture {
// Capture stdout and stderr.
command_mut
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
};

let mut child = cmd.spawn().map_err(RunTestError::Spawn)?;
let crate::test_command::Child { mut child, output } = cmd
.spawn(self.capture_strategy)
.map_err(RunTestError::Spawn)?;

// If assigning the child to the job fails, ignore this. This can happen if the process has
// exited.
Expand All @@ -978,12 +980,11 @@ impl<'a> TestRunnerInner<'a> {

let mut timeout_hit = 0;

let streams = child.stdout.take().zip(child.stderr.take());
let mut test_output = TestOutput::default();

let (res, leaked) = {
let mut collect_output_fut =
std::pin::pin!(crate::test_output::collect_test_output(streams));
std::pin::pin!(crate::test_output::collect_test_output(output));
let mut collect_output_done = false;

let res = loop {
Expand Down Expand Up @@ -2350,7 +2351,7 @@ mod tests {
// Ensure that output settings are ignored with no-capture.
let mut builder = TestRunnerBuilder::default();
builder
.set_no_capture(true)
.set_capture_strategy(CaptureStrategy::None)
.set_test_threads(TestThreads::Count(20));
let test_list = TestList::empty();
let config = NextestConfig::default_config("/fake/dir");
Expand All @@ -2367,7 +2368,7 @@ mod tests {
TargetRunner::empty(),
)
.unwrap();
assert!(runner.inner.no_capture, "no_capture is true");
assert_eq!(runner.inner.capture_strategy, CaptureStrategy::None);
assert_eq!(runner.inner.test_threads, 1, "tests run serially");
}

Expand Down
22 changes: 19 additions & 3 deletions nextest-runner/src/test_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{
double_spawn::{DoubleSpawnContext, DoubleSpawnInfo},
helpers::dylib_path_envvar,
list::{RustBuildMeta, TestListState},
test_output::CaptureStrategy,
};
use camino::{Utf8Path, Utf8PathBuf};
use guppy::graph::PackageMetadata;
Expand All @@ -15,6 +16,9 @@ use std::{
ffi::{OsStr, OsString},
};

mod imp;
pub use imp::{Child, Output};

#[derive(Clone, Debug)]
pub(crate) struct LocalExecuteContext<'a> {
// Note: Must use TestListState here to get remapped paths.
Expand Down Expand Up @@ -92,14 +96,26 @@ impl TestCommand {
&mut self.command
}

pub(crate) fn spawn(self) -> std::io::Result<tokio::process::Child> {
let mut command = tokio::process::Command::from(self.command);
let res = command.spawn();
pub(crate) fn spawn(self, capture_strategy: CaptureStrategy) -> std::io::Result<imp::Child> {
let res = imp::spawn(self.command, capture_strategy);
if let Some(ctx) = self.double_spawn {
ctx.finish();
}
res
}

pub(crate) async fn wait_with_output(self) -> std::io::Result<std::process::Output> {
let mut cmd = self.command;
cmd.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped());
let res = tokio::process::Command::from(cmd).spawn();

if let Some(ctx) = self.double_spawn {
ctx.finish();
}

res?.wait_with_output().await
}
}

pub(crate) fn create_command<I, S>(
Expand Down
80 changes: 80 additions & 0 deletions nextest-runner/src/test_command/imp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use crate::test_output::CaptureStrategy;
use std::process::Stdio;
use tokio::process::{Child as TokioChild, ChildStdout as Pipe};

cfg_if::cfg_if! {
if #[cfg(unix)] {
#[path = "unix.rs"]
mod unix;
use unix as os;
} else if #[cfg(windows)] {
#[path = "windows.rs"]
mod windows;
use windows as os;
} else {
compile_error!("unsupported target platform");
}
}

pub enum Output {
Split { stdout: Pipe, stderr: Pipe },
Combined(Pipe),
}

pub struct Child {
pub child: TokioChild,
pub output: Option<Output>,
}

impl std::ops::Deref for Child {
type Target = TokioChild;

fn deref(&self) -> &Self::Target {
&self.child
}
}

impl std::ops::DerefMut for Child {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.child
}
}

pub(super) fn spawn(
mut cmd: std::process::Command,
strategy: CaptureStrategy,
) -> std::io::Result<Child> {
cmd.stdin(Stdio::null());

let state = match strategy {
CaptureStrategy::None => None,
CaptureStrategy::Split => {
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
None
}
CaptureStrategy::Combined => Some(os::setup_io(&mut cmd)?),
};

let mut cmd: tokio::process::Command = cmd.into();
let mut child = cmd.spawn()?;

let output = match strategy {
CaptureStrategy::None => None,
CaptureStrategy::Split => {
let stdout = child.stdout.take().expect("stdout was set");
let stderr = child.stderr.take().expect("stderr was set");
let stderr = os::stderr_to_stdout(stderr)?;

Some(Output::Split { stdout, stderr })
}
CaptureStrategy::Combined => {
// It doesn't matter which stream we take since they are both the same
// handle, so we take the easy one
Some(Output::Combined(os::state_to_stdout(
state.expect("state was set"),
)?))
}
};

Ok(Child { child, output })
}
Loading

0 comments on commit 3a61f35

Please sign in to comment.