From d5a27e44ba5965679928750201aa638082ee5d78 Mon Sep 17 00:00:00 2001 From: Jiahao XU Date: Wed, 25 Oct 2023 22:40:16 +1100 Subject: [PATCH] Refactor: Extract new mod `async_executor` Signed-off-by: Jiahao XU --- src/async_executor.rs | 118 ++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 105 +++---------------------------------- 2 files changed, 126 insertions(+), 97 deletions(-) create mode 100644 src/async_executor.rs diff --git a/src/async_executor.rs b/src/async_executor.rs new file mode 100644 index 000000000..ad9e62a65 --- /dev/null +++ b/src/async_executor.rs @@ -0,0 +1,118 @@ +use std::{ + cell::Cell, + future::Future, + pin::Pin, + ptr, + task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, + thread, + time::Duration, +}; + +use crate::Error; + +const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + // Cloning just returns a new no-op raw waker + |_| NOOP_RAW_WAKER, + // `wake` does nothing + |_| {}, + // `wake_by_ref` does nothing + |_| {}, + // Dropping does nothing as we don't allocate anything + |_| {}, +); +const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); + +#[derive(Default)] +pub(super) struct YieldOnce(bool); + +impl Future for YieldOnce { + type Output = (); + + fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { + let flag = &mut std::pin::Pin::into_inner(self).0; + if !*flag { + *flag = true; + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +/// Execute the futures and return when they are all done. +/// +/// Here we use our own homebrew async executor since cc is used in the build +/// script of many popular projects, pulling in additional dependencies would +/// significantly slow down its compilation. +pub(super) fn block_on( + mut fut1: Fut1, + mut fut2: Fut2, + has_made_progress: &Cell, +) -> Result<(), Error> +where + Fut1: Future>, + Fut2: Future>, +{ + // Shadows the future so that it can never be moved and is guaranteed + // to be pinned. + // + // The same trick used in `pin!` macro. + // + // TODO: Once MSRV is bumped to 1.68, replace this with `std::pin::pin!` + let mut fut1 = Some(unsafe { Pin::new_unchecked(&mut fut1) }); + let mut fut2 = Some(unsafe { Pin::new_unchecked(&mut fut2) }); + + // TODO: Once `Waker::noop` stablised and our MSRV is bumped to the version + // which it is stablised, replace this wth `Waker::noop`. + let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; + let mut context = Context::from_waker(&waker); + + let mut backoff_cnt = 0; + + loop { + has_made_progress.set(false); + + if let Some(fut) = fut2.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut2 = None; + res?; + } + } + + if let Some(fut) = fut1.as_mut() { + if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { + fut1 = None; + res?; + } + } + + if fut1.is_none() && fut2.is_none() { + return Ok(()); + } + + if !has_made_progress.get() { + if backoff_cnt > 3 { + // We have yielded at least three times without making' + // any progress, so we will sleep for a while. + let duration = Duration::from_millis(100 * (backoff_cnt - 3).min(10)); + thread::sleep(duration); + } else { + // Given that we spawned a lot of compilation tasks, it is unlikely + // that OS cannot find other ready task to execute. + // + // If all of them are done, then we will yield them and spawn more, + // or simply return. + // + // Thus this will not be turned into a busy-wait loop and it will not + // waste CPU resource. + thread::yield_now(); + } + } + + backoff_cnt = if has_made_progress.get() { + 0 + } else { + backoff_cnt + 1 + }; + } +} diff --git a/src/lib.rs b/src/lib.rs index 5d07d8117..12890af50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -66,6 +66,8 @@ use std::process::{Child, Command, Stdio}; use std::sync::{Arc, Mutex}; use std::thread::{self, JoinHandle}; +#[cfg(feature = "parallel")] +mod async_executor; #[cfg(feature = "parallel")] mod job_token; mod os_pipe; @@ -1295,13 +1297,9 @@ impl Build { #[cfg(feature = "parallel")] fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> { - use std::{ - cell::Cell, - future::Future, - pin::Pin, - ptr, - task::{Context, Poll, RawWaker, RawWakerVTable, Waker}, - }; + use std::cell::Cell; + + use async_executor::{block_on, YieldOnce}; if objs.len() <= 1 { for obj in objs { @@ -1343,7 +1341,7 @@ impl Build { let is_disconnected = Cell::new(false); let has_made_progress = Cell::new(false); - let mut wait_future = async { + let wait_future = async { let mut error = None; // Buffer the stdout let mut stdout = io::BufWriter::with_capacity(128, io::stdout()); @@ -1396,7 +1394,7 @@ impl Build { YieldOnce::default().await; } }; - let mut spawn_future = async { + let spawn_future = async { for obj in objs { let (mut cmd, program) = self.create_compile_object_cmd(obj)?; let token = loop { @@ -1420,65 +1418,7 @@ impl Build { Ok::<_, Error>(()) }; - // Shadows the future so that it can never be moved and is guaranteed - // to be pinned. - // - // The same trick used in `pin!` macro. - let mut wait_future = Some(unsafe { Pin::new_unchecked(&mut wait_future) }); - let mut spawn_future = Some(unsafe { Pin::new_unchecked(&mut spawn_future) }); - - let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) }; - let mut context = Context::from_waker(&waker); - - let mut backoff_cnt = 0; - - loop { - has_made_progress.set(false); - - if let Some(fut) = spawn_future.as_mut() { - if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { - spawn_future = None; - res?; - } - } - - if let Some(fut) = wait_future.as_mut() { - if let Poll::Ready(res) = fut.as_mut().poll(&mut context) { - wait_future = None; - res?; - } - } - - if wait_future.is_none() && spawn_future.is_none() { - return Ok(()); - } - - if !has_made_progress.get() { - if backoff_cnt > 3 { - // We have yielded at least three times without making' - // any progress, so we will sleep for a while. - let duration = - std::time::Duration::from_millis(100 * (backoff_cnt - 3).min(10)); - thread::sleep(duration); - } else { - // Given that we spawned a lot of compilation tasks, it is unlikely - // that OS cannot find other ready task to execute. - // - // If all of them are done, then we will yield them and spawn more, - // or simply return. - // - // Thus this will not be turned into a busy-wait loop and it will not - // waste CPU resource. - thread::yield_now(); - } - } - - backoff_cnt = if has_made_progress.get() { - 0 - } else { - backoff_cnt + 1 - }; - } + return block_on(wait_future, spawn_future, &has_made_progress); struct KillOnDrop(Child); @@ -1490,35 +1430,6 @@ impl Build { } } - #[derive(Default)] - struct YieldOnce(bool); - - impl Future for YieldOnce { - type Output = (); - - fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> { - let flag = &mut std::pin::Pin::into_inner(self).0; - if !*flag { - *flag = true; - Poll::Pending - } else { - Poll::Ready(()) - } - } - } - - const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( - // Cloning just returns a new no-op raw waker - |_| NOOP_RAW_WAKER, - // `wake` does nothing - |_| {}, - // `wake_by_ref` does nothing - |_| {}, - // Dropping does nothing as we don't allocate anything - |_| {}, - ); - const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE); - fn cell_update(cell: &Cell, f: F) where T: Default,