diff --git a/omniqueue/src/backends/gcp_pubsub.rs b/omniqueue/src/backends/gcp_pubsub.rs index 16227b4..39dd0f3 100644 --- a/omniqueue/src/backends/gcp_pubsub.rs +++ b/omniqueue/src/backends/gcp_pubsub.rs @@ -15,6 +15,7 @@ use google_cloud_pubsub::subscription::Subscription; use serde::Serialize; use std::path::{Path, PathBuf}; use std::sync::Arc; +use std::time::Duration; use std::{any::TypeId, collections::HashMap}; pub struct GcpPubSubBackend; @@ -217,31 +218,75 @@ async fn subscription(client: &Client, subscription_id: &str) -> Result Result { - let subscription = subscription(&self.client, &self.subscription_id).await?; - let mut stream = subscription - .subscribe(None) - .await - .map_err(QueueError::generic)?; - - let mut recv_msg = stream.next().await.ok_or_else(|| QueueError::NoData)?; +impl GcpPubSubConsumer { + fn wrap_recv_msg(&self, mut recv_msg: ReceivedMessage) -> Delivery { // FIXME: would be nice to avoid having to move the data out here. // While it's possible to ack via a subscription and an ack_id, nack is only // possible via a `ReceiveMessage`. This means we either need to hold 2 copies of // the payload, or move the bytes out so they can be returned _outside of the Acker_. let payload = recv_msg.message.data.drain(..).collect(); - Ok(Delivery { + + Delivery { decoders: self.registry.clone(), acker: Box::new(GcpPubSubAcker { recv_msg, subscription_id: self.subscription_id.clone(), }), payload: Some(payload), - }) + } + } +} + +#[async_trait] +impl QueueConsumer for GcpPubSubConsumer { + type Payload = Payload; + + async fn receive(&mut self) -> Result { + let subscription = subscription(&self.client, &self.subscription_id).await?; + let mut stream = subscription + .subscribe(None) + .await + .map_err(QueueError::generic)?; + + let recv_msg = stream.next().await.ok_or_else(|| QueueError::NoData)?; + + Ok(self.wrap_recv_msg(recv_msg)) + } + + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError> { + let subscription = subscription(&self.client, &self.subscription_id).await?; + + let mut out = Vec::with_capacity(max_messages); + + if let Ok(messages) = subscription.pull(max_messages as _, None).await { + out.extend(messages.into_iter().map(|m| self.wrap_recv_msg(m))); + if out.len() >= max_messages { + return Ok(out); + } + + let mut interval = tokio::time::interval(deadline); + interval.tick().await; + + loop { + tokio::select! { + _ = interval.tick() => break, + messages = subscription.pull(max_messages.saturating_sub(out.len()) as _, None) => { + if let Ok(messages) = messages { + out.extend(messages.into_iter().map(|m| self.wrap_recv_msg(m))); + if out.len() >= max_messages { + break; + } + } + } + } + } + } + + Ok(out) } } diff --git a/omniqueue/src/backends/memory_queue.rs b/omniqueue/src/backends/memory_queue.rs index 0733805..005fb0a 100644 --- a/omniqueue/src/backends/memory_queue.rs +++ b/omniqueue/src/backends/memory_queue.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::{any::TypeId, collections::HashMap}; use async_trait::async_trait; @@ -90,14 +91,9 @@ pub struct MemoryQueueConsumer { tx: broadcast::Sender>, } -#[async_trait] -impl QueueConsumer for MemoryQueueConsumer { - type Payload = Vec; - - async fn receive(&mut self) -> Result { - let payload = self.rx.recv().await.map_err(QueueError::generic)?; - - Ok(Delivery { +impl MemoryQueueConsumer { + fn wrap_payload(&self, payload: Vec) -> Delivery { + Delivery { payload: Some(payload.clone()), decoders: self.registry.clone(), acker: Box::new(MemoryQueueAcker { @@ -105,7 +101,50 @@ impl QueueConsumer for MemoryQueueConsumer { payload_copy: Some(payload), alredy_acked_or_nacked: false, }), - }) + } + } +} + +#[async_trait] +impl QueueConsumer for MemoryQueueConsumer { + type Payload = Vec; + + async fn receive(&mut self) -> Result { + let payload = self.rx.recv().await.map_err(QueueError::generic)?; + Ok(self.wrap_payload(payload)) + } + + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError> { + let mut out = Vec::with_capacity(max_messages); + + // Await at least one delivery before starting the clock + let msg = self.rx.recv().await; + let delivery = msg + .map(|payload| self.wrap_payload(payload)) + .map_err(QueueError::generic)?; + out.push(delivery); + + let mut interval = tokio::time::interval(deadline); + // Skip the first tick which is instantaneous + interval.tick().await; + loop { + tokio::select! { + _ = interval.tick() => break, + msg = self.rx.recv() => { + let delivery = msg + .map(|payload| self.wrap_payload(payload)).map_err(QueueError::generic)?; + out.push(delivery); + if out.len() >= max_messages { + break; + } + } + } + } + Ok(out) } } diff --git a/omniqueue/src/backends/rabbitmq.rs b/omniqueue/src/backends/rabbitmq.rs index 059d0d2..f97ce82 100644 --- a/omniqueue/src/backends/rabbitmq.rs +++ b/omniqueue/src/backends/rabbitmq.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::{any::TypeId, collections::HashMap}; use async_trait::async_trait; @@ -24,6 +25,7 @@ pub struct RabbitMqConfig { pub publish_exchange: String, pub publish_routing_key: String, pub publish_options: BasicPublishOptions, + // FIXME: typos pub publish_properites: BasicProperties, pub consume_queue: String, @@ -168,6 +170,19 @@ pub struct RabbitMqConsumer { requeue_on_nack: bool, } +impl RabbitMqConsumer { + fn wrap_delivery(&self, delivery: lapin::message::Delivery) -> Delivery { + Delivery { + decoders: self.registry.clone(), + payload: Some(delivery.data), + acker: Box::new(RabbitMqAcker { + acker: Some(delivery.acker), + requeue_on_nack: self.requeue_on_nack, + }), + } + } +} + #[async_trait] impl QueueConsumer for RabbitMqConsumer { type Payload = Vec; @@ -178,19 +193,49 @@ impl QueueConsumer for RabbitMqConsumer { .clone() .map(|l: Result| { let l = l.map_err(QueueError::generic)?; - - Ok(Delivery { - decoders: self.registry.clone(), - payload: Some(l.data), - acker: Box::new(RabbitMqAcker { - acker: Some(l.acker), - requeue_on_nack: self.requeue_on_nack, - }), - }) + Ok(self.wrap_delivery(l)) }); stream.next().await.ok_or(QueueError::NoData)? } + + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError> { + let mut stream = self.consumer.clone(); + let mut out = Vec::with_capacity(max_messages); + + // FIXME: the real way to do this is to set the pre-fetch count on the channel (which happens much earlier). + // e.g. `channel_rx.basic_qos(10, Default::default())?` + // There is no config for controlling the timeout - it is up to each client impl. + // As written, this rabbit impl breaks the standard behavior of "return as soon as items are available" + // The tests have been modified to reflect this gap. + if let Some(delivery) = stream.next().await { + out.push(self.wrap_delivery(delivery.map_err(QueueError::generic)?)); + + let mut interval = tokio::time::interval(deadline); + // Skip the instant first period + interval.tick().await; + + loop { + tokio::select! { + _ = interval.tick() => break, + delivery = stream.next() => { + if let Some(delivery) = delivery { + out.push(self.wrap_delivery(delivery.map_err(QueueError::generic)?)); + if out.len() >= max_messages { + break; + } + } + } + } + } + } + + Ok(out) + } } pub struct RabbitMqAcker { diff --git a/omniqueue/src/backends/redis/mod.rs b/omniqueue/src/backends/redis/mod.rs index 8b48aa5..5480d89 100644 --- a/omniqueue/src/backends/redis/mod.rs +++ b/omniqueue/src/backends/redis/mod.rs @@ -1,9 +1,10 @@ +use std::time::Duration; use std::{any::TypeId, collections::HashMap, marker::PhantomData}; use async_trait::async_trait; use bb8::ManageConnection; pub use bb8_redis::RedisMultiplexedConnectionManager; -use redis::streams::{StreamReadOptions, StreamReadReply}; +use redis::streams::{StreamId, StreamReadOptions, StreamReadReply}; use crate::{ decoding::DecoderRegistry, @@ -219,6 +220,31 @@ pub struct RedisStreamConsumer { payload_key: String, } +impl RedisStreamConsumer +where + M: ManageConnection, + M::Connection: redis::aio::ConnectionLike + Send + Sync, + M::Error: 'static + std::error::Error + Send + Sync, +{ + fn wrap_entry(&self, entry: StreamId) -> Result { + let entry_id = entry.id.clone(); + let payload = entry.map.get(&self.payload_key).ok_or(QueueError::NoData)?; + let payload: Vec = redis::from_redis_value(payload).map_err(QueueError::generic)?; + + Ok(Delivery { + payload: Some(payload), + acker: Box::new(RedisStreamAcker { + redis: self.redis.clone(), + queue_key: self.queue_key.clone(), + consumer_group: self.consumer_group.clone(), + entry_id, + already_acked_or_nacked: false, + }), + decoders: self.registry.clone(), + }) + } +} + #[async_trait] impl QueueConsumer for RedisStreamConsumer where @@ -247,21 +273,47 @@ where let queue = read_out.keys.into_iter().next().ok_or(QueueError::NoData)?; let entry = queue.ids.into_iter().next().ok_or(QueueError::NoData)?; + self.wrap_entry(entry) + } - let entry_id = entry.id.clone(); - let payload = entry.map.get(&self.payload_key).ok_or(QueueError::NoData)?; - let payload: Vec = redis::from_redis_value(payload).map_err(QueueError::generic)?; + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError> { + let mut conn = self.redis.get().await.map_err(QueueError::generic)?; - Ok(Delivery { - payload: Some(payload), - acker: Box::new(RedisStreamAcker { - redis: self.redis.clone(), - queue_key: self.queue_key.clone(), - consumer_group: self.consumer_group.clone(), - entry_id, - already_acked_or_nacked: false, - }), - decoders: self.registry.clone(), - }) + // Ensure an empty vec is never returned + let queue = loop { + let read_out: StreamReadReply = redis::Cmd::xread_options( + &[&self.queue_key], + &[">"], + &StreamReadOptions::default() + .group(&self.consumer_group, &self.consumer_name) + .block( + deadline + .as_millis() + .try_into() + .map_err(QueueError::generic)?, + ) + .count(max_messages), + ) + .query_async(&mut *conn) + .await + .map_err(QueueError::generic)?; + + if let Some(queue) = read_out.keys.into_iter().next() { + if !queue.ids.is_empty() { + break queue; + } + } + }; + + let mut out = Vec::with_capacity(max_messages); + for entry in queue.ids { + out.push(self.wrap_entry(entry)?); + } + + Ok(out) } } diff --git a/omniqueue/src/backends/sqs.rs b/omniqueue/src/backends/sqs.rs index 1a142e1..eb302ca 100644 --- a/omniqueue/src/backends/sqs.rs +++ b/omniqueue/src/backends/sqs.rs @@ -1,6 +1,8 @@ +use std::time::Duration; use std::{any::TypeId, collections::HashMap, sync::Arc}; use async_trait::async_trait; +use aws_sdk_sqs::types::Message; use aws_sdk_sqs::{ operation::delete_message::DeleteMessageError, types::error::ReceiptHandleIsInvalid, Client, }; @@ -240,6 +242,21 @@ pub struct SqsQueueConsumer { queue_dsn: String, } +impl SqsQueueConsumer { + fn wrap_message(&self, message: &Message) -> Delivery { + Delivery { + decoders: self.bytes_registry.clone(), + acker: Box::new(SqsAcker { + ack_client: self.client.clone(), + queue_dsn: self.queue_dsn.clone(), + receipt_handle: message.receipt_handle().map(ToOwned::to_owned), + has_been_acked_or_nacked: false, + }), + payload: Some(message.body().unwrap_or_default().as_bytes().to_owned()), + } + } +} + #[async_trait] impl QueueConsumer for SqsQueueConsumer { type Payload = String; @@ -257,19 +274,42 @@ impl QueueConsumer for SqsQueueConsumer { out.messages() .unwrap_or_default() .iter() - .map(|message| -> Result { - Ok(Delivery { - decoders: self.bytes_registry.clone(), - acker: Box::new(SqsAcker { - ack_client: self.client.clone(), - queue_dsn: self.queue_dsn.clone(), - receipt_handle: message.receipt_handle().map(ToOwned::to_owned), - has_been_acked_or_nacked: false, - }), - payload: Some(message.body().unwrap_or_default().as_bytes().to_owned()), - }) - }) + .map(|message| -> Result { Ok(self.wrap_message(message)) }) .next() .ok_or(QueueError::NoData)? } + + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError> { + // Ensure that there's at least one message before returning regardless of timeout + let out = loop { + let out = self + .client + .receive_message() + .set_wait_time_seconds(Some( + deadline.as_secs().try_into().map_err(QueueError::generic)?, + )) + .set_max_number_of_messages(Some( + max_messages.try_into().map_err(QueueError::generic)?, + )) + .queue_url(&self.queue_dsn) + .send() + .await + .map_err(QueueError::generic)?; + + if !out.messages().unwrap_or_default().is_empty() { + break out; + } + }; + + Ok(out + .messages() + .unwrap_or_default() + .iter() + .map(|message| -> Result { Ok(self.wrap_message(message)) }) + .collect::, _>>()?) + } } diff --git a/omniqueue/src/queue/consumer.rs b/omniqueue/src/queue/consumer.rs index 890b750..4f0bab3 100644 --- a/omniqueue/src/queue/consumer.rs +++ b/omniqueue/src/queue/consumer.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use std::time::Duration; use crate::{decoding::DecoderRegistry, QueueError, QueuePayload}; @@ -10,6 +11,12 @@ pub trait QueueConsumer: Send + Sync { async fn receive(&mut self) -> Result; + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError>; + fn into_dyn(self, custom_decoders: DecoderRegistry>) -> DynConsumer where Self: 'static + Sized, @@ -48,6 +55,28 @@ impl> QueueConsumer }) } + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError> { + let xs = self.inner.receive_all(max_messages, deadline).await?; + let mut out = Vec::with_capacity(xs.len()); + for mut t_payload in xs { + let bytes_payload: Option> = match t_payload.payload_custom() { + Ok(b) => b, + Err(QueueError::NoDecoderForThisType) => t_payload.take_payload(), + Err(e) => return Err(e), + }; + out.push(Delivery { + payload: bytes_payload, + decoders: self.custom_decoders.clone(), + acker: t_payload.acker, + }); + } + Ok(out) + } + fn into_dyn(mut self, custom_decoders: DecoderRegistry>) -> DynConsumer where Self: Sized, @@ -67,6 +96,14 @@ impl QueueConsumer for DynConsumer { self.0.receive().await } + async fn receive_all( + &mut self, + max_messages: usize, + deadline: Duration, + ) -> Result, QueueError> { + self.0.receive_all(max_messages, deadline).await + } + fn into_dyn(self, _custom_decoders: DecoderRegistry>) -> DynConsumer where Self: Sized, diff --git a/omniqueue/tests/gcp_pubsub.rs b/omniqueue/tests/gcp_pubsub.rs index 6a06316..1ff5752 100644 --- a/omniqueue/tests/gcp_pubsub.rs +++ b/omniqueue/tests/gcp_pubsub.rs @@ -54,6 +54,7 @@ use google_cloud_googleapis::pubsub::v1::DeadLetterPolicy; use google_cloud_pubsub::client::{Client, ClientConfig}; use google_cloud_pubsub::subscription::SubscriptionConfig; +use std::time::{Duration, Instant}; use omniqueue::backends::gcp_pubsub::{GcpPubSubBackend, GcpPubSubConfig}; use omniqueue::queue::{ @@ -196,3 +197,128 @@ async fn test_custom_send_recv() { d.payload_serde_json::().unwrap_err(); d.ack().await.unwrap(); } + +/// ~~Consumer will return immediately if there are fewer than max messages to start with.~~ +/// GCP will wait the full deadline period for any partial batch. +#[tokio::test] +async fn test_send_recv_all_partial() { + let payload = ExType { a: 2 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 1); + let d = xs.remove(0); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); + // XXX: this differs from the rest of the backends since gcp will wait the entire deadline. + assert!(now.elapsed() >= deadline); +} + +/// Consumer should yield items immediately if there's a full batch ready on the first poll. +#[tokio::test] +async fn test_send_recv_all_full() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + // N.b. it's still possible this could turn up false if the test runs too slow. + assert!(now.elapsed() < deadline); +} + +/// Consumer will return the full batch immediately, but also return immediately if a partial batch is ready. +#[tokio::test] +async fn test_send_recv_all_full_then_partial() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + let deadline = Duration::from_secs(1); + let now1 = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + assert!(now1.elapsed() < deadline); + + // 2nd call + let now2 = Instant::now(); + let mut ys = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(ys.len(), 1); + let d3 = ys.remove(0); + assert_eq!( + d3.payload_serde_json::().unwrap().unwrap(), + payload3 + ); + d3.ack().await.unwrap(); + // XXX: this differs from the rest of the backends since gcp will wait the entire deadline. + assert!(now2.elapsed() >= deadline); +} + +/// Consumer will wait indefinitely for at least one item. +#[tokio::test] +async fn test_send_recv_all_late_arriving_items() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + let deadline = Duration::from_secs(1); + let handle = tokio::spawn(async move { + let now = Instant::now(); + let xs = c.receive_all(2, deadline).await.unwrap(); + (now.elapsed(), xs) + }); + // Wait longer than the deadline before sending any items. + tokio::time::sleep(deadline * 2).await; + p.send_serde_json(&payload1).await.unwrap(); + + let (elapsed, mut xs) = handle.await.unwrap(); + + assert_eq!(xs.len(), 1); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + // shows that the consumer waited well beyond the deadline + assert!(elapsed >= deadline * 2); +} diff --git a/omniqueue/tests/rabbitmq.rs b/omniqueue/tests/rabbitmq.rs index 1c81d09..fad4f05 100644 --- a/omniqueue/tests/rabbitmq.rs +++ b/omniqueue/tests/rabbitmq.rs @@ -8,6 +8,7 @@ use omniqueue::{ queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, }; use serde::{Deserialize, Serialize}; +use std::time::{Duration, Instant}; const MQ_URI: &str = "amqp://guest:guest@localhost:5672/%2f"; @@ -126,3 +127,129 @@ async fn test_custom_send_recv() { d.payload_serde_json::().unwrap_err(); d.ack().await.unwrap(); } + +/// ~~Consumer will return immediately if there are fewer than max messages to start with.~~ +/// Rabbit will wait the full deadline period for any partial batch. +#[tokio::test] +async fn test_send_recv_all_partial() { + let payload = ExType { a: 2 }; + let (p, mut c) = make_test_queue(false).await.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 1); + let d = xs.remove(0); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); + // XXX: this differs from the rest of the backends since rabbit will wait the entire deadline. + assert!(now.elapsed() >= deadline); +} + +/// Consumer should yield items immediately if there's a full batch ready on the first poll. +#[tokio::test] +async fn test_send_recv_all_full() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let (p, mut c) = make_test_queue(false).await.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + // N.b. it's still possible this could turn up false if the test runs too slow. + assert!(now.elapsed() < deadline); +} + +/// ~~Consumer will return the full batch immediately, but also return immediately if a partial batch is ready.~~ +/// Again, rabbit will wait the full deadline when it gets a partial batch. +#[tokio::test] +async fn test_send_recv_all_full_then_partial() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + let (p, mut c) = make_test_queue(false).await.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + let deadline = Duration::from_secs(1); + let now1 = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + assert!(now1.elapsed() < deadline); + + // 2nd call + let now2 = Instant::now(); + let mut ys = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(ys.len(), 1); + let d3 = ys.remove(0); + assert_eq!( + d3.payload_serde_json::().unwrap().unwrap(), + payload3 + ); + d3.ack().await.unwrap(); + // XXX: this differs from the rest of the backends since rabbit will wait the entire deadline. + assert!(now2.elapsed() >= deadline); +} + +/// Consumer will wait indefinitely for at least one item. +#[tokio::test] +async fn test_send_recv_all_late_arriving_items() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue(false).await.build_pair().await.unwrap(); + + let deadline = Duration::from_secs(1); + let handle = tokio::spawn(async move { + let now = Instant::now(); + let xs = c.receive_all(2, deadline).await.unwrap(); + (now.elapsed(), xs) + }); + // Wait longer than the deadline before sending any items. + tokio::time::sleep(deadline * 2).await; + p.send_serde_json(&payload1).await.unwrap(); + + let (elapsed, mut xs) = handle.await.unwrap(); + + assert_eq!(xs.len(), 1); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + // shows that the consumer waited well beyond the deadline + assert!(elapsed >= deadline * 2); +} diff --git a/omniqueue/tests/redis.rs b/omniqueue/tests/redis.rs index 88369f2..7ae7529 100644 --- a/omniqueue/tests/redis.rs +++ b/omniqueue/tests/redis.rs @@ -4,6 +4,7 @@ use omniqueue::{ }; use redis::{AsyncCommands, Client, Commands}; use serde::{Deserialize, Serialize}; +use std::time::{Duration, Instant}; const ROOT_URL: &str = "redis://localhost"; @@ -123,3 +124,136 @@ async fn test_custom_send_recv() { d.payload_serde_json::().unwrap_err(); d.ack().await.unwrap(); } + +/// Consumer will return immediately if there are fewer than max messages to start with. +#[tokio::test] +async fn test_send_recv_all_partial() { + let (builder, _drop) = make_test_queue().await; + + let payload = ExType { a: 2 }; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 1); + let d = xs.remove(0); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); + assert!(now.elapsed() <= deadline); +} + +/// Consumer should yield items immediately if there's a full batch ready on the first poll. +#[tokio::test] +async fn test_send_recv_all_full() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + // N.b. it's still possible this could turn up false if the test runs too slow. + assert!(now.elapsed() < deadline); +} + +/// Consumer will return the full batch immediately, but also return immediately if a partial batch is ready. +#[tokio::test] +async fn test_send_recv_all_full_then_partial() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + let deadline = Duration::from_secs(1); + let now1 = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + assert!(now1.elapsed() < deadline); + + // 2nd call + let now2 = Instant::now(); + let mut ys = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(ys.len(), 1); + let d3 = ys.remove(0); + assert_eq!( + d3.payload_serde_json::().unwrap().unwrap(), + payload3 + ); + d3.ack().await.unwrap(); + assert!(now2.elapsed() < deadline); +} + +/// Consumer will wait indefinitely for at least one item. +#[tokio::test] +async fn test_send_recv_all_late_arriving_items() { + let payload1 = ExType { a: 1 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + let deadline = Duration::from_secs(1); + let handle = tokio::spawn(async move { + let now = Instant::now(); + let xs = c.receive_all(2, deadline).await.unwrap(); + (now.elapsed(), xs) + }); + // Wait longer than the deadline before sending any items. + tokio::time::sleep(deadline * 2).await; + p.send_serde_json(&payload1).await.unwrap(); + + let (elapsed, mut xs) = handle.await.unwrap(); + + assert_eq!(xs.len(), 1); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + // shows that the consumer waited well beyond the deadline + assert!(elapsed >= deadline * 2); +} diff --git a/omniqueue/tests/redis_cluster.rs b/omniqueue/tests/redis_cluster.rs index cadc55f..dce4b6c 100644 --- a/omniqueue/tests/redis_cluster.rs +++ b/omniqueue/tests/redis_cluster.rs @@ -4,6 +4,7 @@ use omniqueue::{ }; use redis::{cluster::ClusterClient, AsyncCommands, Commands}; use serde::{Deserialize, Serialize}; +use std::time::{Duration, Instant}; const ROOT_URL: &str = "redis://localhost:6380"; @@ -126,3 +127,136 @@ async fn test_custom_send_recv() { d.payload_serde_json::().unwrap_err(); d.ack().await.unwrap(); } + +/// Consumer will return immediately if there are fewer than max messages to start with. +#[tokio::test] +async fn test_send_recv_all_partial() { + let (builder, _drop) = make_test_queue().await; + + let payload = ExType { a: 2 }; + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 1); + let d = xs.remove(0); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); + assert!(now.elapsed() <= deadline); +} + +/// Consumer should yield items immediately if there's a full batch ready on the first poll. +#[tokio::test] +async fn test_send_recv_all_full() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + // N.b. it's still possible this could turn up false if the test runs too slow. + assert!(now.elapsed() < deadline); +} + +/// Consumer will return the full batch immediately, but also return immediately if a partial batch is ready. +#[tokio::test] +async fn test_send_recv_all_full_then_partial() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + let deadline = Duration::from_secs(1); + let now1 = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + assert!(now1.elapsed() < deadline); + + // 2nd call + let now2 = Instant::now(); + let mut ys = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(ys.len(), 1); + let d3 = ys.remove(0); + assert_eq!( + d3.payload_serde_json::().unwrap().unwrap(), + payload3 + ); + d3.ack().await.unwrap(); + assert!(now2.elapsed() < deadline); +} + +/// Consumer will wait indefinitely for at least one item. +#[tokio::test] +async fn test_send_recv_all_late_arriving_items() { + let payload1 = ExType { a: 1 }; + + let (builder, _drop) = make_test_queue().await; + + let (p, mut c) = builder.build_pair().await.unwrap(); + + let deadline = Duration::from_secs(1); + let handle = tokio::spawn(async move { + let now = Instant::now(); + let xs = c.receive_all(2, deadline).await.unwrap(); + (now.elapsed(), xs) + }); + // Wait longer than the deadline before sending any items. + tokio::time::sleep(deadline * 2).await; + p.send_serde_json(&payload1).await.unwrap(); + + let (elapsed, mut xs) = handle.await.unwrap(); + + assert_eq!(xs.len(), 1); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + // shows that the consumer waited well beyond the deadline + assert!(elapsed >= deadline * 2); +} diff --git a/omniqueue/tests/sqs.rs b/omniqueue/tests/sqs.rs index d6fef06..76fb8de 100644 --- a/omniqueue/tests/sqs.rs +++ b/omniqueue/tests/sqs.rs @@ -4,6 +4,7 @@ use omniqueue::{ queue::{consumer::QueueConsumer, producer::QueueProducer, QueueBackend, QueueBuilder, Static}, }; use serde::{Deserialize, Serialize}; +use std::time::{Duration, Instant}; const ROOT_URL: &str = "http://localhost:9324"; const DEFAULT_CFG: [(&str, &str); 3] = [ @@ -113,3 +114,125 @@ async fn test_custom_send_recv() { d.payload_serde_json::().unwrap_err(); d.ack().await.unwrap(); } + +/// Consumer will return immediately if there are fewer than max messages to start with. +#[tokio::test] +async fn test_send_recv_all_partial() { + let payload = ExType { a: 2 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 1); + let d = xs.remove(0); + assert_eq!(d.payload_serde_json::().unwrap().unwrap(), payload); + d.ack().await.unwrap(); + assert!(now.elapsed() <= deadline); +} + +/// Consumer should yield items immediately if there's a full batch ready on the first poll. +#[tokio::test] +async fn test_send_recv_all_full() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + let deadline = Duration::from_secs(1); + + let now = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + // N.b. it's still possible this could turn up false if the test runs too slow. + assert!(now.elapsed() < deadline); +} + +/// Consumer will return the full batch immediately, but also return immediately if a partial batch is ready. +#[tokio::test] +async fn test_send_recv_all_full_then_partial() { + let payload1 = ExType { a: 1 }; + let payload2 = ExType { a: 2 }; + let payload3 = ExType { a: 3 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + p.send_serde_json(&payload1).await.unwrap(); + p.send_serde_json(&payload2).await.unwrap(); + p.send_serde_json(&payload3).await.unwrap(); + + let deadline = Duration::from_secs(1); + let now1 = Instant::now(); + let mut xs = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(xs.len(), 2); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + + let d2 = xs.remove(0); + assert_eq!( + d2.payload_serde_json::().unwrap().unwrap(), + payload2 + ); + d2.ack().await.unwrap(); + assert!(now1.elapsed() < deadline); + + // 2nd call + let now2 = Instant::now(); + let mut ys = c.receive_all(2, deadline).await.unwrap(); + assert_eq!(ys.len(), 1); + let d3 = ys.remove(0); + assert_eq!( + d3.payload_serde_json::().unwrap().unwrap(), + payload3 + ); + d3.ack().await.unwrap(); + assert!(now2.elapsed() < deadline); +} + +/// Consumer will wait indefinitely for at least one item. +#[tokio::test] +async fn test_send_recv_all_late_arriving_items() { + let payload1 = ExType { a: 1 }; + let (p, mut c) = make_test_queue().await.build_pair().await.unwrap(); + + let deadline = Duration::from_secs(1); + let handle = tokio::spawn(async move { + let now = Instant::now(); + let xs = c.receive_all(2, deadline).await.unwrap(); + (now.elapsed(), xs) + }); + // Wait longer than the deadline before sending any items. + tokio::time::sleep(deadline * 2).await; + p.send_serde_json(&payload1).await.unwrap(); + + let (elapsed, mut xs) = handle.await.unwrap(); + + assert_eq!(xs.len(), 1); + let d1 = xs.remove(0); + assert_eq!( + d1.payload_serde_json::().unwrap().unwrap(), + payload1 + ); + d1.ack().await.unwrap(); + // shows that the consumer waited well beyond the deadline + assert!(elapsed >= deadline * 2); +}