Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and remove subscription from waitset depending on wait state #98

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion r2r/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ indexmap = "2.2.6"
[dev-dependencies]
serde_json = "1.0.89"
futures = "0.3.25"
tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] }
tokio = { version = "1.22.0", features = ["rt-multi-thread", "macros", "time"] }
rand = "0.8.5"
cdr = "0.2.4"
criterion = "0.5.1"
Expand Down
101 changes: 80 additions & 21 deletions r2r/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ pub struct Node {
// time source that provides simulated time
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
time_source: TimeSource,
// and a guard condition to notify the waitset that it should reload its elements.
// This guard condition must be triggered by any subscriber, service, etc. that changes
// its `is_waiting` state to true
waitset_elements_changed_gc: rcl_guard_condition_t,
}

unsafe impl Send for Node {}
Expand Down Expand Up @@ -182,7 +186,7 @@ impl Node {

/// Creates a ROS node.
pub fn create(ctx: Context, name: &str, namespace: &str) -> Result<Node> {
let (res, node_handle) = {
let (res, waitset_gc, node_handle) = {
let mut ctx_handle = ctx.context_handle.lock().unwrap();

let c_node_name = CString::new(name).unwrap();
Expand All @@ -199,7 +203,9 @@ impl Node {
&node_options as *const _,
)
};
(res, node_handle)

let waitset_gc = new_guard_condition(ctx_handle.as_mut())?;
(res, waitset_gc, node_handle)
};

if res == RCL_RET_OK as i32 {
Expand All @@ -225,6 +231,7 @@ impl Node {
ros_clock,
#[cfg(r2r__rosgraph_msgs__msg__Clock)]
time_source,
waitset_elements_changed_gc: waitset_gc,
};
node.load_params()?;
Ok(node)
Expand Down Expand Up @@ -552,14 +559,19 @@ impl Node {
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<T>(10);

let waker = Arc::new(SharedSubscriptionData::new());
let ws = TypedSubscriber {
rcl_handle: subscription_handle,
sender,
shared: Arc::clone(&waker),
};
self.subscribers.push(Box::new(ws));
Ok(receiver)

Ok(SubscriberStream::<T>::new(
subscription_handle,
waker,
self.waitset_elements_changed_gc,
))
}

/// Subscribe to a ROS topic.
Expand All @@ -573,14 +585,19 @@ impl Node {
{
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?;
let (sender, receiver) = mpsc::channel::<WrappedNativeMsg<T>>(10);

let ws = NativeSubscriber {
let waker = Arc::new(SharedSubscriptionData::new());
let ws = TypedSubscriber {
rcl_handle: subscription_handle,
sender,
shared: Arc::clone(&waker),
};
self.subscribers.push(Box::new(ws));
Ok(receiver)

Ok(NativeSubscriberStream::<T>::new(
subscription_handle,
waker,
self.waitset_elements_changed_gc,
))
}

/// Subscribe to a ROS topic.
Expand All @@ -593,15 +610,20 @@ impl Node {
let msg = WrappedNativeMsgUntyped::new_from(topic_type)?;
let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Result<serde_json::Value>>(10);
let waker = Arc::new(SharedSubscriptionData::new());

let ws = UntypedSubscriber {
rcl_handle: subscription_handle,
topic_type: topic_type.to_string(),
sender,
shared: waker.clone(),
};

self.subscribers.push(Box::new(ws));
Ok(receiver)
Ok(UntypedSubscriberStream::new(
subscription_handle,
waker,
self.waitset_elements_changed_gc,
topic_type.to_string(),
))
}

/// Subscribe to a ROS topic.
Expand Down Expand Up @@ -638,15 +660,20 @@ impl Node {

let subscription_handle =
create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?;
let (sender, receiver) = mpsc::channel::<Vec<u8>>(10);

let ws = RawSubscriber {
let waker = Arc::new(SharedSubscriptionData::new());

let ws = UntypedSubscriber {
rcl_handle: subscription_handle,
msg_buf,
sender,
shared: waker.clone(),
};

self.subscribers.push(Box::new(ws));
Ok(receiver)
Ok(RawSubscriberStream::new(
subscription_handle,
waker,
self.waitset_elements_changed_gc,
))
}

/// Create a ROS service.
Expand Down Expand Up @@ -987,7 +1014,7 @@ impl Node {
rcl_wait_set_init(
&mut ws,
self.subscribers.len() + total_action_subs,
0,
1, // for the waitset_elements_changed_gc
self.timers.len() + total_action_timers,
self.clients.len() + total_action_clients,
self.services.len() + total_action_services,
Expand All @@ -1001,9 +1028,27 @@ impl Node {
rcl_wait_set_clear(&mut ws);
}

unsafe {
// First off, add the waitset_elements_changed guard condition.
// Rationale: The code below will add only subscribers that are actively waiting.
// This avoids an endless loop where a busy subscriber keeps waking up the waitset
// even though it doesn't have the capacity to handle the new data. However, it also
// means that a subscriber/service/etc that changes its waiting state needs to update
// the waitset, otherwise it will not be woken up when new data arrives. In that situation
// it shall trigger this guard condition, which will force a wakeup of the waitset and a return
// from this function. On the next call to spin_once, the subscriber will be added.
rcl_wait_set_add_guard_condition(
&mut ws,
&self.waitset_elements_changed_gc as *const rcl_guard_condition_t,
std::ptr::null_mut(),
);
}

for s in &self.subscribers {
unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
if s.is_waiting() {
unsafe {
rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut());
}
}
}

Expand Down Expand Up @@ -1589,3 +1634,17 @@ fn convert_info_array_to_vec(

topic_info_list
}

fn new_guard_condition(ctx: &mut rcl_context_s) -> Result<rcl_guard_condition_t> {
unsafe {
let mut gc = rcl_get_zero_initialized_guard_condition();
match Error::from_rcl_error(rcl_guard_condition_init(
&mut gc,
ctx,
rcl_guard_condition_get_default_options(),
)) {
Error::RCL_RET_OK => Ok(gc),
e => Err(e),
}
}
}
Loading