Skip to content

Commit

Permalink
Merge pull request #3 from farm-ng/devel
Browse files Browse the repository at this point in the history
0.2.2
  • Loading branch information
strasdat authored Jan 14, 2024
2 parents a432df3 + e3fdd4b commit 2b4e1e9
Show file tree
Hide file tree
Showing 18 changed files with 232 additions and 186 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "README.md"
repository = "https://github.com/farm-ng/hollywood/"
version = "0.2.1"
version = "0.2.2"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand Down
15 changes: 10 additions & 5 deletions examples/moving_average.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,37 @@
use hollywood::actors::printer::PrinterProp;
use hollywood::actors::{Periodic, Printer};
use hollywood::compute::Context;
use hollywood::core::ActorFacade;
use hollywood::core::{FromPropState, NullState};

use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp};
use hollywood::examples::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState};

///
pub async fn run_moving_average_example() {
let pipeline = Context::configure(&mut |context| {
let mut timer = Periodic::new_with_period(context, 1.0);
let mut moving_average = MovingAverage::new_default_init_state(
let mut moving_average = MovingAverage::from_prop_and_state(
context,
MovingAverageProp {
alpha: 0.3,
..Default::default()
},
MovingAverageState {
moving_average: 0.0,
},
);
let mut time_printer = Printer::<f64>::new_default_init_state(
let mut time_printer = Printer::<f64>::from_prop_and_state(
context,
PrinterProp {
topic: "time".to_string(),
},
NullState {},
);
let mut average_printer = Printer::<f64>::new_default_init_state(
let mut average_printer = Printer::<f64>::from_prop_and_state(
context,
PrinterProp {
topic: "average".to_string(),
},
NullState {},
);
timer
.outbound
Expand Down
15 changes: 10 additions & 5 deletions examples/one_dim_robot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ use hollywood::actors::Periodic;
use hollywood::actors::Printer;
use hollywood::compute::Context;
use hollywood::core::*;
use hollywood::examples::one_dim_robot::draw::DrawState;
use hollywood::examples::one_dim_robot::filter::FilterState;
use hollywood::examples::one_dim_robot::{
DrawActor, Filter, NamedFilterState, Robot, Sim, SimState, Stamped,
};

async fn run_robot_example() {
let pipeline = Context::configure(&mut |context| {
let mut timer = Periodic::new_with_period(context, 0.25);
let mut sim = Sim::new_with_state(
let mut sim = Sim::from_prop_and_state(
context,
NullProp {},
SimState {
Expand All @@ -22,21 +24,24 @@ async fn run_robot_example() {
},
},
);
let mut filter = Filter::new_default_init_state(context, NullProp {});
let mut filter_state_printer = Printer::<NamedFilterState>::new_default_init_state(
let mut filter = Filter::from_prop_and_state(context, NullProp {}, FilterState::default());
let mut filter_state_printer = Printer::<NamedFilterState>::from_prop_and_state(
context,
PrinterProp {
topic: "filter state".to_owned(),
},
NullState::default(),
);
let mut truth_printer = Printer::<Stamped<Robot>>::new_default_init_state(
let mut truth_printer = Printer::<Stamped<Robot>>::from_prop_and_state(
context,
PrinterProp {
topic: "truth".to_owned(),
},
NullState::default(),
);

let mut draw_actor = DrawActor::new_default_init_state(context, NullProp {});
let mut draw_actor =
DrawActor::from_prop_and_state(context, NullProp {}, DrawState::default());

timer
.outbound
Expand Down
36 changes: 36 additions & 0 deletions examples/print_ticks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use hollywood::actors::printer::PrinterProp;
use hollywood::actors::{Periodic, Printer};
use hollywood::compute::Context;
use hollywood::core::*;

///
pub async fn run_tick_print_example() {
let pipeline = Context::configure(&mut |context| {
let mut timer = Periodic::new_with_period(context, 1.0);
let mut time_printer = Printer::<f64>::from_prop_and_state(
context,
PrinterProp {
topic: "time".to_string(),
},
NullState::default(),
);
timer
.outbound
.time_stamp
.connect(context, &mut time_printer.inbound.printable);

});

pipeline.print_flow_graph();
pipeline.run().await;
}

fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
run_tick_print_example().await;
})
}
2 changes: 1 addition & 1 deletion hollywood_macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ license = "Apache-2.0"
keywords = ["actor", "compute", "graph", "pipeline"]
readme = "../README.md"
repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros"
version = "0.2.1"
version = "0.2.2"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[lib]
Expand Down
2 changes: 1 addition & 1 deletion hollywood_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
#( #attrs )*
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out>;

impl ActorFacade<#prop, #inbound, #state_type, #out, #message_type, #runner_type>
impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #runner_type>
for #actor_name
{
fn name_hint(prop: &#prop) -> String {
Expand Down
42 changes: 14 additions & 28 deletions src/actors/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,23 @@ use async_trait::async_trait;

use crate::compute::context::Context;
use crate::core::{
actor::{ActorFacade, ActorNode, DormantActorNode, GenericActor},
actor::{FromPropState, ActorNode, DormantActorNode, GenericActor},
inbound::{ForwardMessage, NullInbound, NullMessage},
outbound::{ConnectionEnum, Morph, OutboundChannel, OutboundHub},
runner::Runner,
value::Value,
};
use crate::macros::*;


/// Outbound hub of periodic actor, which consists of a single outbound channel.
#[actor_outputs]
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}


/// A periodic actor.
///
Expand All @@ -20,7 +31,7 @@ pub type Periodic =
impl Periodic {
/// Create a new periodic actor, with a period of `period` seconds.
pub fn new_with_period(context: &mut Context, period: f64) -> Periodic {
Periodic::new_with_state(
Periodic::from_prop_and_state(
context,
PeriodicProp {
period,
Expand All @@ -35,7 +46,7 @@ impl Periodic {
}

impl
ActorFacade<
FromPropState<
PeriodicProp,
NullInbound,
PeriodicState,
Expand Down Expand Up @@ -85,32 +96,7 @@ impl Default for PeriodicState {

impl Value for PeriodicState {}

/// Outbound hub of periodic actor, which consists of a single outbound channel.
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}

impl Morph for PeriodicOutbound {
fn extract(&mut self) -> Self {
Self {
time_stamp: self.time_stamp.extract(),
}
}

fn activate(&mut self) {
self.time_stamp.activate();
}
}

impl OutboundHub for PeriodicOutbound {
fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
Self {
time_stamp: OutboundChannel::<f64>::new(context, "time_stamp".to_owned(), actor_name),
}
}
}

/// The custom runner for the periodic actor.
pub struct PeriodicRunner {}
Expand Down
6 changes: 3 additions & 3 deletions src/actors/printer.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::fmt::{Debug, Display};

use crate::core::{
Actor, ActorBuilder, DefaultRunner, ActorFacade, InboundChannel, InboundHub, InboundMessage,
Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage,
InboundMessageNew, NullOutbound, NullState, OnMessage, Value,
};

Expand Down Expand Up @@ -51,7 +51,7 @@ impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessageNew<T>
pub type Printer<T> = Actor<PrinterProp, PrinterInbound<T>, NullState, NullOutbound>;

impl<T: Clone + Sync + Send + 'static + Debug + Display + Default>
ActorFacade<
FromPropState<
PrinterProp,
PrinterInbound<T>,
NullState,
Expand Down Expand Up @@ -95,7 +95,7 @@ impl<T: Clone + Debug + Display + Default + Sync + Send + 'static>
) -> Self {
let m = InboundChannel::new(
builder.context,
actor_name.clone(),
actor_name,
&builder.sender,
PrinterInboundMessage::Printable(T::default()).inbound_channel(),
);
Expand Down
27 changes: 15 additions & 12 deletions src/compute/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ use crate::core::{
pub struct Context {
pub(crate) actors: Vec<Box<dyn DormantActorNode + Send>>,
pub(crate) topology: Topology,
pub(crate) cancel_request_request_inbound: InboundChannel<bool, CancelRequest>,
pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender<CancelRequest>,
pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver<CancelRequest>,
}

impl Context {
const CONTEXT_NAME: &str = "CONTEXT";

/// Create a new context.
///
/// This is the main entry point to configure the compute graph. The network topology is defined
Expand All @@ -33,27 +31,32 @@ impl Context {
///
/// Upon receiving a cancel request the registered outbound channel, the execution of the
/// pipeline will be stopped.
pub fn get_cancel_request_sender(&mut self) -> tokio::sync::mpsc::Sender<CancelRequest> {
self.cancel_request_sender_template.clone()
}

/// Registers an outbound channel for cancel request.
///
/// Upon receiving a cancel request the registered outbound channel, the execution of the
/// pipeline will be stopped.
pub fn register_cancel_requester(&mut self, outbound: &mut OutboundChannel<()>) {
outbound
.connection_register
.push(Arc::new(OutboundConnection {
sender: self.cancel_request_request_inbound.sender.clone(),
inbound_channel: self.cancel_request_request_inbound.name.clone(),
sender: self.cancel_request_sender_template.clone(),
inbound_channel: "CANCEL".to_string(),
phantom: PhantomData {},
}));
}


fn new() -> Self {
let (exit_request_sender, cancel_request_receiver) = tokio::sync::mpsc::channel(1);
let (cancel_request_sender_template, cancel_request_receiver) =
tokio::sync::mpsc::channel(1);
Self {
actors: vec![],
topology: Topology::new(),
cancel_request_request_inbound: InboundChannel::<bool, CancelRequest> {
name: CancelRequest::CANCEL_REQUEST_INBOUND_CHANNEL .to_owned(),
actor_name: Self::CONTEXT_NAME.to_owned(),
sender: exit_request_sender,
phantom: std::marker::PhantomData {},
},
cancel_request_sender_template,
cancel_request_receiver,
}
}
Expand Down
27 changes: 20 additions & 7 deletions src/compute/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub enum CancelRequest {
impl CancelRequest {
/// Unique name for cancel request inbound channel. This special inbound channel is not
/// associated with any actor but with the pipeline itself.
pub const CANCEL_REQUEST_INBOUND_CHANNEL: &str = "CANCEL";
pub const CANCEL_REQUEST_INBOUND_CHANNEL: &'static str = "CANCEL";
}

impl InboundMessage for CancelRequest {
Expand All @@ -41,6 +41,8 @@ impl InboundMessageNew<()> for CancelRequest {
pub struct Pipeline {
actors: Vec<Box<dyn ActorNode + Send>>,
topology: Topology,
/// We have this here to keep receiver alive
pub cancel_request_sender_template: Option<tokio::sync::mpsc::Sender<CancelRequest>>,
cancel_request_receiver: Option<tokio::sync::mpsc::Receiver<CancelRequest>>,
}

Expand All @@ -53,6 +55,7 @@ impl Pipeline {
let compute_graph = Pipeline {
actors: active,
topology: context.topology,
cancel_request_sender_template: Some(context.cancel_request_sender_template),
cancel_request_receiver: Some(context.cancel_request_receiver),
};
compute_graph.topology.analyze_graph_topology();
Expand Down Expand Up @@ -88,12 +91,17 @@ impl Pipeline {
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();

let h_exit = tokio::spawn(async move {
let msg = cancel_request_receiver.recv().await.unwrap();
match msg {
CancelRequest::Cancel(_) => {
match cancel_request_receiver.recv().await {
Some(msg) => {
println!("Cancel requested");

let _ = exit_tx.send(cancel_request_receiver);
match msg {
CancelRequest::Cancel(_) => {
let _ = exit_tx.send(cancel_request_receiver);
}
}
}
None => {
println!("Cancel request channel closed");
}
}
});
Expand All @@ -110,7 +118,12 @@ impl Pipeline {

handles.push(h);
}
h_exit.await.unwrap();
match h_exit.await {
Ok(_) => {}
Err(err) => {
println!("Error in cancel request handler: {}", err);
}
}
kill_sender.send(()).unwrap();
for h in handles {
h.await.unwrap();
Expand Down
Loading

0 comments on commit 2b4e1e9

Please sign in to comment.