diff --git a/sea-streamer-redis/Cargo.toml b/sea-streamer-redis/Cargo.toml index 007a659..d4c84c6 100644 --- a/sea-streamer-redis/Cargo.toml +++ b/sea-streamer-redis/Cargo.toml @@ -51,3 +51,8 @@ required-features = ["executables"] name = "producer" path = "src/bin/producer.rs" required-features = ["executables"] + +[[bin]] +name = "trim-stream" +path = "src/bin/trim-stream.rs" +required-features = ["executables"] diff --git a/sea-streamer-redis/src/bin/trim-stream.rs b/sea-streamer-redis/src/bin/trim-stream.rs new file mode 100644 index 0000000..10f122f --- /dev/null +++ b/sea-streamer-redis/src/bin/trim-stream.rs @@ -0,0 +1,37 @@ +use anyhow::Result; +use clap::Parser; +use sea_streamer_redis::RedisStreamer; +use sea_streamer_types::{StreamUrl, Streamer}; + +#[derive(Debug, Parser)] +struct Args { + #[clap( + long, + help = "Streamer URI with stream key, i.e. try `redis://localhost/hello`", + env = "STREAM_URL" + )] + stream: StreamUrl, + #[clap( + long, + help = "Trim the stream down to this number of items (not exact)" + )] + maxlen: u32, +} + +#[tokio::main] +async fn main() -> Result<()> { + env_logger::init(); + + let Args { stream, maxlen } = Args::parse(); + let stream_key = stream.stream_key()?; + + let streamer = RedisStreamer::connect(stream.streamer(), Default::default()).await?; + let producer = streamer.create_generic_producer(Default::default()).await?; + + match producer.trim(&stream_key, maxlen).await { + Ok(trimmed) => log::info!("XTRIM {stream_key} trimmed {trimmed} entries"), + Err(err) => log::error!("{err:?}"), + } + + Ok(()) +} diff --git a/sea-streamer-redis/src/producer.rs b/sea-streamer-redis/src/producer.rs index 1f15e8f..65aa071 100644 --- a/sea-streamer-redis/src/producer.rs +++ b/sea-streamer-redis/src/producer.rs @@ -1,5 +1,5 @@ use flume::{bounded, r#async::RecvFut, unbounded, Sender}; -use redis::{aio::ConnectionLike, cmd as command, ErrorKind, Pipeline}; +use redis::{aio::ConnectionLike, cmd as command, ErrorKind, Pipeline, Value}; use std::{fmt::Debug, future::Future, sync::Arc, time::Duration}; use crate::{ @@ -138,6 +138,29 @@ impl Producer for RedisProducer { } impl RedisProducer { + /// Performs `XTRIM MAXLEN ~ `, returning number of items trimmed. + /// + /// This method is not yet considered stable. + pub async fn trim(&self, stream_key: &StreamKey, maxlen: u32) -> RedisResult { + let (receipt, receiver) = bounded(1); + + self.sender + .send(SendRequest { + stream_key: StreamKey::new(SEA_STREAMER_INTERNAL)?, + bytes: format!("XTRIM,{},{}", stream_key.name(), maxlen).into_bytes(), + receipt, + }) + .map_err(|_| StreamErr::Backend(RedisErr::ProducerDied))?; + + let res = receiver + .recv_async() + .await + .map_err(|_| StreamErr::Backend(RedisErr::ProducerDied))??; + + #[allow(clippy::useless_conversion)] + Ok((*res.sequence()).try_into().expect("integer out of range")) + } + /// Same as [`RedisProducer::flush`], but not require `&mut self`. /// Technically the implementation here permits concurrent calls, /// but you are advised against it. @@ -213,6 +236,7 @@ pub(crate) async fn create_producer( let mut batch: (Vec<(String, StreamKey, ShardId, Receipt)>, Pipeline) = Default::default(); let mut next_batch = batch.clone(); + let mut xtrims = Vec::new(); while remaining > 0 { for SendRequest { stream_key, @@ -220,15 +244,40 @@ pub(crate) async fn create_producer( receipt, } in requests.by_ref() { - if stream_key.name() == SEA_STREAMER_INTERNAL && bytes.is_empty() { - // A signalling message - next_batch.0.push(( - SEA_STREAMER_INTERNAL.to_owned(), - stream_key, - ZERO, - receipt, - )); - break; + if stream_key.name() == SEA_STREAMER_INTERNAL { + if bytes.is_empty() { + // A signalling message + next_batch.0.push(( + SEA_STREAMER_INTERNAL.to_owned(), + stream_key, + ZERO, + receipt, + )); + break; + } else if let Some(oper) = parse_internal_mess(&bytes) { + match oper.op { + "XTRIM" => { + if let Ok(key) = StreamKey::new(oper.arg1) { + if let Ok(maxlen) = oper.arg2.parse::() { + xtrims.push((key, maxlen, receipt)); + } else { + log::warn!("Invalid number {}", oper.arg2); + } + } else { + log::warn!("Invalid Stream Key {}", oper.arg1); + } + } + _ => log::warn!( + "Unhandled SEA_STREAMER_INTERNAL operation {}", + oper.op + ), + } + + remaining -= 1; + } else { + log::warn!("Unhandled SEA_STREAMER_INTERNAL message"); + remaining -= 1; + } } else { let redis_stream_key; let (redis_key, shard) = if let Some(sharder) = sharder.as_mut() { @@ -367,6 +416,44 @@ pub(crate) async fn create_producer( batch = next_batch; next_batch = Default::default(); } + + if !xtrims.is_empty() { + for (stream_key, maxlen, receipt) in xtrims.drain(..) { + let (_, conn) = match cluster.get_connection_for(stream_key.name()).await { + Ok(conn) => conn, + Err(err) => { + log::warn!("{err:?}"); + break; + } + }; + + let mut cmd = command("XTRIM"); + cmd.arg(stream_key.name()) + .arg("MAXLEN") + .arg("~") + .arg(maxlen); + + match conn.req_packed_command(&cmd).await { + Ok(Value::Int(deleted)) => { + receipt + .send_async(Ok(MessageHeader::new( + StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(), + Default::default(), + deleted as u64, + Timestamp::now_utc(), + ))) + .await + .ok(); + } + Ok(res) => { + log::warn!("XTRIM failed: {res:?}"); + } + Err(err) => { + receipt.send_async(Err(map_err(err))).await.ok(); + } + } + } + } } }); @@ -423,3 +510,22 @@ impl Sharder for RoundRobinSharder { r as u64 } } + +struct InternalMess<'a> { + op: &'static str, + arg1: &'a str, + arg2: &'a str, +} + +fn parse_internal_mess(bytes: &[u8]) -> Option { + let Ok(string) = std::str::from_utf8(bytes) else { + return None; + }; + let (left, right) = string.split_once(',')?; + let op = match left { + "XTRIM" => "XTRIM", + _ => return None, + }; + let (arg1, arg2) = right.split_once(',')?; + Some(InternalMess { op, arg1, arg2 }) +}