Skip to content

Commit

Permalink
Redis: added XTRIM
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Nov 5, 2024
1 parent 10c0139 commit 9946b2f
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 10 deletions.
5 changes: 5 additions & 0 deletions sea-streamer-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,8 @@ required-features = ["executables"]
name = "producer"
path = "src/bin/producer.rs"
required-features = ["executables"]

[[bin]]
name = "trim-stream"
path = "src/bin/trim-stream.rs"
required-features = ["executables"]
37 changes: 37 additions & 0 deletions sea-streamer-redis/src/bin/trim-stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use anyhow::Result;
use clap::Parser;
use sea_streamer_redis::RedisStreamer;
use sea_streamer_types::{StreamUrl, Streamer};

#[derive(Debug, Parser)]
struct Args {
#[clap(
long,
help = "Streamer URI with stream key, i.e. try `redis://localhost/hello`",
env = "STREAM_URL"
)]
stream: StreamUrl,
#[clap(
long,
help = "Trim the stream down to this number of items (not exact)"
)]
maxlen: u32,
}

#[tokio::main]
async fn main() -> Result<()> {
env_logger::init();

let Args { stream, maxlen } = Args::parse();
let stream_key = stream.stream_key()?;

let streamer = RedisStreamer::connect(stream.streamer(), Default::default()).await?;
let producer = streamer.create_generic_producer(Default::default()).await?;

match producer.trim(&stream_key, maxlen).await {
Ok(trimmed) => log::info!("XTRIM {stream_key} trimmed {trimmed} entries"),
Err(err) => log::error!("{err:?}"),
}

Ok(())
}
126 changes: 116 additions & 10 deletions sea-streamer-redis/src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use flume::{bounded, r#async::RecvFut, unbounded, Sender};
use redis::{aio::ConnectionLike, cmd as command, ErrorKind, Pipeline};
use redis::{aio::ConnectionLike, cmd as command, ErrorKind, Pipeline, Value};
use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};

use crate::{
Expand Down Expand Up @@ -138,6 +138,29 @@ impl Producer for RedisProducer {
}

impl RedisProducer {
/// Performs `XTRIM <stream_key> MAXLEN ~ <maxlen>`, returning number of items trimmed.
///
/// This method is not yet considered stable.
pub async fn trim(&self, stream_key: &StreamKey, maxlen: u32) -> RedisResult<u64> {
let (receipt, receiver) = bounded(1);

self.sender
.send(SendRequest {
stream_key: StreamKey::new(SEA_STREAMER_INTERNAL)?,
bytes: format!("XTRIM,{},{}", stream_key.name(), maxlen).into_bytes(),
receipt,
})
.map_err(|_| StreamErr::Backend(RedisErr::ProducerDied))?;

let res = receiver
.recv_async()
.await
.map_err(|_| StreamErr::Backend(RedisErr::ProducerDied))??;

#[allow(clippy::useless_conversion)]
Ok((*res.sequence()).try_into().expect("integer out of range"))
}

/// Same as [`RedisProducer::flush`], but not require `&mut self`.
/// Technically the implementation here permits concurrent calls,
/// but you are advised against it.
Expand Down Expand Up @@ -213,22 +236,48 @@ pub(crate) async fn create_producer(
let mut batch: (Vec<(String, StreamKey, ShardId, Receipt)>, Pipeline) =
Default::default();
let mut next_batch = batch.clone();
let mut xtrims = Vec::new();
while remaining > 0 {
for SendRequest {
stream_key,
bytes,
receipt,
} in requests.by_ref()
{
if stream_key.name() == SEA_STREAMER_INTERNAL && bytes.is_empty() {
// A signalling message
next_batch.0.push((
SEA_STREAMER_INTERNAL.to_owned(),
stream_key,
ZERO,
receipt,
));
break;
if stream_key.name() == SEA_STREAMER_INTERNAL {
if bytes.is_empty() {
// A signalling message
next_batch.0.push((
SEA_STREAMER_INTERNAL.to_owned(),
stream_key,
ZERO,
receipt,
));
break;
} else if let Some(oper) = parse_internal_mess(&bytes) {
match oper.op {
"XTRIM" => {
if let Ok(key) = StreamKey::new(oper.arg1) {
if let Ok(maxlen) = oper.arg2.parse::<u32>() {
xtrims.push((key, maxlen, receipt));
} else {
log::warn!("Invalid number {}", oper.arg2);
}
} else {
log::warn!("Invalid Stream Key {}", oper.arg1);
}
}
_ => log::warn!(
"Unhandled SEA_STREAMER_INTERNAL operation {}",
oper.op
),
}

remaining -= 1;
} else {
log::warn!("Unhandled SEA_STREAMER_INTERNAL message");
remaining -= 1;
}
} else {
let redis_stream_key;
let (redis_key, shard) = if let Some(sharder) = sharder.as_mut() {
Expand Down Expand Up @@ -367,6 +416,44 @@ pub(crate) async fn create_producer(
batch = next_batch;
next_batch = Default::default();
}

if !xtrims.is_empty() {
for (stream_key, maxlen, receipt) in xtrims.drain(..) {
let (_, conn) = match cluster.get_connection_for(stream_key.name()).await {
Ok(conn) => conn,
Err(err) => {
log::warn!("{err:?}");
break;
}
};

let mut cmd = command("XTRIM");
cmd.arg(stream_key.name())
.arg("MAXLEN")
.arg("~")
.arg(maxlen);

match conn.req_packed_command(&cmd).await {
Ok(Value::Int(deleted)) => {
receipt
.send_async(Ok(MessageHeader::new(
StreamKey::new(SEA_STREAMER_INTERNAL).unwrap(),
Default::default(),
deleted as u64,
Timestamp::now_utc(),
)))
.await
.ok();
}
Ok(res) => {
log::warn!("XTRIM failed: {res:?}");
}
Err(err) => {
receipt.send_async(Err(map_err(err))).await.ok();
}
}
}
}
}
});

Expand Down Expand Up @@ -423,3 +510,22 @@ impl Sharder for RoundRobinSharder {
r as u64
}
}

struct InternalMess<'a> {
op: &'static str,
arg1: &'a str,
arg2: &'a str,
}

fn parse_internal_mess(bytes: &[u8]) -> Option<InternalMess> {
let Ok(string) = std::str::from_utf8(bytes) else {
return None;
};
let (left, right) = string.split_once(',')?;
let op = match left {
"XTRIM" => "XTRIM",
_ => return None,
};
let (arg1, arg2) = right.split_once(',')?;
Some(InternalMess { op, arg1, arg2 })
}

0 comments on commit 9946b2f

Please sign in to comment.