Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

initial implementation #1

Merged
merged 21 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,27 @@ homepage = "https://github.com/jlizen/compute-heavy-future-executor"
rust-version = "1.70"
exclude = ["/.github", "/examples", "/scripts"]
readme = "README.md"
description = "Additional executor patterns for use handling compute-bounded, blocking futures."
description = "Additional executor patterns for handling compute-bounded, blocking futures."
categories = ["asynchronous"]

[features]
tokio = ["tokio/rt"]
tokio_block_in_place = ["tokio", "tokio/rt-multi-thread"]
secondary_tokio_runtime = ["tokio", "tokio/rt-multi-thread", "dep:libc", "dep:num_cpus"]

[dependencies]
libc = { version = "0.2.168", optional = true }
log = "0.4.22"
num_cpus = { version = "1.0", optional = true }
tokio = { version = "1.0", features = ["macros", "sync"] }

[dev-dependencies]
tokio = { version = "1.0", features = ["full"]}
futures-util = "0.3.31"

[package.metadata.docs.rs]
all-features = true

[target.'cfg(compute_heavy_executor_tokio)'.dependencies]
tokio = { version = "1.0", features = ["rt", "rt-multi-thread", "macros", "sync"]}
libc = { version = "0.2.168"}
num_cpus = { version = "1.0"}

[lints.rust]
# calling libraries can use the convention of `cfg(compute_heavy_executor)` to enable usage of this crate
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(compute_heavy_executor)', 'cfg(compute_heavy_executor_tokio)'] }
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tokio_unstable)'] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# compute-heavy-future-executor
Experimental crate for adding special handling for frequently blocking futures
Experimental crate that adds additional executor patterns to use with frequently blocking futures.
45 changes: 45 additions & 0 deletions src/block_in_place.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use crate::{
concurrency_limit::ConcurrencyLimit,
error::{Error, InvalidConfig},
ComputeHeavyFutureExecutor,
};

use tokio::runtime::{Handle, RuntimeFlavor};

pub(crate) struct BlockInPlaceExecutor {
concurrency_limit: ConcurrencyLimit,
}

impl BlockInPlaceExecutor {
pub(crate) fn new(max_concurrency: Option<usize>) -> Result<Self, Error> {
match Handle::current().runtime_flavor() {
RuntimeFlavor::MultiThread => Ok(()),
#[cfg(tokio_unstable)]
RuntimeFlavor::MultiThreadAlt => Ok(()),
flavor => Err(Error::InvalidConfig(InvalidConfig {
field: "current tokio runtime flavor",
received: format!("{flavor:#?}"),
expected: "MultiThread",
}))?,
}?;

Ok(Self {
concurrency_limit: ConcurrencyLimit::new(max_concurrency),
})
}
}

impl ComputeHeavyFutureExecutor for BlockInPlaceExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
where
F: std::future::Future<Output = O> + Send + 'static,
O: Send + 'static,
{
let _permit = self.concurrency_limit.acquire_permit().await;

Ok(tokio::task::block_in_place(move || {
tokio::runtime::Handle::current().block_on(async { fut.await })
}))
// permit implicitly drops
}
}
42 changes: 42 additions & 0 deletions src/concurrency_limit.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::sync::Arc;

use tokio::sync::{OwnedSemaphorePermit, Semaphore};

use crate::error::Error;

/// Wrapper around semaphore that turns it into a non-op if no limit is provided
/// or semaphore channel is closed
pub(crate) struct ConcurrencyLimit {
semaphore: Option<Arc<Semaphore>>,
}

