Skip to content

Commit

Permalink
Impl serde for StreamKey
Browse files Browse the repository at this point in the history
  • Loading branch information
tyt2y3 committed Oct 24, 2024
1 parent a01e95c commit 10c0139
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 12 deletions.
4 changes: 2 additions & 2 deletions sea-streamer-types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ futures = { version = "0.3", default-features = false, features = ["std", "alloc
thiserror = { version = "1", default-features = false }
time = { version = "0.3", default-features = false, features = ["std", "macros", "formatting"] }
url = { version = "2.2", default-features = false }
serde = { version = "1", default-features = false, optional = true, features = ["derive"] }
serde = { version = "1", default-features = false, optional = true }
serde_json = { version = "1", optional = true }

[features]
json = ["serde", "serde_json"]
json = ["serde/derive", "serde_json"]
wide-seq-no = []
19 changes: 9 additions & 10 deletions sea-streamer-types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,6 @@ pub struct MessageHeader {
timestamp: Timestamp,
}

#[cfg(feature = "serde")]
#[derive(serde::Serialize)]
struct HeaderJson<'a> {
stream_key: &'a str,
shard_id: u64,
sequence: u64,
timestamp: String,
}

/// Common interface of byte containers.
pub trait Buffer {
fn size(&self) -> usize;
Expand Down Expand Up @@ -367,12 +358,20 @@ impl<'a> IntoBytesOrStr<'a> for &'a [u8] {
}
}

#[cfg(feature = "serde")]
#[cfg(feature = "json")]
impl serde::Serialize for MessageHeader {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
#[derive(serde::Serialize)]
struct HeaderJson<'a> {
stream_key: &'a str,
shard_id: u64,
sequence: SeqNo,
timestamp: String,
}

HeaderJson {
timestamp: self
.timestamp
Expand Down
24 changes: 24 additions & 0 deletions sea-streamer-types/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,27 @@ pub fn is_valid_stream_key_char(c: char) -> bool {
// https://stackoverflow.com/questions/37062904/what-are-apache-kafka-topic-name-limitations
c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-')
}

#[cfg(feature = "json")]
mod impl_serde {
use super::StreamKey;

impl<'de> serde::Deserialize<'de> for StreamKey {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let s = <&str>::deserialize(deserializer)?;
s.parse().map_err(serde::de::Error::custom)
}
}

impl serde::Serialize for StreamKey {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(self.name())
}
}
}

0 comments on commit 10c0139

Please sign in to comment.