Skip to content

Commit

Permalink
refactor(kinesis): rename fields of KinesisOffset and `KinesisSplit…
Browse files Browse the repository at this point in the history
…` to make everything explicit (#18704)

Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc authored Sep 25, 2024
1 parent 1db23de commit e535aa7
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 29 deletions.
4 changes: 2 additions & 2 deletions src/connector/src/source/kinesis/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ impl SplitEnumerator for KinesisSplitEnumerator {
.map(|x| KinesisSplit {
shard_id: x.shard_id().to_string().into(),
// handle start with position in reader part
start_position: KinesisOffset::None,
end_position: KinesisOffset::None,
next_offset: KinesisOffset::None,
end_offset: KinesisOffset::None,
})
.collect())
}
Expand Down
26 changes: 13 additions & 13 deletions src/connector/src/source/kinesis/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ pub struct KinesisSplitReader {
shard_id: SplitId,
latest_offset: Option<String>,
shard_iter: Option<String>,
start_position: KinesisOffset,
next_offset: KinesisOffset,
#[expect(dead_code)]
end_position: KinesisOffset,
end_offset: KinesisOffset,

split_id: SplitId,
parser_config: ParserConfig,
Expand All @@ -67,7 +67,7 @@ impl SplitReader for KinesisSplitReader {

let split = splits.into_iter().next().unwrap();

let start_position = match &split.start_position {
let next_offset = match &split.next_offset {
KinesisOffset::None => match &properties.scan_startup_mode {
None => KinesisOffset::Earliest,
Some(mode) => match mode.as_str() {
Expand All @@ -85,10 +85,10 @@ impl SplitReader for KinesisSplitReader {
}
},
},
start_position => start_position.to_owned(),
next_offset => next_offset.to_owned(),
};

if !matches!(start_position, KinesisOffset::Timestamp(_))
if !matches!(next_offset, KinesisOffset::Timestamp(_))
&& properties.timestamp_offset.is_some()
{
// cannot bail! here because all new split readers will fail to start if user set 'scan.startup.mode' to 'timestamp'
Expand All @@ -107,8 +107,8 @@ impl SplitReader for KinesisSplitReader {
shard_id: split.shard_id,
shard_iter: None,
latest_offset: None,
start_position,
end_position: split.end_position,
next_offset,
end_offset: split.end_offset,
split_id,
parser_config,
source_ctx,
Expand Down Expand Up @@ -250,9 +250,9 @@ impl KinesisSplitReader {
ShardIteratorType::AfterSequenceNumber,
)
} else {
match &self.start_position {
match &self.next_offset {
KinesisOffset::Earliest => (None, None, ShardIteratorType::TrimHorizon),
KinesisOffset::SequenceNumber(seq) => (
KinesisOffset::AfterSequenceNumber(seq) => (
Some(seq.clone()),
None,
ShardIteratorType::AfterSequenceNumber,
Expand Down Expand Up @@ -365,8 +365,8 @@ mod tests {
properties.clone(),
vec![KinesisSplit {
shard_id: "shardId-000000000001".to_string().into(),
start_position: KinesisOffset::Earliest,
end_position: KinesisOffset::None,
next_offset: KinesisOffset::Earliest,
end_offset: KinesisOffset::None,
}],
Default::default(),
SourceContext::dummy().into(),
Expand All @@ -381,10 +381,10 @@ mod tests {
properties.clone(),
vec![KinesisSplit {
shard_id: "shardId-000000000001".to_string().into(),
start_position: KinesisOffset::SequenceNumber(
next_offset: KinesisOffset::AfterSequenceNumber(
"49629139817504901062972448413535783695568426186596941842".to_string(),
),
end_position: KinesisOffset::None,
end_offset: KinesisOffset::None,
}],
Default::default(),
SourceContext::dummy().into(),
Expand Down
32 changes: 18 additions & 14 deletions src/connector/src/source/kinesis/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,30 @@ use serde::{Deserialize, Serialize};
use crate::error::ConnectorResult;
use crate::source::{SplitId, SplitMetaData};

/// See <https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartingPosition.html> for more details.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)]
pub enum KinesisOffset {
/// Corresponds to `TRIM_HORIZON`. Points the oldest record in the shard.
Earliest,
/// Corresponds to `LATEST`. Points to the (still-nonexisting) record just after the most recent one in the shard.
Latest,
SequenceNumber(String),
/// Corresponds to `AFTER_SEQUENCE_NUMBER`. Points the record just after the one with the given sequence number.
#[serde(alias = "SequenceNumber")] // for backward compatibility
AfterSequenceNumber(String),
/// Corresponds to `AT_TIMESTAMP`. Points to the (first) record right at or after the given timestamp.
Timestamp(i64),

None,
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)]
pub struct KinesisSplit {
pub(crate) shard_id: SplitId,
pub(crate) start_position: KinesisOffset,
pub(crate) end_position: KinesisOffset,

#[serde(alias = "start_position")] // for backward compatibility
pub(crate) next_offset: KinesisOffset,
#[serde(alias = "end_position")] // for backward compatibility
pub(crate) end_offset: KinesisOffset,
}

impl SplitMetaData for KinesisSplit {
Expand All @@ -48,27 +58,21 @@ impl SplitMetaData for KinesisSplit {
}

fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> {
let start_offset = if last_seen_offset.is_empty() {
KinesisOffset::Earliest
} else {
KinesisOffset::SequenceNumber(last_seen_offset)
};

self.start_position = start_offset;
self.next_offset = KinesisOffset::AfterSequenceNumber(last_seen_offset);
Ok(())
}
}

impl KinesisSplit {
pub fn new(
shard_id: SplitId,
start_position: KinesisOffset,
end_position: KinesisOffset,
next_offset: KinesisOffset,
end_offset: KinesisOffset,
) -> KinesisSplit {
KinesisSplit {
shard_id,
start_position,
end_position,
next_offset,
end_offset,
}
}
}

0 comments on commit e535aa7

Please sign in to comment.