From 7189292fe7e9d87517f7ede188c3bcc8a97706ff Mon Sep 17 00:00:00 2001 From: Lucas Walter Date: Sat, 16 Nov 2024 10:03:55 -0800 Subject: [PATCH] timeout on write_all for #206, but this holds up all the the still active subscribers- ought to collect the futures and await them separately, or restructure further to put every tcpstream write_all in a separate tokio task? --- roslibrust/src/ros1/publisher.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/roslibrust/src/ros1/publisher.rs b/roslibrust/src/ros1/publisher.rs index 3294d25..f8e7923 100644 --- a/roslibrust/src/ros1/publisher.rs +++ b/roslibrust/src/ros1/publisher.rs @@ -10,12 +10,15 @@ use std::{ marker::PhantomData, net::{Ipv4Addr, SocketAddr}, sync::Arc, + time::Duration, }; use tokio::{ io::AsyncWriteExt, sync::{mpsc, RwLock}, + time::timeout, }; + /// The regular Publisher representation returned by calling advertise on a [crate::ros1::NodeHandle]. pub struct Publisher { topic_name: String, @@ -45,7 +48,7 @@ impl Publisher { .send(data) .await .map_err(|_| PublisherError::StreamClosed)?; - debug!("Publishing data on topic {}", self.topic_name); + // debug!("Publishing data on topic {}", self.topic_name); Ok(()) } } @@ -214,7 +217,9 @@ impl Publication { let mut streams_to_remove = vec![]; // TODO: we're awaiting in a for loop... Could parallelize here for (stream_idx, stream) in streams.iter_mut().enumerate() { - if let Err(err) = stream.write_all(&msg_to_publish[..]).await { + // TODO(lucasw) distinguish between timeouts and other errors, want to be + // able to recover from the timeout + if let Err(err) = timeout(Duration::from_millis(50), stream.write_all(&msg_to_publish[..])).await { // TODO: A single failure between nodes that cross host boundaries is probably normal, should make this more robust perhaps debug!("Failed to send data to subscriber: {err}, removing"); streams_to_remove.push(stream_idx);