diff --git a/src/connector/src/source/kinesis/enumerator/client.rs b/src/connector/src/source/kinesis/enumerator/client.rs index 423516fa5bd47..e9b03b7c2688e 100644 --- a/src/connector/src/source/kinesis/enumerator/client.rs +++ b/src/connector/src/source/kinesis/enumerator/client.rs @@ -87,8 +87,8 @@ impl SplitEnumerator for KinesisSplitEnumerator { .map(|x| KinesisSplit { shard_id: x.shard_id().to_string().into(), // handle start with position in reader part - start_position: KinesisOffset::None, - end_position: KinesisOffset::None, + next_offset: KinesisOffset::None, + end_offset: KinesisOffset::None, }) .collect()) } diff --git a/src/connector/src/source/kinesis/source/reader.rs b/src/connector/src/source/kinesis/source/reader.rs index 1089ed114dca2..a1f8db4578054 100644 --- a/src/connector/src/source/kinesis/source/reader.rs +++ b/src/connector/src/source/kinesis/source/reader.rs @@ -42,9 +42,9 @@ pub struct KinesisSplitReader { shard_id: SplitId, latest_offset: Option, shard_iter: Option, - start_position: KinesisOffset, + next_offset: KinesisOffset, #[expect(dead_code)] - end_position: KinesisOffset, + end_offset: KinesisOffset, split_id: SplitId, parser_config: ParserConfig, @@ -67,7 +67,7 @@ impl SplitReader for KinesisSplitReader { let split = splits.into_iter().next().unwrap(); - let start_position = match &split.start_position { + let next_offset = match &split.next_offset { KinesisOffset::None => match &properties.scan_startup_mode { None => KinesisOffset::Earliest, Some(mode) => match mode.as_str() { @@ -85,10 +85,10 @@ impl SplitReader for KinesisSplitReader { } }, }, - start_position => start_position.to_owned(), + next_offset => next_offset.to_owned(), }; - if !matches!(start_position, KinesisOffset::Timestamp(_)) + if !matches!(next_offset, KinesisOffset::Timestamp(_)) && properties.timestamp_offset.is_some() { // cannot bail! here because all new split readers will fail to start if user set 'scan.startup.mode' to 'timestamp' @@ -107,8 +107,8 @@ impl SplitReader for KinesisSplitReader { shard_id: split.shard_id, shard_iter: None, latest_offset: None, - start_position, - end_position: split.end_position, + next_offset, + end_offset: split.end_offset, split_id, parser_config, source_ctx, @@ -250,9 +250,9 @@ impl KinesisSplitReader { ShardIteratorType::AfterSequenceNumber, ) } else { - match &self.start_position { + match &self.next_offset { KinesisOffset::Earliest => (None, None, ShardIteratorType::TrimHorizon), - KinesisOffset::SequenceNumber(seq) => ( + KinesisOffset::AfterSequenceNumber(seq) => ( Some(seq.clone()), None, ShardIteratorType::AfterSequenceNumber, @@ -365,8 +365,8 @@ mod tests { properties.clone(), vec![KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), - start_position: KinesisOffset::Earliest, - end_position: KinesisOffset::None, + next_offset: KinesisOffset::Earliest, + end_offset: KinesisOffset::None, }], Default::default(), SourceContext::dummy().into(), @@ -381,10 +381,10 @@ mod tests { properties.clone(), vec![KinesisSplit { shard_id: "shardId-000000000001".to_string().into(), - start_position: KinesisOffset::SequenceNumber( + next_offset: KinesisOffset::AfterSequenceNumber( "49629139817504901062972448413535783695568426186596941842".to_string(), ), - end_position: KinesisOffset::None, + end_offset: KinesisOffset::None, }], Default::default(), SourceContext::dummy().into(), diff --git a/src/connector/src/source/kinesis/split.rs b/src/connector/src/source/kinesis/split.rs index 441846f00a786..9143750ab1be8 100644 --- a/src/connector/src/source/kinesis/split.rs +++ b/src/connector/src/source/kinesis/split.rs @@ -18,20 +18,30 @@ use serde::{Deserialize, Serialize}; use crate::error::ConnectorResult; use crate::source::{SplitId, SplitMetaData}; +/// See for more details. #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Hash)] pub enum KinesisOffset { + /// Corresponds to `TRIM_HORIZON`. Points the oldest record in the shard. Earliest, + /// Corresponds to `LATEST`. Points to the (still-nonexisting) record just after the most recent one in the shard. Latest, - SequenceNumber(String), + /// Corresponds to `AFTER_SEQUENCE_NUMBER`. Points the record just after the one with the given sequence number. + #[serde(alias = "SequenceNumber")] // for backward compatibility + AfterSequenceNumber(String), + /// Corresponds to `AT_TIMESTAMP`. Points to the (first) record right at or after the given timestamp. Timestamp(i64), + None, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Hash)] pub struct KinesisSplit { pub(crate) shard_id: SplitId, - pub(crate) start_position: KinesisOffset, - pub(crate) end_position: KinesisOffset, + + #[serde(alias = "start_position")] // for backward compatibility + pub(crate) next_offset: KinesisOffset, + #[serde(alias = "end_position")] // for backward compatibility + pub(crate) end_offset: KinesisOffset, } impl SplitMetaData for KinesisSplit { @@ -48,13 +58,7 @@ impl SplitMetaData for KinesisSplit { } fn update_offset(&mut self, last_seen_offset: String) -> ConnectorResult<()> { - let start_offset = if last_seen_offset.is_empty() { - KinesisOffset::Earliest - } else { - KinesisOffset::SequenceNumber(last_seen_offset) - }; - - self.start_position = start_offset; + self.next_offset = KinesisOffset::AfterSequenceNumber(last_seen_offset); Ok(()) } } @@ -62,13 +66,13 @@ impl SplitMetaData for KinesisSplit { impl KinesisSplit { pub fn new( shard_id: SplitId, - start_position: KinesisOffset, - end_position: KinesisOffset, + next_offset: KinesisOffset, + end_offset: KinesisOffset, ) -> KinesisSplit { KinesisSplit { shard_id, - start_position, - end_position, + next_offset, + end_offset, } } }