Skip to content

Commit

Permalink
Forget TaskHandles to keep them running
Browse files Browse the repository at this point in the history
  • Loading branch information
mankinskin committed Feb 6, 2021
1 parent 267ab72 commit 9c52932
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 21 deletions.
58 changes: 41 additions & 17 deletions src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ pub trait Task: Future<Output = ()> + Send {}
impl<T: Future<Output = ()> + Send> Task for T {}

pub trait TaskExecutor {
fn spawn(&self, future: Pin<Box<dyn Task>>) -> Result<Box<dyn TaskExec>, Box<dyn Error>>;
fn spawn(&self, future: Pin<Box<dyn Task>>) -> Result<Box<dyn TaskExec<()>>, Box<dyn Error>>;
}
pub trait TaskExec: Future<Output = Result<(), Box<dyn Error>>> + Unpin + Send + Sync {
fn abort(self);
fn forget(self);
pub trait TaskExec<T: Send>:
Future<Output = Result<T, Box<dyn Error>>> + Unpin + Send + Sync
{
fn abort(self: Box<Self>);
fn forget(self: Box<Self>);
}
pub struct TaskHandle<T: Send> {
handle: Box<dyn TaskExec>,
handle: Box<dyn TaskExec<()>>,
recv: Receiver<T>,
}
impl<T: Send> TaskHandle<T> {
pub fn new(handle: Box<dyn TaskExec>, recv: Receiver<T>) -> Self {
pub fn new(handle: Box<dyn TaskExec<()>>, recv: Receiver<T>) -> Self {
Self { handle, recv }
}
}
Expand All @@ -39,6 +41,22 @@ impl<T: Send> Future for TaskHandle<T> {
Poll::Pending
}
}
impl<T: Send> TaskHandle<T> {
pub fn abort(self) {
self.handle.abort()
}
pub fn forget(self) {
self.handle.forget()
}
}
impl<T: Send> TaskExec<T> for TaskHandle<T> {
fn abort(self: Box<Self>) {
self.handle.abort()
}
fn forget(self: Box<Self>) {
self.handle.forget()
}
}

pub use executor_impl::*;
#[cfg(feature = "tokio_executor")]
Expand All @@ -49,7 +67,10 @@ mod executor_impl {
use super::*;
pub struct TokioExecutor(pub tokio::runtime::Handle);
impl TaskExecutor for TokioExecutor {
fn spawn(&self, future: Pin<Box<dyn Task>>) -> Result<Box<dyn TaskExec>, Box<dyn Error>> {
fn spawn(
&self,
future: Pin<Box<dyn Task>>,
) -> Result<Box<dyn TaskExec<()>>, Box<dyn Error>> {
Ok(Box::new(TokioJoinHandle(self.0.spawn(future))))
}
}
Expand All @@ -61,11 +82,11 @@ mod executor_impl {
.map_err(|e| Box::new(e) as Box<dyn Error + 'static>)
}
}
impl TaskExec for TokioJoinHandle {
fn abort(self) {
impl TaskExec<()> for TokioJoinHandle {
fn abort(self: Box<Self>) {
self.0.abort();
}
fn forget(self) {
fn forget(self: Box<Self>) {
drop(self);
}
}
Expand All @@ -88,10 +109,13 @@ mod executor_impl {
}
pub struct FuturesExecutor(pub futures::executor::ThreadPool);
impl TaskExecutor for FuturesExecutor {
fn spawn(&self, future: Pin<Box<dyn Task>>) -> Result<Box<dyn TaskExec>, Box<dyn Error>> {
fn spawn(
&self,
future: Pin<Box<dyn Task>>,
) -> Result<Box<dyn TaskExec<()>>, Box<dyn Error>> {
self.0
.spawn_with_handle(future)
.map(|h| Box::new(FuturesJoinHandle(h)) as Box<dyn TaskExec>)
.map(|h| Box::new(FuturesJoinHandle(h)) as Box<dyn TaskExec<()>>)
.map_err(|e| Box::new(e) as Box<dyn Error>)
}
}
Expand All @@ -102,12 +126,12 @@ mod executor_impl {
Future::poll(Pin::new(&mut self.0), cx).map(|_| Ok(()) as Result<(), Box<dyn Error>>)
}
}
impl TaskExec for FuturesJoinHandle {
fn abort(self) {
drop(self.0)
impl TaskExec<()> for FuturesJoinHandle {
fn abort(self: Box<Self>) {
drop(self)
}
fn forget(self) {
self.0.forget();
fn forget(self: Box<Self>) {
self.0.forget()
}
}
}
2 changes: 1 addition & 1 deletion src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ where
}
};

sys.run(f).unwrap();
sys.run(f).unwrap().forget();
Ok(kr)
}

Expand Down
7 changes: 4 additions & 3 deletions src/kernel/kernel_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ impl KernelRef {

fn send(&self, msg: KernelMsg, sys: &ActorSystem) {
let mut tx = self.tx.clone();
let res = sys.run(async move {
sys.run(async move {
drop(tx.send(msg).await);
});
res.unwrap();
})
.unwrap()
.forget();
}
}

Expand Down

0 comments on commit 9c52932

Please sign in to comment.