Skip to content

Commit

Permalink
Fix subtle bug in thread poller
Browse files Browse the repository at this point in the history
Thread::park may block indefinitely if the thread token is taken by someone else in-between park and unpark. We rework the system to use condvar and mutex
  • Loading branch information
h33p committed Nov 17, 2023
1 parent 2beba34 commit 1881ba2
Showing 1 changed file with 71 additions and 37 deletions.
108 changes: 71 additions & 37 deletions mfio/src/poller.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use core::future::Future;
use core::mem;
use core::task::{Context, Poll, RawWaker, RawWakerVTable, Waker};
#[cfg(feature = "std")]
use std::thread::{self, Thread};

pub trait ThreadLocal: Sized {
/// Get handle to current thread.
Expand Down Expand Up @@ -88,40 +86,6 @@ pub trait Wakeable: ParkHandle + Clone {
}
}

#[cfg(feature = "std")]
impl ThreadLocal for Thread {
fn current() -> Self {
thread::current()
}
}

#[cfg(feature = "std")]
impl ParkHandle for Thread {
fn park(&self) {
thread::park();
}

fn unpark(&self) {
Thread::unpark(self);
}
}

#[cfg(feature = "std")]
impl Wakeable for Thread {
fn into_opaque(self) -> *const () {
// SAFETY: `Thread` internal layout is an Arc to inner type, which is represented as a
// single pointer. The only thing we do with the pointer is transmute it back to
// ThreadWaker in the waker functions. If for whatever reason Thread layout will change to
// contain multiple fields, this will still be safe, because the compiler will simply
// refuse to compile the program.
unsafe { mem::transmute::<_, *const ()>(self) }
}

unsafe fn from_opaque(data: *const ()) -> Self {
mem::transmute(data)
}
}

impl ThreadLocal for *const () {
fn current() -> Self {
core::ptr::null()
Expand Down Expand Up @@ -208,7 +172,77 @@ pub fn block_on_handle<T: ParkHandle, F: Future>(
/// ```
pub fn block_on<F: Future>(fut: F) -> F::Output {
#[cfg(feature = "std")]
return block_on_t::<Thread, _>(fut);
return block_on_t::<LocalThread, _>(fut);
#[cfg(not(feature = "std"))]
return block_on_t::<*const (), _>(fut);
}

#[cfg(feature = "std")]
pub use std_impl::LocalThread;

#[cfg(feature = "std")]
mod std_impl {
use super::*;
use std::sync::{Arc, Condvar, Mutex};

#[derive(Default)]
struct Signal {
signaled: Mutex<bool>,
cond: Condvar,
}

impl Signal {
fn wait(&self) {
let mut signaled = self
.cond
.wait_while(self.signaled.lock().unwrap(), |signaled| !*signaled)
.unwrap();
*signaled = false;
}

fn wake(&self) {
let mut signaled = self.signaled.lock().unwrap();
// Only one thread will be waiting
self.cond.notify_one();
*signaled = true;
}
}

thread_local! {
static ACCESS: Arc<Signal> = Arc::new(Signal::default());
}

#[derive(Clone)]
pub struct LocalThread(Arc<Signal>);

impl ThreadLocal for LocalThread {
fn current() -> Self {
LocalThread(ACCESS.with(Clone::clone))
}
}

impl ParkHandle for LocalThread {
fn park(&self) {
self.0.wait();
}

fn unpark(&self) {
self.0.wake();
}
}

impl Wakeable for LocalThread {
fn into_opaque(self) -> *const () {
// SAFETY: `Thread` internal layout is an Arc to inner type, which is represented as a
// single pointer. The only thing we do with the pointer is transmute it back to
// ThreadWaker in the waker functions. If for whatever reason Thread layout will change to
// contain multiple fields, this will still be safe, because the compiler will simply
// refuse to compile the program.
unsafe { mem::transmute::<_, *const ()>(self) }
}

unsafe fn from_opaque(data: *const ()) -> Self {
mem::transmute(data)
}
}
}

0 comments on commit 1881ba2

Please sign in to comment.