Skip to content

Commit

Permalink
in-request / out-request / egui-actor refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
strasdat committed Apr 24, 2024
1 parent 089f4b1 commit 4459cba
Show file tree
Hide file tree
Showing 30 changed files with 1,571 additions and 603 deletions.
7 changes: 5 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ keywords = ["actor", "compute", "graph", "pipeline"]
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/farm-ng/hollywood/"
version = "0.5.1"
version = "0.6.0"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -23,7 +23,7 @@ drawille = "0.3"
eframe = {version = ">= 0.27, <1.0", features = ["wgpu"], optional = true}
env_logger = {version = "0.11", optional = true}
grid = "0.13"
hollywood_macros = {version = "0.5", path = "hollywood_macros"}
hollywood_macros = {version = "0.6.0", path = "hollywood_macros"}
# hollywood intends to use only very basic features of nalgebra, hence
# future versions of nalgebra before the major < 1.0 release are likely to work
nalgebra = ">= 0.32, <1.0"
Expand All @@ -37,3 +37,6 @@ tokio-stream = "0.1"
[features]
default = ["egui"]
egui = ["dep:eframe", "dep:env_logger"]

[profile.release]
panic = 'abort'
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# hollywood

Hollywood is an actor framework, with focus on representing actors with heterogeneous
Hollywood is an actor framework, with focus on representing actors with heterogeneous
inputs and outputs which are arranged in a non-cyclic compute graph/pipeline. The design
intend is simplicity and minimal boilerplate code.
200 changes: 165 additions & 35 deletions examples/egui.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,50 @@ use hollywood::prelude::*;
#[derive(Clone, Debug)]
#[actor_inputs(
ContentGeneratorInbound,
{NullProp, ContentGeneratorState, ContentGeneratorOutbound, NullRequest})]
{NullProp, ContentGeneratorState, ContentGeneratorOutbound, ContentGeneratorOutRequest,
ContentGeneratorInRequestMessage
})]
pub enum ContentGeneratorInboundMessage {
Tick(f64),
Reply(ReplyMessage<String>),
}

/// Request to reset the content generator.
#[derive(Debug)]
pub enum ContentGeneratorInRequestMessage {
/// Request
Reset(RequestWithReplyChannel<(), f64>),
}

impl IsInRequestMessage for ContentGeneratorInRequestMessage {
type Prop = NullProp;

type State = ContentGeneratorState;

type OutboundHub = ContentGeneratorOutbound;

type OutRequestHub = ContentGeneratorOutRequest;

fn in_request_channel(&self) -> String {
"reset".to_owned()
}
}

impl HasOnRequestMessage for ContentGeneratorInRequestMessage {
fn on_message(
self,
_prop: &Self::Prop,
state: &mut Self::State,
_outbound: &Self::OutboundHub,
_request: &Self::OutRequestHub,
) {
match self {
ContentGeneratorInRequestMessage::Reset(msg) => {
state.offset = -state.last_x;
msg.reply(|_| state.last_x);
}
}
}
}

impl HasOnMessage for ContentGeneratorInboundMessage {
Expand All @@ -21,13 +62,11 @@ impl HasOnMessage for ContentGeneratorInboundMessage {
_prop: &Self::Prop,
state: &mut Self::State,
outbound: &Self::OutboundHub,
_request: &Self::RequestHub,
request: &Self::OutRequestHub,
) {
match &self {
ContentGeneratorInboundMessage::Tick(new_value) => {
if state.reset_request.try_recv().is_ok() {
state.offset = -*new_value;
}
state.last_x = *new_value;

let x = *new_value + state.offset;

Expand All @@ -39,6 +78,13 @@ impl HasOnMessage for ContentGeneratorInboundMessage {
msg: PlotMessage::SinPlot((x, x.cos())),
};
outbound.plot_message.send(c);

if x > 2.0 && x < 2.1 {
request.example_request.send_request("foo:".to_owned());
}
}
ContentGeneratorInboundMessage::Reply(r) => {
println!("Reply received {}", r.reply);
}
}
}
Expand All @@ -50,27 +96,86 @@ impl IsInboundMessageNew<f64> for ContentGeneratorInboundMessage {
}
}

impl IsInboundMessageNew<ReplyMessage<String>> for ContentGeneratorInboundMessage {
fn new(_inbound_name: String, msg: ReplyMessage<String>) -> Self {
ContentGeneratorInboundMessage::Reply(msg)
}
}

