diff --git a/Cargo.toml b/Cargo.toml index bb6dca7..bda625c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" keywords = ["actor", "compute", "graph", "pipeline"] readme = "README.md" repository = "https://github.com/farm-ng/hollywood/" -version = "0.2.1" +version = "0.2.2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/examples/moving_average.rs b/examples/moving_average.rs index c730a88..903e54f 100644 --- a/examples/moving_average.rs +++ b/examples/moving_average.rs @@ -1,32 +1,37 @@ use hollywood::actors::printer::PrinterProp; use hollywood::actors::{Periodic, Printer}; use hollywood::compute::Context; -use hollywood::core::ActorFacade; +use hollywood::core::{FromPropState, NullState}; -use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp}; +use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState}; /// pub async fn run_moving_average_example() { let pipeline = Context::configure(&mut |context| { let mut timer = Periodic::new_with_period(context, 1.0); - let mut moving_average = MovingAverage::new_default_init_state( + let mut moving_average = MovingAverage::from_prop_and_state( context, MovingAverageProp { alpha: 0.3, ..Default::default() }, + MovingAverageState { + moving_average: 0.0, + }, ); - let mut time_printer = Printer::::new_default_init_state( + let mut time_printer = Printer::::from_prop_and_state( context, PrinterProp { topic: "time".to_string(), }, + NullState {}, ); - let mut average_printer = Printer::::new_default_init_state( + let mut average_printer = Printer::::from_prop_and_state( context, PrinterProp { topic: "average".to_string(), }, + NullState {}, ); timer .outbound diff --git a/examples/one_dim_robot.rs b/examples/one_dim_robot.rs index 6e899d6..cc86f77 100644 --- a/examples/one_dim_robot.rs +++ b/examples/one_dim_robot.rs @@ -3,6 +3,8 @@ use hollywood::actors::Periodic; use hollywood::actors::Printer; use hollywood::compute::Context; use hollywood::core::*; +use hollywood::examples::one_dim_robot::draw::DrawState; +use hollywood::examples::one_dim_robot::filter::FilterState; use hollywood::examples::one_dim_robot::{ DrawActor, Filter, NamedFilterState, Robot, Sim, SimState, Stamped, }; @@ -10,7 +12,7 @@ use hollywood::examples::one_dim_robot::{ async fn run_robot_example() { let pipeline = Context::configure(&mut |context| { let mut timer = Periodic::new_with_period(context, 0.25); - let mut sim = Sim::new_with_state( + let mut sim = Sim::from_prop_and_state( context, NullProp {}, SimState { @@ -22,21 +24,24 @@ async fn run_robot_example() { }, }, ); - let mut filter = Filter::new_default_init_state(context, NullProp {}); - let mut filter_state_printer = Printer::::new_default_init_state( + let mut filter = Filter::from_prop_and_state(context, NullProp {}, FilterState::default()); + let mut filter_state_printer = Printer::::from_prop_and_state( context, PrinterProp { topic: "filter state".to_owned(), }, + NullState::default(), ); - let mut truth_printer = Printer::>::new_default_init_state( + let mut truth_printer = Printer::>::from_prop_and_state( context, PrinterProp { topic: "truth".to_owned(), }, + NullState::default(), ); - let mut draw_actor = DrawActor::new_default_init_state(context, NullProp {}); + let mut draw_actor = + DrawActor::from_prop_and_state(context, NullProp {}, DrawState::default()); timer .outbound diff --git a/examples/print_ticks.rs b/examples/print_ticks.rs new file mode 100644 index 0000000..02f2164 --- /dev/null +++ b/examples/print_ticks.rs @@ -0,0 +1,36 @@ +use hollywood::actors::printer::PrinterProp; +use hollywood::actors::{Periodic, Printer}; +use hollywood::compute::Context; +use hollywood::core::*; + +/// +pub async fn run_tick_print_example() { + let pipeline = Context::configure(&mut |context| { + let mut timer = Periodic::new_with_period(context, 1.0); + let mut time_printer = Printer::::from_prop_and_state( + context, + PrinterProp { + topic: "time".to_string(), + }, + NullState::default(), + ); + timer + .outbound + .time_stamp + .connect(context, &mut time_printer.inbound.printable); + + }); + + pipeline.print_flow_graph(); + pipeline.run().await; +} + +fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + run_tick_print_example().await; + }) +} diff --git a/hollywood_macros/Cargo.toml b/hollywood_macros/Cargo.toml index 430a76b..2ad8b2c 100644 --- a/hollywood_macros/Cargo.toml +++ b/hollywood_macros/Cargo.toml @@ -11,7 +11,7 @@ license = "Apache-2.0" keywords = ["actor", "compute", "graph", "pipeline"] readme = "../README.md" repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros" -version = "0.2.1" +version = "0.2.2" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] diff --git a/hollywood_macros/src/lib.rs b/hollywood_macros/src/lib.rs index 7a4e1cb..059f091 100644 --- a/hollywood_macros/src/lib.rs +++ b/hollywood_macros/src/lib.rs @@ -303,7 +303,7 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream { #( #attrs )* pub type #actor_name = Actor<#prop, #inbound, #state_type, #out>; - impl ActorFacade<#prop, #inbound, #state_type, #out, #message_type, #runner_type> + impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #runner_type> for #actor_name { fn name_hint(prop: &#prop) -> String { diff --git a/src/actors/periodic.rs b/src/actors/periodic.rs index 8a19d03..aa85cdd 100644 --- a/src/actors/periodic.rs +++ b/src/actors/periodic.rs @@ -4,12 +4,23 @@ use async_trait::async_trait; use crate::compute::context::Context; use crate::core::{ - actor::{ActorFacade, ActorNode, DormantActorNode, GenericActor}, + actor::{FromPropState, ActorNode, DormantActorNode, GenericActor}, inbound::{ForwardMessage, NullInbound, NullMessage}, outbound::{ConnectionEnum, Morph, OutboundChannel, OutboundHub}, runner::Runner, value::Value, }; +use crate::macros::*; + + +/// Outbound hub of periodic actor, which consists of a single outbound channel. +#[actor_outputs] +pub struct PeriodicOutbound { + /// Time stamp outbound channel, which sends a messages every `period` + /// seconds with the current time stamp. + pub time_stamp: OutboundChannel, +} + /// A periodic actor. /// @@ -20,7 +31,7 @@ pub type Periodic = impl Periodic { /// Create a new periodic actor, with a period of `period` seconds. pub fn new_with_period(context: &mut Context, period: f64) -> Periodic { - Periodic::new_with_state( + Periodic::from_prop_and_state( context, PeriodicProp { period, @@ -35,7 +46,7 @@ impl Periodic { } impl - ActorFacade< + FromPropState< PeriodicProp, NullInbound, PeriodicState, @@ -85,32 +96,7 @@ impl Default for PeriodicState { impl Value for PeriodicState {} -/// Outbound hub of periodic actor, which consists of a single outbound channel. -pub struct PeriodicOutbound { - /// Time stamp outbound channel, which sends a messages every `period` - /// seconds with the current time stamp. - pub time_stamp: OutboundChannel, -} - -impl Morph for PeriodicOutbound { - fn extract(&mut self) -> Self { - Self { - time_stamp: self.time_stamp.extract(), - } - } - - fn activate(&mut self) { - self.time_stamp.activate(); - } -} -impl OutboundHub for PeriodicOutbound { - fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { - Self { - time_stamp: OutboundChannel::::new(context, "time_stamp".to_owned(), actor_name), - } - } -} /// The custom runner for the periodic actor. pub struct PeriodicRunner {} diff --git a/src/actors/printer.rs b/src/actors/printer.rs index 392fe79..dfeb220 100644 --- a/src/actors/printer.rs +++ b/src/actors/printer.rs @@ -1,7 +1,7 @@ use std::fmt::{Debug, Display}; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, ActorFacade, InboundChannel, InboundHub, InboundMessage, + Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, InboundMessageNew, NullOutbound, NullState, OnMessage, Value, }; @@ -51,7 +51,7 @@ impl InboundMessageNew pub type Printer = Actor, NullState, NullOutbound>; impl - ActorFacade< + FromPropState< PrinterProp, PrinterInbound, NullState, @@ -95,7 +95,7 @@ impl ) -> Self { let m = InboundChannel::new( builder.context, - actor_name.clone(), + actor_name, &builder.sender, PrinterInboundMessage::Printable(T::default()).inbound_channel(), ); diff --git a/src/compute/context.rs b/src/compute/context.rs index ef9d5d5..7b28acf 100644 --- a/src/compute/context.rs +++ b/src/compute/context.rs @@ -12,13 +12,11 @@ use crate::core::{ pub struct Context { pub(crate) actors: Vec>, pub(crate) topology: Topology, - pub(crate) cancel_request_request_inbound: InboundChannel, + pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender, pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver, } impl Context { - const CONTEXT_NAME: &str = "CONTEXT"; - /// Create a new context. /// /// This is the main entry point to configure the compute graph. The network topology is defined @@ -33,27 +31,32 @@ impl Context { /// /// Upon receiving a cancel request the registered outbound channel, the execution of the /// pipeline will be stopped. + pub fn get_cancel_request_sender(&mut self) -> tokio::sync::mpsc::Sender { + self.cancel_request_sender_template.clone() + } + + /// Registers an outbound channel for cancel request. + /// + /// Upon receiving a cancel request the registered outbound channel, the execution of the + /// pipeline will be stopped. pub fn register_cancel_requester(&mut self, outbound: &mut OutboundChannel<()>) { outbound .connection_register .push(Arc::new(OutboundConnection { - sender: self.cancel_request_request_inbound.sender.clone(), - inbound_channel: self.cancel_request_request_inbound.name.clone(), + sender: self.cancel_request_sender_template.clone(), + inbound_channel: "CANCEL".to_string(), phantom: PhantomData {}, })); } + fn new() -> Self { - let (exit_request_sender, cancel_request_receiver) = tokio::sync::mpsc::channel(1); + let (cancel_request_sender_template, cancel_request_receiver) = + tokio::sync::mpsc::channel(1); Self { actors: vec![], topology: Topology::new(), - cancel_request_request_inbound: InboundChannel:: { - name: CancelRequest::CANCEL_REQUEST_INBOUND_CHANNEL .to_owned(), - actor_name: Self::CONTEXT_NAME.to_owned(), - sender: exit_request_sender, - phantom: std::marker::PhantomData {}, - }, + cancel_request_sender_template, cancel_request_receiver, } } diff --git a/src/compute/pipeline.rs b/src/compute/pipeline.rs index 084c8f7..a0c27b7 100644 --- a/src/compute/pipeline.rs +++ b/src/compute/pipeline.rs @@ -15,7 +15,7 @@ pub enum CancelRequest { impl CancelRequest { /// Unique name for cancel request inbound channel. This special inbound channel is not /// associated with any actor but with the pipeline itself. - pub const CANCEL_REQUEST_INBOUND_CHANNEL: &str = "CANCEL"; + pub const CANCEL_REQUEST_INBOUND_CHANNEL: &'static str = "CANCEL"; } impl InboundMessage for CancelRequest { @@ -41,6 +41,8 @@ impl InboundMessageNew<()> for CancelRequest { pub struct Pipeline { actors: Vec>, topology: Topology, + /// We have this here to keep receiver alive + pub cancel_request_sender_template: Option>, cancel_request_receiver: Option>, } @@ -53,6 +55,7 @@ impl Pipeline { let compute_graph = Pipeline { actors: active, topology: context.topology, + cancel_request_sender_template: Some(context.cancel_request_sender_template), cancel_request_receiver: Some(context.cancel_request_receiver), }; compute_graph.topology.analyze_graph_topology(); @@ -88,12 +91,17 @@ impl Pipeline { let (exit_tx, exit_rx) = tokio::sync::oneshot::channel(); let h_exit = tokio::spawn(async move { - let msg = cancel_request_receiver.recv().await.unwrap(); - match msg { - CancelRequest::Cancel(_) => { + match cancel_request_receiver.recv().await { + Some(msg) => { println!("Cancel requested"); - - let _ = exit_tx.send(cancel_request_receiver); + match msg { + CancelRequest::Cancel(_) => { + let _ = exit_tx.send(cancel_request_receiver); + } + } + } + None => { + println!("Cancel request channel closed"); } } }); @@ -110,7 +118,12 @@ impl Pipeline { handles.push(h); } - h_exit.await.unwrap(); + match h_exit.await { + Ok(_) => {} + Err(err) => { + println!("Error in cancel request handler: {}", err); + } + } kill_sender.send(()).unwrap(); for h in handles { h.await.unwrap(); diff --git a/src/core/actor.rs b/src/core/actor.rs index 957260c..98660e6 100644 --- a/src/core/actor.rs +++ b/src/core/actor.rs @@ -39,8 +39,8 @@ pub type Actor = GenericActor< DefaultRunner, >; -/// The actor facade trait is used to configure the actor's channel connections. -pub trait ActorFacade< +/// New actor from properties and state. +pub trait FromPropState< Prop, Inbound: InboundHub, State: Value, @@ -53,20 +53,10 @@ pub trait ActorFacade< /// generate a unique name. fn name_hint(prop: &Prop) -> String; - /// Produces a new actor with default state. - /// - /// Also, a dormant actor node is created added to the context. - fn new_default_init_state( - context: &mut Context, - prop: Prop, - ) -> GenericActor { - Self::new_with_state(context, prop, State::default()) - } - /// Produces a new actor with the given state. /// /// Also, a dormant actor node is created added to the context. - fn new_with_state( + fn from_prop_and_state( context: &mut Context, prop: Prop, initial_state: State, diff --git a/src/core/mod.rs b/src/core/mod.rs index a6c13ff..dd4c299 100644 --- a/src/core/mod.rs +++ b/src/core/mod.rs @@ -3,7 +3,7 @@ /// Actor pub mod actor; -pub use actor::{Actor, ActorFacade}; +pub use actor::{Actor, FromPropState}; pub(crate) use actor::{ActorNode, DormantActorNode}; /// Actor builder diff --git a/src/core/outbound.rs b/src/core/outbound.rs index 4d71b7b..237da39 100644 --- a/src/core/outbound.rs +++ b/src/core/outbound.rs @@ -12,7 +12,8 @@ pub trait OutboundHub: Send + Sync + 'static + Morph { /// An empty outbound hub - used for actors that do not have any outbound channels. #[derive(Debug, Clone)] -pub struct NullOutbound {} +pub struct NullOutbound { +} impl Morph for NullOutbound { fn extract(&mut self) -> Self { @@ -191,6 +192,8 @@ impl ConnectionEnu impl Morph for ConnectionEnum { fn extract(&mut self) -> Self { + + println!("ConnectionEnum::extract"); match self { Self::Config(config) => Self::Active(ActiveConnection { maybe_registers: None, @@ -203,6 +206,7 @@ impl Morph for ConnectionEnum { } fn activate(&mut self) { + println!("ConnectionEnum::activate"); match self { Self::Config(_) => { panic!("Cannot activate config connection"); diff --git a/src/core/value.rs b/src/core/value.rs index 5963a11..b31988c 100644 --- a/src/core/value.rs +++ b/src/core/value.rs @@ -1,5 +1,5 @@ /// Trait for actor state and props. -pub trait Value: Default + std::fmt::Debug + Send + Sync + Clone + 'static {} +pub trait Value: std::fmt::Debug + Send + Sync + Clone + 'static {} /// Empty state - for stateless actors. diff --git a/src/examples/moving_average/mod.rs b/src/examples/moving_average/mod.rs index 2c4de43..a2ee7b8 100644 --- a/src/examples/moving_average/mod.rs +++ b/src/examples/moving_average/mod.rs @@ -11,7 +11,7 @@ pub use crate::core::{ }; // needed for actor macro -pub use crate::core::{Actor, DefaultRunner, ActorFacade}; +pub use crate::core::{Actor, DefaultRunner, FromPropState}; /// Outbound hub for the MovingAverage. #[actor_outputs] diff --git a/src/examples/one_dim_robot/filter.rs b/src/examples/one_dim_robot/filter.rs index 6daef7d..f92f148 100644 --- a/src/examples/one_dim_robot/filter.rs +++ b/src/examples/one_dim_robot/filter.rs @@ -2,7 +2,7 @@ use std::fmt::{Debug, Display}; use crate::compute::Context; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, ActorFacade, InboundChannel, InboundHub, InboundMessage, + Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, Value, }; use crate::examples::one_dim_robot::{RangeMeasurementModel, Stamped}; diff --git a/src/examples/one_dim_robot/sim.rs b/src/examples/one_dim_robot/sim.rs index 5963155..9c35586 100644 --- a/src/examples/one_dim_robot/sim.rs +++ b/src/examples/one_dim_robot/sim.rs @@ -4,7 +4,7 @@ use rand_distr::{Distribution, Normal}; use crate::compute::Context; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, ActorFacade, InboundChannel, InboundHub, InboundMessage, + Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, Value, }; use crate::examples::one_dim_robot::{RangeMeasurementModel, Robot, Stamped}; diff --git a/src/lib.rs b/src/lib.rs index af435c8..2a2a64b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,14 +1,14 @@ #![deny(missing_docs)] //! # Hollywood -//! +//! //! Hollywood is an actor framework for Rust. -//! +//! //! Actors are stateful entities that communicate with each other by sending messages. An actor //! actor receives streams of messages through its inbound channels, processes them and sends //! messages to other actors through its outbound channels. Actors are either stateless so that //! its outbound streams are a pure function of its inbound streams, or have an internal state which //! is updated by incoming messages and may influences the content of its outbound messages. -//! +//! //! Actors are arranged in a compute pipeline (or a directed acyclic graph to be more precise). The //! first layer of the graph consists of a set of one or more source actors whose outbound streams //! are fed by an external resource. A typical example of a source actors, include sensor drivers, @@ -16,38 +16,38 @@ //! channels or which outbound channels are not connected. Terminal actors are often sink notes //! which feed data to an external resource. Example of a sink actor are robot manipulators, //! log file writer or visualization components. -//! +//! //! ## Module Overview -//! +//! //! The library is organized in the following modules: -//! +//! //! - The [macros] module contains the three macros that are used to define an actors with //! minimal boilerplate. -//! - The [core] module contains the core structs and traits of the library. [Actor](core::Actor) +//! - The [core] module contains the core structs and traits of the library. [Actor](core::Actor) //! is a generic struct that represents an actor. [InboundHub](core::InboundHub) is the trait //! which represents the collection of inbound channels of an actor. Similarly, //! [OutboundHub](core::OutboundHub) is the trait which represents the collection of outbound //! channels of an actor. -//! +//! //! Most importantly, [OnMessage](core::OnMessage) is the main entry point for user code and sets //! the behavior of a user-defines actor. [OnMessage::on_message()](core::OnMessage::on_message()) //! processes incoming messages, updates the actor's state and sends messages to downstream actors //! in the pipeline. -//! -//! - The [compute] module contains the [Context](compute::Context) and -//! [Pipeline](compute::Pipeline) which are used to configure a set of actors, connect +//! +//! - The [compute] module contains the [Context](compute::Context) and +//! [Pipeline](compute::Pipeline) which are used to configure a set of actors, connect //! them into a graph and to execute flow. -//! +//! //! ## Example: moving average -//! +//! //! We will show how to implement a moving average actor that computes the moving //! average of a stream of numbers. The complete code for this example can be found in the //! [examples/moving_average.rs]. -//! +//! //! ### Outbound hub -//! +//! //! First we define the outbound hub of the actor: -//! +//! //! ```ignore //! #[actor_outputs] //! pub struct MovingAverageOutbound { @@ -55,13 +55,13 @@ //! pub average: OutboundChannel, //! } //! ``` -//! +//! //! It contains of a single outbound channels, to send the calculated average to the outside world. -//! +//! //! ### Properties and state -//! +//! //! Next we define the immutable properties and the internal state of the actor: -//! +//! //! ```ignore //! /// Properties the MovingAverage actor. //! #[derive(Clone, Debug)] @@ -69,7 +69,7 @@ //! /// alpha value for the moving average //! pub alpha: f64, //! } -//! +//! //! impl Default for MovingAverageProp { //! fn default() -> Self { //! MovingAverageProp { @@ -77,46 +77,46 @@ //! } //! } //! } -//! +//! //! impl Value for MovingAverageProp {} -//! +//! //! /// State of the MovingAverage actor. //! #[derive(Clone, Debug, Default)] //! pub struct MovingAverageState { //! /// current moving average //! pub moving_average: f64, //! } -//! +//! //! impl Value for MovingAverageState {} //! ``` -//! -//! Properties can be understood as the configuration of the actor which are specified when the -//! actor is created. The state is the internal state of the actor that is updated when the actor +//! +//! Properties can be understood as the configuration of the actor which are specified when the +//! actor is created. The state is the internal state of the actor that is updated when the actor //! receives messages. Default implementations for the state define the initial state of the actor. //! Note that MovingAverageState implements [Default] trait through the derive macro, and hence //! moving_average is initialized to 0.0 which is the default value for f64. An explicit //! implementation of the [Default] trait can be used to set the values of member fields as done for //! the [examples::moving_average::MovingAverageProp] struct here. -//! +//! //! ### Inbound hub -//! +//! //! Next we define the inbound hub of the actor. The inbound MovingAverageMessage enum contains a -//! single variant that carries a float [actor_inputs](macros::actor_inputs) macro generates the +//! single variant that carries a float [actor_inputs](macros::actor_inputs) macro generates the //! inbound hub struct which contains a single inbound channel. -//! +//! //! ```ignore //! /// Inbound message for the MovingAverage actor. //! #[derive(Clone, Debug)] -//! #[actor_inputs(MovingAverageInbound, {MovingAverageProp, MovingAverageState, +//! #[actor_inputs(MovingAverageInbound, {MovingAverageProp, MovingAverageState, //! MovingAverageOutbound})] //! pub enum MovingAverageMessage { //! /// a float value //! Value(f64), //! } -//! +//! //! impl OnMessage for MovingAverageMessage { //! /// Process the inbound time-stamp message. -//! fn on_message(&self, prop: &Self::Prop, state: &mut Self::State, outbound: &Self::Outbound) +//! fn on_message(&self, prop: &Self::Prop, state: &mut Self::State, outbound: &Self::Outbound) //! { //! match &self { //! MovingAverageMessage::Value(new_value) => { @@ -130,21 +130,21 @@ //! } //! } //! } -//! +//! //! impl InboundMessageNew for MovingAverageMessage { //! fn new(_inbound_name: String, msg: f64) -> Self { //! MovingAverageMessage::Value(msg) //! } //! } //! ``` -//! +//! //! The moving average is calculated from the stream of values received on this channel. //! OnMessage trait implementation the actual business logic of the actor is implemented. -//! +//! //! ### The actor -//! +//! //! Finally we define the actor itself: -//! +//! //! ```ignore //! /// The MovingAverage actor. //! /// @@ -152,46 +152,50 @@ //! type MovingAverage = //! Actor; //! ``` -//! +//! //! ### Configure and execute the pipeline -//! +//! //! In order to make use of the actor we need to send messages to its inbound hub and process the //! messages received on its outbound hub. -//! -//! To do so, we will make use of two predefined actors. The [Periodic](actors::Periodic) actor is -//! and vector without any input channels. It produces a stream of timestamps at a given interval and +//! +//! To do so, we will make use of two predefined actors. The [Periodic](actors::Periodic) actor is +//! and vector without any input channels. It produces a stream of timestamps at a given interval and //! publishes them through its `time_stamp` outbound channel. -//! -//! The [Printer](actors::Printer) actor is a sink actor. It prints the messages it receives on its +//! +//! The [Printer](actors::Printer) actor is a sink actor. It prints the messages it receives on its //! only inbound channel to the console. -//! +//! //! Now we can provide the code snippets to configure and execute the compute pipeline: -//! +//! //! ```rust //! # use hollywood::actors::{Periodic, Printer, PrinterProp}; -//! # use hollywood::core::ActorFacade; +//! # use hollywood::core::FromPropState; +//! # use hollywood::core::value::NullState; //! # use hollywood::compute::Context; -//! # use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp}; +//! # use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState}; //! let pipeline = Context::configure(&mut |context| { //! let mut timer = Periodic::new_with_period(context, 1.0); -//! let mut moving_average = MovingAverage::new_default_init_state( +//! let mut moving_average = MovingAverage::from_prop_and_state( //! context, //! MovingAverageProp { //! alpha: 0.3, //! ..Default::default() //! }, +//! MovingAverageState::default(), //! ); -//! let mut time_printer = Printer::::new_default_init_state( +//! let mut time_printer = Printer::::from_prop_and_state( //! context, //! PrinterProp { //! topic: "time".to_string(), //! }, +//! NullState::default(), //! ); -//! let mut average_printer = Printer::::new_default_init_state( +//! let mut average_printer = Printer::::from_prop_and_state( //! context, //! PrinterProp { //! topic: "average".to_string(), //! }, +//! NullState::default(), //! ); //! timer //! .outbound @@ -200,7 +204,7 @@ //! timer //! .outbound //! .time_stamp -//! .connect(context, &mut time_printer.inbound.printable); +//! .connect(context, &mut time_printer.inbound.printable); //! moving_average //! .outbound //! .average @@ -209,18 +213,18 @@ //! pipeline.print_flow_graph(); //! pipeline.run(); //! ``` -//! -//! The [Pipeline::print_flow_graph()](compute::Pipeline::print_flow_graph()) method prints the +//! +//! The [Pipeline::print_flow_graph()](compute::Pipeline::print_flow_graph()) method prints the //! topology of the compute pipeline to the console. -//! +//! //! ``` text //! * Periodic_0 * //! | time_stamp | //! ⡏⠉⠑⠒⠢⠤⠤⣀⣀ //! ⡇ ⠉⠉⠑⠒⠢⠤⠤⣀⣀ //! ⠁ ⠁ -//! | Value | | Printable | -//! *MovingAverage_0 * *Printer(time)_0 * +//! | Value | | Printable | +//! *MovingAverage_0 * *Printer(time)_0 * //! | average | //! ⡇ //! ⡇ @@ -228,43 +232,43 @@ //! | Printable | //! *Printer(average)* //! ``` -//! -//! Let's interpret the print. The outbound channel of the Periodic actor is connected to the -//! inbound channel of the MovingAverage actor and the inbound channel of the `Printer(time)` actor. +//! +//! Let's interpret the print. The outbound channel of the Periodic actor is connected to the +//! inbound channel of the MovingAverage actor and the inbound channel of the `Printer(time)` actor. //! Furthermore, the outbound channel of the MovingAverage actor is connected to the inbound channel //! of the `Printer(average)` actor. -//! +//! //! ## Pipeline topology and channel connections -//! -//! The compute pipeline is a acyclic directed graph (DAG). Coarsely, we introduced the topology of -//! the graph by a set of actors and the connections between them. In a more detailed view, the +//! +//! The compute pipeline is a acyclic directed graph (DAG). Coarsely, we introduced the topology of +//! the graph by a set of actors and the connections between them. In a more detailed view, the //! graph consists of three types of nodes: a set of actors, a set of inbound channels and a set of //! outbound channels. Each inbound channel and outbound channel is associated with exactly one //! actor. Sometimes we refer to an actor and its associated inbound/outbound channels as //! a super node. Futhermore, there are three types of edges in the graph: a set of static edges //! which link the concrete inbound channels to its actor, a set of static edges which link an actor -//! to its outbound channels, as well as a set of dynamic edges which connect the outbound channel +//! to its outbound channels, as well as a set of dynamic edges which connect the outbound channel //! of one actor to a compatible inbound channel of another actor downstream. -//! -//! These channel connections are configured at runtime using +//! +//! These channel connections are configured at runtime using //! [OutboundChannel::connect()](core::OutboundChannel::connect) during the pipeline configuration -//! step. In the simple example above, we had a 1:2 and a 1:1 connection: The process_time_stamp -//! outbound channel of the Periodic actor was connected to two inbound channels. The average -//! outbound channel of the MovingAverage actor was connected to one inbound channel. In general, -//! channel connections are n:m. Each outbound channel can be connected to zero, one or more inbound -//! channels. Similarly, each inbound channel can be connected to zero, one or more outbound -//! channels. If an outbound channel is connected to multiple inbound channels, the messages are -//! broadcasted to all connected inbound channels. This is the main reason why +//! step. In the simple example above, we had a 1:2 and a 1:1 connection: The process_time_stamp +//! outbound channel of the Periodic actor was connected to two inbound channels. The average +//! outbound channel of the MovingAverage actor was connected to one inbound channel. In general, +//! channel connections are n:m. Each outbound channel can be connected to zero, one or more inbound +//! channels. Similarly, each inbound channel can be connected to zero, one or more outbound +//! channels. If an outbound channel is connected to multiple inbound channels, the messages are +//! broadcasted to all connected inbound channels. This is the main reason why //! [InboundMessage](core::InboundMessage) must be [Clone]. -//! +//! //! The types of connected outbound channels must match the type of the connected inbound channel. -//! An inbound channel is uniquely identified by a **variant** of the +//! An inbound channel is uniquely identified by a **variant** of the //! [InboundMessage](core::InboundMessage) enum. Messages received from connected outbound channels //! are merged into a single stream and processed in the corresponding match arm (for that //! **variant**) within the [OnMessage::on_message()](core::OnMessage::on_message()) method in a -//! uniform manner, regardless of the outbound channel (and actor) the message originated +//! uniform manner, regardless of the outbound channel (and actor) the message originated //! from. -//! +//! /// The core framework concepts such as actors, state, inbound, outbound and runners. pub mod core; @@ -281,20 +285,20 @@ pub mod actors; pub mod examples; /// Convenience macros for hollywood to define new actor types. -/// +/// /// In order to minimize potential of compile time errors, the macros are best implemented in the /// following order: -/// +/// /// 1. [actor_outputs](macros::actor_outputs) /// 2. [actor_inputs](macros::actor_inputs) which depends on 1. /// 3. [actor](macros::actor) which depends on 1. and 2. -/// +/// /// The documentation in this module is rather technical. For a more practical introduction, please /// refer to the examples in the root of the [crate](crate#example-moving-average). pub mod macros { - /// This macro generates the boilerplate for the outbound hub struct it is applied to. - /// + /// This macro generates the boilerplate for the outbound hub struct it is applied to. + /// /// Macro template: /// /// ``` text @@ -305,17 +309,17 @@ pub mod macros { /// ... /// } /// ``` - /// + /// /// Here, OUTBOUND is the user-specified name of the struct. The struct shall be defined right /// after the macro invocation. (Indeed, these types of macros are called "attribute macros". - /// They are applied to the item directly following them, in this case a struct.) The outbound - /// struct consists of a zero, one or more outbound channels. Each outbound channel has a + /// They are applied to the item directly following them, in this case a struct.) The outbound + /// struct consists of a zero, one or more outbound channels. Each outbound channel has a /// user-specified name CHANNEL* and a user specified type TYPE*. - /// - /// Effect: The macro generates the [OutboundHub](crate::core::OutboundHub) and + /// + /// Effect: The macro generates the [OutboundHub](crate::core::OutboundHub) and /// [Morph](crate::core::Morph) implementations for the provided struct OUTBOUND. /// - /// This is the first of three macros to define an actor. The other two are [macro@actor_inputs] + /// This is the first of three macros to define an actor. The other two are [macro@actor_inputs] /// and [macro@actor]. /// pub use hollywood_macros::actor_outputs; @@ -333,24 +337,24 @@ pub mod macros { /// ... /// } /// ``` - /// - /// INBOUND_MESSAGE is the user-specified name of an enum which shall be defined right below the - /// macro invocation. The enum shall consist of a zero, one or more message variants. Each + /// + /// INBOUND_MESSAGE is the user-specified name of an enum which shall be defined right below the + /// macro invocation. The enum shall consist of a zero, one or more message variants. Each /// variant has a user-specified name VARIENT* and type TYPE*. /// /// Prerequisite: - /// - The OUTBOUND struct is defined and implements [OutboundHub](crate::core::OutboundHub) + /// - The OUTBOUND struct is defined and implements [OutboundHub](crate::core::OutboundHub) /// and [Morph](crate::core::Morph), typically using the [macro@actor_outputs] macro. - /// - The PROP and STATE structs are defined and implement the [Value](crate::core::Value) + /// - The PROP and STATE structs are defined and implement the [Value](crate::core::Value) /// trait. /// /// Effects: /// - The macro defines the struct INBOUND that contains an inbound channel field for each - /// variant of the INBOUND_MESSAGE enum, and implements the + /// variant of the INBOUND_MESSAGE enum, and implements the /// [InboundHub](crate::core::InboundHub) trait for it. /// - Implements the [InboundMessage](crate::core::InboundMessage) trait for INBOUND_MESSAGE. /// - /// This is the second of three macros to define an actor. The other two are + /// This is the second of three macros to define an actor. The other two are /// [macro@actor_outputs] and [macro@actor]. pub use hollywood_macros::actor_inputs; @@ -362,25 +366,25 @@ pub mod macros { /// #[actor(INBOUND_MESSAGE)] /// type ACTOR = Actor; /// ``` - /// + /// /// Here, ACTOR is the user-specified name of the actor type. The actor type shall be defined /// right after the macro invocation as an alias of [Actor](crate::core::Actor). /// /// Prerequisites: - /// - The OUTBOUND struct is defined and implements (OutboundHub)[crate::core::OutboundHub] + /// - The OUTBOUND struct is defined and implements (OutboundHub)[crate::core::OutboundHub] /// and [Morph](crate::core::Morph), e.g. using the [actor_outputs] macro. - /// - The INBOUND_MESSAGE enum is defined and implements + /// - The INBOUND_MESSAGE enum is defined and implements /// [InboundMessage](crate::core::InboundMessage), as well as the INBOUND - /// struct is defined and implements the [InboundHub](crate::core::InboundHub) trait, e.g. + /// struct is defined and implements the [InboundHub](crate::core::InboundHub) trait, e.g. /// through the [macro@actor_inputs] macro. /// - The PROP and STATE structs are defined and implement the [Value](crate::core::Value) /// trait. /// /// Effect: - /// - This macro implements the [ActorFacade](crate::core::ActorFacade) trait for the ACTOR + /// - This macro implements the [FromPropState](crate::core::FromPropState) trait for the ACTOR /// type. /// - /// This is the last of the three macros that need to be used to define a new actor type. The + /// This is the last of the three macros that need to be used to define a new actor type. The /// first one is [macro@actor_outputs], the second one is [macro@actor_inputs]. pub use hollywood_macros::actor; }