Skip to content

Commit

Permalink
heartbeat: monitor incoming heartbeats
Browse files Browse the repository at this point in the history
Fixes #401 #403
  • Loading branch information
Keruspe committed Apr 22, 2024
1 parent b49d907 commit 33ace98
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 4 deletions.
37 changes: 33 additions & 4 deletions src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use crate::{
use amq_protocol::frame::{AMQPFrame, ProtocolVersion};
use executor_trait::FullExecutor;
use parking_lot::Mutex;
use std::{collections::HashMap, fmt, sync::Arc};
use std::{
collections::HashMap,
fmt,
sync::Arc,
time::{Duration, Instant},
};
use tracing::{debug, error, level_enabled, trace, Level};

#[derive(Clone)]
Expand Down Expand Up @@ -80,6 +85,12 @@ impl Channels {
self.inner.lock().channels.get(&id).cloned()
}

pub(crate) fn get_for_frame(&self, id: ChannelId) -> Option<Channel> {
let mut inner = self.inner.lock();
inner.frame_received();
inner.channels.get(&id).cloned()
}

pub(crate) fn remove(&self, id: ChannelId, error: Error) -> Result<()> {
self.frames.clear_expected_replies(id, error);
if self.inner.lock().channels.remove(&id).is_some() {
Expand All @@ -90,7 +101,7 @@ impl Channels {
}

pub(crate) fn receive_method(&self, id: ChannelId, method: AMQPClass) -> Result<()> {
self.get(id)
self.get_for_frame(id)
.map(|channel| channel.receive_method(method))
.unwrap_or_else(|| Err(Error::InvalidChannel(id)))
}
Expand All @@ -102,13 +113,13 @@ impl Channels {
size: PayloadSize,
properties: BasicProperties,
) -> Result<()> {
self.get(id)
self.get_for_frame(id)
.map(|channel| channel.handle_content_header_frame(class_id, size, properties))
.unwrap_or_else(|| Err(Error::InvalidChannel(id)))
}

pub(crate) fn handle_body_frame(&self, id: ChannelId, payload: Vec<u8>) -> Result<()> {
self.get(id)
self.get_for_frame(id)
.map(|channel| channel.handle_body_frame(payload))
.unwrap_or_else(|| Err(Error::InvalidChannel(id)))
}
Expand Down Expand Up @@ -199,6 +210,7 @@ impl Channels {
AMQPFrame::Heartbeat(channel_id) => {
if channel_id == 0 {
debug!("received heartbeat from server");
self.inner.lock().frame_received();
} else {
error!(channel=%channel_id, "received invalid heartbeat");
let error = AMQPError::new(
Expand Down Expand Up @@ -271,6 +283,15 @@ impl Channels {
.map(Channel::topology)
.collect()
}

pub(crate) fn check_connection(&self, timeout: Duration) {
let latest = self.inner.lock().latest_received_frame;
if latest.elapsed() > timeout {
// We didn't get any frame nor heartbeat from the server for too long
self.internal_rpc
.set_connection_error(Error::MissingHeartbeatError);
}
}
}

impl fmt::Debug for Channels {
Expand All @@ -294,6 +315,7 @@ struct Inner {
channels: HashMap<ChannelId, Channel>,
channel_id: IdSequence<ChannelId>,
configuration: Configuration,
latest_received_frame: Instant,
waker: SocketStateHandle,
}

Expand All @@ -303,6 +325,9 @@ impl Inner {
channels: HashMap::default(),
channel_id: IdSequence::new(false),
configuration,
// Let's consider we just received a frame when setting everything up.
// This will get updated with the connection frames anyways.
latest_received_frame: Instant::now(),
waker,
}
}
Expand Down Expand Up @@ -369,4 +394,8 @@ impl Inner {
}
Err(Error::ChannelsLimitReached)
}

fn frame_received(&mut self) {
self.latest_received_frame = Instant::now();
}
}
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub enum Error {
ParsingError(ParserError),
ProtocolError(AMQPError),
SerialisationError(Arc<GenError>),

MissingHeartbeatError,
}

impl Error {
Expand Down Expand Up @@ -67,6 +69,10 @@ impl fmt::Display for Error {
Error::ParsingError(e) => write!(f, "failed to parse: {}", e),
Error::ProtocolError(e) => write!(f, "protocol error: {}", e),
Error::SerialisationError(e) => write!(f, "failed to serialise: {}", e),

Error::MissingHeartbeatError => {
write!(f, "no heartbeat received from server for too long")
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl Default for Inner {
impl Inner {
fn poll_timeout(&mut self, channels: &Channels) -> Option<Duration> {
self.timeout.map(|timeout| {
// The value stored in timeout is half the configured heartbeat value as the spec
// recommends to send heartbeats at twice the configured pace.
// The specs tells us to close the connection after once twice the configured interval
// has passed.
channels.check_connection(timeout * 4);
timeout
.checked_sub(self.last_write.elapsed())
.map(|timeout| timeout.max(Duration::from_millis(1)))
Expand Down

0 comments on commit 33ace98

Please sign in to comment.