Skip to content

Commit

Permalink
feat: request reply & refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
strasdat committed Jan 14, 2024
1 parent 2b4e1e9 commit bf28b7e
Show file tree
Hide file tree
Showing 29 changed files with 925 additions and 435 deletions.
21 changes: 8 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,18 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "README.md"
repository = "https://github.com/farm-ng/hollywood/"
version = "0.2.2"
version = "0.3.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
# executor feature needed
tokio = { version = "1.28.0", features = ["full"] }
tokio-stream = "0.1.14"
enum-map = "3.0.0-0.gat.0"
async-trait = "0.1.51"
rand = "0.8.4"
drawille = "0.3.0"
grid = "0.13.0"
hollywood_macros = { version = "0.3.0", path = "hollywood_macros" }
petgraph = "0.6.3"
uuid = { version = "1.3.3", features = ["v4"] }
strum_macros = "0.25"
strum = { version = "0.25", features = ["derive"] }
hollywood_macros = { version = "0.2.1", path = "hollywood_macros" }
rand = "0.8.4"
rand_distr = "0.4.3"
nalgebra = "0.32.2"
grid = "0.11.0"
drawille = "0.3.0"
# executor feature needed
tokio = { version = "1.28.0", features = ["full"] }
tokio-stream = "0.1.14"
3 changes: 3 additions & 0 deletions examples/one_dim_robot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ async fn run_robot_example() {
sim.outbound
.true_robot
.connect(context, &mut truth_printer.inbound.printable);


sim.request.ask_for_time.connect(context, &mut filter.inbound.request);
context.register_cancel_requester(&mut sim.outbound.cancel_request);

filter
Expand Down
7 changes: 4 additions & 3 deletions hollywood_macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "../README.md"
repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros"
version = "0.2.2"
version = "0.3.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
proc-macro = true

[dependencies]
syn = { version = "2.0.18", features = ["full"] }
quote = "1.0.9"
convert_case = "0.6.0"
quote = "1.0.9"
syn = { version = "2.0.18", features = ["full"] }

24 changes: 17 additions & 7 deletions hollywood_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
let prop_type = &args.prop_type;
let state_type = &args.state_type;
let output_type = &args.output_type;
let request_type = &args.request_type;

let inbound = fields.iter().map(|variant| {
let variant_name = variant.ident.clone();
Expand Down Expand Up @@ -197,6 +198,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
type Prop = #prop_type;
type State = #state_type;
type OutboundHub = #output_type;
type RequestHub = #request_type;

fn inbound_channel(&self) -> String {
match self {
Expand All @@ -205,9 +207,9 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
}
}

impl InboundHub<#prop_type, #state_type, #output_type, #name> for #struct_name {
impl InboundHub<#prop_type, #state_type, #output_type, #request_type,#name> for #struct_name {

fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type, #name>,
fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type,#request_type, #name>,
actor_name: &str) -> Self {
#(#from_builder_inbounds)*

Expand Down Expand Up @@ -258,6 +260,8 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
let mut maybe_inbounds = None;
let mut maybe_state = None;
let mut maybe_outputs = None;
let mut maybe_requests = None;

if let Item::Type(item_type) = inbound_clone {
if let Type::Path(type_path) = *item_type.ty {
if type_path.path.segments.last().unwrap().ident != "Actor" {
Expand All @@ -267,10 +271,10 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
}
for segment in type_path.path.segments {
if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments {
if angle_bracketed_args.args.len() != 4 {
if angle_bracketed_args.args.len() != 5 {
return Error::new_spanned(
&angle_bracketed_args,
"Expected three type arguments: Actor<PROP, INBOUNDS, STATE, OUTBOUNDS>",
"Expected 5 type arguments: Actor<PROP, INBOUNDS, STATE, OUTBOUNDS, REQUESTS>",
)
.to_compile_error()
.into();
Expand All @@ -279,6 +283,7 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
maybe_inbounds = Some(angle_bracketed_args.args[1].clone());
maybe_state = Some(angle_bracketed_args.args[2].clone());
maybe_outputs = Some(angle_bracketed_args.args[3].clone());
maybe_requests = Some(angle_bracketed_args.args[4].clone());
}
}
} else {
Expand All @@ -294,16 +299,17 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
let inbound = maybe_inbounds.unwrap();
let state_type = maybe_state.unwrap();
let out = maybe_outputs.unwrap();
let requests = maybe_requests.unwrap();

let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out> };
let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out, #requests> };

let gen = quote! {

///
#( #attrs )*
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out>;
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>;

impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #runner_type>
impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type>
for #actor_name
{
fn name_hint(prop: &#prop) -> String {
Expand All @@ -320,6 +326,7 @@ struct ActorInbound {
prop_type: Ident,
state_type: Ident,
output_type: Ident,
request_type: Ident,
}

impl Parse for ActorInbound {
Expand All @@ -333,11 +340,14 @@ impl Parse for ActorInbound {
let state_type: Ident = content.parse()?;
let _: Token![,] = content.parse()?;
let output_type: Ident = content.parse()?;
let _: Token![,] = content.parse()?;
let request_type: Ident = content.parse()?;
Ok(ActorInbound {
struct_name,
prop_type,
state_type,
output_type,
request_type
})
}
}
File renamed without changes.
108 changes: 56 additions & 52 deletions src/actors/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,27 @@ use std::sync::Arc;
use async_trait::async_trait;

use crate::compute::context::Context;
use crate::core::connection::ConnectionEnum;

use crate::core::request::NullRequest;
use crate::core::{
actor::{FromPropState, ActorNode, DormantActorNode, GenericActor},
actor::{ActorNode, FromPropState, GenericActor},
inbound::{ForwardMessage, NullInbound, NullMessage},
outbound::{ConnectionEnum, Morph, OutboundChannel, OutboundHub},
outbound::{Morph, OutboundChannel, OutboundHub},
runner::Runner,
value::Value,
};
use crate::macros::*;


/// Outbound hub of periodic actor, which consists of a single outbound channel.
#[actor_outputs]
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}


/// A periodic actor.
///
/// This is an actor that periodically sends a message to its outbound.
pub type Periodic =
GenericActor<PeriodicProp, NullInbound, PeriodicState, PeriodicOutbound, PeriodicRunner>;
pub type Periodic = GenericActor<
PeriodicProp,
NullInbound,
PeriodicState,
PeriodicOutbound,
NullRequest,
PeriodicRunner,
>;

impl Periodic {
/// Create a new periodic actor, with a period of `period` seconds.
Expand All @@ -51,7 +48,8 @@ impl
NullInbound,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
NullRequest,
PeriodicRunner,
> for Periodic
{
Expand All @@ -76,8 +74,6 @@ impl Default for PeriodicProp {
}
}

impl Value for PeriodicProp {}

/// State of the periodic actor.
#[derive(Clone, Debug)]
pub struct PeriodicState {
Expand All @@ -94,9 +90,32 @@ impl Default for PeriodicState {
}
}

impl Value for PeriodicState {}
/// Outbound hub of periodic actor, which consists of a single outbound channel.
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}

impl Morph for PeriodicOutbound {
fn extract(&mut self) -> Self {
Self {
time_stamp: self.time_stamp.extract(),
}
}

fn activate(&mut self) {
self.time_stamp.activate();
}
}

impl OutboundHub for PeriodicOutbound {
fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
Self {
time_stamp: OutboundChannel::<f64>::new(context, "time_stamp".to_owned(), actor_name),
}
}
}

/// The custom runner for the periodic actor.
pub struct PeriodicRunner {}
Expand All @@ -107,16 +126,17 @@ impl
NullInbound,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullRequest,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
> for PeriodicRunner
{
/// Create a new dormant actor.
fn new_dormant_actor(
/// Create a new actor node.
fn new_actor_node(
name: String,
prop: PeriodicProp,
state: PeriodicState,
_receiver: tokio::sync::mpsc::Receiver<
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
>,
_forward: std::collections::HashMap<
String,
Expand All @@ -125,54 +145,36 @@ impl
PeriodicProp,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullRequest,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
> + Send
+ Sync,
>,
>,
outbound: PeriodicOutbound,
) -> Box<dyn DormantActorNode + Send + Sync> {
Box::new(DormantPeriodic {
_request: NullRequest,
) -> Box<dyn ActorNode + Send + Sync> {
Box::new(PeriodicActor {
name: name.clone(),
prop,
init_state: state.clone(),
outbound,
})
}
}

/// The dormant periodic actor.
pub struct DormantPeriodic {
name: String,
prop: PeriodicProp,
init_state: PeriodicState,
outbound: PeriodicOutbound,
}

impl DormantActorNode for DormantPeriodic {
fn activate(mut self: Box<Self>) -> Box<dyn ActorNode + Send> {
self.outbound.activate();
Box::new(ActivePeriodic {
name: self.name.clone(),
prop: self.prop.clone(),
init_state: self.init_state.clone(),
state: None,
outbound: Arc::new(self.outbound),
outbound: Some(outbound),
})
}
}

/// The active periodic actor.
pub struct ActivePeriodic {
pub struct PeriodicActor {
name: String,
prop: PeriodicProp,
init_state: PeriodicState,
state: Option<PeriodicState>,
outbound: Arc<PeriodicOutbound>,
outbound: Option<PeriodicOutbound>,
}

#[async_trait]
impl ActorNode for ActivePeriodic {
impl ActorNode for PeriodicActor {
fn name(&self) -> &String {
&self.name
}
Expand All @@ -182,6 +184,8 @@ impl ActorNode for ActivePeriodic {
}

async fn run(&mut self, mut kill: tokio::sync::broadcast::Receiver<()>) {
let mut outbound = self.outbound.take().unwrap();
outbound.activate();
self.reset();

let state = self.state.as_mut().unwrap();
Expand All @@ -190,7 +194,7 @@ impl ActorNode for ActivePeriodic {
(1000.0 * self.prop.period) as u64,
));

let conns = Arc::new(self.outbound.clone());
let conns = Arc::new(outbound);

loop {
interval.tick().await;
Expand Down
Loading

0 comments on commit bf28b7e

Please sign in to comment.