diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index 2f1107cf0..be5ac6108 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -32,7 +32,7 @@ indexmap = "2.2.6" [dev-dependencies] serde_json = "1.0.89" futures = "0.3.25" -tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread", "macros", "time"] } rand = "0.8.5" cdr = "0.2.4" criterion = "0.5.1" diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index f8404f750..1557349f9 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -65,6 +65,10 @@ pub struct Node { // time source that provides simulated time #[cfg(r2r__rosgraph_msgs__msg__Clock)] time_source: TimeSource, + // and a guard condition to notify the waitset that it should reload its elements. + // This guard condition must be triggered by any subscriber, service, etc. that changes + // its `is_waiting` state to true + waitset_elements_changed_gc: rcl_guard_condition_t, } unsafe impl Send for Node {} @@ -182,7 +186,7 @@ impl Node { /// Creates a ROS node. pub fn create(ctx: Context, name: &str, namespace: &str) -> Result { - let (res, node_handle) = { + let (res, waitset_gc, node_handle) = { let mut ctx_handle = ctx.context_handle.lock().unwrap(); let c_node_name = CString::new(name).unwrap(); @@ -199,7 +203,9 @@ impl Node { &node_options as *const _, ) }; - (res, node_handle) + + let waitset_gc = new_guard_condition(ctx_handle.as_mut())?; + (res, waitset_gc, node_handle) }; if res == RCL_RET_OK as i32 { @@ -225,6 +231,7 @@ impl Node { ros_clock, #[cfg(r2r__rosgraph_msgs__msg__Clock)] time_source, + waitset_elements_changed_gc: waitset_gc, }; node.load_params()?; Ok(node) @@ -552,14 +559,19 @@ impl Node { { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let (sender, receiver) = mpsc::channel::(10); + let waker = Arc::new(SharedSubscriptionData::new()); let ws = TypedSubscriber { rcl_handle: subscription_handle, - sender, + shared: Arc::clone(&waker), }; self.subscribers.push(Box::new(ws)); - Ok(receiver) + + Ok(SubscriberStream::::new( + subscription_handle, + waker, + self.waitset_elements_changed_gc, + )) } /// Subscribe to a ROS topic. @@ -573,14 +585,19 @@ impl Node { { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let (sender, receiver) = mpsc::channel::>(10); - let ws = NativeSubscriber { + let waker = Arc::new(SharedSubscriptionData::new()); + let ws = TypedSubscriber { rcl_handle: subscription_handle, - sender, + shared: Arc::clone(&waker), }; self.subscribers.push(Box::new(ws)); - Ok(receiver) + + Ok(NativeSubscriberStream::::new( + subscription_handle, + waker, + self.waitset_elements_changed_gc, + )) } /// Subscribe to a ROS topic. @@ -593,15 +610,20 @@ impl Node { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; - let (sender, receiver) = mpsc::channel::>(10); + let waker = Arc::new(SharedSubscriptionData::new()); let ws = UntypedSubscriber { rcl_handle: subscription_handle, - topic_type: topic_type.to_string(), - sender, + shared: waker.clone(), }; + self.subscribers.push(Box::new(ws)); - Ok(receiver) + Ok(UntypedSubscriberStream::new( + subscription_handle, + waker, + self.waitset_elements_changed_gc, + topic_type.to_string(), + )) } /// Subscribe to a ROS topic. @@ -638,15 +660,20 @@ impl Node { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; - let (sender, receiver) = mpsc::channel::>(10); - let ws = RawSubscriber { + let waker = Arc::new(SharedSubscriptionData::new()); + + let ws = UntypedSubscriber { rcl_handle: subscription_handle, - msg_buf, - sender, + shared: waker.clone(), }; + self.subscribers.push(Box::new(ws)); - Ok(receiver) + Ok(RawSubscriberStream::new( + subscription_handle, + waker, + self.waitset_elements_changed_gc, + )) } /// Create a ROS service. @@ -987,7 +1014,7 @@ impl Node { rcl_wait_set_init( &mut ws, self.subscribers.len() + total_action_subs, - 0, + 1, // for the waitset_elements_changed_gc self.timers.len() + total_action_timers, self.clients.len() + total_action_clients, self.services.len() + total_action_services, @@ -1001,9 +1028,27 @@ impl Node { rcl_wait_set_clear(&mut ws); } + unsafe { + // First off, add the waitset_elements_changed guard condition. + // Rationale: The code below will add only subscribers that are actively waiting. + // This avoids an endless loop where a busy subscriber keeps waking up the waitset + // even though it doesn't have the capacity to handle the new data. However, it also + // means that a subscriber/service/etc that changes its waiting state needs to update + // the waitset, otherwise it will not be woken up when new data arrives. In that situation + // it shall trigger this guard condition, which will force a wakeup of the waitset and a return + // from this function. On the next call to spin_once, the subscriber will be added. + rcl_wait_set_add_guard_condition( + &mut ws, + &self.waitset_elements_changed_gc as *const rcl_guard_condition_t, + std::ptr::null_mut(), + ); + } + for s in &self.subscribers { - unsafe { - rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + if s.is_waiting() { + unsafe { + rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + } } } @@ -1589,3 +1634,17 @@ fn convert_info_array_to_vec( topic_info_list } + +fn new_guard_condition(ctx: &mut rcl_context_s) -> Result { + unsafe { + let mut gc = rcl_get_zero_initialized_guard_condition(); + match Error::from_rcl_error(rcl_guard_condition_init( + &mut gc, + ctx, + rcl_guard_condition_get_default_options(), + )) { + Error::RCL_RET_OK => Ok(gc), + e => Err(e), + } + } +} diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 0d3f34fc8..c0b35787e 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -1,90 +1,187 @@ -use futures::channel::mpsc; use std::ffi::CString; use crate::{error::*, msg_types::*, qos::QosProfile}; +use futures::stream::Stream; use r2r_rcl::*; use std::ffi::{c_void, CStr}; +use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::task::Poll; pub trait Subscriber_ { fn handle(&self) -> &rcl_subscription_t; /// Returns true if the subscriber stream has been dropped. fn handle_incoming(&mut self) -> bool; + // Returns true if the subscriber is waiting for incoming messages + fn is_waiting(&self) -> bool; fn destroy(&mut self, node: &mut rcl_node_t); } -pub struct TypedSubscriber +pub struct SharedSubscriptionData { + // A flag that is set to true when the subscription object is destroyed. + // This must be checked by stream objects before accessing the underlying rcl handle + subscription_is_dead: AtomicBool, + // The waker to call when new data is available + waker: std::sync::Mutex>, +} + +impl SharedSubscriptionData { + pub fn new() -> Self { + SharedSubscriptionData { + subscription_is_dead: AtomicBool::new(false), + waker: std::sync::Mutex::new(None), + } + } +} + +pub struct TypedSubscriber { + pub rcl_handle: rcl_subscription_t, + pub shared: Arc, +} + +// Existing code distinguished these two kinds of subscribers, so keep that distinction in place +// to reduce delta with upstream. +pub type UntypedSubscriber = TypedSubscriber; + +impl Subscriber_ for TypedSubscriber { + fn handle(&self) -> &rcl_subscription_t { + &self.rcl_handle + } + + fn handle_incoming(&mut self) -> bool { + let locked_waker = self.shared.waker.lock().unwrap(); + if let Some(ref waker) = *locked_waker { + waker.wake_by_ref(); + } + false + } + + fn is_waiting(&self) -> bool { + self.shared.waker.lock().unwrap().is_some() + } + + fn destroy(&mut self, node: &mut rcl_node_t) { + self.shared + .subscription_is_dead + .store(true, Ordering::Release); + unsafe { + rcl_subscription_fini(&mut self.rcl_handle, node); + } + } +} + +fn set_waker( + shared: Arc, new_waker: std::task::Waker, + gc: &mut rcl_guard_condition_t, +) { + let was_waiting = { + let mut stored_waker = shared.waker.lock().unwrap(); + let was_waiting = stored_waker.is_some(); + *stored_waker = Some(new_waker); + was_waiting + }; + + // If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription + if !was_waiting { + unsafe { + match Error::from_rcl_error(rcl_trigger_guard_condition(gc)) { + Error::RCL_RET_OK => {} + e => { + // This can only fail if the guard condition object was invalid, so panic is the appropriate response + panic!("Failed to trigger guard condition: {e}"); + } + } + } + } +} + +pub struct SubscriberStream where T: WrappedTypesupport, { pub rcl_handle: rcl_subscription_t, - pub sender: mpsc::Sender, + shared: Arc, + pub waiting_state_changed_gc: rcl_guard_condition_t, + // suppress Rust's "unused type" error + pub stream_type: std::marker::PhantomData, } -pub struct NativeSubscriber +impl std::marker::Unpin for SubscriberStream {} +unsafe impl std::marker::Send for SubscriberStream {} + +pub struct NativeSubscriberStream where T: WrappedTypesupport, { - pub rcl_handle: rcl_subscription_t, - pub sender: mpsc::Sender>, + pub stream: SubscriberStream, } -pub struct UntypedSubscriber { +pub struct UntypedSubscriberStream { pub rcl_handle: rcl_subscription_t, + shared: Arc, + pub waiting_state_changed_gc: rcl_guard_condition_t, pub topic_type: String, - pub sender: mpsc::Sender>, } -pub struct RawSubscriber { +impl std::marker::Unpin for UntypedSubscriberStream {} +unsafe impl std::marker::Send for UntypedSubscriberStream {} + +pub struct RawSubscriberStream { pub rcl_handle: rcl_subscription_t, - pub msg_buf: rcl_serialized_message_t, - pub sender: mpsc::Sender>, + shared: Arc, + pub waiting_state_changed_gc: rcl_guard_condition_t, + msg_buf: rcl_serialized_message_t, } -impl Subscriber_ for TypedSubscriber -where - T: WrappedTypesupport, -{ - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle +impl Drop for RawSubscriberStream { + fn drop(&mut self) { + rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t); } +} - fn handle_incoming(&mut self) -> bool { +impl std::marker::Unpin for RawSubscriberStream {} +unsafe impl std::marker::Send for RawSubscriberStream {} + +impl SubscriberStream { + pub fn new( + sub: rcl_subscription_t, shared_sub_data: Arc, + gc: rcl_guard_condition_t, + ) -> Self { + SubscriberStream:: { + rcl_handle: sub, + shared: shared_sub_data, + waiting_state_changed_gc: gc, + stream_type: std::marker::PhantomData, + } + } + + fn receive_native_no_loaning(&mut self) -> Option> { let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsg::::new(); let ret = unsafe { rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) }; if ret == RCL_RET_OK as i32 { - let msg = T::from_native(&msg); - if let Err(e) = self.sender.try_send(msg) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; - } - log::debug!("error {:?}", e) - } - } - false - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); + Some(msg) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + // No message available + None + } else { + // An unexpected error while reading. The old code just ignored it. + // For now just panic, but we should think about this again + panic!("Error while reading message from subscription: {ret}"); } } -} -impl Subscriber_ for NativeSubscriber -where - T: WrappedTypesupport, -{ - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle + fn receive(&mut self) -> Option { + self.receive_native_no_loaning() + .map(|msg| T::from_native(&msg)) } - fn handle_incoming(&mut self) -> bool { + fn receive_native(&mut self) -> Option> { let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let msg = unsafe { + unsafe { if rcl_subscription_can_loan_messages(&self.rcl_handle) { let mut loaned_msg: *mut c_void = std::ptr::null_mut(); let ret = rcl_take_loaned_message( @@ -93,8 +190,12 @@ where &mut msg_info, std::ptr::null_mut(), ); + if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + // no message available + return None; + } if ret != RCL_RET_OK as i32 { - return false; + panic!("Error while reading message from subscription: {ret}"); } let handle_box = Box::new(self.rcl_handle); let deallocator = Box::new(|msg: *mut T::CStruct| { @@ -108,6 +209,7 @@ where let topic_str = rcl_subscription_get_topic_name(handle_ptr); let topic = CStr::from_ptr(topic_str); + drop(Box::from_raw(handle_ptr)); crate::log_error!( "r2r", @@ -117,46 +219,41 @@ where error_msg.to_str().expect("to_str() call failed") ); } - // drop(Box::from_raw(handle_ptr)); + drop(Box::from_raw(handle_ptr)); }); - WrappedNativeMsg::::from_loaned(loaned_msg as *mut T::CStruct, deallocator) + Some(WrappedNativeMsg::::from_loaned(loaned_msg as *mut T::CStruct, deallocator)) } else { - let mut new_msg = WrappedNativeMsg::::new(); - let ret = rcl_take( - &self.rcl_handle, - new_msg.void_ptr_mut(), - &mut msg_info, - std::ptr::null_mut(), - ); - if ret != RCL_RET_OK as i32 { - return false; - } - new_msg - } - }; - if let Err(e) = self.sender.try_send(msg) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; + self.receive_native_no_loaning() } - log::error!("error {:?}", e) } - false } +} - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); +impl NativeSubscriberStream { + pub fn new( + sub: rcl_subscription_t, shared_sub_data: Arc, + gc: rcl_guard_condition_t, + ) -> Self { + Self { + stream: SubscriberStream::::new(sub, shared_sub_data, gc), } } } -impl Subscriber_ for UntypedSubscriber { - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle +impl UntypedSubscriberStream { + pub fn new( + sub: rcl_subscription_t, shared_sub_data: Arc, + gc: rcl_guard_condition_t, topic_type: String, + ) -> Self { + Self { + rcl_handle: sub, + shared: shared_sub_data, + waiting_state_changed_gc: gc, + topic_type, + } } - fn handle_incoming(&mut self) -> bool { + fn receive_json(&mut self) -> Option> { let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) .unwrap_or_else(|_| panic!("no typesupport for {}", self.topic_type)); @@ -164,31 +261,31 @@ impl Subscriber_ for UntypedSubscriber { rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) }; if ret == RCL_RET_OK as i32 { - let json = msg.to_json(); - if let Err(e) = self.sender.try_send(json) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; - } - log::debug!("error {:?}", e) - } - } - false - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); + Some( + msg.to_json() + .map_err(|e| crate::Error::SerdeError { err: e.to_string() }), + ) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + None + } else { + panic!("Failed to read from subscription: {ret}"); } } } -impl Subscriber_ for RawSubscriber { - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle +impl RawSubscriberStream { + pub fn new( + sub: rcl_subscription_t, shared_sub_data: Arc, + gc: rcl_guard_condition_t, + ) -> Self { + Self { + rcl_handle: sub, + shared: shared_sub_data, + waiting_state_changed_gc: gc, + msg_buf: unsafe { rcutils_get_zero_initialized_uint8_array() }, + } } - - fn handle_incoming(&mut self) -> bool { + fn receive_raw(&mut self) -> Option> { let mut msg_info = rmw_message_info_t::default(); // we dont care for now let ret = unsafe { rcl_take_serialized_message( @@ -198,34 +295,141 @@ impl Subscriber_ for RawSubscriber { std::ptr::null_mut(), ) }; - if ret != RCL_RET_OK as i32 { - log::error!("failed to take serialized message"); - return false; + if ret == RCL_RET_OK as i32 { + Some(if self.msg_buf.buffer == std::ptr::null_mut() { + Vec::new() + } else { + unsafe { + std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length) + .to_vec() + } + }) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + None + } else { + // An unexpected error while reading. The old code just ignored it. + // For now just panic, but we should think about this again + panic!("Error while reading message from subscription: {ret}"); } + } +} - let data_bytes = if self.msg_buf.buffer == std::ptr::null_mut() { - Vec::new() - } else { - unsafe { - std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec() +impl Stream for SubscriberStream +where + T: WrappedTypesupport, +{ + type Item = T; + + // Required method + fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + if self.shared.subscription_is_dead.load(Ordering::Acquire) { + return Poll::Ready(None); + } + match self.receive() { + Some(msg) => { + *self.shared.waker.lock().unwrap() = None; + Poll::Ready(Some(msg)) } - }; + None => { + set_waker( + Arc::clone(&self.shared), + cx.waker().clone(), + &mut self.waiting_state_changed_gc, + ); + Poll::Pending + } + } + } +} - if let Err(e) = self.sender.try_send(data_bytes) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; +impl Stream for NativeSubscriberStream +where + T: WrappedTypesupport, +{ + type Item = WrappedNativeMsg; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll>> { + if self + .stream + .shared + .subscription_is_dead + .load(Ordering::Acquire) + { + return Poll::Ready(None); + } + + match self.stream.receive_native() { + Some(msg) => { + *self.stream.shared.waker.lock().unwrap() = None; + Poll::Ready(Some(msg)) + } + None => { + set_waker( + Arc::clone(&self.stream.shared), + cx.waker().clone(), + &mut self.stream.waiting_state_changed_gc, + ); + Poll::Pending } - log::debug!("error {:?}", e) } + } +} - false +impl Stream for UntypedSubscriberStream { + type Item = Result; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll> { + if self.shared.subscription_is_dead.load(Ordering::Acquire) { + return Poll::Ready(None); + } + + match self.receive_json() { + Some(msg) => { + *self.shared.waker.lock().unwrap() = None; + Poll::Ready(Some(msg)) + } + None => { + set_waker( + Arc::clone(&self.shared), + cx.waker().clone(), + &mut self.waiting_state_changed_gc, + ); + Poll::Pending + } + } } +} - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); - rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t); +impl Stream for RawSubscriberStream { + type Item = Vec; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll> { + if self.shared.subscription_is_dead.load(Ordering::Acquire) { + return Poll::Ready(None); + } + + match self.receive_raw() { + Some(msg) => { + *self.shared.waker.lock().unwrap() = None; + Poll::Ready(Some(msg)) + } + None => { + set_waker( + Arc::clone(&self.shared), + cx.waker().clone(), + &mut self.waiting_state_changed_gc, + ); + Poll::Pending + } } } } diff --git a/r2r/src/time_source.rs b/r2r/src/time_source.rs index 91b1a09c6..b25a95d8f 100644 --- a/r2r/src/time_source.rs +++ b/r2r/src/time_source.rs @@ -247,6 +247,11 @@ impl Subscriber_ for TimeSourceSubscriber { } } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.subscriber_handle, node); diff --git a/r2r/tests/tokio_test_raw.rs b/r2r/tests/tokio_test_raw.rs index 021df62bb..488f89b63 100644 --- a/r2r/tests/tokio_test_raw.rs +++ b/r2r/tests/tokio_test_raw.rs @@ -75,15 +75,16 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box> } }); - let handle = std::thread::spawn(move || { - for _ in 1..=30 { - node.spin_once(std::time::Duration::from_millis(100)); + let spin_task = tokio::task::spawn_blocking(move || { + loop { + node.spin_once(tokio::time::Duration::from_millis(10)); } }); sub_int_handle.await.unwrap(); sub_array_handle.await.unwrap(); - handle.join().unwrap(); + spin_task.abort(); + spin_task.await; println!("Going to drop tokio_subscribe_raw_testing iteration {i_cycle}"); } diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index cec59d3d8..2c8a94c65 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -157,3 +157,68 @@ async fn tokio_testing() -> Result<(), Box> { Ok(()) } + +#[tokio::test] +async fn pub_sub_with_subscriber_after_publisher() -> Result<(), Box> { + // Test that pub-sub works if the publication happens before the subscriber waits + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut s_the_no = + node.subscribe::("/the_to", QosProfile::default())?; + let p_the_no = + node.create_publisher::("/the_to", QosProfile::default())?; + + // First publish a message + p_the_no + .publish(&r2r::std_msgs::msg::Int32 { data: 0xc0ffee })?; + + // Then check if that message arrived. + // Note that there's no need to spin the node as no waiting is involved + let msg = s_the_no + .next() + .await + .expect("Awaiting the subscription yielded none"); + assert_eq!(msg.data, 0xc0ffee); + Ok(()) +} + +#[tokio::test] +async fn pub_sub_with_waiting_subscriber() -> Result<(), Box> { + // Test that pub-sub works if the subscriber is blocking before the publication happens + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut s_the_no = + node.subscribe::("/the_no", QosProfile::default())?; + let p_the_no = + node.create_publisher::("/the_no", QosProfile::default())?; + let state = Arc::new(Mutex::new(false)); + + // First spawn a subscriber + let handle = task::spawn({ + let state = state.clone(); + async move { + if let Some(msg) = s_the_no.next().await { + assert_eq!(msg.data, 0xc0ffee); + *state.lock().unwrap() = true; + } + }}); + + // Then wait some time to ensure the future is being polled + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Then publish a message + p_the_no + .publish(&r2r::std_msgs::msg::Int32 { data: 0xc0ffee })?; + + let spin_task = tokio::task::spawn_blocking(move || { + loop { + node.spin_once(tokio::time::Duration::from_millis(10)); + } + }); + // And check that the subscriber receives it + + handle.await.expect("subscriber task panicked"); + spin_task.abort(); + assert!(*state.lock().unwrap()); + Ok(()) +}