Skip to content

Commit

Permalink
Drastically simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
Jake-Shadle committed Nov 3, 2023
1 parent eb9be61 commit afeda7d
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 197 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion nextest-runner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ windows = { version = "0.48.0", features = [
"Win32_System_Console",
"Win32_System_JobObjects",
] }
windows-sys = { version = "0.48.0", features = ["Win32_System_Pipes"] }
win32job = "1.0.2"
dunce = "1.0.4"

Expand Down
295 changes: 100 additions & 195 deletions nextest-runner/src/test_output.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
//! Utilities for capture output from tests run in a child process
use bytes::{Bytes, BytesMut};
use std::{io::Write as _, time::Instant};
use tokio::io::{AsyncRead, AsyncReadExt};
use pin_project_lite::pin_project;
use std::{io::Write as _, pin::Pin, time::Instant};
use tokio::io::{AsyncBufReadExt, BufReader};

/// A single chunk of captured output, this may represent 0 or more lines
#[derive(Clone, Debug)]
Expand Down Expand Up @@ -191,227 +192,131 @@ impl<'acc> std::fmt::Write for TestOutputWriter<'acc> {
}

Check warning on line 192 in nextest-runner/src/test_output.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/test_output.rs#L192

Added line #L192 was not covered by tests
}

