From 0b2bde0efaf8d9d3bf506050c6425ed2748486c4 Mon Sep 17 00:00:00 2001 From: Chris Tsang Date: Thu, 10 Oct 2024 23:15:46 +0100 Subject: [PATCH] Support custom message field in Redis --- sea-streamer-file/src/file.rs | 1 + sea-streamer-file/src/messages.rs | 2 ++ sea-streamer-redis/src/consumer/mod.rs | 5 +++-- sea-streamer-redis/src/consumer/node.rs | 7 ++++++- sea-streamer-redis/src/consumer/options.rs | 3 ++- sea-streamer-redis/src/message.rs | 13 +++++++----- sea-streamer-redis/src/producer.rs | 8 +++++--- sea-streamer-redis/src/streamer.rs | 23 +++++++++++++++++++++- 8 files changed, 49 insertions(+), 13 deletions(-) diff --git a/sea-streamer-file/src/file.rs b/sea-streamer-file/src/file.rs index dbc5df3..929f390 100644 --- a/sea-streamer-file/src/file.rs +++ b/sea-streamer-file/src/file.rs @@ -224,6 +224,7 @@ impl AsyncFile { .seek(match to { SeqPos::Beginning => SeekFrom::Start(0), SeqPos::End => SeekFrom::End(0), + #[allow(clippy::useless_conversion)] SeqPos::At(to) => SeekFrom::Start(to.try_into().expect("SeqNo out of range")), }) .await diff --git a/sea-streamer-file/src/messages.rs b/sea-streamer-file/src/messages.rs index 53c38d9..ff84125 100644 --- a/sea-streamer-file/src/messages.rs +++ b/sea-streamer-file/src/messages.rs @@ -112,6 +112,7 @@ impl MessageSource { SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as SeqNo), SeqPos::End => SeqPos::End, SeqPos::At(nth) => { + #[allow(clippy::unnecessary_cast)] let at = nth as u64 * self.beacon_interval(); if at < self.known_size() { SeqPos::At(at as SeqNo) @@ -130,6 +131,7 @@ impl MessageSource { SeqPos::Beginning | SeqPos::At(0) => unreachable!(), SeqPos::End => max, SeqPos::At(nth) => { + #[allow(clippy::unnecessary_cast)] let at = nth as u64 * self.beacon_interval(); if at < self.known_size() { at diff --git a/sea-streamer-redis/src/consumer/mod.rs b/sea-streamer-redis/src/consumer/mod.rs index 465dbad..b04f8a3 100644 --- a/sea-streamer-redis/src/consumer/mod.rs +++ b/sea-streamer-redis/src/consumer/mod.rs @@ -15,8 +15,8 @@ use flume::{bounded, unbounded, Receiver, Sender}; use std::{fmt::Debug, future::Future, sync::Arc, time::Duration}; use crate::{ - from_seq_no, get_message_id, host_id, MessageId, RedisCluster, RedisErr, RedisResult, - TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID, + from_seq_no, get_message_id, host_id, MessageField, MessageId, RedisCluster, RedisErr, + RedisResult, TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID, }; use sea_streamer_runtime::{spawn_task, timeout}; use sea_streamer_types::{ @@ -51,6 +51,7 @@ pub struct RedisConsumerOptions { shard_ownership: ShardOwnership, mkstream: bool, pub(crate) timestamp_format: TimestampFormat, + pub(crate) message_field: MessageField, } #[derive(Debug)] diff --git a/sea-streamer-redis/src/consumer/node.rs b/sea-streamer-redis/src/consumer/node.rs index c711f43..a1fbdbb 100644 --- a/sea-streamer-redis/src/consumer/node.rs +++ b/sea-streamer-redis/src/consumer/node.rs @@ -545,7 +545,11 @@ impl Node { assert!(self.buffer.is_empty()); match conn.req_packed_command(&cmd).await { Ok(value) => { - match StreamReadReply::from_redis_value(value, self.options.timestamp_format) { + match StreamReadReply::from_redis_value( + value, + self.options.timestamp_format, + self.options.message_field, + ) { Ok(StreamReadReply(mut mess)) => { log::trace!("Read {} messages", mess.len()); if mess.is_empty() { @@ -678,6 +682,7 @@ impl Node { claiming.stream.0.clone(), claiming.stream.1, self.options.timestamp_format, + self.options.message_field, ) { Ok(AutoClaimReply(mut mess)) => { log::trace!( diff --git a/sea-streamer-redis/src/consumer/options.rs b/sea-streamer-redis/src/consumer/options.rs index 9d6558a..8923a8d 100644 --- a/sea-streamer-redis/src/consumer/options.rs +++ b/sea-streamer-redis/src/consumer/options.rs @@ -1,5 +1,5 @@ use super::{constants::*, ConsumerConfig, RedisConsumerOptions}; -use crate::{RedisErr, RedisResult, TimestampFormat}; +use crate::{MessageField, RedisErr, RedisResult, TimestampFormat}; use sea_streamer_types::{ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, StreamErr}; use std::time::Duration; @@ -84,6 +84,7 @@ impl ConsumerOptions for RedisConsumerOptions { shard_ownership: ShardOwnership::Shared, mkstream: false, timestamp_format: TimestampFormat::default(), + message_field: MessageField::default(), } } diff --git a/sea-streamer-redis/src/message.rs b/sea-streamer-redis/src/message.rs index 97466c0..3dbb4a3 100644 --- a/sea-streamer-redis/src/message.rs +++ b/sea-streamer-redis/src/message.rs @@ -1,4 +1,4 @@ -use crate::{RedisErr, RedisResult, TimestampFormat as TsFmt, MSG, ZERO}; +use crate::{MessageField as MsgF, RedisErr, RedisResult, TimestampFormat as TsFmt, ZERO}; use redis::Value; use sea_streamer_types::{ MessageHeader, SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp, @@ -66,6 +66,7 @@ pub(crate) fn get_message_id(header: &MessageHeader) -> MessageId { #[inline] pub(crate) fn from_seq_no(seq_no: SeqNo) -> MessageId { + #[allow(clippy::unnecessary_cast)] ((seq_no >> 16) as u64, (seq_no & 0xFFFF) as u16) } @@ -85,7 +86,7 @@ impl RedisMessageId for RedisMessage { // LOL such nesting. This is still undesirable, as there are 5 layers of nested Vec. But at least we don't have to copy the bytes again. impl StreamReadReply { /// Like [`redis::FromRedisValue`], but taking ownership instead of copying. - pub(crate) fn from_redis_value(value: Value, ts_fmt: TsFmt) -> RedisResult { + pub(crate) fn from_redis_value(value: Value, ts_fmt: TsFmt, msg: MsgF) -> RedisResult { let mut messages = Vec::new(); if let Value::Bulk(values) = value { @@ -113,7 +114,7 @@ impl StreamReadReply { }; let stream_key = StreamKey::new(stream_key)?; if let Value::Bulk(values) = value_1 { - parse_messages(values, stream_key, shard, &mut messages, ts_fmt)?; + parse_messages(values, stream_key, shard, &mut messages, ts_fmt, msg)?; } } } @@ -129,6 +130,7 @@ impl AutoClaimReply { stream_key: StreamKey, shard: ShardId, ts_fmt: TsFmt, + msg: MsgF, ) -> RedisResult { let mut messages = Vec::new(); if let Value::Bulk(values) = value { @@ -139,7 +141,7 @@ impl AutoClaimReply { _ = values.next().unwrap(); let value = values.next().unwrap(); if let Value::Bulk(values) = value { - parse_messages(values, stream_key, shard, &mut messages, ts_fmt)?; + parse_messages(values, stream_key, shard, &mut messages, ts_fmt, msg)?; } else { return Err(err(value)); } @@ -154,6 +156,7 @@ fn parse_messages( shard: ShardId, messages: &mut Vec, ts_fmt: TsFmt, + msg: MsgF, ) -> RedisResult<()> { for value in values { if let Value::Bulk(values) = value { @@ -173,7 +176,7 @@ fn parse_messages( let field = values.next().unwrap(); let field = string_from_redis_value(field)?; let value = values.next().unwrap(); - if field == MSG { + if field == msg.0 { let bytes = bytes_from_redis_value(value)?; let length = bytes.len(); messages.push(RedisMessage::new( diff --git a/sea-streamer-redis/src/producer.rs b/sea-streamer-redis/src/producer.rs index 53fa9ef..aed5431 100644 --- a/sea-streamer-redis/src/producer.rs +++ b/sea-streamer-redis/src/producer.rs @@ -3,8 +3,8 @@ use redis::{aio::ConnectionLike, cmd as command, ErrorKind, Pipeline}; use std::{fmt::Debug, future::Future, sync::Arc, time::Duration}; use crate::{ - map_err, parse_message_id, string_from_redis_value, RedisCluster, RedisErr, RedisResult, - TimestampFormat, MSG, ZERO, + map_err, parse_message_id, string_from_redis_value, MessageField, RedisCluster, RedisErr, + RedisResult, TimestampFormat, ZERO, }; use sea_streamer_runtime::{sleep, spawn_task}; use sea_streamer_types::{ @@ -26,6 +26,7 @@ pub struct RedisProducer { pub struct RedisProducerOptions { sharder: Option>, pub(crate) timestamp_format: TimestampFormat, + pub(crate) message_field: MessageField, } impl Debug for RedisProducerOptions { @@ -190,6 +191,7 @@ pub(crate) async fn create_producer( let (sender, receiver) = unbounded(); let mut sharder = options.sharder.take().map(|a| a.init()); let timestamp_format = options.timestamp_format; + let message_field = options.message_field; // Redis commands are exclusive (`&mut self`), so we need a producer task spawn_task(async move { @@ -238,7 +240,7 @@ pub(crate) async fn create_producer( cmd.arg(&ts) } }; - let msg = [(MSG, bytes)]; + let msg = [(message_field.0, bytes)]; cmd.arg(&msg); let command = (redis_key.to_owned(), stream_key, shard, receipt); if batch.0.is_empty() || batch.0.last().unwrap().0 == command.0 { diff --git a/sea-streamer-redis/src/streamer.rs b/sea-streamer-redis/src/streamer.rs index 2c87fc9..3fc0aaa 100644 --- a/sea-streamer-redis/src/streamer.rs +++ b/sea-streamer-redis/src/streamer.rs @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration}; use crate::{ create_consumer, create_producer, RedisCluster, RedisConsumer, RedisConsumerOptions, RedisErr, - RedisProducer, RedisProducerOptions, RedisResult, REDIS_PORT, + RedisProducer, RedisProducerOptions, RedisResult, MSG, REDIS_PORT, }; use sea_streamer_types::{ ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri, @@ -25,6 +25,16 @@ pub struct RedisConnectOptions { enable_cluster: bool, disable_hostname_verification: bool, timestamp_format: TimestampFormat, + message_field: MessageField, +} + +#[derive(Debug, Copy, Clone)] +pub(crate) struct MessageField(pub &'static str); + +impl Default for MessageField { + fn default() -> Self { + Self(MSG) + } } #[derive(Debug, Default, Clone, Copy)] @@ -75,6 +85,7 @@ impl Streamer for RedisStreamer { mut options: Self::ProducerOptions, ) -> RedisResult { options.timestamp_format = self.options.timestamp_format; + options.message_field = self.options.message_field; let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?; create_producer(cluster, options).await } @@ -85,6 +96,7 @@ impl Streamer for RedisStreamer { mut options: Self::ConsumerOptions, ) -> RedisResult { options.timestamp_format = self.options.timestamp_format; + options.message_field = self.options.message_field; let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?; create_consumer(cluster, options, streams.to_vec()).await } @@ -160,4 +172,13 @@ impl RedisConnectOptions { self.timestamp_format = fmt; self } + + pub fn message_field(&self) -> &'static str { + self.message_field.0 + } + /// The field used to hold the message. Defaults to [`crate::MSG`]. + pub fn set_message_field(&mut self, field: &'static str) -> &mut Self { + self.message_field = MessageField(field); + self + } }