Skip to content

Commit

Permalink
Added RedisProducer::flush_immut
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Oct 11, 2024
1 parent 0b2bde0 commit 1310558
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions sea-streamer-redis/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,7 @@ impl Producer for RedisProducer {

#[inline]
async fn flush(&mut self) -> RedisResult<()> {
// The trick here is to send a signal message and wait for the receipt.
// By the time it returns a receipt, everything before should have already been sent.
let null = [];
self.send_to(&StreamKey::new(SEA_STREAMER_INTERNAL)?, null.as_slice())?
.await?;
Ok(())
self.flush_immut().await
}

fn anchor(&mut self, stream: StreamKey) -> RedisResult<()> {
Expand All @@ -142,6 +137,20 @@ impl Producer for RedisProducer {
}
}

impl RedisProducer {
/// Same as [`RedisProducer::flush`], but not require `&mut self`.
/// Technically the implementation here permits concurrent calls,
/// but you are advised against it.
pub async fn flush_immut(&self) -> RedisResult<()> {
// The trick here is to send a signal message and wait for the receipt.
// By the time it returns a receipt, everything before should have already been sent.
let null = [];
self.send_to(&StreamKey::new(SEA_STREAMER_INTERNAL)?, null.as_slice())?
.await?;
Ok(())
}
}

impl ProducerOptions for RedisProducerOptions {}

impl RedisProducerOptions {
Expand Down

0 comments on commit 1310558

Please sign in to comment.