pin_project! {
#[project = StreamProj]
enum Stream {
Stdout { #[pin] o: tokio::io::BufReader<tokio::process::ChildStdout> },
Stderr { #[pin] o: tokio::io::BufReader<tokio::process::ChildStderr> },
}
}

impl tokio::io::AsyncRead for Stream {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
match self.project() {
StreamProj::Stdout { o } => o.poll_read(cx, buf),
StreamProj::Stderr { o } => o.poll_read(cx, buf),

Check warning on line 212 in nextest-runner/src/test_output.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/test_output.rs#L205-L212

Added lines #L205 - L212 were not covered by tests
}
}

Check warning on line 214 in nextest-runner/src/test_output.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/test_output.rs#L214

Added line #L214 was not covered by tests
}

impl tokio::io::AsyncBufRead for Stream {
#[inline]
fn poll_fill_buf(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<std::io::Result<&[u8]>> {
match self.project() {
StreamProj::Stdout { o } => o.poll_fill_buf(cx),
StreamProj::Stderr { o } => o.poll_fill_buf(cx),
}
}

#[inline]
fn consume(self: Pin<&mut Self>, amt: usize) {
match self.project() {
StreamProj::Stdout { o } => o.consume(amt),
StreamProj::Stderr { o } => o.consume(amt),
}
}
}

/// The size of each buffered reader's buffer, and the size at which we grow
/// the interleaved buffer.
///
/// This size is not totally arbitrary, but rather the (normal) page size on
/// most linux, windows, and macos systems.
const CHUNK_SIZE: usize = 4 * 1024;

/// Collects the stdout and/or stderr streams into a single buffer
#[allow(clippy::needless_lifetimes, clippy::let_and_return)]
pub fn collect_test_output<'a>(
mut stdout: Option<tokio::process::ChildStdout>,
mut stderr: Option<tokio::process::ChildStderr>,
stdout: Option<tokio::process::ChildStdout>,
stderr: Option<tokio::process::ChildStderr>,
acc: &'a mut TestOutputAccumulator,
) -> impl futures::Future<Output = Result<(), crate::errors::CollectTestOutputError>> + 'a {
let read_loop = async move {
loop {
if let Some(so) = &mut stdout {
if read_chunk(acc, so, true)
.await
.map_err(crate::errors::CollectTestOutputError::ReadStdout)?
{
read_to_end(acc, so, true)
.await
.map_err(crate::errors::CollectTestOutputError::ReadStdout)?;
stdout.take();
}
}

if let Some(se) = &mut stderr {
if read_chunk(acc, se, false)
.await
.map_err(crate::errors::CollectTestOutputError::ReadStderr)?
{
read_to_end(acc, se, false)
.await
.map_err(crate::errors::CollectTestOutputError::ReadStderr)?;
stderr.take();
}
// Currently, the runner either captures both, or neither, never just one
let (mut stdout, mut stderr) = match (stdout, stderr) {
(Some(out), Some(err)) => (
Stream::Stdout {
o: BufReader::with_capacity(CHUNK_SIZE, out),
},
Stream::Stderr {
o: BufReader::with_capacity(CHUNK_SIZE, err),
},
),
(None, None) => {
return Ok(());

Check warning on line 264 in nextest-runner/src/test_output.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/test_output.rs#L264

Added line #L264 was not covered by tests
}

if stdout.is_none() && stderr.is_none() {
break;
} else {
// We're polling to see if there is data and thus sidestepping
// the async reads if there is none, which can cause tokio to
// deadlock by not giving time to other async tasks, so do a
// yield to keep it happy
tokio::task::yield_now().await;
(out, err) => {
unreachable!(
"one of stdout: {out:?} stderr: {err:?}, was captured, but not the other"
);

Check warning on line 269 in nextest-runner/src/test_output.rs

View check run for this annotation

Codecov / codecov/patch

nextest-runner/src/test_output.rs#L266-L269

Added lines #L266 - L269 were not covered by tests
}
}

Ok(())
};

read_loop
}
};

#[cfg(unix)]
async fn read_chunk<S>(
acc: &mut TestOutputAccumulator,
output: &mut S,
stdout: bool,
) -> std::io::Result<bool>
where
S: AsyncRead + std::os::fd::AsRawFd + Unpin + Send,
{
let fd = output.as_raw_fd();

let mut pfd = libc::pollfd {
fd,
events: libc::POLLIN,
revents: 0,
};
let mut out_done = false;
let mut err_done = false;

// Poll the pipe to see if there is data to read, or the other end has closed
// SAFETY: syscall
let res = unsafe { libc::poll(&mut pfd, 1, 0) };

match res {
// Timed out before the pipe could be ready
0 => Ok(false),
1 => {
if (pfd.revents & libc::POLLIN) != 0 {
// Get the amount of data we can read from the pipe
let mut num_bytes: u32 = 0; // note technically this is an i32, but...

// NOTE: this might not work on some unixes as it's not, strictly speaking,
// a standardized way of checking for pending data
// SAFETY: syscall
if unsafe { libc::ioctl(fd, libc::FIONREAD, &mut num_bytes as *mut u32) } == 0 {
num_bytes
} else {
let last = std::io::Error::last_os_error();
return if matches!(
last.kind(),
std::io::ErrorKind::Interrupted | std::io::ErrorKind::BrokenPipe
) {
Ok(true)
} else {
Err(last)
};
};

// Ignore spurious readiness
if num_bytes == 0 {
return Ok(false);
while !out_done || !err_done {
let (read, which) = tokio::select! {
buf = stdout.fill_buf() => {
let buf = buf.map_err(crate::errors::CollectTestOutputError::ReadStdout)?;
push_chunk(acc, buf, true);
(buf.len(), true)
}
buf = stderr.fill_buf() => {
let buf = buf.map_err(crate::errors::CollectTestOutputError::ReadStderr)?;
push_chunk(acc, buf, false);
(buf.len(), false)
}
};

do_read(acc, output, stdout, num_bytes as usize).await?;
}

// If write end of the pipe has closed, we can stop polling it
Ok((pfd.revents & libc::POLLHUP) != 0)
}
_ => {
let last = std::io::Error::last_os_error();
if matches!(
last.kind(),
std::io::ErrorKind::Interrupted | std::io::ErrorKind::BrokenPipe
) {
Ok(true)
if which {
stdout.consume(read);
out_done = read == 0;
} else {
Err(last)
stderr.consume(read);
err_done = read == 0;
}
}
}
}

#[cfg(windows)]
async fn read_chunk<S>(
acc: &mut TestOutputAccumulator,
output: &mut S,
stdout: bool,
) -> std::io::Result<bool>
where
S: AsyncRead + std::os::windows::io::AsRawHandle + Unpin + Send,
{
// NOTE: we need to put this in a separate block since as_raw_handle()
// returns a pointer and makes the .await at the bottom unable to be compiled
// since they are non-Send
let num_bytes = {
let handle = output.as_raw_handle();

// Check if the pipe has pending data
let mut num_bytes = 0;

// SAFETY: syscall
if unsafe {
windows_sys::Win32::System::Pipes::PeekNamedPipe(
handle as _,
std::ptr::null_mut(),
0,
std::ptr::null_mut(),
&mut num_bytes,
std::ptr::null_mut(),
)
} == 0
{
let err = std::io::Error::last_os_error();

return if err.kind() == std::io::ErrorKind::BrokenPipe {
Ok(true)
} else {
Err(err)
};
}

num_bytes
Ok(())
};

if num_bytes == 0 {
return Ok(false);
}

do_read(acc, output, stdout, num_bytes as usize).await
read_loop
}

/// Perform the actual read now that we know how much data is pending
async fn do_read<S>(
acc: &mut TestOutputAccumulator,
output: &mut S,
stdout: bool,
num_bytes: usize,
) -> std::io::Result<bool>
where
S: AsyncRead + Unpin + Send,
{
#[inline]
fn push_chunk(acc: &mut TestOutputAccumulator, chunk: &[u8], stdout: bool) {
let start = acc.buf.len();

// We allocate in 4k chunks, so avoid if we already have space
if acc.buf.capacity() - start < num_bytes {
const CHUNK_SIZE: usize = 4 * 1024;
let chunk_size = if num_bytes > CHUNK_SIZE {
CHUNK_SIZE * (num_bytes / CHUNK_SIZE + 1)
} else {
CHUNK_SIZE
};

acc.buf.reserve(chunk_size);
if acc.buf.capacity() - start < chunk.len() {
acc.buf.reserve(CHUNK_SIZE);
}

let timestamp = Instant::now();
let read = output.read_buf(&mut acc.buf).await?;
acc.buf.extend_from_slice(chunk);

if read == 0 {
return Ok(true);
}

let end = acc.buf.len();
let timestamp = Instant::now();

acc.chunks.push(OutputChunk {
range: start..end,
range: start..start + chunk.len(),
timestamp,
stdout,
});

Ok(false)
}

async fn read_to_end<S>(
acc: &mut TestOutputAccumulator,
output: &mut S,
stdout: bool,
) -> std::io::Result<()>
where
S: AsyncRead + Unpin + Send,
{
while !do_read(acc, output, stdout, 1024).await? {}

Ok(())
}

0 comments on commit afeda7d

Please sign in to comment.