Skip to content

Commit

Permalink
Further simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Dec 11, 2023
1 parent 57a1466 commit 9784359
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 104 deletions.
10 changes: 3 additions & 7 deletions nextest-runner/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,17 +978,13 @@ impl<'a> TestRunnerInner<'a> {

let mut timeout_hit = 0;

let child_stdout = child.stdout.take();
let child_stderr = child.stderr.take();
let streams = child.stdout.take().zip(child.stderr.take());

let mut acc = crate::test_output::TestOutputAccumulator::new();

let (res, leaked) = {
let mut collect_output_fut = std::pin::pin!(crate::test_output::collect_test_output(
child_stdout,
child_stderr,
&mut acc,
));
let mut collect_output_fut =
std::pin::pin!(crate::test_output::collect_test_output(streams, &mut acc));
let mut collect_output_done = false;

let res = loop {
Expand Down
130 changes: 33 additions & 97 deletions nextest-runner/src/test_output.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Utilities for capture output from tests run in a child process
use bytes::{Bytes, BytesMut};
use pin_project_lite::pin_project;
use std::{io::Write as _, pin::Pin, time::Instant};
use tokio::io::{AsyncBufReadExt, BufReader};
use std::{io::Write as _, time::Instant};
use tokio::io::AsyncBufReadExt;

/// A single chunk of captured output, this may represent 0 or more lines
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -192,49 +191,6 @@ impl<'acc> std::fmt::Write for TestOutputWriter<'acc> {
}

Check warning on line 191 in nextest-runner/src/test_output.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/test_output.rs#L191

Added line #L191 was not covered by tests
}

pin_project! {
#[project = StreamProj]
enum Stream {
Stdout { #[pin] o: tokio::io::BufReader<tokio::process::ChildStdout> },
Stderr { #[pin] o: tokio::io::BufReader<tokio::process::ChildStderr> },
}
}

impl tokio::io::AsyncRead for Stream {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match self.project() {
StreamProj::Stdout { o } => o.poll_read(cx, buf),
StreamProj::Stderr { o } => o.poll_read(cx, buf),
}
}
}

impl tokio::io::AsyncBufRead for Stream {
#[inline]
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<&[u8]>> {
match self.project() {
StreamProj::Stdout { o } => o.poll_fill_buf(cx),
StreamProj::Stderr { o } => o.poll_fill_buf(cx),
}
}

#[inline]
fn consume(self: Pin<&mut Self>, amt: usize) {
match self.project() {
StreamProj::Stdout { o } => o.consume(amt),
StreamProj::Stderr { o } => o.consume(amt),
}
}
}

/// The size of each buffered reader's buffer, and the size at which we grow
/// the interleaved buffer.
///
Expand All @@ -243,63 +199,46 @@ impl tokio::io::AsyncBufRead for Stream {
const CHUNK_SIZE: usize = 4 * 1024;

/// Collects the stdout and/or stderr streams into a single buffer
#[allow(clippy::needless_lifetimes, clippy::let_and_return)]
pub fn collect_test_output<'a>(
stdout: Option<tokio::process::ChildStdout>,
stderr: Option<tokio::process::ChildStderr>,
acc: &'a mut TestOutputAccumulator,
) -> impl futures::Future<Output = Result<(), crate::errors::CollectTestOutputError>> + 'a {
let read_loop = async move {
// Currently, the runner either captures both, or neither, never just one
let (mut stdout, mut stderr) = match (stdout, stderr) {
(Some(out), Some(err)) => (
Stream::Stdout {
o: BufReader::with_capacity(CHUNK_SIZE, out),
},
Stream::Stderr {
o: BufReader::with_capacity(CHUNK_SIZE, err),
},
),
(None, None) => {
return Ok(());
}
(out, err) => {
unreachable!(
"one of stdout: {out:?} stderr: {err:?}, was captured, but not the other"
);
}
};
pub async fn collect_test_output(
streams: Option<(tokio::process::ChildStdout, tokio::process::ChildStderr)>,
acc: &mut TestOutputAccumulator,
) -> Result<(), crate::errors::CollectTestOutputError> {
let Some((stdout, stderr)) = streams else {
return Ok(());

Check warning on line 207 in nextest-runner/src/test_output.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/test_output.rs#L207

Added line #L207 was not covered by tests
};

let mut out_done = false;
let mut err_done = false;
let mut stdout = tokio::io::BufReader::with_capacity(CHUNK_SIZE, stdout);
let mut stderr = tokio::io::BufReader::with_capacity(CHUNK_SIZE, stderr);

while !out_done || !err_done {
let (read, which) = tokio::select! {
buf = stdout.fill_buf() => {
let buf = buf.map_err(crate::errors::CollectTestOutputError::ReadStdout)?;
let mut out_done = false;
let mut err_done = false;

while !out_done || !err_done {
tokio::select! {
res = stdout.fill_buf() => {
let read = {
let buf = res.map_err(crate::errors::CollectTestOutputError::ReadStdout)?;
push_chunk(acc, buf, true);
(buf.len(), true)
}
buf = stderr.fill_buf() => {
let buf = buf.map_err(crate::errors::CollectTestOutputError::ReadStderr)?;
push_chunk(acc, buf, false);
(buf.len(), false)
}
};
buf.len()
};

if which {
stdout.consume(read);
out_done = read == 0;
} else {
}
res = stderr.fill_buf() => {
let read = {
let buf = res.map_err(crate::errors::CollectTestOutputError::ReadStderr)?;
push_chunk(acc, buf, false);
buf.len()
};

stderr.consume(read);
err_done = read == 0;
}
}

Ok(())
};
};
}

read_loop
Ok(())
}

#[inline]
Expand All @@ -311,12 +250,9 @@ fn push_chunk(acc: &mut TestOutputAccumulator, chunk: &[u8], stdout: bool) {
}

acc.buf.extend_from_slice(chunk);

let timestamp = Instant::now();

acc.chunks.push(OutputChunk {
range: start..start + chunk.len(),
timestamp,
timestamp: Instant::now(),
stdout,
});
}

0 comments on commit 9784359

Please sign in to comment.