impl IsInRequestMessageNew<RequestWithReplyChannel<(), f64>> for ContentGeneratorInRequestMessage {
fn new(_inbound_name: String, msg: RequestWithReplyChannel<(), f64>) -> Self {
ContentGeneratorInRequestMessage::Reset(msg)
}
}

#[derive(Debug)]
pub struct ContentGeneratorState {
pub reset_request: tokio::sync::broadcast::Receiver<()>,
pub last_x: f64,
pub offset: f64,
}

#[actor(ContentGeneratorInboundMessage)]
/// Out request channels for the content generator actor.
#[actor_out_requests]
pub struct ContentGeneratorOutRequest {
pub example_request: OutRequestChannel<String, String, ContentGeneratorInboundMessage>,
}

/// The content generator actor.
#[actor(ContentGeneratorInboundMessage, ContentGeneratorInRequestMessage)]
type ContentGenerator = Actor<
NullProp,
ContentGeneratorInbound,
ContentGeneratorInRequest,
ContentGeneratorState,
ContentGeneratorOutbound,
NullRequest,
ContentGeneratorOutRequest,
>;

/// OutboundChannel channels for the filter actor.
/// Outbound channels for the content generator actor.
#[actor_outputs]
pub struct ContentGeneratorOutbound {
pub plot_message: OutboundChannel<Stream<PlotMessage>>,
}

pub struct ContentGeneratorInRequest {
pub reset: InRequestChannel<RequestWithReplyChannel<(), f64>, ContentGeneratorInRequestMessage>,
}

impl
IsInRequestHub<
NullProp,
ContentGeneratorState,
ContentGeneratorOutbound,
ContentGeneratorOutRequest,
ContentGeneratorInboundMessage,
ContentGeneratorInRequestMessage,
> for ContentGeneratorInRequest
{
fn from_builder(
builder: &mut ActorBuilder<
NullProp,
ContentGeneratorState,
ContentGeneratorOutbound,
ContentGeneratorOutRequest,
ContentGeneratorInboundMessage,
ContentGeneratorInRequestMessage,
>,
actor_name: &str,
) -> Self {
let reset = InRequestChannel::new(
builder.context,
actor_name,
&builder.request_sender.clone(),
"reset".to_owned(),
);
builder
.forward_request
.insert(reset.name.clone(), Box::new(reset.clone()));

Self { reset }
}
}

#[derive(Clone, Debug)]
pub enum PlotMessage {
SinPlot((f64, f64)),
Expand All @@ -83,17 +188,23 @@ impl Default for PlotMessage {
}
}

struct EguiAppExampleAppConfig {
reset_side_channel_tx: tokio::sync::broadcast::Sender<()>,
}
struct EguiAppExampleAppConfig {}

type EguiAppExampleBuilder =
GenericEguiBuilder<PlotMessage, RequestMessage<String, String>, EguiAppExampleAppConfig>;
type EguiAppExampleBuilder = GenericEguiBuilder<
PlotMessage,
RequestWithReplyChannel<String, String>,
(),
f64,
EguiAppExampleAppConfig,
>;

