Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tokio executor #152

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
9814274
Add tokio_executor feature
mankinskin Dec 17, 2020
fc2f708
Test with tokio_executor feature
mankinskin Dec 17, 2020
b88c902
Fix failing tests because of dropped RemoteHandles
mankinskin Dec 17, 2020
3b0997f
Always use tokio_executor feature of riker-testkit
mankinskin Dec 20, 2020
b293c76
Downgrade Tokio to v0.2
mankinskin Dec 20, 2020
df88252
Allow unused ThreadPoolConfig fields
mankinskin Jan 10, 2021
5bbe423
Conditionally enable tokio_executor feature in testkit
mankinskin Jan 10, 2021
872c217
Import riker-testkit as regular dependencies
mankinskin Jan 10, 2021
dbef831
Formatting
mankinskin Jan 10, 2021
2ac563c
Fix warnings
mankinskin Jan 10, 2021
eca92b7
Upgrade to tokio 1.0
mankinskin Jan 10, 2021
2138bab
Remove tokio_executor feature, use tokio by default
mankinskin Jan 11, 2021
298ecba
Fix tests on tokio
mankinskin Jan 11, 2021
d2a95c2
Fix fmt errors
mankinskin Jan 11, 2021
f2ead4d
Add ActorSystem::with_executor
mankinskin Feb 4, 2021
ab25948
Run cargo fmt
mankinskin Feb 4, 2021
8d9fafc
Use futures ThreadPool
mankinskin Feb 6, 2021
b3bc93a
Use features to select executor
mankinskin Feb 6, 2021
feac5dc
Refactor
mankinskin Feb 6, 2021
267ab72
Add features for riker-macros
mankinskin Feb 6, 2021
9c52932
Forget TaskHandles to keep them running
mankinskin Feb 6, 2021
84faaaa
Only use doc tests for not tokio_executor feature
mankinskin Feb 6, 2021
afdad99
Remove riker-macros/tokio_executor feature for now
mankinskin Feb 8, 2021
d691195
Fix warnings
mankinskin Jul 18, 2021
3678431
Remove dev-dependency to riker-testkit
mankinskin Jul 29, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ keywords = ["actors", "actor-model", "async", "cqrs", "event_sourcing"]
[badges]
travis-ci = { repository = "riker-rs/riker" }

[features]
default = []
tokio_executor = ["tokio"]

[dependencies]
riker-macros = { path = "riker-macros", version = "0.2.0" }
chrono = "0.4"
Expand All @@ -27,8 +31,12 @@ slog-stdlog = "4.0"
slog-scope = "4.3.0"
num_cpus = "1.13.0"
dashmap = "3"
tokio = { version = "^1", features = ["rt-multi-thread", "macros", "time"], optional = true }


[dev-dependencies]
riker-testkit = "0.1.0"
log = "0.4"

#[dev-dependencies.riker-testkit]
#git = "https://github.com/mankinskin/riker-testkit"
#branch = "tokio_executor"
8 changes: 8 additions & 0 deletions riker-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,18 @@ keywords = ["actors", "actor-model", "async", "cqrs", "event_sourcing"]
[lib]
proc-macro = true

[features]
default = []

[dependencies]
syn = { version ="1.0", features = ["parsing", "full", "extra-traits", "proc-macro"] }
quote = "1.0"
proc-macro2 = "1.0"
tokio = { version = "^1", features = ["rt-multi-thread", "macros", "time"], optional = true }

[dev-dependencies]
riker = { path = ".." }

[dev-dependencies.riker-testkit]
git = "https://github.com/mankinskin/riker-testkit"
branch = "tokio_executor"
20 changes: 16 additions & 4 deletions riker-macros/tests/macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Receive<String> for NewActor {
}
}

