Skip to content

Commit

Permalink
timeout on write_all for RosLibRust#206, but this holds up all the th…
Browse files Browse the repository at this point in the history
…e still active subscribers- ought to collect the futures and await them separately, or restructure further to put every tcpstream write_all in a separate tokio task?
  • Loading branch information
lucasw committed Nov 16, 2024
1 parent b6735a6 commit 7189292
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions roslibrust/src/ros1/publisher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,15 @@ use std::{
marker::PhantomData,
net::{Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
use tokio::{
io::AsyncWriteExt,
sync::{mpsc, RwLock},
time::timeout,
};


/// The regular Publisher representation returned by calling advertise on a [crate::ros1::NodeHandle].
pub struct Publisher<T> {
topic_name: String,
Expand Down Expand Up @@ -45,7 +48,7 @@ impl<T: RosMessageType> Publisher<T> {
.send(data)
.await
.map_err(|_| PublisherError::StreamClosed)?;
debug!("Publishing data on topic {}", self.topic_name);
// debug!("Publishing data on topic {}", self.topic_name);
Ok(())
}
}
Expand Down Expand Up @@ -214,7 +217,9 @@ impl Publication {
let mut streams_to_remove = vec![];
// TODO: we're awaiting in a for loop... Could parallelize here
for (stream_idx, stream) in streams.iter_mut().enumerate() {
if let Err(err) = stream.write_all(&msg_to_publish[..]).await {
// TODO(lucasw) distinguish between timeouts and other errors, want to be
// able to recover from the timeout
if let Err(err) = timeout(Duration::from_millis(50), stream.write_all(&msg_to_publish[..])).await {
// TODO: A single failure between nodes that cross host boundaries is probably normal, should make this more robust perhaps
debug!("Failed to send data to subscriber: {err}, removing");
streams_to_remove.push(stream_idx);
Expand Down

0 comments on commit 7189292

Please sign in to comment.