impl ConcurrencyLimit {
/// Accepts none in case no concurrency
pub(crate) fn new(limit: Option<usize>) -> Self {
let semaphore = limit.map(|limit| Arc::new(Semaphore::new(limit)));

Self { semaphore }
}

/// Waits on a permit to the semaphore if configured, otherwise immediately returns.
///
/// Internally turns errors into a no-op (`None`) and outputs log lines.
pub(crate) async fn acquire_permit(&self) -> Option<OwnedSemaphorePermit> {
match self.semaphore.clone() {
Some(semaphore) => {
match semaphore
.acquire_owned()
.await
.map_err(|err| Error::Semaphore(err))
{
Ok(permit) => Some(permit),
Err(err) => {
log::error!("failed to acquire permit: {err}");
None
}
}
}
None => None,
}
}
}
27 changes: 27 additions & 0 deletions src/current_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use crate::{concurrency_limit::ConcurrencyLimit, error::Error, ComputeHeavyFutureExecutor};

pub(crate) struct CurrentContextExecutor {
concurrency_limit: ConcurrencyLimit,
}

impl CurrentContextExecutor {
pub(crate) fn new(max_concurrency: Option<usize>) -> Self {
Self {
concurrency_limit: ConcurrencyLimit::new(max_concurrency),
}
}
}

impl ComputeHeavyFutureExecutor for CurrentContextExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
where
F: std::future::Future<Output = O> + Send + 'static,
O: Send + 'static,
{
let _permit = self.concurrency_limit.acquire_permit().await;

Ok(fut.await)

// implicit permit drop
}
}
55 changes: 55 additions & 0 deletions src/custom_executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::{future::Future, pin::Pin};

use crate::{
concurrency_limit::ConcurrencyLimit, error::Error, make_future_cancellable,
ComputeHeavyFutureExecutor,
};

/// A closure that accepts an arbitrary future and polls it to completion
/// via its preferred strategy.
pub type CustomExecutorClosure = Box<
dyn Fn(
Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
) -> Box<
dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send
+ 'static,
> + Send
+ Sync,
>;

pub(crate) struct CustomExecutor {
closure: CustomExecutorClosure,
concurrency_limit: ConcurrencyLimit,
}

impl CustomExecutor {
pub(crate) fn new(closure: CustomExecutorClosure, max_concurrency: Option<usize>) -> Self {
Self {
closure,
concurrency_limit: ConcurrencyLimit::new(max_concurrency),
}
}
}

impl ComputeHeavyFutureExecutor for CustomExecutor {
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
where
F: Future<Output = O> + Send + 'static,
O: Send + 'static,
{
let _permit = self.concurrency_limit.acquire_permit().await;

let (wrapped_future, rx) = make_future_cancellable(fut);

// if our custom executor future resolves to an error, we know it will never send
// the response so we immediately return
if let Err(err) = Box::into_pin((self.closure)(Box::pin(wrapped_future))).await {
return Err(Error::BoxError(err));
}

rx.await.map_err(|err| Error::RecvError(err))

// permit implicitly drops
}
}
45 changes: 45 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use core::fmt;

use crate::ExecutorStrategy;

#[non_exhaustive]
#[derive(Debug)]
pub enum Error {
AlreadyInitialized(ExecutorStrategy),
InvalidConfig(InvalidConfig),
RecvError(tokio::sync::oneshot::error::RecvError),
Semaphore(tokio::sync::AcquireError),
BoxError(Box<dyn std::error::Error + Send + Sync>),
#[cfg(feature = "tokio")]
JoinError(tokio::task::JoinError),
}

#[derive(Debug)]
pub struct InvalidConfig {
pub field: &'static str,
pub received: String,
pub expected: &'static str,
}

impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::AlreadyInitialized(strategy) => write!(
f,
"global strategy is already initialzed with strategy: {strategy:#?}"
),
Error::InvalidConfig(err) => write!(f, "invalid config: {err:#?}"),
Error::BoxError(err) => write!(f, "custom executor error: {err}"),
Error::RecvError(err) => write!(f, "error in custom executor response channel: {err}"),
Error::Semaphore(err) => write!(
f,
"concurrency limiter semaphore channel is closed, continuing: {err}"
),
#[cfg(feature = "tokio")]
Error::JoinError(err) => write!(
f,
"error joining tokio handle in spawn_blocking executor: {err}"
),
}
}
}
Loading