Skip to content

Commit

Permalink
minor improvement: avoid compare option string
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Feb 22, 2024
1 parent 732a1c5 commit 1a54a30
Showing 1 changed file with 11 additions and 13 deletions.
24 changes: 11 additions & 13 deletions src/stream/src/executor/source/kafka_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ impl BackfillState {
) -> bool {
let mut vis = false;
match self {
BackfillState::Backfilling(backfill_offset) => {
match compare_kafka_offset(backfill_offset.as_ref(), offset) {
BackfillState::Backfilling(None) => {
// backfilling for this split is not started yet. Ignore this row
}
BackfillState::Backfilling(Some(backfill_offset)) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// continue backfilling. Ignore this row
}
Expand All @@ -81,13 +84,13 @@ impl BackfillState {
Ordering::Greater => {
// backfilling for this split produced more data than current source's progress.
// We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
*self = BackfillState::SourceCachingUp(backfill_offset.clone().unwrap());
*self = BackfillState::SourceCachingUp(backfill_offset.clone());
abort_handles.get(split).unwrap().abort();
}
}
}
BackfillState::SourceCachingUp(backfill_offset) => {
match compare_kafka_offset(Some(backfill_offset), offset) {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// XXX: Is this possible? i.e., Source caught up, but doesn't contain the
// last backfilled row.
Expand Down Expand Up @@ -796,15 +799,10 @@ impl<S: StateStore> KafkaBackfillExecutorInner<S> {
}
}

fn compare_kafka_offset(a: Option<&String>, b: &str) -> Ordering {
match a {
Some(a) => {
let a = a.parse::<i64>().unwrap();
let b = b.parse::<i64>().unwrap();
a.cmp(&b)
}
None => Ordering::Less,
}
fn compare_kafka_offset(a: &str, b: &str) -> Ordering {
let a = a.parse::<i64>().unwrap();
let b = b.parse::<i64>().unwrap();
a.cmp(&b)
}

impl<S: StateStore> Executor for KafkaBackfillExecutor<S> {
Expand Down

0 comments on commit 1a54a30

Please sign in to comment.