Skip to content

Commit

Permalink
Support custom message field in Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Oct 10, 2024
1 parent 36530cf commit 0b2bde0
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 13 deletions.
1 change: 1 addition & 0 deletions sea-streamer-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl AsyncFile {
.seek(match to {
SeqPos::Beginning => SeekFrom::Start(0),
SeqPos::End => SeekFrom::End(0),
#[allow(clippy::useless_conversion)]
SeqPos::At(to) => SeekFrom::Start(to.try_into().expect("SeqNo out of range")),
})
.await
Expand Down
2 changes: 2 additions & 0 deletions sea-streamer-file/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl MessageSource {
SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as SeqNo),
SeqPos::End => SeqPos::End,
SeqPos::At(nth) => {
#[allow(clippy::unnecessary_cast)]
let at = nth as u64 * self.beacon_interval();
if at < self.known_size() {
SeqPos::At(at as SeqNo)
Expand All @@ -130,6 +131,7 @@ impl MessageSource {
SeqPos::Beginning | SeqPos::At(0) => unreachable!(),
SeqPos::End => max,
SeqPos::At(nth) => {
#[allow(clippy::unnecessary_cast)]
let at = nth as u64 * self.beacon_interval();
if at < self.known_size() {
at
Expand Down
5 changes: 3 additions & 2 deletions sea-streamer-redis/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use flume::{bounded, unbounded, Receiver, Sender};
use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};

use crate::{
from_seq_no, get_message_id, host_id, MessageId, RedisCluster, RedisErr, RedisResult,
TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID,
from_seq_no, get_message_id, host_id, MessageField, MessageId, RedisCluster, RedisErr,
RedisResult, TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID,
};
use sea_streamer_runtime::{spawn_task, timeout};
use sea_streamer_types::{
Expand Down Expand Up @@ -51,6 +51,7 @@ pub struct RedisConsumerOptions {
shard_ownership: ShardOwnership,
mkstream: bool,
pub(crate) timestamp_format: TimestampFormat,
pub(crate) message_field: MessageField,
}

#[derive(Debug)]
Expand Down
7 changes: 6 additions & 1 deletion sea-streamer-redis/src/consumer/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,11 @@ impl Node {
assert!(self.buffer.is_empty());
match conn.req_packed_command(&cmd).await {
Ok(value) => {
match StreamReadReply::from_redis_value(value, self.options.timestamp_format) {
match StreamReadReply::from_redis_value(
value,
self.options.timestamp_format,
self.options.message_field,
) {
Ok(StreamReadReply(mut mess)) => {
log::trace!("Read {} messages", mess.len());
if mess.is_empty() {
Expand Down Expand Up @@ -678,6 +682,7 @@ impl Node {
claiming.stream.0.clone(),
claiming.stream.1,
self.options.timestamp_format,
self.options.message_field,
) {
Ok(AutoClaimReply(mut mess)) => {
log::trace!(
Expand Down
3 changes: 2 additions & 1 deletion sea-streamer-redis/src/consumer/options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{constants::*, ConsumerConfig, RedisConsumerOptions};
use crate::{RedisErr, RedisResult, TimestampFormat};
use crate::{MessageField, RedisErr, RedisResult, TimestampFormat};
use sea_streamer_types::{ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, StreamErr};
use std::time::Duration;

Expand Down Expand Up @@ -84,6 +84,7 @@ impl ConsumerOptions for RedisConsumerOptions {
shard_ownership: ShardOwnership::Shared,
mkstream: false,
timestamp_format: TimestampFormat::default(),
message_field: MessageField::default(),
}
}

Expand Down
13 changes: 8 additions & 5 deletions sea-streamer-redis/src/message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{RedisErr, RedisResult, TimestampFormat as TsFmt, MSG, ZERO};
use crate::{MessageField as MsgF, RedisErr, RedisResult, TimestampFormat as TsFmt, ZERO};
use redis::Value;
use sea_streamer_types::{
MessageHeader, SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, Timestamp,
Expand Down Expand Up @@ -66,6 +66,7 @@ pub(crate) fn get_message_id(header: &MessageHeader) -> MessageId {

#[inline]
pub(crate) fn from_seq_no(seq_no: SeqNo) -> MessageId {
#[allow(clippy::unnecessary_cast)]
((seq_no >> 16) as u64, (seq_no & 0xFFFF) as u16)
}

Expand All @@ -85,7 +86,7 @@ impl RedisMessageId for RedisMessage {
// LOL such nesting. This is still undesirable, as there are 5 layers of nested Vec. But at least we don't have to copy the bytes again.
impl StreamReadReply {
/// Like [`redis::FromRedisValue`], but taking ownership instead of copying.
pub(crate) fn from_redis_value(value: Value, ts_fmt: TsFmt) -> RedisResult<Self> {
pub(crate) fn from_redis_value(value: Value, ts_fmt: TsFmt, msg: MsgF) -> RedisResult<Self> {
let mut messages = Vec::new();

if let Value::Bulk(values) = value {
Expand Down Expand Up @@ -113,7 +114,7 @@ impl StreamReadReply {
};
let stream_key = StreamKey::new(stream_key)?;
if let Value::Bulk(values) = value_1 {
parse_messages(values, stream_key, shard, &mut messages, ts_fmt)?;
parse_messages(values, stream_key, shard, &mut messages, ts_fmt, msg)?;
}
}
}
Expand All @@ -129,6 +130,7 @@ impl AutoClaimReply {
stream_key: StreamKey,
shard: ShardId,
ts_fmt: TsFmt,
msg: MsgF,
) -> RedisResult<Self> {
let mut messages = Vec::new();
if let Value::Bulk(values) = value {
Expand All @@ -139,7 +141,7 @@ impl AutoClaimReply {
_ = values.next().unwrap();
let value = values.next().unwrap();
if let Value::Bulk(values) = value {
parse_messages(values, stream_key, shard, &mut messages, ts_fmt)?;
parse_messages(values, stream_key, shard, &mut messages, ts_fmt, msg)?;
} else {
return Err(err(value));
}
Expand All @@ -154,6 +156,7 @@ fn parse_messages(
shard: ShardId,
messages: &mut Vec<RedisMessage>,
ts_fmt: TsFmt,
msg: MsgF,
) -> RedisResult<()> {
for value in values {
if let Value::Bulk(values) = value {
Expand All @@ -173,7 +176,7 @@ fn parse_messages(
let field = values.next().unwrap();
let field = string_from_redis_value(field)?;
let value = values.next().unwrap();
if field == MSG {
if field == msg.0 {
let bytes = bytes_from_redis_value(value)?;
let length = bytes.len();
messages.push(RedisMessage::new(
Expand Down
8 changes: 5 additions & 3 deletions sea-streamer-redis/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use redis::{aio::ConnectionLike, cmd as command, ErrorKind, Pipeline};
use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};

use crate::{
map_err, parse_message_id, string_from_redis_value, RedisCluster, RedisErr, RedisResult,
TimestampFormat, MSG, ZERO,
map_err, parse_message_id, string_from_redis_value, MessageField, RedisCluster, RedisErr,
RedisResult, TimestampFormat, ZERO,
};
use sea_streamer_runtime::{sleep, spawn_task};
use sea_streamer_types::{
Expand All @@ -26,6 +26,7 @@ pub struct RedisProducer {
pub struct RedisProducerOptions {
sharder: Option<Arc<dyn SharderConfig>>,
pub(crate) timestamp_format: TimestampFormat,
pub(crate) message_field: MessageField,
}

impl Debug for RedisProducerOptions {
Expand Down Expand Up @@ -190,6 +191,7 @@ pub(crate) async fn create_producer(
let (sender, receiver) = unbounded();
let mut sharder = options.sharder.take().map(|a| a.init());
let timestamp_format = options.timestamp_format;
let message_field = options.message_field;

// Redis commands are exclusive (`&mut self`), so we need a producer task
spawn_task(async move {
Expand Down Expand Up @@ -238,7 +240,7 @@ pub(crate) async fn create_producer(
cmd.arg(&ts)
}
};
let msg = [(MSG, bytes)];
let msg = [(message_field.0, bytes)];
cmd.arg(&msg);
let command = (redis_key.to_owned(), stream_key, shard, receipt);
if batch.0.is_empty() || batch.0.last().unwrap().0 == command.0 {
Expand Down
23 changes: 22 additions & 1 deletion sea-streamer-redis/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{sync::Arc, time::Duration};

use crate::{
create_consumer, create_producer, RedisCluster, RedisConsumer, RedisConsumerOptions, RedisErr,
RedisProducer, RedisProducerOptions, RedisResult, REDIS_PORT,
RedisProducer, RedisProducerOptions, RedisResult, MSG, REDIS_PORT,
};
use sea_streamer_types::{
ConnectOptions, StreamErr, StreamKey, StreamUrlErr, Streamer, StreamerUri,
Expand All @@ -25,6 +25,16 @@ pub struct RedisConnectOptions {
enable_cluster: bool,
disable_hostname_verification: bool,
timestamp_format: TimestampFormat,
message_field: MessageField,
}

#[derive(Debug, Copy, Clone)]
pub(crate) struct MessageField(pub &'static str);

impl Default for MessageField {
fn default() -> Self {
Self(MSG)
}
}

#[derive(Debug, Default, Clone, Copy)]
Expand Down Expand Up @@ -75,6 +85,7 @@ impl Streamer for RedisStreamer {
mut options: Self::ProducerOptions,
) -> RedisResult<Self::Producer> {
options.timestamp_format = self.options.timestamp_format;
options.message_field = self.options.message_field;
let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
create_producer(cluster, options).await
}
Expand All @@ -85,6 +96,7 @@ impl Streamer for RedisStreamer {
mut options: Self::ConsumerOptions,
) -> RedisResult<Self::Consumer> {
options.timestamp_format = self.options.timestamp_format;
options.message_field = self.options.message_field;
let cluster = RedisCluster::new(self.uri.clone(), self.options.clone())?;
create_consumer(cluster, options, streams.to_vec()).await
}
Expand Down Expand Up @@ -160,4 +172,13 @@ impl RedisConnectOptions {
self.timestamp_format = fmt;
self
}

pub fn message_field(&self) -> &'static str {
self.message_field.0
}
/// The field used to hold the message. Defaults to [`crate::MSG`].
pub fn set_message_field(&mut self, field: &'static str) -> &mut Self {
self.message_field = MessageField(field);
self
}
}

0 comments on commit 0b2bde0

Please sign in to comment.