diff --git a/src/actor/actor_cell.rs b/src/actor/actor_cell.rs index 4d066416..54c83391 100644 --- a/src/actor/actor_cell.rs +++ b/src/actor/actor_cell.rs @@ -536,7 +536,10 @@ where self.system.run(future) } #[cfg(feature = "tokio_executor")] - fn run(&self, future: Fut) -> Result::Output>, std::convert::Infallible> + fn run( + &self, + future: Fut, + ) -> Result::Output>, std::convert::Infallible> where Fut: Future + Send + 'static, ::Output: Send, diff --git a/src/kernel.rs b/src/kernel.rs index b47b305c..e7bcb817 100644 --- a/src/kernel.rs +++ b/src/kernel.rs @@ -28,7 +28,7 @@ use crate::{ kernel_ref::KernelRef, mailbox::{flush_to_deadletters, run_mailbox, Mailbox}, }, - system::{ActorRestarted, ActorTerminated, SystemMsg, Run}, + system::{ActorRestarted, ActorTerminated, Run, SystemMsg}, Message, }; @@ -98,9 +98,9 @@ where } }; - #[cfg(not(feature="tokio_executor"))] + #[cfg(not(feature = "tokio_executor"))] sys.run(f).unwrap().forget(); - #[cfg(feature="tokio_executor")] + #[cfg(feature = "tokio_executor")] sys.run(f).unwrap(); Ok(kr) } diff --git a/src/kernel/kernel_ref.rs b/src/kernel/kernel_ref.rs index 91fa9584..55b879e9 100644 --- a/src/kernel/kernel_ref.rs +++ b/src/kernel/kernel_ref.rs @@ -37,11 +37,11 @@ impl KernelRef { fn send(&self, msg: KernelMsg, sys: &ActorSystem) { let mut tx = self.tx.clone(); let res = sys.run(async move { - drop(tx.send(msg).await); - }); - #[cfg(not(feature="tokio_executor"))] + drop(tx.send(msg).await); + }); + #[cfg(not(feature = "tokio_executor"))] res.unwrap().forget(); - #[cfg(feature="tokio_executor")] + #[cfg(feature = "tokio_executor")] res.unwrap(); } } diff --git a/src/system.rs b/src/system.rs index b8bdcf73..644a2caf 100644 --- a/src/system.rs +++ b/src/system.rs @@ -150,10 +150,7 @@ use std::{ use chrono::prelude::*; use config::Config; -use futures::{ - channel::oneshot, - Future, -}; +use futures::{channel::oneshot, Future}; #[cfg(not(feature = "tokio_executor"))] use futures::{ executor::{ThreadPool, ThreadPoolBuilder}, @@ -708,7 +705,10 @@ pub trait Run { Fut: Future + Send + 'static, ::Output: Send; #[cfg(feature = "tokio_executor")] - fn run(&self, future: Fut) -> Result::Output>, Infallible> + fn run( + &self, + future: Fut, + ) -> Result::Output>, Infallible> where Fut: Future + Send + 'static, ::Output: Send; @@ -724,7 +724,10 @@ impl Run for ActorSystem { self.exec.spawn_with_handle(future) } #[cfg(feature = "tokio_executor")] - fn run(&self, future: Fut) -> Result::Output>, Infallible> + fn run( + &self, + future: Fut, + ) -> Result::Output>, Infallible> where Fut: Future + Send + 'static, ::Output: Send, @@ -919,7 +922,6 @@ impl<'a> From<&'a Config> for ThreadPoolConfig { } } - #[derive(Clone)] pub struct SysActors { pub root: BasicActorRef, diff --git a/tests/actors.rs b/tests/actors.rs index b8950b4e..dcb4d8ed 100644 --- a/tests/actors.rs +++ b/tests/actors.rs @@ -48,9 +48,9 @@ impl Receive for Counter { test_fn! { fn actor_create() { let sys = ActorSystem::new().unwrap(); - + assert!(sys.actor_of::("valid-name").is_ok()); - + assert!(sys.actor_of::("/").is_err()); assert!(sys.actor_of::("*").is_err()); assert!(sys.actor_of::("/a/b/c").is_err()); @@ -64,16 +64,16 @@ test_fn! { test_fn! { fn actor_tell() { let sys = ActorSystem::new().unwrap(); - + let actor = sys.actor_of::("me").unwrap(); - + let (probe, listen) = probe(); actor.tell(TestProbe(probe), None); - + for _ in 0..1_000_000 { actor.tell(Add, None); } - + //p_assert_eq!(listen, ()); } } @@ -81,22 +81,22 @@ test_fn! { test_fn! { fn actor_try_tell() { let sys = ActorSystem::new().unwrap(); - + let actor = sys.actor_of::("me").unwrap(); let actor: BasicActorRef = actor.into(); - + let (probe, listen) = probe(); actor .try_tell(CounterMsg::TestProbe(TestProbe(probe)), None) .unwrap(); - + assert!(actor.try_tell(CounterMsg::Add(Add), None).is_ok()); assert!(actor.try_tell("invalid-type".to_string(), None).is_err()); - + for _ in 0..1_000_000 { actor.try_tell(CounterMsg::Add(Add), None).unwrap(); } - + //p_assert_eq!(listen, ()); } } @@ -144,16 +144,16 @@ test_fn! { #[allow(dead_code)] fn actor_stop() { let system = ActorSystem::new().unwrap(); - + let parent = system.actor_of::("parent").unwrap(); - + let (probe, listen) = probe(); parent.tell(TestProbe(probe), None); system.print_tree(); - + // wait for the probe to arrive at the actor before attempting to stop the actor //listen.recv(); - + system.stop(&parent); //p_assert_eq!(listen, ()); } diff --git a/tests/channels.rs b/tests/channels.rs index 0e432aa7..17976ae6 100644 --- a/tests/channels.rs +++ b/tests/channels.rs @@ -67,29 +67,29 @@ impl Receive for Subscriber { } } -test_fn!{ +test_fn! { fn channel_publish() { let sys = ActorSystem::new().unwrap(); - + // Create the channel we'll be using let chan: ChannelRef = channel("my-chan", &sys).unwrap(); - + // The topic we'll be publishing to. Endow our subscriber test actor with this. // On Subscriber's pre_start it will subscribe to this channel+topic let topic = Topic::from("my-topic"); let sub = sys .actor_of_args::("sub-actor", (chan.clone(), topic.clone())) .unwrap(); - + let (probe, mut listen) = probe(); sub.tell(TestProbe(probe), None); - + // wait for the probe to arrive at the actor before publishing message #[cfg(feature = "tokio_executor")] listen.recv().await; #[cfg(not(feature = "tokio_executor"))] listen.recv(); - + // Publish a test message chan.tell( Publish { @@ -98,34 +98,34 @@ test_fn!{ }, None, ); - + p_assert_eq!(listen, ()); } } -test_fn!{ +test_fn! { fn channel_publish_subscribe_all() { let sys = ActorSystem::new().unwrap(); - + // Create the channel we'll be using let chan: ChannelRef = channel("my-chan", &sys).unwrap(); - + // The '*' All topic. Endow our subscriber test actor with this. // On Subscriber's pre_start it will subscribe to all topics on this channel. let topic = Topic::from("*"); let sub = sys .actor_of_args::("sub-actor", (chan.clone(), topic)) .unwrap(); - + let (probe, mut listen) = probe(); sub.tell(TestProbe(probe), None); - + // wait for the probe to arrive at the actor before publishing message #[cfg(feature = "tokio_executor")] listen.recv().await; #[cfg(not(feature = "tokio_executor"))] listen.recv(); - + // Publish a test message to topic "topic-1" chan.tell( Publish { @@ -134,7 +134,7 @@ test_fn!{ }, None, ); - + // Publish a test message to topic "topic-2" chan.tell( Publish { @@ -143,7 +143,7 @@ test_fn!{ }, None, ); - + // Publish a test message to topic "topic-3" chan.tell( Publish { @@ -152,7 +152,7 @@ test_fn!{ }, None, ); - + // Expecting three probe events p_assert_eq!(listen, ()); p_assert_eq!(listen, ()); @@ -263,32 +263,32 @@ impl Receive for EventSubscriber { } } -test_fn!{ +test_fn! { fn channel_system_events() { let sys = ActorSystem::new().unwrap(); - + let actor = sys.actor_of::("event-sub").unwrap(); - + let (probe, mut listen) = probe(); actor.tell(TestProbe(probe), None); - + // wait for the probe to arrive at the actor before attempting // create, restart and stop #[cfg(feature = "tokio_executor")] listen.recv().await; #[cfg(not(feature = "tokio_executor"))] listen.recv(); - + // Create an actor let dumb = sys.actor_of::("dumb-actor").unwrap(); // ActorCreated event was received p_assert_eq!(listen, ()); - + // Force restart of actor dumb.tell(Panic, None); // ActorRestarted event was received p_assert_eq!(listen, ()); - + // Terminate actor sys.stop(&dumb); // ActorTerminated event was receive @@ -340,27 +340,27 @@ impl Receive for DeadLetterSub { } } -test_fn!{ +test_fn! { fn channel_dead_letters() { let sys = ActorSystem::new().unwrap(); let actor = sys.actor_of::("dl-subscriber").unwrap(); - + let (probe, mut listen) = probe(); actor.tell(TestProbe(probe), None); - + // wait for the probe to arrive at the actor before attempting to stop the actor #[cfg(feature = "tokio_executor")] listen.recv().await; #[cfg(not(feature = "tokio_executor"))] listen.recv(); - + let dumb = sys.actor_of::("dumb-actor").unwrap(); - + // immediately stop the actor and attempt to send a message sys.stop(&dumb); std::thread::sleep(std::time::Duration::from_secs(1)); dumb.tell(SomeMessage, None); - + p_assert_eq!(listen, ()); } } diff --git a/tests/logger.rs b/tests/logger.rs index 2d06ac75..c8862a7f 100644 --- a/tests/logger.rs +++ b/tests/logger.rs @@ -1,8 +1,8 @@ use futures::executor::block_on; use riker::actors::*; -use slog::{o, Fuse, Logger}; use riker_testkit::test_fn; +use slog::{o, Fuse, Logger}; mod common { use std::{fmt, result}; @@ -43,7 +43,7 @@ mod common { } } -test_fn!{ +test_fn! { fn system_create_with_slog() { let log = Logger::root( Fuse(common::PrintlnDrain), @@ -55,7 +55,7 @@ test_fn!{ } // a test that logging without slog using "log" crate works -test_fn!{ +test_fn! { fn logging_stdlog() { log::info!("before the system"); let _sys = ActorSystem::new().unwrap(); diff --git a/tests/scheduling.rs b/tests/scheduling.rs index 0f0208d7..b2cd55e5 100644 --- a/tests/scheduling.rs +++ b/tests/scheduling.rs @@ -48,28 +48,28 @@ impl Receive for ScheduleOnce { } } -test_fn!{ +test_fn! { fn schedule_once() { let sys = ActorSystem::new().unwrap(); - + let actor = sys.actor_of::("schedule-once").unwrap(); - + let (probe, mut listen) = probe(); - + // use scheduler to set up probe sys.schedule_once(Duration::from_millis(200), actor, None, TestProbe(probe)); p_assert_eq!(listen, ()); } } -test_fn!{ +test_fn! { fn schedule_at_time() { let sys = ActorSystem::new().unwrap(); - + let actor = sys.actor_of::("schedule-once").unwrap(); - + let (probe, mut listen) = probe(); - + // use scheduler to set up probe at a specific time let schedule_at = Utc::now() + CDuration::milliseconds(200); sys.schedule_at_time(schedule_at, actor, None, TestProbe(probe)); @@ -125,16 +125,16 @@ impl Receive for ScheduleRepeat { } } -test_fn!{ +test_fn! { fn schedule_repeat() { let sys = ActorSystem::new().unwrap(); - + let actor = sys.actor_of::("schedule-repeat").unwrap(); - + let (probe, mut listen) = probe(); - + actor.tell(TestProbe(probe), None); - + p_assert_eq!(listen, ()); } } diff --git a/tests/selection.rs b/tests/selection.rs index 32ae6cbf..f5dba61e 100644 --- a/tests/selection.rs +++ b/tests/selection.rs @@ -42,68 +42,68 @@ impl Actor for SelectTest { } } -test_fn!{ +test_fn! { fn select_child() { let sys = ActorSystem::new().unwrap(); - + sys.actor_of::("select-actor").unwrap(); - + let (probe, mut listen) = probe(); - + // select test actors through actor selection: /root/user/select-actor/* let sel = sys.select("select-actor").unwrap(); - + sel.try_tell(TestProbe(probe), None); - + p_assert_eq!(listen, ()); } } -test_fn!{ +test_fn! { fn select_child_of_child() { let sys = ActorSystem::new().unwrap(); - + sys.actor_of::("select-actor").unwrap(); - + // delay to allow 'select-actor' pre_start to create 'child_a' and 'child_b' // Direct messaging on the actor_ref doesn't have this same issue std::thread::sleep(std::time::Duration::from_millis(500)); - + let (probe, mut listen) = probe(); - + // select test actors through actor selection: /root/user/select-actor/* let sel = sys.select("select-actor/child_a").unwrap(); sel.try_tell(TestProbe(probe), None); - + // actors 'child_a' should fire a probe event p_assert_eq!(listen, ()); } } -test_fn!{ +test_fn! { fn select_all_children_of_child() { let sys = ActorSystem::new().unwrap(); - + sys.actor_of::("select-actor").unwrap(); - + // delay to allow 'select-actor' pre_start to create 'child_a' and 'child_b' // Direct messaging on the actor_ref doesn't have this same issue std::thread::sleep(std::time::Duration::from_millis(500)); - + let (probe, mut listen) = probe(); - + // select relative test actors through actor selection: /root/user/select-actor/* let sel = sys.select("select-actor/*").unwrap(); sel.try_tell(TestProbe(probe.clone()), None); - + // actors 'child_a' and 'child_b' should both fire a probe event p_assert_eq!(listen, ()); p_assert_eq!(listen, ()); - + // select absolute test actors through actor selection: /root/user/select-actor/* let sel = sys.select("/user/select-actor/*").unwrap(); sel.try_tell(TestProbe(probe), None); - + // actors 'child_a' and 'child_b' should both fire a probe event p_assert_eq!(listen, ()); p_assert_eq!(listen, ()); @@ -147,15 +147,15 @@ impl Actor for SelectTest2 { } } -test_fn!{ +test_fn! { fn select_from_context() { let sys = ActorSystem::new().unwrap(); - + let actor = sys.actor_of::("select-actor").unwrap(); - + let (probe, mut listen) = probe(); actor.tell(TestProbe(probe), None); - + // seven events back expected: p_assert_eq!(listen, ()); p_assert_eq!(listen, ()); @@ -167,10 +167,10 @@ test_fn!{ } } -test_fn!{ +test_fn! { fn select_paths() { let sys = ActorSystem::new().unwrap(); - + assert!(sys.select("foo/").is_ok()); assert!(sys.select("/foo/").is_ok()); assert!(sys.select("/foo").is_ok()); @@ -178,7 +178,7 @@ test_fn!{ assert!(sys.select("../foo/").is_ok()); assert!(sys.select("/foo/*").is_ok()); assert!(sys.select("*").is_ok()); - + assert!(sys.select("foo/`").is_err()); assert!(sys.select("foo/@").is_err()); assert!(sys.select("!").is_err()); diff --git a/tests/supervision.rs b/tests/supervision.rs index b024aa2a..a9aba75d 100644 --- a/tests/supervision.rs +++ b/tests/supervision.rs @@ -99,18 +99,18 @@ impl Receive for RestartSup { } } -test_fn!{ +test_fn! { fn supervision_restart_failed_actor() { let sys = ActorSystem::new().unwrap(); - + for i in 0..100 { let sup = sys .actor_of::(&format!("supervisor_{}", i)) .unwrap(); - + // Make the test actor panic sup.tell(Panic, None); - + let (probe, mut listen) = probe::<()>(); sup.tell(TestProbe(probe), None); p_assert_eq!(listen, ()); @@ -205,15 +205,15 @@ impl Receive for EscRestartSup { } } -test_fn!{ +test_fn! { fn supervision_escalate_failed_actor() { let sys = ActorSystem::new().unwrap(); - + let sup = sys.actor_of::("supervisor").unwrap(); - + // Make the test actor panic sup.tell(Panic, None); - + let (probe, mut listen) = probe::<()>(); std::thread::sleep(std::time::Duration::from_millis(2000)); sup.tell(TestProbe(probe), None); diff --git a/tests/system.rs b/tests/system.rs index 5990a13b..60e7e003 100644 --- a/tests/system.rs +++ b/tests/system.rs @@ -3,11 +3,11 @@ use riker::actors::*; use riker_testkit::test_fn; -test_fn!{ +test_fn! { fn system_create() { assert!(ActorSystem::new().is_ok()); assert!(ActorSystem::with_name("valid-name").is_ok()); - + assert!(ActorSystem::with_name("/").is_err()); assert!(ActorSystem::with_name("*").is_err()); assert!(ActorSystem::with_name("/a/b/c").is_err()); @@ -43,26 +43,26 @@ impl Actor for ShutdownTest { fn recv(&mut self, _: &Context, _: Self::Msg, _: Sender) {} } -test_fn!{ +test_fn! { #[allow(dead_code)] fn system_shutdown() { let sys = ActorSystem::new().unwrap(); - + let _ = sys .actor_of_args::("test-actor-1", 1) .unwrap(); - + block_on(sys.shutdown()).unwrap(); } } -test_fn!{ +test_fn! { fn system_futures_exec() { let sys = ActorSystem::new().unwrap(); - + for i in 0..100 { let f = sys.run(async move { format!("some_val_{}", i) }).unwrap(); - + #[cfg(not(feature = "tokio_executor"))] assert_eq!(block_on(f), format!("some_val_{}", i)); #[cfg(feature = "tokio_executor")] @@ -71,10 +71,10 @@ test_fn!{ } } -test_fn!{ +test_fn! { fn system_futures_panic() { let sys = ActorSystem::new().unwrap(); - + for _ in 0..100 { let _ = sys .run(async move { @@ -82,10 +82,10 @@ test_fn!{ }) .unwrap(); } - + for i in 0..100 { let f = sys.run(async move { format!("some_val_{}", i) }).unwrap(); - + #[cfg(not(feature = "tokio_executor"))] assert_eq!(block_on(f), format!("some_val_{}", i)); #[cfg(feature = "tokio_executor")] @@ -94,19 +94,19 @@ test_fn!{ } } -test_fn!{ +test_fn! { fn system_load_app_config() { let sys = ActorSystem::new().unwrap(); - + assert_eq!(sys.config().get_int("app.some_setting").unwrap() as i64, 1); } } -test_fn!{ +test_fn! { fn system_builder() { let sys = SystemBuilder::new().create().unwrap(); block_on(sys.shutdown()).unwrap(); - + let sys = SystemBuilder::new().name("my-sys").create().unwrap(); block_on(sys.shutdown()).unwrap(); }