diff --git a/Cargo.lock b/Cargo.lock index ac0bd1f578d..61b1af824e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1775,6 +1775,7 @@ dependencies = [ "test-strategy", "thiserror 2.0.3", "tokio", + "tokio-stream", "toml", "toml_edit", "tracing", @@ -1799,6 +1800,7 @@ dependencies = [ "console", "either", "futures-channel", + "futures-core", "futures-sink", "getrandom", "indexmap 2.6.0", diff --git a/Cargo.toml b/Cargo.toml index e8b8fa61894..4f518258eb9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -116,6 +116,7 @@ test-case = "3.3.1" test-strategy = "0.4.0" thiserror = "2.0.3" tokio = "1.41.1" +tokio-stream = "0.1.16" toml = "0.8.19" toml_edit = "0.22.22" tracing = "0.1.40" diff --git a/nextest-runner/Cargo.toml b/nextest-runner/Cargo.toml index 72e3390689c..65c93c4fd4d 100644 --- a/nextest-runner/Cargo.toml +++ b/nextest-runner/Cargo.toml @@ -78,6 +78,7 @@ tokio = { workspace = true, features = [ "sync", "time", ] } +tokio-stream = { workspace = true, features = ["signal"] } toml.workspace = true toml_edit = { workspace = true, features = ["serde"] } tracing.workspace = true diff --git a/nextest-runner/src/runner.rs b/nextest-runner/src/runner.rs index 34bc59069e8..6a34cef770d 100644 --- a/nextest-runner/src/runner.rs +++ b/nextest-runner/src/runner.rs @@ -27,7 +27,7 @@ use crate::{ time::{PausableSleep, StopwatchSnapshot, StopwatchStart}, }; use async_scoped::TokioScope; -use chrono::{DateTime, FixedOffset}; +use chrono::{DateTime, FixedOffset, Local}; use display_error_chain::DisplayErrorChain; use future_queue::StreamExt; use futures::prelude::*; @@ -893,7 +893,7 @@ impl<'a> TestRunnerInner<'a> { } // Don't break here to give the wait task a chance to finish. } else { - interval_sleep.as_mut().reset_original_duration(); + interval_sleep.as_mut().reset_last_duration(); } } recv = req_rx.recv() => { @@ -1117,7 +1117,7 @@ impl<'a> TestRunnerInner<'a> { } // Don't break here to give the wait task a chance to finish. } else { - interval_sleep.as_mut().reset_original_duration(); + interval_sleep.as_mut().reset_last_duration(); } } recv = req_rx.recv() => { @@ -1491,7 +1491,7 @@ impl InternalExecuteStatus { output: self.output, result: self.result, start_time: self.stopwatch_end.start_time.fixed_offset(), - time_taken: self.stopwatch_end.duration, + time_taken: self.stopwatch_end.active, is_slow: self.is_slow, delay_before_start: self.delay_before_start, } @@ -1529,7 +1529,7 @@ impl InternalSetupScriptExecuteStatus { output: self.output, result: self.result, start_time: self.stopwatch_end.start_time.fixed_offset(), - time_taken: self.stopwatch_end.duration, + time_taken: self.stopwatch_end.active, is_slow: self.is_slow, env_count: self.env_count, } @@ -1899,8 +1899,12 @@ where fn basic_callback(&mut self, kind: TestEventKind<'a>) { let snapshot = self.stopwatch.snapshot(); let event = TestEvent { - timestamp: snapshot.end_time().fixed_offset(), - elapsed: snapshot.duration, + // We'd previously add up snapshot.start_time + snapshot.active + + // paused, but that isn't resilient to clock changes. Instead, use + // `Local::now()` time (which isn't necessarily monotonic) along + // with snapshot.active (which is almost always monotonic). + timestamp: Local::now().fixed_offset(), + elapsed: snapshot.active, kind, }; (self.callback)(event) @@ -2274,7 +2278,7 @@ where self.basic_callback(TestEventKind::RunFinished { start_time: stopwatch_end.start_time.fixed_offset(), run_id: self.run_id, - elapsed: stopwatch_end.duration, + elapsed: stopwatch_end.active, run_stats: self.run_stats, }) } diff --git a/nextest-runner/src/signal.rs b/nextest-runner/src/signal.rs index 4291eab2915..6fa52e771ed 100644 --- a/nextest-runner/src/signal.rs +++ b/nextest-runner/src/signal.rs @@ -60,99 +60,67 @@ impl SignalHandler { #[cfg(unix)] mod imp { use super::*; - use tokio::signal::unix::{signal, Signal, SignalKind}; + use std::io; + use tokio::signal::unix::{signal, SignalKind}; + use tokio_stream::{wrappers::SignalStream, StreamExt, StreamMap}; + + #[derive(Clone, Copy, Debug, Hash, Eq, PartialEq, Ord, PartialOrd)] + enum SignalId { + Int, + Hup, + Term, + Quit, + Tstp, + Cont, + } /// Signals for SIGINT, SIGTERM and SIGHUP on Unix. #[derive(Debug)] pub(super) struct Signals { - sigint: SignalWithDone, - sighup: SignalWithDone, - sigterm: SignalWithDone, - sigquit: SignalWithDone, - sigtstp: SignalWithDone, - sigcont: SignalWithDone, + // The number of streams is quite small, so a StreamMap (backed by a + // Vec) is a good option to store the list of streams to poll. + map: StreamMap, } impl Signals { - pub(super) fn new() -> std::io::Result { - let sigint = SignalWithDone::new(SignalKind::interrupt())?; - let sighup = SignalWithDone::new(SignalKind::hangup())?; - let sigterm = SignalWithDone::new(SignalKind::terminate())?; - let sigquit = SignalWithDone::new(SignalKind::quit())?; - let sigtstp = SignalWithDone::new(SignalKind::from_raw(libc::SIGTSTP))?; - let sigcont = SignalWithDone::new(SignalKind::from_raw(libc::SIGCONT))?; - - Ok(Self { - sigint, - sighup, - sigterm, - sigquit, - sigtstp, - sigcont, - }) + pub(super) fn new() -> io::Result { + let mut map = StreamMap::new(); + + // Set up basic signals. + map.extend([ + (SignalId::Int, signal_stream(SignalKind::interrupt())?), + (SignalId::Hup, signal_stream(SignalKind::hangup())?), + (SignalId::Term, signal_stream(SignalKind::terminate())?), + (SignalId::Quit, signal_stream(SignalKind::quit())?), + (SignalId::Tstp, signal_stream(tstp_kind())?), + (SignalId::Cont, signal_stream(cont_kind())?), + ]); + + Ok(Self { map }) } pub(super) async fn recv(&mut self) -> Option { - loop { - tokio::select! { - recv = self.sigint.signal.recv(), if !self.sigint.done => { - match recv { - Some(()) => break Some(SignalEvent::Shutdown(ShutdownEvent::Interrupt)), - None => self.sigint.done = true, - } - } - recv = self.sighup.signal.recv(), if !self.sighup.done => { - match recv { - Some(()) => break Some(SignalEvent::Shutdown(ShutdownEvent::Hangup)), - None => self.sighup.done = true, - } - } - recv = self.sigterm.signal.recv(), if !self.sigterm.done => { - match recv { - Some(()) => break Some(SignalEvent::Shutdown(ShutdownEvent::Term)), - None => self.sigterm.done = true, - } - } - recv = self.sigquit.signal.recv(), if !self.sigquit.done => { - match recv { - Some(()) => break Some(SignalEvent::Shutdown(ShutdownEvent::Quit)), - None => self.sigquit.done = true, - } - } - recv = self.sigtstp.signal.recv(), if !self.sigtstp.done => { - match recv { - Some(()) => break Some(SignalEvent::JobControl(JobControlEvent::Stop)), - None => self.sigtstp.done = true, - } - } - recv = self.sigcont.signal.recv(), if !self.sigcont.done => { - match recv { - Some(()) => break Some(SignalEvent::JobControl(JobControlEvent::Continue)), - None => self.sigcont.done = true, - } - } - else => { - break None - } - } - } + self.map.next().await.map(|(id, _)| match id { + SignalId::Int => SignalEvent::Shutdown(ShutdownEvent::Interrupt), + SignalId::Hup => SignalEvent::Shutdown(ShutdownEvent::Hangup), + SignalId::Term => SignalEvent::Shutdown(ShutdownEvent::Term), + SignalId::Quit => SignalEvent::Shutdown(ShutdownEvent::Quit), + SignalId::Tstp => SignalEvent::JobControl(JobControlEvent::Stop), + SignalId::Cont => SignalEvent::JobControl(JobControlEvent::Continue), + }) } } - #[derive(Debug)] - struct SignalWithDone { - signal: Signal, - done: bool, + fn signal_stream(kind: SignalKind) -> io::Result { + Ok(SignalStream::new(signal(kind)?)) } - impl SignalWithDone { - fn new(kind: SignalKind) -> std::io::Result { - let signal = signal(kind)?; - Ok(Self { - signal, - done: false, - }) - } + fn tstp_kind() -> SignalKind { + SignalKind::from_raw(libc::SIGTSTP) + } + + fn cont_kind() -> SignalKind { + SignalKind::from_raw(libc::SIGCONT) } } diff --git a/nextest-runner/src/time/pausable_sleep.rs b/nextest-runner/src/time/pausable_sleep.rs index ad0dbc3011d..1b4d1391b14 100644 --- a/nextest-runner/src/time/pausable_sleep.rs +++ b/nextest-runner/src/time/pausable_sleep.rs @@ -64,10 +64,38 @@ impl PausableSleep { } } - /// Resets the inner sleep to now + the original duration. - pub(crate) fn reset_original_duration(self: Pin<&mut Self>) { + /// Resets the sleep to the given duration. + /// + /// * If the timer is currently running, it will be reset to + /// `Instant::now()` plus the last duration provided via + /// [`pausable_sleep`] or [`Self::reset`]. + /// + /// * If it is currently paused, it will be reset to the new duration + /// whenever it is resumed. + pub(crate) fn reset(self: Pin<&mut Self>, duration: Duration) { let this = self.project(); - this.sleep.reset(Instant::now() + *this.duration); + *this.duration = duration; + match this.pause_state { + SleepPauseState::Running => { + this.sleep.reset(Instant::now() + duration); + } + SleepPauseState::Paused { remaining } => { + *remaining = duration; + } + } + } + + /// Resets the sleep to the last duration provided. + /// + /// * If the timer is currently running, it will be reset to + /// `Instant::now()` plus the last duration provided via + /// [`pausable_sleep`] or [`Self::reset`]. + /// + /// * If it is currently paused, it will be reset to the new duration + /// whenever it is resumed. + pub(crate) fn reset_last_duration(self: Pin<&mut Self>) { + let duration = self.duration; + self.reset(duration); } } @@ -76,14 +104,18 @@ impl Future for PausableSleep { fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { let this = self.project(); - match &this.pause_state { - SleepPauseState::Running => this.sleep.poll(cx), - SleepPauseState::Paused { .. } => Poll::Pending, - } + // Always call into this.sleep. + // + // We don't do anything special for paused sleeps here. That's because + // on pause, the sleep is reset to a far future deadline. Calling poll + // will mean that the future gets registered with the time driver (so is + // not going to be stuck without a waker, even though the waker will + // never end up waking the task in practice). + this.sleep.poll(cx) } } -#[derive(Debug)] +#[derive(Debug, PartialEq, Eq)] enum SleepPauseState { Running, Paused { remaining: Duration }, @@ -97,3 +129,58 @@ fn far_future() -> Instant { // 1000 years overflows on macOS, 100 years overflows on FreeBSD. Instant::now() + Duration::from_secs(86400 * 365 * 30) } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn reset_on_sleep() { + const TICK: Duration = Duration::from_millis(500); + + // Create a very short timer. + let mut sleep = std::pin::pin!(pausable_sleep(Duration::from_millis(1))); + + // Pause the timer. + sleep.as_mut().pause(); + assert!( + !sleep.as_mut().sleep.is_elapsed(), + "underlying sleep has been suspended" + ); + + // Now set the timer to one tick. This should *not* cause the timer to + // be reset -- instead, the new timer should be buffered until the timer + // is resumed. + sleep.as_mut().reset(TICK); + assert_eq!( + sleep.as_ref().pause_state, + SleepPauseState::Paused { remaining: TICK } + ); + assert!( + !sleep.as_mut().sleep.is_elapsed(), + "underlying sleep is still suspended" + ); + + // Now sleep for 2 ticks. The timer should still be paused and not + // completed. + tokio::time::sleep(2 * TICK).await; + assert!( + !sleep.as_mut().sleep.is_elapsed(), + "underlying sleep is still suspended after waiting 2 ticks" + ); + + // Now resume the timer and wait for it to complete. It should take + // around 1 tick starting from this point. + + let now = Instant::now(); + sleep.as_mut().resume(); + sleep.as_mut().await; + + assert!( + sleep.as_mut().sleep.is_elapsed(), + "underlying sleep has finally elapsed" + ); + + assert!(now.elapsed() >= TICK); + } +} diff --git a/nextest-runner/src/time/stopwatch.rs b/nextest-runner/src/time/stopwatch.rs index 2c8a7d8b2a7..d446582036a 100644 --- a/nextest-runner/src/time/stopwatch.rs +++ b/nextest-runner/src/time/stopwatch.rs @@ -67,21 +67,26 @@ impl StopwatchStart { pub(crate) fn snapshot(&self) -> StopwatchSnapshot { StopwatchSnapshot { start_time: self.start_time, - duration: self.instant.elapsed() - self.paused_time, + // self.instant is supposed to be monotonic but might not be so on + // some weird systems. If the duration underflows, just return 0. + active: self.instant.elapsed().saturating_sub(self.paused_time), + paused: self.paused_time, } } } +/// A snapshot of the state of the stopwatch. #[derive(Clone, Debug)] pub(crate) struct StopwatchSnapshot { + /// The time at which the stopwatch was started. pub(crate) start_time: DateTime, - pub(crate) duration: Duration, -} -impl StopwatchSnapshot { - pub(crate) fn end_time(&self) -> DateTime { - self.start_time + self.duration - } + /// The amount of time spent while the stopwatch was active. + pub(crate) active: Duration, + + /// The amount of time spent while the stopwatch was paused. + #[allow(dead_code)] + pub(crate) paused: Duration, } #[derive(Clone, Debug)] @@ -115,10 +120,10 @@ mod tests { // // (Previously, this used to cap the difference at 650ms, but empirically, the test would // sometimes fail on GitHub CI. Just setting a minimum bound is enough.) - let difference = unpaused_end.duration - end.duration; + let difference = unpaused_end.active - end.active; assert!( difference > Duration::from_millis(450), "difference between unpaused_end and end ({difference:?}) is at least 450ms" - ) + ); } } diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index 89201c42a50..4f78cb42768 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -46,6 +46,7 @@ syn = { version = "2.0.89", features = ["extra-traits", "full", "visit", "visit- [target.x86_64-unknown-linux-gnu.dependencies] futures-channel = { version = "0.3.31", features = ["sink"] } +futures-core = { version = "0.3.31" } futures-sink = { version = "0.3.31", default-features = false, features = ["std"] } libc = { version = "0.2.166", features = ["extra_traits"] } linux-raw-sys = { version = "0.4.14", default-features = false, features = ["elf", "errno", "general", "ioctl", "no_std", "std"] } @@ -58,6 +59,7 @@ libc = { version = "0.2.166", features = ["extra_traits"] } [target.x86_64-apple-darwin.dependencies] futures-channel = { version = "0.3.31", features = ["sink"] } +futures-core = { version = "0.3.31" } futures-sink = { version = "0.3.31", default-features = false, features = ["std"] } libc = { version = "0.2.166", features = ["extra_traits"] } rustix = { version = "0.38.37", features = ["fs", "termios"] } @@ -69,6 +71,7 @@ libc = { version = "0.2.166", features = ["extra_traits"] } [target.x86_64-pc-windows-msvc.dependencies] futures-channel = { version = "0.3.31", features = ["sink"] } +futures-core = { version = "0.3.31" } futures-sink = { version = "0.3.31", default-features = false, features = ["std"] } smallvec = { version = "1.13.2", default-features = false, features = ["const_new"] } tokio = { version = "1.41.1", default-features = false, features = ["net"] }