#[test]
#[riker_testkit::test]
fn run_derived_actor() {
let sys = ActorSystem::new().unwrap();

Expand All @@ -45,6 +45,9 @@ fn run_derived_actor() {
// wait until all direct children of the user root are terminated
while sys.user_root().has_children() {
// in order to lower cpu usage, sleep here
#[cfg(feature = "tokio_executor")]
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
#[cfg(not(feature = "tokio_executor"))]
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
Expand Down Expand Up @@ -80,7 +83,7 @@ impl<A: Send + 'static, B: Send + 'static> Receive<String> for GenericActor<A, B
}
}

#[test]
#[riker_testkit::test]
fn run_derived_generic_actor() {
let sys = ActorSystem::new().unwrap();

Expand All @@ -92,6 +95,9 @@ fn run_derived_generic_actor() {
// wait until all direct children of the user root are terminated
while sys.user_root().has_children() {
// in order to lower cpu usage, sleep here
#[cfg(feature = "tokio_executor")]
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
#[cfg(not(feature = "tokio_executor"))]
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
Expand Down Expand Up @@ -131,7 +137,7 @@ impl Receive<Message<String>> for GenericMsgActor {
}
}

#[test]
#[riker_testkit::test]
fn run_generic_message_actor() {
let sys = ActorSystem::new().unwrap();

Expand All @@ -145,6 +151,9 @@ fn run_generic_message_actor() {
// wait until all direct children of the user root are terminated
while sys.user_root().has_children() {
// in order to lower cpu usage, sleep here
#[cfg(feature = "tokio_executor")]
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
#[cfg(not(feature = "tokio_executor"))]
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
Expand Down Expand Up @@ -202,7 +211,7 @@ impl Receive<test_mod::Message> for PathMsgActor {
}
}

#[test]
#[riker_testkit::test]
fn run_path_message_actor() {
let sys = ActorSystem::new().unwrap();

Expand All @@ -219,6 +228,9 @@ fn run_path_message_actor() {
// wait until all direct children of the user root are terminated
while sys.user_root().has_children() {
// in order to lower cpu usage, sleep here
#[cfg(feature = "tokio_executor")]
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
#[cfg(not(feature = "tokio_executor"))]
std::thread::sleep(std::time::Duration::from_millis(50));
}
}
109 changes: 57 additions & 52 deletions src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,58 +210,63 @@ impl<A: Actor + ?Sized> Actor for Box<A> {
/// attribute macro and implemented for each message type to receive.
///
/// # Examples
///
/// ```
/// # use riker::actors::*;
///
/// #[derive(Clone, Debug)]
/// pub struct Foo;
/// #[derive(Clone, Debug)]
/// pub struct Bar;
/// #[actor(Foo, Bar)] // <-- set our actor to receive Foo and Bar types
/// #[derive(Default)]
/// struct MyActor;
///
/// impl Actor for MyActor {
/// type Msg = MyActorMsg; // <-- MyActorMsg is provided for us
///
/// fn recv(&mut self,
/// ctx: &Context<Self::Msg>,
/// msg: Self::Msg,
/// sender: Sender) {
/// self.receive(ctx, msg, sender); // <-- call the respective implementation
/// }
/// }
///
/// impl Receive<Foo> for MyActor {
/// type Msg = MyActorMsg;
///
/// fn receive(&mut self,
/// ctx: &Context<Self::Msg>,
/// msg: Foo, // <-- receive Foo
/// sender: Sender) {
/// println!("Received a Foo");
/// }
/// }
///
/// impl Receive<Bar> for MyActor {
/// type Msg = MyActorMsg;
///
/// fn receive(&mut self,
/// ctx: &Context<Self::Msg>,
/// msg: Bar, // <-- receive Bar
/// sender: Sender) {
/// println!("Received a Bar");
/// }
/// }
///
/// // main
/// let sys = ActorSystem::new().unwrap();
/// let actor = sys.actor_of::<MyActor>("my-actor").unwrap();
///
/// actor.tell(Foo, None);
/// actor.tell(Bar, None);
/// ```
#[cfg_attr(
not(feature = "tokio_executor"),
doc = r##"
```
# use riker::actors::*;

#[derive(Clone, Debug)]
pub struct Foo;
#[derive(Clone, Debug)]
pub struct Bar;
#[actor(Foo, Bar)] // <-- set our actor to receive Foo and Bar types
#[derive(Default)]
struct MyActor;

impl Actor for MyActor {
type Msg = MyActorMsg; // <-- MyActorMsg is provided for us

fn recv(&mut self,
ctx: &Context<Self::Msg>,
msg: Self::Msg,
sender: Sender) {
self.receive(ctx, msg, sender); // <-- call the respective implementation
}
}

impl Receive<Foo> for MyActor {
type Msg = MyActorMsg;

fn receive(&mut self,
ctx: &Context<Self::Msg>,
msg: Foo, // <-- receive Foo
sender: Sender) {
println!("Received a Foo");
}
}

impl Receive<Bar> for MyActor {
type Msg = MyActorMsg;

fn receive(&mut self,
ctx: &Context<Self::Msg>,
msg: Bar, // <-- receive Bar
sender: Sender) {
println!("Received a Bar");
}
}

fn main() {
let sys = ActorSystem::new().unwrap();
let actor = sys.actor_of::<MyActor>("my-actor").unwrap();

actor.tell(Foo, None);
actor.tell(Bar, None);
}
```
"##
)]
pub trait Receive<Msg: Message> {
type Msg: Message;

Expand Down
8 changes: 6 additions & 2 deletions src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ use std::{

use chrono::prelude::*;
use dashmap::DashMap;
use futures::{future::RemoteHandle, task::SpawnError, Future};
use futures::Future;
use uuid::Uuid;

use crate::{
actor::{props::ActorFactory, *},
executor::TaskHandle,
kernel::{
kernel_ref::{dispatch, dispatch_any, KernelRef},
mailbox::{AnyEnqueueError, AnySender, MailboxSender},
Expand Down Expand Up @@ -525,7 +526,10 @@ impl<Msg> Run for Context<Msg>
where
Msg: Message,
{
fn run<Fut>(&self, future: Fut) -> Result<RemoteHandle<<Fut as Future>::Output>, SpawnError>
fn run<Fut>(
&self,
future: Fut,
) -> Result<TaskHandle<<Fut as Future>::Output>, Box<dyn std::error::Error>>
where
Fut: Future + Send + 'static,
<Fut as Future>::Output: Send,
Expand Down
Loading