diff --git a/src/stream/src/executor/source/kafka_backfill_executor.rs b/src/stream/src/executor/source/kafka_backfill_executor.rs index eed0bf7145480..a1a3ea65136e8 100644 --- a/src/stream/src/executor/source/kafka_backfill_executor.rs +++ b/src/stream/src/executor/source/kafka_backfill_executor.rs @@ -68,8 +68,11 @@ impl BackfillState { ) -> bool { let mut vis = false; match self { - BackfillState::Backfilling(backfill_offset) => { - match compare_kafka_offset(backfill_offset.as_ref(), offset) { + BackfillState::Backfilling(None) => { + // backfilling for this split is not started yet. Ignore this row + } + BackfillState::Backfilling(Some(backfill_offset)) => { + match compare_kafka_offset(backfill_offset, offset) { Ordering::Less => { // continue backfilling. Ignore this row } @@ -81,13 +84,13 @@ impl BackfillState { Ordering::Greater => { // backfilling for this split produced more data than current source's progress. // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset. - *self = BackfillState::SourceCachingUp(backfill_offset.clone().unwrap()); + *self = BackfillState::SourceCachingUp(backfill_offset.clone()); abort_handles.get(split).unwrap().abort(); } } } BackfillState::SourceCachingUp(backfill_offset) => { - match compare_kafka_offset(Some(backfill_offset), offset) { + match compare_kafka_offset(backfill_offset, offset) { Ordering::Less => { // XXX: Is this possible? i.e., Source caught up, but doesn't contain the // last backfilled row. @@ -796,15 +799,10 @@ impl KafkaBackfillExecutorInner { } } -fn compare_kafka_offset(a: Option<&String>, b: &str) -> Ordering { - match a { - Some(a) => { - let a = a.parse::().unwrap(); - let b = b.parse::().unwrap(); - a.cmp(&b) - } - None => Ordering::Less, - } +fn compare_kafka_offset(a: &str, b: &str) -> Ordering { + let a = a.parse::().unwrap(); + let b = b.parse::().unwrap(); + a.cmp(&b) } impl Executor for KafkaBackfillExecutor {