Skip to content

Commit

Permalink
add examples testing publisher recovery works properly
Browse files Browse the repository at this point in the history
Signed-off-by: Marc-Antoine Perennou <[email protected]>
  • Loading branch information
Keruspe committed Oct 1, 2024
1 parent 3a2cb83 commit 4b27a4a
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ waker-fn = "^1.1"

[dev-dependencies]
async-global-executor = "^2.0"
async-io = "^2.0"
futures-lite = "^2.0"
serde_json = "^1.0"
waker-fn = "^1.1"
Expand Down
58 changes: 58 additions & 0 deletions examples/c.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use futures_lite::StreamExt;
use lapin::{options::*, types::FieldTable, Connection, ConnectionProperties};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
unsafe { std::env::set_var("RUST_LOG", "info") };

Check warning on line 7 in examples/c.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, 1.74.0)

unnecessary `unsafe` block

Check warning on line 7 in examples/c.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, 1.74.0)

unnecessary `unsafe` block

Check warning on line 7 in examples/c.rs

View workflow job for this annotation

GitHub Actions / build_and_test (ubuntu-latest, 1.74.0)

unnecessary `unsafe` block
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());

async_global_executor::block_on(async {
let conn = Connection::connect(&addr, ConnectionProperties::default())
.await
.expect("connection error");

info!("CONNECTED");

//receive channel
let channel = conn.create_channel().await.expect("create_channel");
info!(state=?conn.status().state());

let queue = channel
.queue_declare(
"hello-recover",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");
info!(state=?conn.status().state());
info!(?queue, "Declared queue");

info!("will consume");
let mut consumer = channel
.basic_consume(
"hello-recover",
"my_consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("basic_consume");
info!(state=?conn.status().state());

while let Some(delivery) = consumer.next().await {
info!(message=?delivery, "received message");
if let Ok(delivery) = delivery {
delivery
.ack(BasicAckOptions::default())
.await
.expect("basic_ack");
}
}
})
}
104 changes: 104 additions & 0 deletions examples/p.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
use lapin::{
options::*, types::FieldTable, BasicProperties, ChannelState, Connection, ConnectionProperties,
Error,
};
use tracing::info;

fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}

tracing_subscriber::fmt::init();

let addr = std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
let recovery_config = lapin::experimental::RecoveryConfig {
auto_recover_channels: true,
};

async_global_executor::block_on(async {
let conn = Connection::connect(
&addr,
ConnectionProperties::default().with_experimental_recovery_config(recovery_config),
)
.await
.expect("connection error");

info!("CONNECTED");

let channel1 = conn.create_channel().await.expect("create_channel");
channel1
.confirm_select(ConfirmSelectOptions::default())
.await
.expect("confirm_select");
channel1
.queue_declare(
"hello-recover",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.expect("queue_declare");

let ch = channel1.clone();
async_global_executor::spawn(async move {
loop {
async_io::Timer::after(std::time::Duration::from_secs(1)).await;
info!("Trigger failure");
assert!(ch
.queue_declare(
"fake queue",
QueueDeclareOptions {
passive: true,
..QueueDeclareOptions::default()
},
FieldTable::default(),
)
.await
.is_err());
}
})
.detach();

let mut published = 0;
let mut errors = 0;
info!("will publish");
loop {
let res = channel1
.basic_publish(
"",
"recover-test",
BasicPublishOptions::default(),
b"before",
BasicProperties::default(),
)
.await;
let res = if let Ok(res) = res {
res.await.map(|_| ())
} else {
res.map(|_| ())
};
match res {
Ok(()) => {
println!("GOT OK");
published += 1;
}
Err(err) => {
println!("GOT ERROR");
match err {
Error::InvalidChannelState(ChannelState::Reconnecting, Some(notifier)) => {
notifier.await
}
err => {
if !err.is_amqp_soft_error() {
panic!("{}", err);
}
errors += 1;
}
}
}
}
println!("Published {} with {} errors", published, errors);
}
});
}
2 changes: 1 addition & 1 deletion src/id_sequence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl<
// self.id is actually the next (so that first call to next returns 0
// if we're 0 (or 1 if 0 is not allowed), either we haven't started yet, or last number we yielded (current one) is
// the max.
if self.id == self.first() {
if self.id <= self.first() {
self.max
} else {
Some(self.id - self.one)
Expand Down

0 comments on commit 4b27a4a

Please sign in to comment.