diff --git a/crates/turborepo-lib/src/process/child.rs b/crates/turborepo-lib/src/process/child.rs index ad244ed46e311..3c5f0fd37e673 100644 --- a/crates/turborepo-lib/src/process/child.rs +++ b/crates/turborepo-lib/src/process/child.rs @@ -31,7 +31,7 @@ use tokio::{ process::Command as TokioCommand, sync::{mpsc, watch, RwLock}, }; -use tracing::debug; +use tracing::{debug, trace}; use super::Command; @@ -480,8 +480,12 @@ impl Child { /// Wait for the `Child` to exit, returning the exit code. pub async fn wait(&mut self) -> Option { + trace!("watching exit channel of {}", self.label); // If sending end of exit channel closed, then return last value in the channel - self.exit_channel.changed().await.ok(); + match self.exit_channel.changed().await { + Ok(()) => trace!("exit channel was updated"), + Err(_) => trace!("exit channel sender was dropped"), + } *self.exit_channel.borrow() } @@ -654,7 +658,10 @@ impl Child { ) -> Option> { match stream { Some(stream) => match stream.read_until(b'\n', buffer).await { - Ok(0) => None, + Ok(0) => { + trace!("reached EOF"); + None + } Ok(_) => Some(Ok(())), Err(e) => Some(Err(e)), }, @@ -669,26 +676,31 @@ impl Child { loop { tokio::select! { Some(result) = next_line(&mut stdout_lines, &mut stdout_buffer) => { + trace!("processing stdout line"); result?; add_trailing_newline(&mut stdout_buffer); stdout_pipe.write_all(&stdout_buffer)?; stdout_buffer.clear(); } Some(result) = next_line(&mut stderr_lines, &mut stderr_buffer) => { + trace!("processing stderr line"); result?; add_trailing_newline(&mut stderr_buffer); stdout_pipe.write_all(&stderr_buffer)?; stderr_buffer.clear(); } status = self.wait(), if !is_exited => { + trace!("child process exited: {}", self.label()); is_exited = true; // We don't abort in the cases of a zero exit code as we could be // caching this task and should read all the logs it produces. if status != Some(ChildExit::Finished(Some(0))) { + debug!("child process failed, skipping reading stdout/stderr"); return Ok(status); } } else => { + trace!("flushing child stdout/stderr buffers"); // In the case that both futures read a complete line // the future not chosen in the select will return None if it's at EOF // as the number of bytes read will be 0. @@ -753,6 +765,7 @@ impl ChildStateManager { match state { ChildState::Exited(exit) => { // ignore the send error, failure means the channel is dropped + trace!("sending child exit"); self.exit_tx.send(Some(exit)).ok(); } ChildState::Running(_) => { @@ -783,6 +796,7 @@ impl ChildStateManager { } // ignore the send error, the channel is dropped anyways + trace!("sending child exit"); self.exit_tx.send(Some(child_exit)).ok(); } } @@ -832,6 +846,7 @@ mod test { #[test_case(false)] #[test_case(TEST_PTY)] + #[tracing_test::traced_test] #[tokio::test] async fn test_wait(use_pty: bool) { let script = find_script_dir().join_component("hello_world.js"); diff --git a/crates/turborepo-lib/src/process/mod.rs b/crates/turborepo-lib/src/process/mod.rs index 0fdbc3efff594..da7bd0a5bebdc 100644 --- a/crates/turborepo-lib/src/process/mod.rs +++ b/crates/turborepo-lib/src/process/mod.rs @@ -43,6 +43,7 @@ struct ProcessManagerInner { impl ProcessManager { pub fn new(use_pty: bool) -> Self { + debug!("spawning children with pty: {use_pty}"); Self { state: Arc::new(Mutex::new(ProcessManagerInner { is_closing: false, @@ -75,8 +76,14 @@ impl ProcessManager { command: Command, stop_timeout: Duration, ) -> Option> { + let label = tracing::enabled!(tracing::Level::TRACE) + .then(|| command.label()) + .unwrap_or_default(); + trace!("acquiring lock for spawning {label}"); let mut lock = self.state.lock().unwrap(); + trace!("acquired lock for spawning {label}"); if lock.is_closing { + debug!("process manager closing"); return None; } let child = child::Child::spawn( @@ -87,6 +94,7 @@ impl ProcessManager { if let Ok(child) = &child { lock.children.push(child.clone()); } + trace!("releasing lock for spawning {label}"); Some(child) }