Skip to content

Commit

Permalink
fix(heartbeat): stop heartbeat if connection is not active
Browse files Browse the repository at this point in the history
  • Loading branch information
slavik-pastushenko committed Apr 22, 2024
1 parent b49d907 commit 9f234cd
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
9 changes: 9 additions & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,11 @@ impl Channel {
self.waker.wake()
}

fn readable(&self) {
trace!(channel=%self.id, "readable");
self.waker.readable()
}

fn assert_channel0(&self, class_id: Identifier, method_id: Identifier) -> Result<()> {
if self.id == 0 {
Ok(())
Expand Down Expand Up @@ -374,6 +379,7 @@ impl Channel {
trace!(channel=%self.id, "send_frame");
self.frames.push(self.id, frame, resolver, expected_reply);
self.wake();
self.readable();
}

async fn send_method_frame_with_body(
Expand Down Expand Up @@ -405,6 +411,7 @@ impl Channel {
trace!(channel=%self.id, "send_frames");
let promise = self.frames.push_frames(frames);
self.wake();
self.readable();
promise.await?;
Ok(publisher_confirms_result
.unwrap_or_else(|| PublisherConfirm::not_requested(self.returned_messages.clone())))
Expand Down Expand Up @@ -842,6 +849,8 @@ impl Channel {
) -> Result<()> {
self.connection_status.unblock();
self.wake();
self.readable();

Ok(())
}

Expand Down
23 changes: 22 additions & 1 deletion src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,29 @@ impl Channels {
.all(|c| c.status().flow())
}

pub(crate) fn send_heartbeat(&self) {
pub(crate) fn send_heartbeat(&self) -> Result<()> {
debug!("send heartbeat");

if let Some(channel0) = self.get(0) {
debug!("connection status: {:?}", self.connection_status.state());

if !self.connection_status.connected() {
let error = AMQPError::new(
AMQPHardError::FRAMEERROR.into(),
"heartbeat frame was not received on channel 0".into(),
);

self.internal_rpc.register_internal_future(async move {
channel0
.connection_close(error.get_id(), error.get_message().as_str(), 0, 0)
.await
});

return Err(Error::InvalidConnectionState(
self.connection_status.state(),
));
}

let (promise, resolver) = Promise::new();

if level_enabled!(Level::TRACE) {
Expand All @@ -167,6 +186,8 @@ impl Channels {
channel0.send_frame(AMQPFrame::Heartbeat(0), resolver, None);
self.internal_rpc.register_internal_future(promise);
}

Ok(())
}

pub(crate) fn handle_frame(&self, f: AMQPFrame) -> Result<()> {
Expand Down
8 changes: 7 additions & 1 deletion src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
sync::Arc,
time::{Duration, Instant},
};
use tracing::error;

#[derive(Clone)]
pub struct Heartbeat {
Expand Down Expand Up @@ -86,7 +87,12 @@ impl Inner {
.unwrap_or_else(|| {
// Update last_write so that if we cannot write to the socket yet, we don't enqueue countless heartbeats
self.update_last_write();
channels.send_heartbeat();

if let Err(err) = channels.send_heartbeat() {
self.timeout = None;
error!("Failed to send heartbeat: {}", err);
}

timeout
})
})
Expand Down
6 changes: 6 additions & 0 deletions src/socket_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ impl SocketStateHandle {
let _ = self.sender.send(event);
}

/// Wake the socket up
pub fn wake(&self) {
self.send(SocketEvent::Wake);
}

/// Notify that the socket is readable
pub fn readable(&self) {
self.send(SocketEvent::Readable);
}
}

0 comments on commit 9f234cd

Please sign in to comment.