From 35c17b954578297731fdb1c229e281bad0ca1e75 Mon Sep 17 00:00:00 2001 From: Steve Date: Mon, 14 Feb 2022 14:29:40 -0800 Subject: [PATCH] don't close immediately in test thread Signed-off-by: Steve --- rpc-rs/tests/nats_sub.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/rpc-rs/tests/nats_sub.rs b/rpc-rs/tests/nats_sub.rs index 8bfe1e3..e4f4c75 100644 --- a/rpc-rs/tests/nats_sub.rs +++ b/rpc-rs/tests/nats_sub.rs @@ -1,7 +1,10 @@ //! test nats subscriptions (queue and non-queue) with rpc_client #![cfg(test)] +const THREE_SEC: Duration = Duration::from_secs(3); + use std::{str::FromStr as _, time::Duration}; +use log::debug; use wasmbus_rpc::{ error::{RpcError, RpcResult}, rpc_client::RpcClient, @@ -111,23 +114,25 @@ async fn listen_queue( .queue_subscribe(&subject, &queue) .await .expect("group subscriber"); - println!("listening subj: {} queue: {}", &subject, &queue); + debug!("listening subj: {} queue: {}", &subject, &queue); while let Some(msg) = sub.next().await { let payload = String::from_utf8_lossy(&msg.data); if !pattern.is_match(payload.as_ref()) && &payload != "exit" { - println!("ERROR: payload on {}: {}", &subject, &payload); + debug!("ERROR: payload on {}: {}", &subject, &payload); break; } if let Some(reply_to) = msg.reply { + debug!("listener {} replying ok", &subject); client.publish(&reply_to, b"ok").await.expect("reply"); } if &payload == "exit" { - let _ = sub.close().await; + debug!("listener {} received 'exit'", &subject); + //let _ = sub.close().await; break; } count += 1; } - println!("exiting: {}", count); + println!("listener {} exiting with count {}", &subject, count); count }) } @@ -236,6 +241,7 @@ async fn queue_sub() -> Result<(), Box> { // This confirms that publishing to queue subscription divides the load, // and also confirms that a queue group name ('X') is only applicable // within a topic. + let _ = env_logger::try_init(); let sub_name = uuid::Uuid::new_v4().to_string(); let topic_one = format!("one_{}", &sub_name); let topic_two = format!("two_{}", &sub_name); @@ -260,16 +266,15 @@ async fn queue_sub() -> Result<(), Box> { check_ok(sender.request(&topic_one, b"exit").await?)?; check_ok(sender.request(&topic_two, b"exit").await?)?; - let v3 = wait_for(thread3, TWO_SEC).await??; - let v2 = wait_for(thread2, TWO_SEC).await??; - let v1 = wait_for(thread1, TWO_SEC).await??; + let v3 = wait_for(thread3, THREE_SEC).await??; + let v2 = wait_for(thread2, THREE_SEC).await??; + let v1 = wait_for(thread1, THREE_SEC).await??; assert_eq!(v1 + v2, SPLIT_TOTAL as u64, "no loss in queue"); assert_eq!(v3, SINGLE_TOTAL as u64, "no overlap between queues"); Ok(()) } -const TWO_SEC: Duration = Duration::from_secs(2); async fn wait_for>( f: F,