Skip to content

Commit

Permalink
Refactor: Extract new mod async_executor
Browse files Browse the repository at this point in the history
Signed-off-by: Jiahao XU <[email protected]>
  • Loading branch information
NobodyXu committed Oct 25, 2023
1 parent f8b1981 commit d5a27e4
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 97 deletions.
118 changes: 118 additions & 0 deletions src/async_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::{
cell::Cell,
future::Future,
pin::Pin,
ptr,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
thread,
time::Duration,
};

use crate::Error;

const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
// Cloning just returns a new no-op raw waker
|_| NOOP_RAW_WAKER,
// `wake` does nothing
|_| {},
// `wake_by_ref` does nothing
|_| {},
// Dropping does nothing as we don't allocate anything
|_| {},
);
const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE);

#[derive(Default)]
pub(super) struct YieldOnce(bool);

impl Future for YieldOnce {
type Output = ();

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
let flag = &mut std::pin::Pin::into_inner(self).0;
if !*flag {
*flag = true;
Poll::Pending
} else {
Poll::Ready(())
}
}
}

/// Execute the futures and return when they are all done.
///
/// Here we use our own homebrew async executor since cc is used in the build
/// script of many popular projects, pulling in additional dependencies would
/// significantly slow down its compilation.
pub(super) fn block_on<Fut1, Fut2>(
mut fut1: Fut1,
mut fut2: Fut2,
has_made_progress: &Cell<bool>,
) -> Result<(), Error>
where
Fut1: Future<Output = Result<(), Error>>,
Fut2: Future<Output = Result<(), Error>>,
{
// Shadows the future so that it can never be moved and is guaranteed
// to be pinned.
//
// The same trick used in `pin!` macro.
//
// TODO: Once MSRV is bumped to 1.68, replace this with `std::pin::pin!`
let mut fut1 = Some(unsafe { Pin::new_unchecked(&mut fut1) });
let mut fut2 = Some(unsafe { Pin::new_unchecked(&mut fut2) });

// TODO: Once `Waker::noop` stablised and our MSRV is bumped to the version
// which it is stablised, replace this wth `Waker::noop`.
let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
let mut context = Context::from_waker(&waker);

let mut backoff_cnt = 0;

loop {
has_made_progress.set(false);

if let Some(fut) = fut2.as_mut() {
if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
fut2 = None;
res?;
}
}

if let Some(fut) = fut1.as_mut() {
if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
fut1 = None;
res?;
}
}

if fut1.is_none() && fut2.is_none() {
return Ok(());
}

if !has_made_progress.get() {
if backoff_cnt > 3 {
// We have yielded at least three times without making'
// any progress, so we will sleep for a while.
let duration = Duration::from_millis(100 * (backoff_cnt - 3).min(10));
thread::sleep(duration);
} else {
// Given that we spawned a lot of compilation tasks, it is unlikely
// that OS cannot find other ready task to execute.
//
// If all of them are done, then we will yield them and spawn more,
// or simply return.
//
// Thus this will not be turned into a busy-wait loop and it will not
// waste CPU resource.
thread::yield_now();
}
}

backoff_cnt = if has_made_progress.get() {
0
} else {
backoff_cnt + 1
};
}
}
105 changes: 8 additions & 97 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ use std::process::{Child, Command, Stdio};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};

#[cfg(feature = "parallel")]
mod async_executor;
#[cfg(feature = "parallel")]
mod job_token;
mod os_pipe;
Expand Down Expand Up @@ -1295,13 +1297,9 @@ impl Build {

#[cfg(feature = "parallel")]
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
use std::{
cell::Cell,
future::Future,
pin::Pin,
ptr,
task::{Context, Poll, RawWaker, RawWakerVTable, Waker},
};
use std::cell::Cell;

use async_executor::{block_on, YieldOnce};

if objs.len() <= 1 {
for obj in objs {
Expand Down Expand Up @@ -1343,7 +1341,7 @@ impl Build {
let is_disconnected = Cell::new(false);
let has_made_progress = Cell::new(false);

let mut wait_future = async {
let wait_future = async {
let mut error = None;
// Buffer the stdout
let mut stdout = io::BufWriter::with_capacity(128, io::stdout());
Expand Down Expand Up @@ -1396,7 +1394,7 @@ impl Build {
YieldOnce::default().await;
}
};
let mut spawn_future = async {
let spawn_future = async {
for obj in objs {
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
let token = loop {
Expand All @@ -1420,65 +1418,7 @@ impl Build {
Ok::<_, Error>(())
};

// Shadows the future so that it can never be moved and is guaranteed
// to be pinned.
//
// The same trick used in `pin!` macro.
let mut wait_future = Some(unsafe { Pin::new_unchecked(&mut wait_future) });
let mut spawn_future = Some(unsafe { Pin::new_unchecked(&mut spawn_future) });

let waker = unsafe { Waker::from_raw(NOOP_RAW_WAKER) };
let mut context = Context::from_waker(&waker);

let mut backoff_cnt = 0;

loop {
has_made_progress.set(false);

if let Some(fut) = spawn_future.as_mut() {
if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
spawn_future = None;
res?;
}
}

if let Some(fut) = wait_future.as_mut() {
if let Poll::Ready(res) = fut.as_mut().poll(&mut context) {
wait_future = None;
res?;
}
}

if wait_future.is_none() && spawn_future.is_none() {
return Ok(());
}

if !has_made_progress.get() {
if backoff_cnt > 3 {
// We have yielded at least three times without making'
// any progress, so we will sleep for a while.
let duration =
std::time::Duration::from_millis(100 * (backoff_cnt - 3).min(10));
thread::sleep(duration);
} else {
// Given that we spawned a lot of compilation tasks, it is unlikely
// that OS cannot find other ready task to execute.
//
// If all of them are done, then we will yield them and spawn more,
// or simply return.
//
// Thus this will not be turned into a busy-wait loop and it will not
// waste CPU resource.
thread::yield_now();
}
}

backoff_cnt = if has_made_progress.get() {
0
} else {
backoff_cnt + 1
};
}
return block_on(wait_future, spawn_future, &has_made_progress);

struct KillOnDrop(Child);

Expand All @@ -1490,35 +1430,6 @@ impl Build {
}
}

#[derive(Default)]
struct YieldOnce(bool);

impl Future for YieldOnce {
type Output = ();

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
let flag = &mut std::pin::Pin::into_inner(self).0;
if !*flag {
*flag = true;
Poll::Pending
} else {
Poll::Ready(())
}
}
}

const NOOP_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
// Cloning just returns a new no-op raw waker
|_| NOOP_RAW_WAKER,
// `wake` does nothing
|_| {},
// `wake_by_ref` does nothing
|_| {},
// Dropping does nothing as we don't allocate anything
|_| {},
);
const NOOP_RAW_WAKER: RawWaker = RawWaker::new(ptr::null(), &NOOP_WAKER_VTABLE);

fn cell_update<T, F>(cell: &Cell<T>, f: F)
where
T: Default,
Expand Down

0 comments on commit d5a27e4

Please sign in to comment.