Skip to content

Commit

Permalink
Formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
mankinskin committed Jan 10, 2021
1 parent 872c217 commit dbef831
Show file tree
Hide file tree
Showing 11 changed files with 130 additions and 126 deletions.
5 changes: 4 additions & 1 deletion src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,10 @@ where
self.system.run(future)
}
#[cfg(feature = "tokio_executor")]
fn run<Fut>(&self, future: Fut) -> Result<tokio::task::JoinHandle<<Fut as Future>::Output>, std::convert::Infallible>
fn run<Fut>(
&self,
future: Fut,
) -> Result<tokio::task::JoinHandle<<Fut as Future>::Output>, std::convert::Infallible>
where
Fut: Future + Send + 'static,
<Fut as Future>::Output: Send,
Expand Down
6 changes: 3 additions & 3 deletions src/kernel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
kernel_ref::KernelRef,
mailbox::{flush_to_deadletters, run_mailbox, Mailbox},
},
system::{ActorRestarted, ActorTerminated, SystemMsg, Run},
system::{ActorRestarted, ActorTerminated, Run, SystemMsg},
Message,
};

Expand Down Expand Up @@ -98,9 +98,9 @@ where
}
};

#[cfg(not(feature="tokio_executor"))]
#[cfg(not(feature = "tokio_executor"))]
sys.run(f).unwrap().forget();
#[cfg(feature="tokio_executor")]
#[cfg(feature = "tokio_executor")]
sys.run(f).unwrap();
Ok(kr)
}
Expand Down
8 changes: 4 additions & 4 deletions src/kernel/kernel_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ impl KernelRef {
fn send(&self, msg: KernelMsg, sys: &ActorSystem) {
let mut tx = self.tx.clone();
let res = sys.run(async move {
drop(tx.send(msg).await);
});
#[cfg(not(feature="tokio_executor"))]
drop(tx.send(msg).await);
});
#[cfg(not(feature = "tokio_executor"))]
res.unwrap().forget();
#[cfg(feature="tokio_executor")]
#[cfg(feature = "tokio_executor")]
res.unwrap();
}
}
Expand Down
16 changes: 9 additions & 7 deletions src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,7 @@ use std::{

use chrono::prelude::*;
use config::Config;
use futures::{
channel::oneshot,
Future,
};
use futures::{channel::oneshot, Future};
#[cfg(not(feature = "tokio_executor"))]
use futures::{
executor::{ThreadPool, ThreadPoolBuilder},
Expand Down Expand Up @@ -697,7 +694,10 @@ pub trait Run {
Fut: Future + Send + 'static,
<Fut as Future>::Output: Send;
#[cfg(feature = "tokio_executor")]
fn run<Fut>(&self, future: Fut) -> Result<tokio::task::JoinHandle<<Fut as Future>::Output>, Infallible>
fn run<Fut>(
&self,
future: Fut,
) -> Result<tokio::task::JoinHandle<<Fut as Future>::Output>, Infallible>
where
Fut: Future + Send + 'static,
<Fut as Future>::Output: Send;
Expand All @@ -713,7 +713,10 @@ impl Run for ActorSystem {
self.exec.spawn_with_handle(future)
}
#[cfg(feature = "tokio_executor")]
fn run<Fut>(&self, future: Fut) -> Result<tokio::task::JoinHandle<<Fut as Future>::Output>, Infallible>
fn run<Fut>(
&self,
future: Fut,
) -> Result<tokio::task::JoinHandle<<Fut as Future>::Output>, Infallible>
where
Fut: Future + Send + 'static,
<Fut as Future>::Output: Send,
Expand Down Expand Up @@ -908,7 +911,6 @@ impl<'a> From<&'a Config> for ThreadPoolConfig {
}
}


#[derive(Clone)]
pub struct SysActors {
pub root: BasicActorRef,
Expand Down
29 changes: 14 additions & 15 deletions tests/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ impl Receive<Add> for Counter {
test_fn! {
fn actor_create() {
let sys = ActorSystem::new().unwrap();

assert!(sys.actor_of::<Counter>("valid-name").is_ok());

match sys.actor_of::<Counter>("/") {
Ok(_) => panic!("test should not reach here"),
Err(e) => {
Expand Down Expand Up @@ -80,39 +79,39 @@ test_fn! {
test_fn! {
fn actor_tell() {
let sys = ActorSystem::new().unwrap();

let actor = sys.actor_of::<Counter>("me").unwrap();

let (probe, listen) = probe();
actor.tell(TestProbe(probe), None);

for _ in 0..1_000_000 {
actor.tell(Add, None);
}

//p_assert_eq!(listen, ());
}
}

test_fn! {
fn actor_try_tell() {
let sys = ActorSystem::new().unwrap();

let actor = sys.actor_of::<Counter>("me").unwrap();
let actor: BasicActorRef = actor.into();

let (probe, listen) = probe();
actor
.try_tell(CounterMsg::TestProbe(TestProbe(probe)), None)
.unwrap();

assert!(actor.try_tell(CounterMsg::Add(Add), None).is_ok());
assert!(actor.try_tell("invalid-type".to_string(), None).is_err());

for _ in 0..1_000_000 {
actor.try_tell(CounterMsg::Add(Add), None).unwrap();
}

//p_assert_eq!(listen, ());
}
}
Expand Down Expand Up @@ -160,16 +159,16 @@ test_fn! {
#[allow(dead_code)]
fn actor_stop() {
let system = ActorSystem::new().unwrap();

let parent = system.actor_of::<Parent>("parent").unwrap();

let (probe, listen) = probe();
parent.tell(TestProbe(probe), None);
system.print_tree();

// wait for the probe to arrive at the actor before attempting to stop the actor
//listen.recv();

system.stop(&parent);
//p_assert_eq!(listen, ());
}
Expand Down
58 changes: 29 additions & 29 deletions tests/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,29 @@ impl Receive<SomeMessage> for Subscriber {
}
}

test_fn!{
test_fn! {
fn channel_publish() {
let sys = ActorSystem::new().unwrap();

// Create the channel we'll be using
let chan: ChannelRef<SomeMessage> = channel("my-chan", &sys).unwrap();

// The topic we'll be publishing to. Endow our subscriber test actor with this.
// On Subscriber's pre_start it will subscribe to this channel+topic
let topic = Topic::from("my-topic");
let sub = sys
.actor_of_args::<Subscriber, _>("sub-actor", (chan.clone(), topic.clone()))
.unwrap();

let (probe, mut listen) = probe();
sub.tell(TestProbe(probe), None);

// wait for the probe to arrive at the actor before publishing message
#[cfg(feature = "tokio_executor")]
listen.recv().await;
#[cfg(not(feature = "tokio_executor"))]
listen.recv();

// Publish a test message
chan.tell(
Publish {
Expand All @@ -98,34 +98,34 @@ test_fn!{
},
None,
);

p_assert_eq!(listen, ());
}
}

test_fn!{
test_fn! {
fn channel_publish_subscribe_all() {
let sys = ActorSystem::new().unwrap();

// Create the channel we'll be using
let chan: ChannelRef<SomeMessage> = channel("my-chan", &sys).unwrap();

// The '*' All topic. Endow our subscriber test actor with this.
// On Subscriber's pre_start it will subscribe to all topics on this channel.
let topic = Topic::from("*");
let sub = sys
.actor_of_args::<Subscriber, _>("sub-actor", (chan.clone(), topic))
.unwrap();

let (probe, mut listen) = probe();
sub.tell(TestProbe(probe), None);

// wait for the probe to arrive at the actor before publishing message
#[cfg(feature = "tokio_executor")]
listen.recv().await;
#[cfg(not(feature = "tokio_executor"))]
listen.recv();

// Publish a test message to topic "topic-1"
chan.tell(
Publish {
Expand All @@ -134,7 +134,7 @@ test_fn!{
},
None,
);

// Publish a test message to topic "topic-2"
chan.tell(
Publish {
Expand All @@ -143,7 +143,7 @@ test_fn!{
},
None,
);

// Publish a test message to topic "topic-3"
chan.tell(
Publish {
Expand All @@ -152,7 +152,7 @@ test_fn!{
},
None,
);

// Expecting three probe events
p_assert_eq!(listen, ());
p_assert_eq!(listen, ());
Expand Down Expand Up @@ -263,32 +263,32 @@ impl Receive<SystemEvent> for EventSubscriber {
}
}

test_fn!{
test_fn! {
fn channel_system_events() {
let sys = ActorSystem::new().unwrap();

let actor = sys.actor_of::<EventSubscriber>("event-sub").unwrap();

let (probe, mut listen) = probe();
actor.tell(TestProbe(probe), None);

// wait for the probe to arrive at the actor before attempting
// create, restart and stop
#[cfg(feature = "tokio_executor")]
listen.recv().await;
#[cfg(not(feature = "tokio_executor"))]
listen.recv();

// Create an actor
let dumb = sys.actor_of::<DumbActor>("dumb-actor").unwrap();
// ActorCreated event was received
p_assert_eq!(listen, ());

// Force restart of actor
dumb.tell(Panic, None);
// ActorRestarted event was received
p_assert_eq!(listen, ());

// Terminate actor
sys.stop(&dumb);
// ActorTerminated event was receive
Expand Down Expand Up @@ -340,27 +340,27 @@ impl Receive<DeadLetter> for DeadLetterSub {
}
}

test_fn!{
test_fn! {
fn channel_dead_letters() {
let sys = ActorSystem::new().unwrap();
let actor = sys.actor_of::<DeadLetterSub>("dl-subscriber").unwrap();

let (probe, mut listen) = probe();
actor.tell(TestProbe(probe), None);

// wait for the probe to arrive at the actor before attempting to stop the actor
#[cfg(feature = "tokio_executor")]
listen.recv().await;
#[cfg(not(feature = "tokio_executor"))]
listen.recv();

let dumb = sys.actor_of::<DumbActor>("dumb-actor").unwrap();

// immediately stop the actor and attempt to send a message
sys.stop(&dumb);
std::thread::sleep(std::time::Duration::from_secs(1));
dumb.tell(SomeMessage, None);

p_assert_eq!(listen, ());
}
}
6 changes: 3 additions & 3 deletions tests/logger.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use futures::executor::block_on;

use riker::actors::*;
use slog::{o, Fuse, Logger};
use riker_testkit::test_fn;
use slog::{o, Fuse, Logger};

mod common {
use std::{fmt, result};
Expand Down Expand Up @@ -43,7 +43,7 @@ mod common {
}
}

test_fn!{
test_fn! {
fn system_create_with_slog() {
let log = Logger::root(
Fuse(common::PrintlnDrain),
Expand All @@ -55,7 +55,7 @@ test_fn!{
}

// a test that logging without slog using "log" crate works
test_fn!{
test_fn! {
fn logging_stdlog() {
log::info!("before the system");
let _sys = ActorSystem::new().unwrap();
Expand Down
Loading

0 comments on commit dbef831

Please sign in to comment.