Skip to content

Commit

Permalink
Merge pull request RosLibRust#192 from lucasw/subscriber_any2
Browse files Browse the repository at this point in the history
subscribe to any topic of any type and get received messages as a raw vector of bytes
  • Loading branch information
Carter12s authored Aug 15, 2024
2 parents b9c6d91 + cd953b6 commit f9ccf04
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- ROS1 Native Publishers now support latching behavior
- The XML RPC client for interacting directly with the rosmaster server has been exposed as a public API
- Experimental: Initial support for writing generic clients that can be compile time specialized for rosbridge or ros1
- Can subscribe to any topic and get raw bytes instead of a deserialized message of known type

### Fixed

Expand Down
14 changes: 13 additions & 1 deletion roslibrust/src/ros1/node/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::actor::{Node, NodeServerHandle};
use crate::{
ros1::{
names::Name, publisher::Publisher, service_client::ServiceClient, subscriber::Subscriber,
NodeError, ServiceServer,
subscriber::SubscriberAny, NodeError, ServiceServer,
},
ServiceFn,
};
Expand Down Expand Up @@ -75,6 +75,18 @@ impl NodeHandle {
Ok(Publisher::new(topic_name, sender))
}

pub async fn subscribe_any(
&self,
topic_name: &str,
queue_size: usize,
) -> Result<SubscriberAny, NodeError> {
let receiver = self
.inner
.register_subscriber::<roslibrust_codegen::ShapeShifter>(topic_name, queue_size)
.await?;
Ok(SubscriberAny::new(receiver))
}

pub async fn subscribe<T: roslibrust_codegen::RosMessageType>(
&self,
topic_name: &str,
Expand Down
30 changes: 28 additions & 2 deletions roslibrust/src/ros1/subscriber.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::ros1::{names::Name, tcpros::ConnectionHeader};
use abort_on_drop::ChildTask;
use roslibrust_codegen::RosMessageType;
use roslibrust_codegen::{RosMessageType, ShapeShifter};
use std::{marker::PhantomData, sync::Arc};
use tokio::{
io::AsyncWriteExt,
Expand Down Expand Up @@ -39,6 +39,30 @@ impl<T: RosMessageType> Subscriber<T> {
}
}

pub struct SubscriberAny {
receiver: broadcast::Receiver<Vec<u8>>,
_phantom: PhantomData<ShapeShifter>,
}

impl SubscriberAny {
pub(crate) fn new(receiver: broadcast::Receiver<Vec<u8>>) -> Self {
Self {
receiver,
_phantom: PhantomData,
}
}

// pub async fn next(&mut self) -> Option<Result<ShapeShifter, SubscriberError>> {
pub async fn next(&mut self) -> Option<Result<Vec<u8>, SubscriberError>> {
let data = match self.receiver.recv().await {
Ok(v) => v,
Err(RecvError::Closed) => return None,
Err(RecvError::Lagged(n)) => return Some(Err(SubscriberError::Lagged(n))),
};
Some(Ok(data))
}
}

pub struct Subscription {
subscription_tasks: Vec<ChildTask<()>>,
_msg_receiver: broadcast::Receiver<Vec<u8>>,
Expand Down Expand Up @@ -154,7 +178,9 @@ async fn establish_publisher_connection(
stream.write_all(&conn_header_bytes[..]).await?;

if let Ok(responded_header) = tcpros::receive_header(&mut stream).await {
if conn_header.md5sum == responded_header.md5sum {
if conn_header.md5sum == Some("*".to_string())
|| conn_header.md5sum == responded_header.md5sum
{
log::debug!(
"Established connection with publisher for {:?}",
conn_header.topic
Expand Down
27 changes: 27 additions & 0 deletions roslibrust/tests/ros1_native_integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,33 @@ mod tests {
"assets/ros1_common_interfaces"
);

#[test_log::test(tokio::test)]
async fn test_subscribe_any() {
// get a single message in raw bytes and test the bytes are as expected
let nh = NodeHandle::new("http://localhost:11311", "test_subscribe_any")
.await
.unwrap();

let publisher = nh
.advertise::<std_msgs::String>("/test_subscribe_any", 1, true)
.await
.unwrap();

let mut subscriber = nh.subscribe_any("/test_subscribe_any", 1).await.unwrap();

publisher
.publish(&std_msgs::String {
data: "test".to_owned(),
})
.await
.unwrap();

let res =
tokio::time::timeout(tokio::time::Duration::from_millis(250), subscriber.next()).await;
let res = res.unwrap().unwrap().unwrap();
assert!(res == vec![8, 0, 0, 0, 4, 0, 0, 0, 116, 101, 115, 116]);
}

#[test_log::test(tokio::test)]
async fn test_latching() {
let nh = NodeHandle::new("http://localhost:11311", "test_latching")
Expand Down
10 changes: 10 additions & 0 deletions roslibrust_codegen/src/integral_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ impl RosMessageType for Time {
const DEFINITION: &'static str = "";
}

#[derive(:: serde :: Deserialize, :: serde :: Serialize, Debug, Default, Clone, PartialEq)]
pub struct ShapeShifter(Vec<u8>);

// The equivalent of rospy AnyMsg or C++ ShapeShifter, subscribe_any() uses this type
impl RosMessageType for ShapeShifter {
const ROS_TYPE_NAME: &'static str = "*";
const MD5SUM: &'static str = "*";
const DEFINITION: &'static str = "";
}

// TODO provide chrono conversions here behind a cfg flag

/// Matches the integral ros1 duration type, with extensions for ease of use
Expand Down

0 comments on commit f9ccf04

Please sign in to comment.