From 9cce951e68efd5c92ead0250bf69720b22911a39 Mon Sep 17 00:00:00 2001 From: Lucas Walter Date: Tue, 6 Aug 2024 08:51:03 -0700 Subject: [PATCH 1/3] subscribe to any topic of any type and get received messages as a raw vector of bytes --- CHANGELOG.md | 1 + roslibrust/src/ros1/node/handle.rs | 13 ++++++++ roslibrust/src/ros1/subscriber.rs | 25 +++++++++++++++- .../tests/ros1_native_integration_tests.rs | 30 +++++++++++++++++++ roslibrust_codegen/src/integral_types.rs | 7 +++++ 5 files changed, 75 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c39fd56..9215f28d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - ROS1 Native Publishers now support latching behavior - The XML RPC client for interacting directly with the rosmaster server has been exposed as a public API - Experimental: Initial support for writing generic clients that can be compile time specialized for rosbridge or ros1 +- Can subscribe to any topic and get raw bytes instead of a deserialized message of known type ### Fixed diff --git a/roslibrust/src/ros1/node/handle.rs b/roslibrust/src/ros1/node/handle.rs index 33d02efb..42166cf0 100644 --- a/roslibrust/src/ros1/node/handle.rs +++ b/roslibrust/src/ros1/node/handle.rs @@ -2,6 +2,7 @@ use super::actor::{Node, NodeServerHandle}; use crate::{ ros1::{ names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber, + subscriber::SubscriberAny, NodeError, ServiceServer, }, ServiceFn, @@ -75,6 +76,18 @@ impl NodeHandle { Ok(Publisher::new(topic_name, sender)) } + pub async fn subscribe_any( + &self, + topic_name: &str, + queue_size: usize, + ) -> Result { + let receiver = self + .inner + .register_subscriber::>(topic_name, queue_size) + .await?; + Ok(SubscriberAny::new(receiver)) + } + pub async fn subscribe( &self, topic_name: &str, diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index a19058df..4d4861e4 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -39,6 +39,29 @@ impl Subscriber { } } +pub struct SubscriberAny { + receiver: broadcast::Receiver>, + _phantom: PhantomData>, +} + +impl SubscriberAny { + pub(crate) fn new(receiver: broadcast::Receiver>) -> Self { + Self { + receiver, + _phantom: PhantomData, + } + } + + pub async fn next(&mut self) -> Option, SubscriberError>> { + let data = match self.receiver.recv().await { + Ok(v) => v, + Err(RecvError::Closed) => return None, + Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))), + }; + Some(Ok(data)) + } +} + pub struct Subscription { subscription_tasks: Vec>, _msg_receiver: broadcast::Receiver>, @@ -154,7 +177,7 @@ async fn establish_publisher_connection( stream.write_all(&conn_header_bytes[..]).await?; if let Ok(responded_header) = tcpros::receive_header(&mut stream).await { - if conn_header.md5sum == responded_header.md5sum { + if conn_header.md5sum == Some("*".to_string()) || conn_header.md5sum == responded_header.md5sum { log::debug!( "Established connection with publisher for {:?}", conn_header.topic diff --git a/roslibrust/tests/ros1_native_integration_tests.rs b/roslibrust/tests/ros1_native_integration_tests.rs index b4cc3ec6..42a9c7d8 100644 --- a/roslibrust/tests/ros1_native_integration_tests.rs +++ b/roslibrust/tests/ros1_native_integration_tests.rs @@ -12,6 +12,36 @@ mod tests { "assets/ros1_common_interfaces" ); + #[test_log::test(tokio::test)] + async fn test_subscribe_any() { + // get a single message in raw bytes and test the bytes are as expected + let nh = NodeHandle::new("http://localhost:11311", "test_subscribe_any") + .await + .unwrap(); + + let publisher = nh + .advertise::("/test_subscribe_any", 1, true) + .await + .unwrap(); + + let mut subscriber = nh + .subscribe_any("/test_subscribe_any", 1) + .await + .unwrap(); + + publisher + .publish(&std_msgs::String { + data: "test".to_owned(), + }) + .await + .unwrap(); + + let res = + tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await; + let res = res.unwrap().unwrap().unwrap(); + assert!(res == vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116]); + } + #[test_log::test(tokio::test)] async fn test_latching() { let nh = NodeHandle::new("http://localhost:11311", "test_latching") diff --git a/roslibrust_codegen/src/integral_types.rs b/roslibrust_codegen/src/integral_types.rs index 230fe8ea..f7d7b7bc 100644 --- a/roslibrust_codegen/src/integral_types.rs +++ b/roslibrust_codegen/src/integral_types.rs @@ -36,6 +36,13 @@ impl RosMessageType for Time { const DEFINITION: &'static str = ""; } +// The equivalent of rospy AnyMsg or C++ ShapeShifter, subscribe_any() uses this type +impl RosMessageType for Vec { + const ROS_TYPE_NAME: &'static str = "*"; + const MD5SUM: &'static str = "*"; + const DEFINITION: &'static str = ""; +} + // TODO provide chrono conversions here behind a cfg flag /// Matches the integral ros1 duration type, with extensions for ease of use From 09b3c5e428094243e40231d904edbe1508dd6a41 Mon Sep 17 00:00:00 2001 From: Lucas Walter Date: Wed, 14 Aug 2024 15:37:21 -0700 Subject: [PATCH 2/3] newtype ShapeShifter instead of raw Vec, but still return Vec from next() on SubscriberAny --- roslibrust/src/ros1/node/handle.rs | 2 +- roslibrust/src/ros1/subscriber.rs | 5 +++-- roslibrust_codegen/src/integral_types.rs | 5 ++++- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/roslibrust/src/ros1/node/handle.rs b/roslibrust/src/ros1/node/handle.rs index 42166cf0..a8b47626 100644 --- a/roslibrust/src/ros1/node/handle.rs +++ b/roslibrust/src/ros1/node/handle.rs @@ -83,7 +83,7 @@ impl NodeHandle { ) -> Result { let receiver = self .inner - .register_subscriber::>(topic_name, queue_size) + .register_subscriber::(topic_name, queue_size) .await?; Ok(SubscriberAny::new(receiver)) } diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index 4d4861e4..057c016c 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -1,6 +1,6 @@ use crate::ros1::{names::Name, tcpros::ConnectionHeader}; use abort_on_drop::ChildTask; -use roslibrust_codegen::RosMessageType; +use roslibrust_codegen::{RosMessageType, ShapeShifter}; use std::{marker::PhantomData, sync::Arc}; use tokio::{ io::AsyncWriteExt, @@ -41,7 +41,7 @@ impl Subscriber { pub struct SubscriberAny { receiver: broadcast::Receiver>, - _phantom: PhantomData>, + _phantom: PhantomData, } impl SubscriberAny { @@ -52,6 +52,7 @@ impl SubscriberAny { } } + // pub async fn next(&mut self) -> Option> { pub async fn next(&mut self) -> Option, SubscriberError>> { let data = match self.receiver.recv().await { Ok(v) => v, diff --git a/roslibrust_codegen/src/integral_types.rs b/roslibrust_codegen/src/integral_types.rs index f7d7b7bc..736c419a 100644 --- a/roslibrust_codegen/src/integral_types.rs +++ b/roslibrust_codegen/src/integral_types.rs @@ -36,8 +36,11 @@ impl RosMessageType for Time { const DEFINITION: &'static str = ""; } +#[derive(:: serde :: Deserialize, :: serde :: Serialize, Debug, Default, Clone, PartialEq)] +pub struct ShapeShifter(Vec); + // The equivalent of rospy AnyMsg or C++ ShapeShifter, subscribe_any() uses this type -impl RosMessageType for Vec { +impl RosMessageType for ShapeShifter { const ROS_TYPE_NAME: &'static str = "*"; const MD5SUM: &'static str = "*"; const DEFINITION: &'static str = ""; From cd953b6f6353bdc9d2b18c1593e6b39cdaf33699 Mon Sep 17 00:00:00 2001 From: Carter Date: Thu, 15 Aug 2024 14:57:52 -0600 Subject: [PATCH 3/3] Run cargo fmt --- roslibrust/src/ros1/node/handle.rs | 3 +-- roslibrust/src/ros1/subscriber.rs | 4 +++- roslibrust/tests/ros1_native_integration_tests.rs | 5 +---- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/roslibrust/src/ros1/node/handle.rs b/roslibrust/src/ros1/node/handle.rs index a8b47626..3ef3e6ea 100644 --- a/roslibrust/src/ros1/node/handle.rs +++ b/roslibrust/src/ros1/node/handle.rs @@ -2,8 +2,7 @@ use super::actor::{Node, NodeServerHandle}; use crate::{ ros1::{ names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber, - subscriber::SubscriberAny, - NodeError, ServiceServer, + subscriber::SubscriberAny, NodeError, ServiceServer, }, ServiceFn, }; diff --git a/roslibrust/src/ros1/subscriber.rs b/roslibrust/src/ros1/subscriber.rs index 057c016c..900453b2 100644 --- a/roslibrust/src/ros1/subscriber.rs +++ b/roslibrust/src/ros1/subscriber.rs @@ -178,7 +178,9 @@ async fn establish_publisher_connection( stream.write_all(&conn_header_bytes[..]).await?; if let Ok(responded_header) = tcpros::receive_header(&mut stream).await { - if conn_header.md5sum == Some("*".to_string()) || conn_header.md5sum == responded_header.md5sum { + if conn_header.md5sum == Some("*".to_string()) + || conn_header.md5sum == responded_header.md5sum + { log::debug!( "Established connection with publisher for {:?}", conn_header.topic diff --git a/roslibrust/tests/ros1_native_integration_tests.rs b/roslibrust/tests/ros1_native_integration_tests.rs index 42a9c7d8..710be2fc 100644 --- a/roslibrust/tests/ros1_native_integration_tests.rs +++ b/roslibrust/tests/ros1_native_integration_tests.rs @@ -24,10 +24,7 @@ mod tests { .await .unwrap(); - let mut subscriber = nh - .subscribe_any("/test_subscribe_any", 1) - .await - .unwrap(); + let mut subscriber = nh.subscribe_any("/test_subscribe_any", 1).await.unwrap(); publisher .publish(&std_msgs::String {