From 09f981980ad377f2291a8f8e3551bd4dc5ed7d7f Mon Sep 17 00:00:00 2001 From: Tobias Stark Date: Thu, 30 May 2024 16:04:33 +0000 Subject: [PATCH 1/6] Add and remove subscription from waitset depending on wait state --- r2r/src/nodes.rs | 61 ++++++++++++++++--- r2r/src/subscribers.rs | 131 ++++++++++++++++++++++++++++++++++------- r2r/src/time_source.rs | 5 ++ 3 files changed, 169 insertions(+), 28 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index f8404f750..39e483d7d 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -65,6 +65,10 @@ pub struct Node { // time source that provides simulated time #[cfg(r2r__rosgraph_msgs__msg__Clock)] time_source: TimeSource, + // and a guard condition to notify the waitset that it should reload its elements. + // This guard condition must be triggered by any subscriber, service, etc. that changes + // its `is_waiting` state to true + waitset_elements_changed_gc: rcl_guard_condition_t, } unsafe impl Send for Node {} @@ -182,7 +186,7 @@ impl Node { /// Creates a ROS node. pub fn create(ctx: Context, name: &str, namespace: &str) -> Result { - let (res, node_handle) = { + let (res, waitset_gc, node_handle) = { let mut ctx_handle = ctx.context_handle.lock().unwrap(); let c_node_name = CString::new(name).unwrap(); @@ -199,7 +203,9 @@ impl Node { &node_options as *const _, ) }; - (res, node_handle) + + let waitset_gc = new_guard_condition(ctx_handle.as_mut())?; + (res, waitset_gc, node_handle) }; if res == RCL_RET_OK as i32 { @@ -225,6 +231,7 @@ impl Node { ros_clock, #[cfg(r2r__rosgraph_msgs__msg__Clock)] time_source, + waitset_elements_changed_gc: waitset_gc, }; node.load_params()?; Ok(node) @@ -552,14 +559,20 @@ impl Node { { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let (sender, receiver) = mpsc::channel::(10); + let waker = Arc::new(std::sync::Mutex::new(None)); let ws = TypedSubscriber { rcl_handle: subscription_handle, - sender, + waker: Arc::clone(&waker), }; self.subscribers.push(Box::new(ws)); - Ok(receiver) + + Ok(SubscriberStream:: { + rcl_handle: subscription_handle, + waker, + waiting_state_changed_gc: self.waitset_elements_changed_gc, + stream_type: std::marker::PhantomData, + }) } /// Subscribe to a ROS topic. @@ -987,7 +1000,7 @@ impl Node { rcl_wait_set_init( &mut ws, self.subscribers.len() + total_action_subs, - 0, + 1, // for the waitset_elements_changed_gc self.timers.len() + total_action_timers, self.clients.len() + total_action_clients, self.services.len() + total_action_services, @@ -1001,9 +1014,27 @@ impl Node { rcl_wait_set_clear(&mut ws); } + unsafe { + // First off, add the waitset_elements_changed guard condition. + // Rationale: The code below will add only subscribers that are actively waiting. + // This avoids an endless loop where a busy subscriber keeps waking up the waitset + // even though it doesn't have the capacity to handle the new data. However, it also + // means that a subscriber/service/etc that changes its waiting state needs to update + // the waitset, otherwise it will not be woken up when new data arrives. In that situation + // it shall trigger this guard condition, which will force a wakeup of the waitset and a return + // from this function. On the next call to spin_once, the subscriber will be added. + rcl_wait_set_add_guard_condition( + &mut ws, + &self.waitset_elements_changed_gc as *const rcl_guard_condition_t, + std::ptr::null_mut(), + ); + } + for s in &self.subscribers { - unsafe { - rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + if s.is_waiting() { + unsafe { + rcl_wait_set_add_subscription(&mut ws, s.handle(), std::ptr::null_mut()); + } } } @@ -1589,3 +1620,17 @@ fn convert_info_array_to_vec( topic_info_list } + +fn new_guard_condition(ctx: &mut rcl_context_s) -> Result { + unsafe { + let mut gc = rcl_get_zero_initialized_guard_condition(); + match Error::from_rcl_error(rcl_guard_condition_init( + &mut gc, + ctx, + rcl_guard_condition_get_default_options(), + )) { + Error::RCL_RET_OK => Ok(gc), + e => Err(e), + } + } +} diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 0d3f34fc8..c89b52052 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -4,20 +4,64 @@ use std::ffi::CString; use crate::{error::*, msg_types::*, qos::QosProfile}; use r2r_rcl::*; use std::ffi::{c_void, CStr}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::Poll; +use futures::stream::Stream; pub trait Subscriber_ { fn handle(&self) -> &rcl_subscription_t; /// Returns true if the subscriber stream has been dropped. fn handle_incoming(&mut self) -> bool; + // Returns true if the subscriber is waiting for incoming messages + fn is_waiting(&self) -> bool; fn destroy(&mut self, node: &mut rcl_node_t); } -pub struct TypedSubscriber +// TODO(tobias.stark): Implement the new wakeup logic for the other subscriber types as well. +// Let's just take TypedSubscriber as our proof of concept. +pub struct TypedSubscriber { + pub rcl_handle: rcl_subscription_t, + // The waker to call when new data is available + pub waker: Arc>>, +} + +pub struct SubscriberStream where T: WrappedTypesupport, { pub rcl_handle: rcl_subscription_t, - pub sender: mpsc::Sender, + pub waker: Arc>>, + pub waiting_state_changed_gc: rcl_guard_condition_t, + // suppress Rust's "unused type" error + pub stream_type: std::marker::PhantomData, +} + +impl std::marker::Unpin for SubscriberStream {} +unsafe impl std::marker::Send for SubscriberStream {} + +impl SubscriberStream +where + T: WrappedTypesupport, +{ + fn receive(&mut self) -> Option { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + }; + if ret == RCL_RET_OK as i32 { + Some(T::from_native(&msg)) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + // No message available + None + } else { + // An unexpected error while reading. The old code just ignored it. + // For now just panic, but we should think about this again + panic!("Error while reading message from subscription: {ret}"); + } + } } pub struct NativeSubscriber @@ -40,33 +84,23 @@ pub struct RawSubscriber { pub sender: mpsc::Sender>, } -impl Subscriber_ for TypedSubscriber -where - T: WrappedTypesupport, -{ +impl Subscriber_ for TypedSubscriber { fn handle(&self) -> &rcl_subscription_t { &self.rcl_handle } fn handle_incoming(&mut self) -> bool { - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let mut msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) - }; - if ret == RCL_RET_OK as i32 { - let msg = T::from_native(&msg); - if let Err(e) = self.sender.try_send(msg) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; - } - log::debug!("error {:?}", e) - } + let locked_waker = self.waker.lock().unwrap(); + if let Some(ref waker) = *locked_waker { + waker.wake_by_ref(); } false } + fn is_waiting(&self) -> bool { + self.waker.lock().unwrap().is_some() + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -144,6 +178,11 @@ where false } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -176,6 +215,11 @@ impl Subscriber_ for UntypedSubscriber { false } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -222,6 +266,11 @@ impl Subscriber_ for RawSubscriber { false } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); @@ -254,3 +303,45 @@ pub fn create_subscription_helper( Err(Error::from_rcl_error(result)) } } + +impl Stream for SubscriberStream +where + T: WrappedTypesupport, +{ + type Item = T; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll> { + if let Some(msg) = self.receive() { + *self.waker.lock().unwrap() = None; + return Poll::Ready(Some(msg)); + } + + // Update the stored waker, depending on whether the subscriber is now pending or not + let was_waiting = { + let mut stored_waker = self.waker.lock().unwrap(); + let was_waiting = stored_waker.is_some(); + *stored_waker = Some(cx.waker().clone()); + was_waiting + }; + + // If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription + if !was_waiting { + unsafe { + match Error::from_rcl_error(rcl_trigger_guard_condition( + &mut self.waiting_state_changed_gc, + )) { + Error::RCL_RET_OK => {} + e => { + // This can only fail if the guard condition object was invalid, so panic is the appropriate response + panic!("Failed to trigger guard condition: {e}"); + } + } + } + } + + Poll::Pending + } +} diff --git a/r2r/src/time_source.rs b/r2r/src/time_source.rs index 91b1a09c6..b25a95d8f 100644 --- a/r2r/src/time_source.rs +++ b/r2r/src/time_source.rs @@ -247,6 +247,11 @@ impl Subscriber_ for TimeSourceSubscriber { } } + fn is_waiting(&self) -> bool { + // TODO(tobiasstark): Implement + true + } + fn destroy(&mut self, node: &mut rcl_node_t) { unsafe { rcl_subscription_fini(&mut self.subscriber_handle, node); From 87eb65eceb48f54adc6b149b54926705534d1fdb Mon Sep 17 00:00:00 2001 From: Tobias Stark Date: Mon, 17 Jun 2024 10:03:59 +0000 Subject: [PATCH 2/6] Implement new waitset handling for all subscription types --- r2r/src/nodes.rs | 36 ++-- r2r/src/subscribers.rs | 417 ++++++++++++++++++++++++++--------------- 2 files changed, 293 insertions(+), 160 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 39e483d7d..cc9a60836 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -567,12 +567,11 @@ impl Node { }; self.subscribers.push(Box::new(ws)); - Ok(SubscriberStream:: { - rcl_handle: subscription_handle, + Ok(SubscriberStream::::new( + subscription_handle, waker, - waiting_state_changed_gc: self.waitset_elements_changed_gc, - stream_type: std::marker::PhantomData, - }) + self.waitset_elements_changed_gc, + )) } /// Subscribe to a ROS topic. @@ -586,14 +585,19 @@ impl Node { { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let (sender, receiver) = mpsc::channel::>(10); - let ws = NativeSubscriber { + let waker = Arc::new(std::sync::Mutex::new(None)); + let ws = TypedSubscriber { rcl_handle: subscription_handle, - sender, + waker: Arc::clone(&waker), }; self.subscribers.push(Box::new(ws)); - Ok(receiver) + + Ok(NativeSubscriberStream::::new( + subscription_handle, + waker, + self.waitset_elements_changed_gc, + )) } /// Subscribe to a ROS topic. @@ -606,15 +610,20 @@ impl Node { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; - let (sender, receiver) = mpsc::channel::>(10); + let waker = Arc::new(std::sync::Mutex::new(None)); let ws = UntypedSubscriber { rcl_handle: subscription_handle, - topic_type: topic_type.to_string(), - sender, + waker: waker.clone(), }; + self.subscribers.push(Box::new(ws)); - Ok(receiver) + Ok(UntypedSubscriberStream::new( + subscription_handle, + waker, + self.waitset_elements_changed_gc, + topic_type.to_string(), + )) } /// Subscribe to a ROS topic. @@ -624,6 +633,7 @@ impl Node { pub fn subscribe_raw( &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, ) -> Result> + Unpin> { + // TODO(tobias.stark): Port over to new approach // TODO is it possible to handle the raw message without type support? // // Passing null ts to rcl_subscription_init throws an error .. diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index c89b52052..1ccbdec33 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -2,9 +2,9 @@ use futures::channel::mpsc; use std::ffi::CString; use crate::{error::*, msg_types::*, qos::QosProfile}; +use futures::stream::Stream; use r2r_rcl::*; use std::ffi::{c_void, CStr}; -use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::Poll; @@ -19,64 +19,15 @@ pub trait Subscriber_ { fn destroy(&mut self, node: &mut rcl_node_t); } -// TODO(tobias.stark): Implement the new wakeup logic for the other subscriber types as well. -// Let's just take TypedSubscriber as our proof of concept. pub struct TypedSubscriber { pub rcl_handle: rcl_subscription_t, // The waker to call when new data is available pub waker: Arc>>, } -pub struct SubscriberStream -where - T: WrappedTypesupport, -{ - pub rcl_handle: rcl_subscription_t, - pub waker: Arc>>, - pub waiting_state_changed_gc: rcl_guard_condition_t, - // suppress Rust's "unused type" error - pub stream_type: std::marker::PhantomData, -} - -impl std::marker::Unpin for SubscriberStream {} -unsafe impl std::marker::Send for SubscriberStream {} - -impl SubscriberStream -where - T: WrappedTypesupport, -{ - fn receive(&mut self) -> Option { - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let mut msg = WrappedNativeMsg::::new(); - let ret = unsafe { - rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) - }; - if ret == RCL_RET_OK as i32 { - Some(T::from_native(&msg)) - } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { - // No message available - None - } else { - // An unexpected error while reading. The old code just ignored it. - // For now just panic, but we should think about this again - panic!("Error while reading message from subscription: {ret}"); - } - } -} - -pub struct NativeSubscriber -where - T: WrappedTypesupport, -{ - pub rcl_handle: rcl_subscription_t, - pub sender: mpsc::Sender>, -} - -pub struct UntypedSubscriber { - pub rcl_handle: rcl_subscription_t, - pub topic_type: String, - pub sender: mpsc::Sender>, -} +// Existing code distinguished these two kinds of subscribers, so keep that distinction in place +// to reduce delta with upstream. +pub type UntypedSubscriber = TypedSubscriber; pub struct RawSubscriber { pub rcl_handle: rcl_subscription_t, @@ -108,17 +59,125 @@ impl Subscriber_ for TypedSubscriber { } } -impl Subscriber_ for NativeSubscriber +struct SubscriberStreamWaker { + waker: Arc>>, + pub waiting_state_changed_gc: rcl_guard_condition_t, +} + +impl SubscriberStreamWaker { + fn new( + waker: Arc>>, gc: rcl_guard_condition_t, + ) -> Self { + SubscriberStreamWaker { + waker, + waiting_state_changed_gc: gc, + } + } + + fn set_waker(&mut self, waker: std::task::Waker) { + let was_waiting = { + let mut stored_waker = self.waker.lock().unwrap(); + let was_waiting = stored_waker.is_some(); + *stored_waker = Some(waker); + was_waiting + }; + + // If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription + if !was_waiting { + unsafe { + match Error::from_rcl_error(rcl_trigger_guard_condition( + &mut self.waiting_state_changed_gc, + )) { + Error::RCL_RET_OK => {} + e => { + // This can only fail if the guard condition object was invalid, so panic is the appropriate response + panic!("Failed to trigger guard condition: {e}"); + } + } + } + } + } + fn clear(&mut self) { + *self.waker.lock().unwrap() = None; + } +} + +pub struct SubscriberStream where T: WrappedTypesupport, { - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle + pub rcl_handle: rcl_subscription_t, + waker: SubscriberStreamWaker, + // suppress Rust's "unused type" error + pub stream_type: std::marker::PhantomData, +} + +impl std::marker::Unpin for SubscriberStream {} +unsafe impl std::marker::Send for SubscriberStream {} + +pub struct NativeSubscriberStream +where + T: WrappedTypesupport, +{ + pub stream: SubscriberStream, +} + +pub struct UntypedSubscriberStream { + pub rcl_handle: rcl_subscription_t, + waker: SubscriberStreamWaker, + pub topic_type: String, +} + +impl std::marker::Unpin for UntypedSubscriberStream {} +unsafe impl std::marker::Send for UntypedSubscriberStream {} + +pub struct RawSubscriberStream { + pub rcl_handle: rcl_subscription_t, + waker: SubscriberStreamWaker, + msg_buf: rcl_serialized_message_t, +} + +impl std::marker::Unpin for RawSubscriberStream {} +unsafe impl std::marker::Send for RawSubscriberStream {} + +impl SubscriberStream { + pub fn new( + sub: rcl_subscription_t, waker: Arc>>, + gc: rcl_guard_condition_t, + ) -> Self { + SubscriberStream:: { + rcl_handle: sub, + waker: SubscriberStreamWaker::new(waker, gc), + stream_type: std::marker::PhantomData, + } } - fn handle_incoming(&mut self) -> bool { + fn receive_native_no_loaning(&mut self) -> Option> { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let mut msg = WrappedNativeMsg::::new(); + let ret = unsafe { + rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) + }; + if ret == RCL_RET_OK as i32 { + Some(msg) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + // No message available + None + } else { + // An unexpected error while reading. The old code just ignored it. + // For now just panic, but we should think about this again + panic!("Error while reading message from subscription: {ret}"); + } + } + + fn receive(&mut self) -> Option { + self.receive_native_no_loaning() + .map(|msg| T::from_native(&msg)) + } + + fn receive_native(&mut self) -> Option> { let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let msg = unsafe { + unsafe { if rcl_subscription_can_loan_messages(&self.rcl_handle) { let mut loaned_msg: *mut c_void = std::ptr::null_mut(); let ret = rcl_take_loaned_message( @@ -127,8 +186,12 @@ where &mut msg_info, std::ptr::null_mut(), ); + if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + // no message available + return None; + } if ret != RCL_RET_OK as i32 { - return false; + panic!("Error while reading message from subscription: {ret}"); } let handle_box = Box::new(self.rcl_handle); let deallocator = Box::new(|msg: *mut T::CStruct| { @@ -142,6 +205,7 @@ where let topic_str = rcl_subscription_get_topic_name(handle_ptr); let topic = CStr::from_ptr(topic_str); + drop(Box::from_raw(handle_ptr)); crate::log_error!( "r2r", @@ -151,51 +215,40 @@ where error_msg.to_str().expect("to_str() call failed") ); } - // drop(Box::from_raw(handle_ptr)); + drop(Box::from_raw(handle_ptr)); }); - WrappedNativeMsg::::from_loaned(loaned_msg as *mut T::CStruct, deallocator) + Some(WrappedNativeMsg::::from_loaned(loaned_msg as *mut T::CStruct, deallocator)) } else { - let mut new_msg = WrappedNativeMsg::::new(); - let ret = rcl_take( - &self.rcl_handle, - new_msg.void_ptr_mut(), - &mut msg_info, - std::ptr::null_mut(), - ); - if ret != RCL_RET_OK as i32 { - return false; - } - new_msg + self.receive_native_no_loaning() } - }; - if let Err(e) = self.sender.try_send(msg) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; - } - log::error!("error {:?}", e) } - false - } - - fn is_waiting(&self) -> bool { - // TODO(tobiasstark): Implement - true } +} - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); +impl NativeSubscriberStream { + pub fn new( + sub: rcl_subscription_t, waker: Arc>>, + gc: rcl_guard_condition_t, + ) -> Self { + Self { + stream: SubscriberStream::::new(sub, waker, gc), } } } -impl Subscriber_ for UntypedSubscriber { - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle +impl UntypedSubscriberStream { + pub fn new( + sub: rcl_subscription_t, waker: Arc>>, + gc: rcl_guard_condition_t, topic_type: String, + ) -> Self { + Self { + rcl_handle: sub, + waker: SubscriberStreamWaker::new(waker, gc), + topic_type, + } } - fn handle_incoming(&mut self) -> bool { + fn receive_json(&mut self) -> Option> { let mut msg_info = rmw_message_info_t::default(); // we dont care for now let mut msg = WrappedNativeMsgUntyped::new_from(&self.topic_type) .unwrap_or_else(|_| panic!("no typesupport for {}", self.topic_type)); @@ -203,26 +256,138 @@ impl Subscriber_ for UntypedSubscriber { rcl_take(&self.rcl_handle, msg.void_ptr_mut(), &mut msg_info, std::ptr::null_mut()) }; if ret == RCL_RET_OK as i32 { - let json = msg.to_json(); - if let Err(e) = self.sender.try_send(json) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; + Some( + msg.to_json() + .map_err(|e| crate::Error::SerdeError { err: e.to_string() }), + ) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + None + } else { + panic!("Failed to read from subscription: {ret}"); + } + } +} + +impl RawSubscriberStream { + fn new( + sub: rcl_subscription_t, waker: Arc>>, + gc: rcl_guard_condition_t, + ) -> Self { + Self { + rcl_handle: sub, + waker: SubscriberStreamWaker::new(waker, gc), + msg_buf: unsafe { rcutils_get_zero_initialized_uint8_array() }, + } + } + fn receive_raw(&mut self) -> Option> { + let mut msg_info = rmw_message_info_t::default(); // we dont care for now + let ret = unsafe { + rcl_take_serialized_message( + &self.rcl_handle, + &mut self.msg_buf as *mut rcl_serialized_message_t, + &mut msg_info, + std::ptr::null_mut(), + ) + }; + if ret == RCL_RET_OK as i32 { + Some(if self.msg_buf.buffer == std::ptr::null_mut() { + Vec::new() + } else { + unsafe { + std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length) + .to_vec() } - log::debug!("error {:?}", e) + }) + } else if ret == RCL_RET_SUBSCRIPTION_TAKE_FAILED as i32 { + None + } else { + // An unexpected error while reading. The old code just ignored it. + // For now just panic, but we should think about this again + panic!("Error while reading message from subscription: {ret}"); + } + } +} + +impl Stream for SubscriberStream +where + T: WrappedTypesupport, +{ + type Item = T; + + // Required method + fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + match self.receive() { + Some(msg) => { + self.waker.clear(); + Poll::Ready(Some(msg)) + } + None => { + self.waker.set_waker(cx.waker().clone()); + Poll::Pending } } - false } +} - fn is_waiting(&self) -> bool { - // TODO(tobiasstark): Implement - true +impl Stream for NativeSubscriberStream +where + T: WrappedTypesupport, +{ + type Item = WrappedNativeMsg; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll>> { + match self.stream.receive_native() { + Some(msg) => { + self.stream.waker.clear(); + Poll::Ready(Some(msg)) + } + None => { + self.stream.waker.set_waker(cx.waker().clone()); + Poll::Pending + } + } } +} - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); +impl Stream for UntypedSubscriberStream { + type Item = Result; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.receive_json() { + Some(msg) => { + self.waker.clear(); + Poll::Ready(Some(msg)) + } + None => { + self.waker.set_waker(cx.waker().clone()); + Poll::Pending + } + } + } +} + +impl Stream for RawSubscriberStream { + type Item = Vec; + + // Required method + fn poll_next( + mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, + ) -> Poll> { + match self.receive_raw() { + Some(msg) => { + self.waker.clear(); + Poll::Ready(Some(msg)) + } + None => { + self.waker.set_waker(cx.waker().clone()); + Poll::Pending + } } } } @@ -303,45 +468,3 @@ pub fn create_subscription_helper( Err(Error::from_rcl_error(result)) } } - -impl Stream for SubscriberStream -where - T: WrappedTypesupport, -{ - type Item = T; - - // Required method - fn poll_next( - mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> Poll> { - if let Some(msg) = self.receive() { - *self.waker.lock().unwrap() = None; - return Poll::Ready(Some(msg)); - } - - // Update the stored waker, depending on whether the subscriber is now pending or not - let was_waiting = { - let mut stored_waker = self.waker.lock().unwrap(); - let was_waiting = stored_waker.is_some(); - *stored_waker = Some(cx.waker().clone()); - was_waiting - }; - - // If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription - if !was_waiting { - unsafe { - match Error::from_rcl_error(rcl_trigger_guard_condition( - &mut self.waiting_state_changed_gc, - )) { - Error::RCL_RET_OK => {} - e => { - // This can only fail if the guard condition object was invalid, so panic is the appropriate response - panic!("Failed to trigger guard condition: {e}"); - } - } - } - } - - Poll::Pending - } -} From 67ce71b601e8a8de69b689c80627411cab6d2f56 Mon Sep 17 00:00:00 2001 From: Tobias Stark Date: Mon, 17 Jun 2024 10:36:44 +0000 Subject: [PATCH 3/6] add tests --- r2r/Cargo.toml | 2 +- r2r/tests/tokio_testing.rs | 58 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/r2r/Cargo.toml b/r2r/Cargo.toml index 2f1107cf0..be5ac6108 100644 --- a/r2r/Cargo.toml +++ b/r2r/Cargo.toml @@ -32,7 +32,7 @@ indexmap = "2.2.6" [dev-dependencies] serde_json = "1.0.89" futures = "0.3.25" -tokio = { version = "1.22.0", features = ["rt-multi-thread", "time", "macros"] } +tokio = { version = "1.22.0", features = ["rt-multi-thread", "macros", "time"] } rand = "0.8.5" cdr = "0.2.4" criterion = "0.5.1" diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index cec59d3d8..96407ec41 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -157,3 +157,61 @@ async fn tokio_testing() -> Result<(), Box> { Ok(()) } + +#[tokio::test] +async fn pub_sub_with_subscriber_after_publisher() -> Result<(), Box> { + // Test that pub-sub works if the publication happens before the subscriber waits + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut s_the_no = + node.subscribe::("/the_to", QosProfile::default())?; + let p_the_no = + node.create_publisher::("/the_to", QosProfile::default())?; + let state = Arc::new(Mutex::new(false)); + + // First publish a message + p_the_no + .publish(&r2r::std_msgs::msg::Int32 { data: 0xc0ffee })?; + + // Then check if that message arrived + let msg = s_the_no + .next() + .await + .expect("Awaiting the subscription yielded none"); + assert_eq!(msg.data, 0xc0ffee); + Ok(()) +} + +#[tokio::test] +async fn pub_sub_with_waiting_subscriber() -> Result<(), Box> { + // Test that pub-sub works if the subscriber is blocking before the publication happens + let ctx = r2r::Context::create()?; + let mut node = r2r::Node::create(ctx, "testnode", "")?; + let mut s_the_no = + node.subscribe::("/the_no", QosProfile::default())?; + let p_the_no = + node.create_publisher::("/the_no", QosProfile::default())?; + let state = Arc::new(Mutex::new(false)); + + // First spawn a subscriber + let handle = task::spawn({ + let state = state.clone(); + async move { + if let Some(msg) = s_the_no.next().await { + assert_eq!(msg.data, 0xc0ffee); + *state.lock().unwrap() = true; + } + }}); + + // Then wait some time to ensure the future is being polled + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + // Then publish a message + p_the_no + .publish(&r2r::std_msgs::msg::Int32 { data: 0xc0ffee })?; + + // And check that the subscriber receives it + handle.await.expect("subscriber task panicked"); + assert!(*state.lock().unwrap()); + Ok(()) +} From d36cde75dd2b06c3e8367fefb3692339ba916269 Mon Sep 17 00:00:00 2001 From: Tobias Stark Date: Mon, 17 Jun 2024 12:55:11 +0000 Subject: [PATCH 4/6] Fix undefined behaviour in waitset handling --- r2r/src/nodes.rs | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index cc9a60836..5719c983f 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -916,6 +916,16 @@ impl Node { /// `timeout` is a duration specifying how long the spin should /// block for if there are no pending events. pub fn spin_once(&mut self, timeout: Duration) { + // like std::slice_from_raw_parts, but allows data to be nullptr if len is zero + unsafe fn slice_from_ptr<'a, T>(data : *const T, len : usize) -> &'a [T] { + if data.is_null() { + assert_eq!(len, 0); + std::slice::from_raw_parts(std::ptr::NonNull::::dangling().as_ptr(), 0) + } else { + std::slice::from_raw_parts(data, len) + } + } + // first handle any completed action cancellation responses for a in &mut self.action_servers { a.lock().unwrap().send_completed_cancel_requests(); @@ -1100,6 +1110,9 @@ impl Node { return; } + + let ws_subs = + unsafe { slice_from_ptr(ws.subscriptions, self.subscribers.len()) }; let mut subs_to_remove = vec![]; if ws.subscriptions != std::ptr::null_mut() { let ws_subs = @@ -1117,6 +1130,7 @@ impl Node { self.subscribers .retain(|s| !subs_to_remove.contains(s.handle())); + let ws_timers = unsafe { slice_from_ptr(ws.timers, self.timers.len()) }; let mut timers_to_remove = vec![]; if ws.timers != std::ptr::null_mut() { let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) }; @@ -1134,16 +1148,15 @@ impl Node { self.timers .retain(|t| !timers_to_remove.contains(&*t.timer_handle)); - if ws.clients != std::ptr::null_mut() { - let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) }; - for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) { - if ws_s != &std::ptr::null() { - let mut s = s.lock().unwrap(); - s.handle_response(); - } + let ws_clients = unsafe { slice_from_ptr(ws.clients, self.clients.len()) }; + for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) { + if ws_s != &std::ptr::null() { + let mut s = s.lock().unwrap(); + s.handle_response(); } } + let ws_services = unsafe { slice_from_ptr(ws.services, self.services.len()) }; let mut services_to_remove = vec![]; if ws.services != std::ptr::null_mut() { let ws_services = From 4cd395fa14106f993f0356b5b450868ca057f6d4 Mon Sep 17 00:00:00 2001 From: Tobias Stark Date: Mon, 15 Jul 2024 12:24:23 +0000 Subject: [PATCH 5/6] Avoid use-after-free in stream implementation --- r2r/src/nodes.rs | 28 +++-- r2r/src/subscribers.rs | 230 +++++++++++++++++------------------- r2r/tests/tokio_test_raw.rs | 9 +- r2r/tests/tokio_testing.rs | 11 +- 4 files changed, 140 insertions(+), 138 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 5719c983f..6302142fc 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -560,10 +560,10 @@ impl Node { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let waker = Arc::new(std::sync::Mutex::new(None)); + let waker = Arc::new(SharedSubscriptionData::new()); let ws = TypedSubscriber { rcl_handle: subscription_handle, - waker: Arc::clone(&waker), + shared: Arc::clone(&waker), }; self.subscribers.push(Box::new(ws)); @@ -586,10 +586,10 @@ impl Node { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, T::get_ts(), qos_profile)?; - let waker = Arc::new(std::sync::Mutex::new(None)); + let waker = Arc::new(SharedSubscriptionData::new()); let ws = TypedSubscriber { rcl_handle: subscription_handle, - waker: Arc::clone(&waker), + shared: Arc::clone(&waker), }; self.subscribers.push(Box::new(ws)); @@ -610,11 +610,11 @@ impl Node { let msg = WrappedNativeMsgUntyped::new_from(topic_type)?; let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; - let waker = Arc::new(std::sync::Mutex::new(None)); + let waker = Arc::new(SharedSubscriptionData::new()); let ws = UntypedSubscriber { rcl_handle: subscription_handle, - waker: waker.clone(), + shared: waker.clone(), }; self.subscribers.push(Box::new(ws)); @@ -633,7 +633,6 @@ impl Node { pub fn subscribe_raw( &mut self, topic: &str, topic_type: &str, qos_profile: QosProfile, ) -> Result> + Unpin> { - // TODO(tobias.stark): Port over to new approach // TODO is it possible to handle the raw message without type support? // // Passing null ts to rcl_subscription_init throws an error .. @@ -661,15 +660,20 @@ impl Node { let subscription_handle = create_subscription_helper(self.node_handle.as_mut(), topic, msg.ts, qos_profile)?; - let (sender, receiver) = mpsc::channel::>(10); - let ws = RawSubscriber { + let waker = Arc::new(SharedSubscriptionData::new()); + + let ws = UntypedSubscriber { rcl_handle: subscription_handle, - msg_buf, - sender, + shared: waker.clone(), }; + self.subscribers.push(Box::new(ws)); - Ok(receiver) + Ok(RawSubscriberStream::new( + subscription_handle, + waker, + self.waitset_elements_changed_gc, + )) } /// Create a ROS service. diff --git a/r2r/src/subscribers.rs b/r2r/src/subscribers.rs index 1ccbdec33..c0b35787e 100644 --- a/r2r/src/subscribers.rs +++ b/r2r/src/subscribers.rs @@ -1,4 +1,3 @@ -use futures::channel::mpsc; use std::ffi::CString; use crate::{error::*, msg_types::*, qos::QosProfile}; @@ -6,9 +5,9 @@ use futures::stream::Stream; use r2r_rcl::*; use std::ffi::{c_void, CStr}; use std::pin::Pin; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::task::Poll; -use futures::stream::Stream; pub trait Subscriber_ { fn handle(&self) -> &rcl_subscription_t; @@ -19,29 +18,39 @@ pub trait Subscriber_ { fn destroy(&mut self, node: &mut rcl_node_t); } +pub struct SharedSubscriptionData { + // A flag that is set to true when the subscription object is destroyed. + // This must be checked by stream objects before accessing the underlying rcl handle + subscription_is_dead: AtomicBool, + // The waker to call when new data is available + waker: std::sync::Mutex>, +} + +impl SharedSubscriptionData { + pub fn new() -> Self { + SharedSubscriptionData { + subscription_is_dead: AtomicBool::new(false), + waker: std::sync::Mutex::new(None), + } + } +} + pub struct TypedSubscriber { pub rcl_handle: rcl_subscription_t, - // The waker to call when new data is available - pub waker: Arc>>, + pub shared: Arc, } // Existing code distinguished these two kinds of subscribers, so keep that distinction in place // to reduce delta with upstream. pub type UntypedSubscriber = TypedSubscriber; -pub struct RawSubscriber { - pub rcl_handle: rcl_subscription_t, - pub msg_buf: rcl_serialized_message_t, - pub sender: mpsc::Sender>, -} - impl Subscriber_ for TypedSubscriber { fn handle(&self) -> &rcl_subscription_t { &self.rcl_handle } fn handle_incoming(&mut self) -> bool { - let locked_waker = self.waker.lock().unwrap(); + let locked_waker = self.shared.waker.lock().unwrap(); if let Some(ref waker) = *locked_waker { waker.wake_by_ref(); } @@ -49,57 +58,42 @@ impl Subscriber_ for TypedSubscriber { } fn is_waiting(&self) -> bool { - self.waker.lock().unwrap().is_some() + self.shared.waker.lock().unwrap().is_some() } fn destroy(&mut self, node: &mut rcl_node_t) { + self.shared + .subscription_is_dead + .store(true, Ordering::Release); unsafe { rcl_subscription_fini(&mut self.rcl_handle, node); } } } -struct SubscriberStreamWaker { - waker: Arc>>, - pub waiting_state_changed_gc: rcl_guard_condition_t, -} - -impl SubscriberStreamWaker { - fn new( - waker: Arc>>, gc: rcl_guard_condition_t, - ) -> Self { - SubscriberStreamWaker { - waker, - waiting_state_changed_gc: gc, - } - } - - fn set_waker(&mut self, waker: std::task::Waker) { - let was_waiting = { - let mut stored_waker = self.waker.lock().unwrap(); - let was_waiting = stored_waker.is_some(); - *stored_waker = Some(waker); - was_waiting - }; +fn set_waker( + shared: Arc, new_waker: std::task::Waker, + gc: &mut rcl_guard_condition_t, +) { + let was_waiting = { + let mut stored_waker = shared.waker.lock().unwrap(); + let was_waiting = stored_waker.is_some(); + *stored_waker = Some(new_waker); + was_waiting + }; - // If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription - if !was_waiting { - unsafe { - match Error::from_rcl_error(rcl_trigger_guard_condition( - &mut self.waiting_state_changed_gc, - )) { - Error::RCL_RET_OK => {} - e => { - // This can only fail if the guard condition object was invalid, so panic is the appropriate response - panic!("Failed to trigger guard condition: {e}"); - } + // If the subscription goes from not-waiting to waiting, notify the waitset so it adds this subscription + if !was_waiting { + unsafe { + match Error::from_rcl_error(rcl_trigger_guard_condition(gc)) { + Error::RCL_RET_OK => {} + e => { + // This can only fail if the guard condition object was invalid, so panic is the appropriate response + panic!("Failed to trigger guard condition: {e}"); } } } } - fn clear(&mut self) { - *self.waker.lock().unwrap() = None; - } } pub struct SubscriberStream @@ -107,7 +101,8 @@ where T: WrappedTypesupport, { pub rcl_handle: rcl_subscription_t, - waker: SubscriberStreamWaker, + shared: Arc, + pub waiting_state_changed_gc: rcl_guard_condition_t, // suppress Rust's "unused type" error pub stream_type: std::marker::PhantomData, } @@ -124,7 +119,8 @@ where pub struct UntypedSubscriberStream { pub rcl_handle: rcl_subscription_t, - waker: SubscriberStreamWaker, + shared: Arc, + pub waiting_state_changed_gc: rcl_guard_condition_t, pub topic_type: String, } @@ -133,21 +129,29 @@ unsafe impl std::marker::Send for UntypedSubscriberStream {} pub struct RawSubscriberStream { pub rcl_handle: rcl_subscription_t, - waker: SubscriberStreamWaker, + shared: Arc, + pub waiting_state_changed_gc: rcl_guard_condition_t, msg_buf: rcl_serialized_message_t, } +impl Drop for RawSubscriberStream { + fn drop(&mut self) { + rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t); + } +} + impl std::marker::Unpin for RawSubscriberStream {} unsafe impl std::marker::Send for RawSubscriberStream {} impl SubscriberStream { pub fn new( - sub: rcl_subscription_t, waker: Arc>>, + sub: rcl_subscription_t, shared_sub_data: Arc, gc: rcl_guard_condition_t, ) -> Self { SubscriberStream:: { rcl_handle: sub, - waker: SubscriberStreamWaker::new(waker, gc), + shared: shared_sub_data, + waiting_state_changed_gc: gc, stream_type: std::marker::PhantomData, } } @@ -227,23 +231,24 @@ impl SubscriberStream { impl NativeSubscriberStream { pub fn new( - sub: rcl_subscription_t, waker: Arc>>, + sub: rcl_subscription_t, shared_sub_data: Arc, gc: rcl_guard_condition_t, ) -> Self { Self { - stream: SubscriberStream::::new(sub, waker, gc), + stream: SubscriberStream::::new(sub, shared_sub_data, gc), } } } impl UntypedSubscriberStream { pub fn new( - sub: rcl_subscription_t, waker: Arc>>, + sub: rcl_subscription_t, shared_sub_data: Arc, gc: rcl_guard_condition_t, topic_type: String, ) -> Self { Self { rcl_handle: sub, - waker: SubscriberStreamWaker::new(waker, gc), + shared: shared_sub_data, + waiting_state_changed_gc: gc, topic_type, } } @@ -269,13 +274,14 @@ impl UntypedSubscriberStream { } impl RawSubscriberStream { - fn new( - sub: rcl_subscription_t, waker: Arc>>, + pub fn new( + sub: rcl_subscription_t, shared_sub_data: Arc, gc: rcl_guard_condition_t, ) -> Self { Self { rcl_handle: sub, - waker: SubscriberStreamWaker::new(waker, gc), + shared: shared_sub_data, + waiting_state_changed_gc: gc, msg_buf: unsafe { rcutils_get_zero_initialized_uint8_array() }, } } @@ -316,13 +322,20 @@ where // Required method fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll> { + if self.shared.subscription_is_dead.load(Ordering::Acquire) { + return Poll::Ready(None); + } match self.receive() { Some(msg) => { - self.waker.clear(); + *self.shared.waker.lock().unwrap() = None; Poll::Ready(Some(msg)) } None => { - self.waker.set_waker(cx.waker().clone()); + set_waker( + Arc::clone(&self.shared), + cx.waker().clone(), + &mut self.waiting_state_changed_gc, + ); Poll::Pending } } @@ -339,13 +352,26 @@ where fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll>> { + if self + .stream + .shared + .subscription_is_dead + .load(Ordering::Acquire) + { + return Poll::Ready(None); + } + match self.stream.receive_native() { Some(msg) => { - self.stream.waker.clear(); + *self.stream.shared.waker.lock().unwrap() = None; Poll::Ready(Some(msg)) } None => { - self.stream.waker.set_waker(cx.waker().clone()); + set_waker( + Arc::clone(&self.stream.shared), + cx.waker().clone(), + &mut self.stream.waiting_state_changed_gc, + ); Poll::Pending } } @@ -359,13 +385,21 @@ impl Stream for UntypedSubscriberStream { fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { + if self.shared.subscription_is_dead.load(Ordering::Acquire) { + return Poll::Ready(None); + } + match self.receive_json() { Some(msg) => { - self.waker.clear(); + *self.shared.waker.lock().unwrap() = None; Poll::Ready(Some(msg)) } None => { - self.waker.set_waker(cx.waker().clone()); + set_waker( + Arc::clone(&self.shared), + cx.waker().clone(), + &mut self.waiting_state_changed_gc, + ); Poll::Pending } } @@ -379,71 +413,27 @@ impl Stream for RawSubscriberStream { fn poll_next( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> Poll> { + if self.shared.subscription_is_dead.load(Ordering::Acquire) { + return Poll::Ready(None); + } + match self.receive_raw() { Some(msg) => { - self.waker.clear(); + *self.shared.waker.lock().unwrap() = None; Poll::Ready(Some(msg)) } None => { - self.waker.set_waker(cx.waker().clone()); + set_waker( + Arc::clone(&self.shared), + cx.waker().clone(), + &mut self.waiting_state_changed_gc, + ); Poll::Pending } } } } -impl Subscriber_ for RawSubscriber { - fn handle(&self) -> &rcl_subscription_t { - &self.rcl_handle - } - - fn handle_incoming(&mut self) -> bool { - let mut msg_info = rmw_message_info_t::default(); // we dont care for now - let ret = unsafe { - rcl_take_serialized_message( - &self.rcl_handle, - &mut self.msg_buf as *mut rcl_serialized_message_t, - &mut msg_info, - std::ptr::null_mut(), - ) - }; - if ret != RCL_RET_OK as i32 { - log::error!("failed to take serialized message"); - return false; - } - - let data_bytes = if self.msg_buf.buffer == std::ptr::null_mut() { - Vec::new() - } else { - unsafe { - std::slice::from_raw_parts(self.msg_buf.buffer, self.msg_buf.buffer_length).to_vec() - } - }; - - if let Err(e) = self.sender.try_send(data_bytes) { - if e.is_disconnected() { - // user dropped the handle to the stream, signal removal. - return true; - } - log::debug!("error {:?}", e) - } - - false - } - - fn is_waiting(&self) -> bool { - // TODO(tobiasstark): Implement - true - } - - fn destroy(&mut self, node: &mut rcl_node_t) { - unsafe { - rcl_subscription_fini(&mut self.rcl_handle, node); - rcutils_uint8_array_fini(&mut self.msg_buf as *mut rcl_serialized_message_t); - } - } -} - pub fn create_subscription_helper( node: &mut rcl_node_t, topic: &str, ts: *const rosidl_message_type_support_t, qos_profile: QosProfile, diff --git a/r2r/tests/tokio_test_raw.rs b/r2r/tests/tokio_test_raw.rs index 021df62bb..488f89b63 100644 --- a/r2r/tests/tokio_test_raw.rs +++ b/r2r/tests/tokio_test_raw.rs @@ -75,15 +75,16 @@ async fn tokio_subscribe_raw_testing() -> Result<(), Box> } }); - let handle = std::thread::spawn(move || { - for _ in 1..=30 { - node.spin_once(std::time::Duration::from_millis(100)); + let spin_task = tokio::task::spawn_blocking(move || { + loop { + node.spin_once(tokio::time::Duration::from_millis(10)); } }); sub_int_handle.await.unwrap(); sub_array_handle.await.unwrap(); - handle.join().unwrap(); + spin_task.abort(); + spin_task.await; println!("Going to drop tokio_subscribe_raw_testing iteration {i_cycle}"); } diff --git a/r2r/tests/tokio_testing.rs b/r2r/tests/tokio_testing.rs index 96407ec41..2c8a94c65 100644 --- a/r2r/tests/tokio_testing.rs +++ b/r2r/tests/tokio_testing.rs @@ -167,13 +167,13 @@ async fn pub_sub_with_subscriber_after_publisher() -> Result<(), Box("/the_to", QosProfile::default())?; let p_the_no = node.create_publisher::("/the_to", QosProfile::default())?; - let state = Arc::new(Mutex::new(false)); // First publish a message p_the_no .publish(&r2r::std_msgs::msg::Int32 { data: 0xc0ffee })?; - // Then check if that message arrived + // Then check if that message arrived. + // Note that there's no need to spin the node as no waiting is involved let msg = s_the_no .next() .await @@ -210,8 +210,15 @@ async fn pub_sub_with_waiting_subscriber() -> Result<(), Box Date: Mon, 15 Jul 2024 12:24:38 +0000 Subject: [PATCH 6/6] Merge conflicts --- r2r/src/nodes.rs | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/r2r/src/nodes.rs b/r2r/src/nodes.rs index 6302142fc..1557349f9 100644 --- a/r2r/src/nodes.rs +++ b/r2r/src/nodes.rs @@ -920,16 +920,6 @@ impl Node { /// `timeout` is a duration specifying how long the spin should /// block for if there are no pending events. pub fn spin_once(&mut self, timeout: Duration) { - // like std::slice_from_raw_parts, but allows data to be nullptr if len is zero - unsafe fn slice_from_ptr<'a, T>(data : *const T, len : usize) -> &'a [T] { - if data.is_null() { - assert_eq!(len, 0); - std::slice::from_raw_parts(std::ptr::NonNull::::dangling().as_ptr(), 0) - } else { - std::slice::from_raw_parts(data, len) - } - } - // first handle any completed action cancellation responses for a in &mut self.action_servers { a.lock().unwrap().send_completed_cancel_requests(); @@ -1114,9 +1104,6 @@ impl Node { return; } - - let ws_subs = - unsafe { slice_from_ptr(ws.subscriptions, self.subscribers.len()) }; let mut subs_to_remove = vec![]; if ws.subscriptions != std::ptr::null_mut() { let ws_subs = @@ -1134,7 +1121,6 @@ impl Node { self.subscribers .retain(|s| !subs_to_remove.contains(s.handle())); - let ws_timers = unsafe { slice_from_ptr(ws.timers, self.timers.len()) }; let mut timers_to_remove = vec![]; if ws.timers != std::ptr::null_mut() { let ws_timers = unsafe { std::slice::from_raw_parts(ws.timers, self.timers.len()) }; @@ -1152,15 +1138,16 @@ impl Node { self.timers .retain(|t| !timers_to_remove.contains(&*t.timer_handle)); - let ws_clients = unsafe { slice_from_ptr(ws.clients, self.clients.len()) }; - for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) { - if ws_s != &std::ptr::null() { - let mut s = s.lock().unwrap(); - s.handle_response(); + if ws.clients != std::ptr::null_mut() { + let ws_clients = unsafe { std::slice::from_raw_parts(ws.clients, self.clients.len()) }; + for (s, ws_s) in self.clients.iter_mut().zip(ws_clients) { + if ws_s != &std::ptr::null() { + let mut s = s.lock().unwrap(); + s.handle_response(); + } } } - let ws_services = unsafe { slice_from_ptr(ws.services, self.services.len()) }; let mut services_to_remove = vec![]; if ws.services != std::ptr::null_mut() { let ws_services =