Skip to content

Commit

Permalink
initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
jlizen committed Dec 19, 2024
1 parent fa76791 commit 1af1aed
Show file tree
Hide file tree
Showing 8 changed files with 817 additions and 1 deletion.
14 changes: 13 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,23 @@ categories = ["asynchronous"]
[dependencies]
log = "0.4.22"

# Requires `--cfg compute_heavy_executor_tokio` to enable.
[target.'cfg(compute_heavy_executor_tokio)'.dependencies]
tokio = { version = "1.0", features = ["rt", "rt-multi-thread", "macros", "sync"]}
libc = { version = "0.2.168"}
num_cpus = { version = "1.0"}

[package.metadata.docs.rs]
# enable tokio conditional compilation in the documentation
rustdoc-args = ["--cfg", "compute_heavy_executor_tokio"]
# it's necessary to _also_ pass `--cfg compute_heavy_executor_tokio`
# to rustc, or else dependencies will not be enabled, and the docs build will fail.
rustc-args = ["--cfg", "compute_heavy_executor_tokio"]

[dev-dependencies]
tokio = { version = "1.0", features = ["full"]}

[lints.rust]
# calling libraries can use the convention of `cfg(compute_heavy_executor)` to enable usage of this crate
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(compute_heavy_executor)', 'cfg(compute_heavy_executor_tokio)'] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(compute_heavy_executor)', 'cfg(compute_heavy_executor_tokio)'] }

15 changes: 15 additions & 0 deletions src/block_in_place.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use crate::{error::Error, ComputeHeavyFutureExecutor};

pub(crate) struct BlockInPlaceExecutor {}

impl ComputeHeavyFutureExecutor for BlockInPlaceExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
where
F: std::future::Future<Output = O> + Send + 'static,
O: Send + 'static,
{
Ok(tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async { fut.await })
}))
}
}
13 changes: 13 additions & 0 deletions src/current_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use crate::{error::Error, ComputeHeavyFutureExecutor};

pub(crate) struct CurrentContextExecutor {}

impl ComputeHeavyFutureExecutor for CurrentContextExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
where
F: std::future::Future<Output = O> + Send + 'static,
O: Send + 'static,
{
Ok(fut.await)
}
}
71 changes: 71 additions & 0 deletions src/custom_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use std::{any::Any, future::Future, pin::Pin};

use crate::{error::Error, ComputeHeavyFutureExecutor};

/// The input future for a custom executor closure.
/// This is the regular future, with its output type erased to Any.
pub type AnyWrappedFuture = Pin<Box<dyn Future<Output = Box<dyn Any + Send + 'static>> + Send + 'static>>;

/// A closure that accepts a type-erased input future and returns a future that resolves to
/// either Ok(<type erased input future's output>) or Err(<boxed error representing executor errors>)
pub type CustomExecutorClosure = Box<
dyn Fn(
AnyWrappedFuture,
) -> Box<
dyn Future<Output = Result<Box<dyn Any + Send + 'static>, Box<dyn std::error::Error + Send + Sync>>>,
> + Send
+ Sync,
>;

// used for checking that the closure doesn't mutate the output type while working with Any
struct PrivateType {}

pub(crate) struct CustomExecutor {
pub(crate) closure: CustomExecutorClosure,
}

impl CustomExecutor {
pub(crate) async fn new(closure: CustomExecutorClosure) -> Self {
let executor = Self { closure };

let test_future = async { PrivateType {} };

if let Err(_) = executor.execute(test_future).await {
panic!(
"CustomExecutor strategy initialized with a closure that \
changes the future's output type"
);
}

executor
}
}

impl ComputeHeavyFutureExecutor for CustomExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
where
F: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
let wrapped_future = Box::pin(async move {
let res = fut.await;
Box::new(res) as Box<dyn Any + Send>
});

let executor_result = Box::into_pin((self.closure)(wrapped_future)).await;

match executor_result {
Ok(future_result) => {
if let Ok(value) = future_result.downcast::<O>() {
Ok(*value)
} else {
// we would love to include the output that we actually received, but we don't want
// to force display/debug bounds onto the future output
Err(Error::CustomExecutorOutputTypeMismatch)
}
},
Err(err) => Err(Error::BoxError(err))

}
}
}
21 changes: 21 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use core::fmt;

#[non_exhaustive]
#[derive(Debug)]
pub enum Error {
CustomExecutorOutputTypeMismatch,
#[cfg(compute_heavy_executor_tokio)]
JoinError(String),
BoxError(Box<dyn std::error::Error + Send + Sync>)
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::CustomExecutorOutputTypeMismatch => write!(f, "custom executor returned a different output type than the input future's output type"),
#[cfg(compute_heavy_executor_tokio)]
Error::JoinError(err) => write!(f, "error joining tokio handle: {err}"),
Error::BoxError(err) => write!(f, "{err}"),
}
}
}
Loading

0 comments on commit 1af1aed

Please sign in to comment.