From 1310558ddc6fb44402ea3a853bf0a06d610ea29b Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Fri, 11 Oct 2024 11:48:07 +0100 Subject: [PATCH] Added `RedisProducer::flush_immut` --- sea-streamer-redis/src/producer.rs | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/sea-streamer-redis/src/producer.rs b/sea-streamer-redis/src/producer.rs index aed5431..1f15e8f 100644 --- a/sea-streamer-redis/src/producer.rs +++ b/sea-streamer-redis/src/producer.rs @@ -116,12 +116,7 @@ impl Producer for RedisProducer { #[inline] async fn flush(&mut self) -> RedisResult<()> { - // The trick here is to send a signal message and wait for the receipt. - // By the time it returns a receipt, everything before should have already been sent. - let null = []; - self.send_to(&StreamKey::new(SEA_STREAMER_INTERNAL)?, null.as_slice())? - .await?; - Ok(()) + self.flush_immut().await } fn anchor(&mut self, stream: StreamKey) -> RedisResult<()> { @@ -142,6 +137,20 @@ impl Producer for RedisProducer { } } +impl RedisProducer { + /// Same as [`RedisProducer::flush`], but not require `&mut self`. + /// Technically the implementation here permits concurrent calls, + /// but you are advised against it. + pub async fn flush_immut(&self) -> RedisResult<()> { + // The trick here is to send a signal message and wait for the receipt. + // By the time it returns a receipt, everything before should have already been sent. + let null = []; + self.send_to(&StreamKey::new(SEA_STREAMER_INTERNAL)?, null.as_slice())? + .await?; + Ok(()) + } +} + impl ProducerOptions for RedisProducerOptions {} impl RedisProducerOptions {