pub struct EguiAppExample {
pub message_recv: std::sync::mpsc::Receiver<Stream<PlotMessage>>,
pub request_recv: std::sync::mpsc::Receiver<RequestMessage<String, String>>,
pub reset_side_channel_tx: tokio::sync::broadcast::Sender<()>,
pub message_recv: tokio::sync::mpsc::UnboundedReceiver<Stream<PlotMessage>>,
pub in_request_recv:
tokio::sync::mpsc::UnboundedReceiver<RequestWithReplyChannel<String, String>>,
pub out_reply_recv: tokio::sync::mpsc::UnboundedReceiver<ReplyMessage<f64>>,
pub out_request_sender: tokio::sync::mpsc::UnboundedSender<()>,
pub cancel_request_sender: tokio::sync::mpsc::UnboundedSender<hollywood::CancelRequest>,

pub x: f64,
pub sin_value: f64,
Expand All @@ -103,9 +214,11 @@ pub struct EguiAppExample {
impl EguiAppFromBuilder<EguiAppExampleBuilder> for EguiAppExample {
fn new(builder: EguiAppExampleBuilder, _dummy_example_state: String) -> Box<EguiAppExample> {
Box::new(EguiAppExample {
message_recv: builder.message_recv,
request_recv: builder.request_recv,
reset_side_channel_tx: builder.config.reset_side_channel_tx,
message_recv: builder.message_from_actor_recv,
out_reply_recv: builder.out_reply_from_actor_recv,
in_request_recv: builder.in_request_from_actor_recv,
out_request_sender: builder.out_request_to_actor_sender,
cancel_request_sender: builder.cancel_request_sender.unwrap(),
x: 0.0,
sin_value: 0.0,
cos_value: 0.0,
Expand All @@ -117,7 +230,9 @@ impl EguiAppFromBuilder<EguiAppExampleBuilder> for EguiAppExample {
type State = String;
}
use eframe::egui;
use hollywood::RequestMessage;
use hollywood::OutRequestChannel;
use hollywood::ReplyMessage;
use hollywood::RequestWithReplyChannel;
impl eframe::App for EguiAppExample {
fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) {
while let Ok(value) = self.message_recv.try_recv() {
Expand All @@ -132,6 +247,14 @@ impl eframe::App for EguiAppExample {
}
}
}
while let Ok(value) = self.out_reply_recv.try_recv() {
println!("Reply: {:?}", value);
}
while let Ok(value) = self.in_request_recv.try_recv() {
//println!("Request: {:?}", value);

value.reply(|_| "reply".to_owned());
}

egui::CentralPanel::default().show(ctx, |ui| {
ui.heading("Hello, egui!");
Expand All @@ -140,25 +263,22 @@ impl eframe::App for EguiAppExample {
ui.label(format!("cos(y): {}", self.cos_value));

if ui.button("Reset").clicked() {
self.reset_side_channel_tx.send(()).unwrap();
// let (reply_channel_sender, reply_receiver) = tokio::sync::oneshot::channel();

// tokio::spawn(async move {
// let reply = reply_receiver.await.unwrap();
// println!("Reply: {}", reply.reply);
// });
self.out_request_sender.send(()).unwrap();
}
});

ctx.request_repaint_after(Duration::from_secs_f64(0.1));
}

fn on_exit(&mut self, _gl: Option<&eframe::glow::Context>) {
self.cancel_request_sender
.send(hollywood::CancelRequest::Cancel(()))
.unwrap();
}
}

pub async fn run_viewer_example() {
let (reset_side_channel_tx, _) = tokio::sync::broadcast::channel(1);
let mut builder = EguiAppExampleBuilder::from_config(EguiAppExampleAppConfig {
reset_side_channel_tx: reset_side_channel_tx.clone(),
});
let mut builder = EguiAppExampleBuilder::from_config(EguiAppExampleAppConfig {});

// Pipeline configuration
let pipeline = Hollywood::configure(&mut |context| {
Expand All @@ -170,15 +290,15 @@ pub async fn run_viewer_example() {
context,
NullProp::default(),
ContentGeneratorState {
reset_request: reset_side_channel_tx.subscribe(),
last_x: 0.0,
offset: 0.0,
},
);
// 3. The egui actor
let mut egui_actor =
EguiActor::<PlotMessage, String, String>::from_builder(context, &builder);
EguiActor::<PlotMessage, String, String, (), f64>::from_builder(context, &builder);

// // Pipeline connections:
// Pipeline connections:
timer
.outbound
.time_stamp
Expand All @@ -187,6 +307,16 @@ pub async fn run_viewer_example() {
.outbound
.plot_message
.connect(context, &mut egui_actor.inbound.stream);

egui_actor
.out_requests
.request
.connect(context, &mut content_generator.in_requests.reset);

content_generator
.out_requests
.example_request
.connect(context, &mut egui_actor.in_requests.request);
});

// The cancel_requester is used to cancel the pipeline.
Expand Down
2 changes: 1 addition & 1 deletion examples/moving_average.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub async fn run_moving_average_example() {
context,
MovingAverageProp {
alpha: 0.3,
..Default::default()
timeout: 5.0,
},
MovingAverageState {
moving_average: 0.0,
Expand Down
4 changes: 2 additions & 2 deletions examples/one_dim_robot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ async fn run_robot_example() {
.true_robot
.connect(context, &mut truth_printer.inbound.printable);

sim.request
sim.out_requests
.ping_pong
.connect(context, &mut filter.inbound.ping_pong_request);
.connect(context, &mut filter.in_requests.ping_pong_request);
context.register_cancel_requester(&mut sim.outbound.cancel_request);

filter
Expand Down
2 changes: 1 addition & 1 deletion hollywood_macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ keywords = ["actor", "compute", "graph", "pipeline"]
license = "Apache-2.0"
readme = "../README.md"
repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros"
version = "0.5.1"
version = "0.6.0"

[lib]
proc-macro = true
Expand Down
Loading

0 comments on commit 4459cba

Please sign in to comment.