Skip to content

Commit

Permalink
Support nanosecond timestamp in Redis
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Oct 10, 2024
1 parent b913a51 commit 36530cf
Show file tree
Hide file tree
Showing 17 changed files with 131 additions and 76 deletions.
2 changes: 1 addition & 1 deletion sea-streamer-file/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl AsyncFile {
.seek(match to {
SeqPos::Beginning => SeekFrom::Start(0),
SeqPos::End => SeekFrom::End(0),
SeqPos::At(to) => SeekFrom::Start(to),
SeqPos::At(to) => SeekFrom::Start(to.try_into().expect("SeqNo out of range")),
})
.await
.map_err(FileErr::IoError)?;
Expand Down
8 changes: 5 additions & 3 deletions sea-streamer-file/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ use crate::{
ByteSink, ByteSource, Bytes, FileErr,
};
use sea_streamer_types::{
Buffer, Message as MessageTrait, OwnedMessage, ShardId, StreamKey, StreamKeyErr, Timestamp,
Buffer, Message as MessageTrait, OwnedMessage, SeqNo, ShardId, StreamKey, StreamKeyErr,
Timestamp,
};
#[cfg(feature = "serde")]
use serde::Serialize;
Expand Down Expand Up @@ -353,7 +354,7 @@ impl MessageHeader {
use sea_streamer_types::MessageHeader as Header;
let stream_key = StreamKey::new(ShortString::read_from(file).await?.string())?;
let shard_id = ShardId::new(U64::read_from(file).await?.0);
let sequence = U64::read_from(file).await?.0;
let sequence = U64::read_from(file).await?.0 as SeqNo;
let timestamp = UnixTimestamp::read_from(file).await?.0;
Ok(Self(Header::new(stream_key, shard_id, sequence, timestamp)))
}
Expand All @@ -363,7 +364,8 @@ impl MessageHeader {
let h = self.0;
sum += ShortString::new(h.stream_key().name().to_owned())?.write_to(sink)?;
sum += U64(h.shard_id().id()).write_to(sink)?;
sum += U64(*h.sequence()).write_to(sink)?;
sum += U64(TryInto::<u64>::try_into(*h.sequence()).expect("SeqNo out of range"))
.write_to(sink)?;
sum += UnixTimestamp(*h.timestamp()).write_to(sink)?;
Ok(sum)
}
Expand Down
22 changes: 11 additions & 11 deletions sea-streamer-file/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ impl MessageSource {
/// Warning: This future must not be canceled.
pub async fn rewind(&mut self, target: SeqPos) -> Result<u32, FileErr> {
let pos = match target {
SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as u64),
SeqPos::Beginning | SeqPos::At(0) => SeqPos::At(Header::size() as SeqNo),
SeqPos::End => SeqPos::End,
SeqPos::At(nth) => {
let at = nth * self.beacon_interval();
let at = nth as u64 * self.beacon_interval();
if at < self.known_size() {
SeqPos::At(at)
SeqPos::At(at as SeqNo)
} else {
SeqPos::End
}
Expand All @@ -130,7 +130,7 @@ impl MessageSource {
SeqPos::Beginning | SeqPos::At(0) => unreachable!(),
SeqPos::End => max,
SeqPos::At(nth) => {
let at = nth * self.beacon_interval();
let at = nth as u64 * self.beacon_interval();
if at < self.known_size() {
at
} else {
Expand All @@ -139,7 +139,7 @@ impl MessageSource {
}
};
if self.offset != pos {
self.offset = self.source.seek(SeqPos::At(pos)).await?;
self.offset = self.source.seek(SeqPos::At(pos as SeqNo)).await?;
}
}

Expand Down Expand Up @@ -175,7 +175,7 @@ impl MessageSource {
while let Ok(message) = Message::read_from(&mut buffer).await {
next += message.size() as u64;
}
self.offset = self.source.seek(SeqPos::At(next)).await?;
self.offset = self.source.seek(SeqPos::At(next as SeqNo)).await?;
}

Ok((self.offset / self.beacon_interval()) as u32)
Expand Down Expand Up @@ -226,7 +226,7 @@ impl MessageSource {
}
};
// now we know roughly where's the message
match self.rewind(SeqPos::At(pos as u64)).await {
match self.rewind(SeqPos::At(pos as SeqNo)).await {
Ok(_) => (),
Err(e) => {
break 'outer match e {
Expand Down Expand Up @@ -260,7 +260,7 @@ impl MessageSource {
self.source = source.switch_to(source_type).await?;

if res.is_err() {
self.source.seek(SeqPos::At(savepoint)).await?;
self.source.seek(SeqPos::At(savepoint as SeqNo)).await?;
self.buffer.clear();
self.pending.take();
}
Expand Down Expand Up @@ -399,7 +399,7 @@ impl BeaconReader for MessageSource {
fn survey(&mut self, at: NonZeroU32) -> Self::Future<'_> {
async move {
let at = at.get() as u64 * self.beacon_interval();
let offset = self.source.seek(SeqPos::At(at)).await?;
let offset = self.source.seek(SeqPos::At(at as SeqNo)).await?;
if at == offset {
let beacon = Beacon::read_from(&mut self.source).await?;
Ok(beacon)
Expand Down Expand Up @@ -462,7 +462,7 @@ impl MessageSink {
if nth > 0 {
// we need to rewind further backwards
nth -= 1;
source.rewind(SeqPos::At(nth as u64)).await?;
source.rewind(SeqPos::At(nth as SeqNo)).await?;
} else {
// we reached the start now
break;
Expand Down Expand Up @@ -492,7 +492,7 @@ impl MessageSink {
let has_beacon = source.has_beacon(offset).is_some();
if let DynFileSource::FileReader(reader) = source.source {
let (mut file, _, _) = reader.end();
assert_eq!(offset, file.seek(SeqPos::At(offset)).await?);
assert_eq!(offset, file.seek(SeqPos::At(offset as SeqNo)).await?);
let mut sink = FileSink::new(file, limit)?;

if has_beacon {
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-file/src/producer/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ impl Writer {
n -= 1;
}
// 2. go forward from there and read all messages up to started_from, recording the latest messages
source.rewind(SeqPos::At(n as u64)).await?;
source.rewind(SeqPos::At(n as SeqNo)).await?;
while source.offset() < sink.started_from() {
match source.next().await {
Ok(msg) => {
Expand Down Expand Up @@ -234,7 +234,7 @@ impl Writer {
_ => panic!("Must be FileReader"),
};
let (mut file, _, _) = reader.end();
file.seek(SeqPos::At(sink.offset())).await?; // restore offset
file.seek(SeqPos::At(sink.offset() as SeqNo)).await?; // restore offset
sink.use_file(FileSink::new(file, file_size_limit)?);
// now we've gone through the stream, we can safely assume the stream state
let entry = streams.entry(key.clone()).or_default();
Expand Down
33 changes: 19 additions & 14 deletions sea-streamer-file/tests/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn consumer() -> anyhow::Result<()> {
};
use sea_streamer_types::{
export::futures::TryStreamExt, Buffer, Consumer, Message, MessageHeader, OwnedMessage,
ShardId, SharedMessage, StreamErr, StreamKey, Streamer, Timestamp,
SeqNo, ShardId, SharedMessage, StreamErr, StreamKey, Streamer, Timestamp,
};

const TEST: &str = "consumer";
Expand All @@ -33,13 +33,14 @@ async fn consumer() -> anyhow::Result<()> {
let shard = ShardId::new(1);

let message = |i: u64| -> OwnedMessage {
let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc());
let header =
MessageHeader::new(stream_key.clone(), shard, i as SeqNo, Timestamp::now_utc());
OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes())
};
let check = |i: u64, mess: SharedMessage| {
assert_eq!(mess.header().stream_key(), &stream_key);
assert_eq!(mess.header().shard_id(), &shard);
assert_eq!(mess.header().sequence(), &i);
assert_eq!(*mess.header().sequence() as u64, i);
assert_eq!(
mess.message().as_str().unwrap(),
format!("{}-{}", stream_key.name(), i)
Expand Down Expand Up @@ -119,8 +120,8 @@ async fn demux() -> anyhow::Result<()> {
DEFAULT_FILE_SIZE_LIMIT,
};
use sea_streamer_types::{
Buffer, Consumer, Message, MessageHeader, OwnedMessage, ShardId, SharedMessage, StreamKey,
Streamer, Timestamp,
Buffer, Consumer, Message, MessageHeader, OwnedMessage, SeqNo, ShardId, SharedMessage,
StreamKey, Streamer, Timestamp,
};

const TEST: &str = "demux";
Expand All @@ -139,16 +140,18 @@ async fn demux() -> anyhow::Result<()> {
let shard = ShardId::new(1);

let cat = |i: u64| -> OwnedMessage {
let header = MessageHeader::new(cat_key.clone(), shard, i, Timestamp::now_utc());
let header =
MessageHeader::new(cat_key.clone(), shard, i as SeqNo, Timestamp::now_utc());
OwnedMessage::new(header, format!("{}", i).into_bytes())
};
let dog = |i: u64| -> OwnedMessage {
let header = MessageHeader::new(dog_key.clone(), shard, i, Timestamp::now_utc());
let header =
MessageHeader::new(dog_key.clone(), shard, i as SeqNo, Timestamp::now_utc());
OwnedMessage::new(header, format!("{}", i).into_bytes())
};
let check = |i: u64, mess: &SharedMessage| {
assert_eq!(mess.header().shard_id(), &shard);
assert_eq!(mess.header().sequence(), &i);
assert_eq!(*mess.header().sequence() as u64, i);
assert_eq!(mess.message().as_str().unwrap(), format!("{}", i));
};
let is_cat = |i: u64, m: SharedMessage| {
Expand Down Expand Up @@ -217,7 +220,7 @@ async fn group() -> anyhow::Result<()> {
};
use sea_streamer_types::{
Buffer, Consumer, ConsumerGroup, ConsumerMode, ConsumerOptions, Message, MessageHeader,
OwnedMessage, ShardId, SharedMessage, StreamKey, Streamer, Timestamp,
OwnedMessage, SeqNo, ShardId, SharedMessage, StreamKey, Streamer, Timestamp,
};

const TEST: &str = "group";
Expand All @@ -236,13 +239,14 @@ async fn group() -> anyhow::Result<()> {
let group = ConsumerGroup::new("friends");

let message = |i: u64| -> OwnedMessage {
let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc());
let header =
MessageHeader::new(stream_key.clone(), shard, i as SeqNo, Timestamp::now_utc());
OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes())
};
let check = |i: u64, mess: SharedMessage| {
assert_eq!(mess.header().stream_key(), &stream_key);
assert_eq!(mess.header().shard_id(), &shard);
assert_eq!(mess.header().sequence(), &i);
assert_eq!(*mess.header().sequence() as u64, i);
assert_eq!(
mess.message().as_str().unwrap(),
format!("{}-{}", stream_key.name(), i)
Expand Down Expand Up @@ -325,7 +329,7 @@ async fn seek() -> anyhow::Result<()> {
};
use sea_streamer_types::{
Buffer, Consumer, ConsumerGroup, ConsumerMode, ConsumerOptions, Message, MessageHeader,
OwnedMessage, SeqPos, ShardId, SharedMessage, StreamKey, Streamer, Timestamp,
OwnedMessage, SeqNo, SeqPos, ShardId, SharedMessage, StreamKey, Streamer, Timestamp,
};

const TEST: &str = "seek";
Expand All @@ -344,13 +348,14 @@ async fn seek() -> anyhow::Result<()> {
let group = ConsumerGroup::new("group");

let message = |i: u64| -> OwnedMessage {
let header = MessageHeader::new(stream_key.clone(), shard, i, Timestamp::now_utc());
let header =
MessageHeader::new(stream_key.clone(), shard, i as SeqNo, Timestamp::now_utc());
OwnedMessage::new(header, format!("{}-{}", stream_key.name(), i).into_bytes())
};
let check = |i: u64, mess: SharedMessage| {
assert_eq!(mess.header().stream_key(), &stream_key);
assert_eq!(mess.header().shard_id(), &shard);
assert_eq!(mess.header().sequence(), &i);
assert_eq!(*mess.header().sequence() as u64, i);
assert_eq!(
mess.message().as_str().unwrap(),
format!("{}-{}", stream_key.name(), i)
Expand Down
4 changes: 2 additions & 2 deletions sea-streamer-kafka/src/producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub use rdkafka::{consumer::ConsumerGroupMetadata, producer::FutureRecord, Topic
use sea_streamer_runtime::spawn_blocking;
use sea_streamer_types::{
export::futures::FutureExt, runtime_error, Buffer, MessageHeader, Producer, ProducerOptions,
ShardId, StreamErr, StreamKey, StreamResult, StreamerUri, Timestamp,
SeqNo, ShardId, StreamErr, StreamKey, StreamResult, StreamerUri, Timestamp,
};

#[derive(Clone)]
Expand Down Expand Up @@ -373,7 +373,7 @@ impl Future for SendFuture {
Ok((part, offset)) => Ok(MessageHeader::new(
self.stream_key.take().expect("Must have stream_key"),
ShardId::new(part as u64),
offset as u64,
offset as SeqNo,
Timestamp::now_utc(),
)),
Err((err, _)) => Err(stream_err(err)),
Expand Down
1 change: 1 addition & 0 deletions sea-streamer-redis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ runtime-async-std = ["async-std", "redis/async-std-comp", "sea-streamer-runtime/
runtime-tokio = ["tokio", "redis/tokio-comp", "sea-streamer-runtime/runtime-tokio"]
runtime-async-std-native-tls = ["runtime-async-std", "redis/async-std-native-tls-comp"]
runtime-tokio-native-tls = ["runtime-tokio", "redis/tokio-native-tls-comp"]
nanosecond-timestamp = ["sea-streamer-types/wide-seq-no"]

[[bin]]
name = "consumer"
Expand Down
3 changes: 2 additions & 1 deletion sea-streamer-redis/src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{fmt::Debug, future::Future, sync::Arc, time::Duration};

use crate::{
from_seq_no, get_message_id, host_id, MessageId, RedisCluster, RedisErr, RedisResult,
DEFAULT_TIMEOUT, MAX_MSG_ID,
TimestampFormat, DEFAULT_TIMEOUT, MAX_MSG_ID,
};
use sea_streamer_runtime::{spawn_task, timeout};
use sea_streamer_types::{
Expand Down Expand Up @@ -50,6 +50,7 @@ pub struct RedisConsumerOptions {
batch_size: usize,
shard_ownership: ShardOwnership,
mkstream: bool,
pub(crate) timestamp_format: TimestampFormat,
}

#[derive(Debug)]
Expand Down
27 changes: 15 additions & 12 deletions sea-streamer-redis/src/consumer/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,20 +544,22 @@ impl Node {
log::trace!("XREAD ...");
assert!(self.buffer.is_empty());
match conn.req_packed_command(&cmd).await {
Ok(value) => match StreamReadReply::from_redis_value(value) {
Ok(StreamReadReply(mut mess)) => {
log::trace!("Read {} messages", mess.len());
if mess.is_empty() {
// If we receive an empty reply, it means if we were reading the pending list
// then the list is now empty
self.group.pending_state = false;
Ok(value) => {
match StreamReadReply::from_redis_value(value, self.options.timestamp_format) {
Ok(StreamReadReply(mut mess)) => {
log::trace!("Read {} messages", mess.len());
if mess.is_empty() {
// If we receive an empty reply, it means if we were reading the pending list
// then the list is now empty
self.group.pending_state = false;
}
mess.reverse();
self.buffer = mess;
Ok(ReadResult::Msg(self.buffer.len()))
}
mess.reverse();
self.buffer = mess;
Ok(ReadResult::Msg(self.buffer.len()))
Err(err) => self.send_error(err).await,
}
Err(err) => self.send_error(err).await,
},
}
Err(err) => {
let kind = err.kind();
if kind == ErrorKind::Moved {
Expand Down Expand Up @@ -675,6 +677,7 @@ impl Node {
value,
claiming.stream.0.clone(),
claiming.stream.1,
self.options.timestamp_format,
) {
Ok(AutoClaimReply(mut mess)) => {
log::trace!(
Expand Down
3 changes: 2 additions & 1 deletion sea-streamer-redis/src/consumer/options.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::{constants::*, ConsumerConfig, RedisConsumerOptions};
use crate::{RedisErr, RedisResult};
use crate::{RedisErr, RedisResult, TimestampFormat};
use sea_streamer_types::{ConsumerGroup, ConsumerId, ConsumerMode, ConsumerOptions, StreamErr};
use std::time::Duration;

Expand Down Expand Up @@ -83,6 +83,7 @@ impl ConsumerOptions for RedisConsumerOptions {
},
shard_ownership: ShardOwnership::Shared,
mkstream: false,
timestamp_format: TimestampFormat::default(),
}
}

Expand Down
Loading

0 comments on commit 36530cf

Please sign in to comment.