Skip to content

Commit

Permalink
don't close immediately in test thread
Browse files Browse the repository at this point in the history
Signed-off-by: Steve <[email protected]>
  • Loading branch information
stevelr committed Feb 14, 2022
1 parent 8beae75 commit 35c17b9
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions rpc-rs/tests/nats_sub.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
//! test nats subscriptions (queue and non-queue) with rpc_client
#![cfg(test)]

const THREE_SEC: Duration = Duration::from_secs(3);

use std::{str::FromStr as _, time::Duration};
use log::debug;
use wasmbus_rpc::{
error::{RpcError, RpcResult},
rpc_client::RpcClient,
Expand Down Expand Up @@ -111,23 +114,25 @@ async fn listen_queue(
.queue_subscribe(&subject, &queue)
.await
.expect("group subscriber");
println!("listening subj: {} queue: {}", &subject, &queue);
debug!("listening subj: {} queue: {}", &subject, &queue);
while let Some(msg) = sub.next().await {
let payload = String::from_utf8_lossy(&msg.data);
if !pattern.is_match(payload.as_ref()) && &payload != "exit" {
println!("ERROR: payload on {}: {}", &subject, &payload);
debug!("ERROR: payload on {}: {}", &subject, &payload);
break;
}
if let Some(reply_to) = msg.reply {
debug!("listener {} replying ok", &subject);
client.publish(&reply_to, b"ok").await.expect("reply");
}
if &payload == "exit" {
let _ = sub.close().await;
debug!("listener {} received 'exit'", &subject);
//let _ = sub.close().await;
break;
}
count += 1;
}
println!("exiting: {}", count);
println!("listener {} exiting with count {}", &subject, count);
count
})
}
Expand Down Expand Up @@ -236,6 +241,7 @@ async fn queue_sub() -> Result<(), Box<dyn std::error::Error>> {
// This confirms that publishing to queue subscription divides the load,
// and also confirms that a queue group name ('X') is only applicable
// within a topic.
let _ = env_logger::try_init();
let sub_name = uuid::Uuid::new_v4().to_string();
let topic_one = format!("one_{}", &sub_name);
let topic_two = format!("two_{}", &sub_name);
Expand All @@ -260,16 +266,15 @@ async fn queue_sub() -> Result<(), Box<dyn std::error::Error>> {
check_ok(sender.request(&topic_one, b"exit").await?)?;
check_ok(sender.request(&topic_two, b"exit").await?)?;

let v3 = wait_for(thread3, TWO_SEC).await??;
let v2 = wait_for(thread2, TWO_SEC).await??;
let v1 = wait_for(thread1, TWO_SEC).await??;
let v3 = wait_for(thread3, THREE_SEC).await??;
let v2 = wait_for(thread2, THREE_SEC).await??;
let v1 = wait_for(thread1, THREE_SEC).await??;

assert_eq!(v1 + v2, SPLIT_TOTAL as u64, "no loss in queue");
assert_eq!(v3, SINGLE_TOTAL as u64, "no overlap between queues");
Ok(())
}

const TWO_SEC: Duration = Duration::from_secs(2);

async fn wait_for<O, F: futures::Future<Output = O>>(
f: F,
Expand Down

0 comments on commit 35c17b9

Please sign in to comment.