diff --git a/src/stream/src/executor/source/source_backfill_executor.rs b/src/stream/src/executor/source/source_backfill_executor.rs index 39a458b28ff47..6b22293331306 100644 --- a/src/stream/src/executor/source/source_backfill_executor.rs +++ b/src/stream/src/executor/source/source_backfill_executor.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::cmp::Ordering; -use std::collections::hash_map::Entry; use std::collections::{HashMap, HashSet}; use std::time::Instant; @@ -45,7 +44,7 @@ use crate::executor::{AddMutation, UpdateMutation}; pub enum BackfillState { /// `None` means not started yet. It's the initial state. Backfilling(Option), - /// Backfill is stopped at this offset. Source needs to filter out messages before this offset. + /// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset. SourceCachingUp(String), Finished, } @@ -59,54 +58,6 @@ impl BackfillState { pub fn restore_from_json(value: JsonbVal) -> anyhow::Result { serde_json::from_value(value.take()).map_err(|e| anyhow!(e)) } - - /// Returns whether the row from upstream `SourceExecutor` is visible. - fn handle_upstream_row(&mut self, offset: &str) -> bool { - let mut vis = false; - match self { - BackfillState::Backfilling(None) => { - // backfilling for this split is not started yet. Ignore this row - } - BackfillState::Backfilling(Some(backfill_offset)) => { - match compare_kafka_offset(backfill_offset, offset) { - Ordering::Less => { - // continue backfilling. Ignore this row - } - Ordering::Equal => { - // backfilling for this split is finished just right. - *self = BackfillState::Finished; - } - Ordering::Greater => { - // backfilling for this split produced more data than current source's progress. - // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset. - *self = BackfillState::SourceCachingUp(backfill_offset.clone()); - } - } - } - BackfillState::SourceCachingUp(backfill_offset) => { - match compare_kafka_offset(backfill_offset, offset) { - Ordering::Less => { - // Source caught up, but doesn't contain the last backfilled row. - // This may happen e.g., if Kafka performed compaction. - vis = true; - *self = BackfillState::Finished; - } - Ordering::Equal => { - // Source just caught up with backfilling. - *self = BackfillState::Finished; - } - Ordering::Greater => { - // Source is still behind backfilling. - } - } - } - BackfillState::Finished => { - vis = true; - // This split's backfilling is finisehd, we are waiting for other splits - } - } - vis - } } pub struct SourceBackfillExecutor { @@ -138,6 +89,8 @@ pub struct SourceBackfillExecutorInner { } /// Local variables used in the backfill stage. +/// +/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`. #[derive(Debug)] struct BackfillStage { states: BackfillStates, @@ -145,9 +98,28 @@ struct BackfillStage { /// /// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`). splits: Vec, + /// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling. + /// TODO: initialize this with high watermark so that we can finish backfilling even when upstream + /// doesn't emit any data. + target_offsets: HashMap>, } impl BackfillStage { + fn debug_assert_consistent(&self) { + if cfg!(debug_assertions) { + let all_splits: HashSet<_> = + self.splits.iter().map(|split| split.id().clone()).collect(); + assert_eq!( + self.states.keys().cloned().collect::>(), + all_splits + ); + assert_eq!( + self.target_offsets.keys().cloned().collect::>(), + all_splits + ); + } + } + /// Get unfinished splits with latest offsets according to the backfill states. fn get_latest_unfinished_splits(&self) -> StreamExecutorResult> { let mut unfinished_splits = Vec::new(); @@ -165,6 +137,92 @@ impl BackfillStage { } Ok(unfinished_splits) } + + /// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible. + fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool { + let mut vis = false; + let state = self.states.get_mut(split_id).unwrap(); + match state { + BackfillState::Backfilling(None) => { + // backfilling for this split is not started yet. Ignore this row + } + BackfillState::Backfilling(Some(backfill_offset)) => { + match compare_kafka_offset(backfill_offset, offset) { + Ordering::Less => { + // continue backfilling. Ignore this row + } + Ordering::Equal => { + // backfilling for this split is finished just right. + *state = BackfillState::Finished; + } + Ordering::Greater => { + // backfilling for this split produced more data than current source's progress. + // We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset. + *state = BackfillState::SourceCachingUp(backfill_offset.clone()); + } + } + } + BackfillState::SourceCachingUp(backfill_offset) => { + match compare_kafka_offset(backfill_offset, offset) { + Ordering::Less => { + // Source caught up, but doesn't contain the last backfilled row. + // This may happen e.g., if Kafka performed compaction. + vis = true; + *state = BackfillState::Finished; + } + Ordering::Equal => { + // Source just caught up with backfilling. + *state = BackfillState::Finished; + } + Ordering::Greater => { + // Source is still behind backfilling. + } + } + } + BackfillState::Finished => { + vis = true; + // This split's backfilling is finished, we are waiting for other splits + } + } + if matches!(state, BackfillState::Backfilling(_)) { + *self.target_offsets.get_mut(split_id).unwrap() = Some(offset.to_string()); + } + if vis { + debug_assert_eq!(*state, BackfillState::Finished); + } + vis + } + + /// Updates backfill states and returns whether the row from upstream `SourceExecutor` is visible. + fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool { + let state = self.states.get_mut(split_id).unwrap(); + match state { + BackfillState::Backfilling(_old_offset) => { + let target_offset = self.target_offsets.get(split_id).unwrap(); + if let Some(target_offset) = target_offset + && compare_kafka_offset(offset, target_offset).is_ge() + { + // Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up + // and dropping duplicated messages. + // But it's not true if target_offset is fetched from other places, like Kafka high watermark. + // In this case, upstream hasn't reached the target_offset yet. + // + // Note2: after this, all following rows in the current chunk will be invisible. + // + // Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will + // keep backfilling. + *state = BackfillState::SourceCachingUp(offset.to_string()); + } else { + *state = BackfillState::Backfilling(Some(offset.to_string())); + } + true + } + BackfillState::SourceCachingUp(_) | BackfillState::Finished => { + // backfilling stopped. ignore + false + } + } + } } impl SourceBackfillExecutorInner { @@ -275,9 +333,15 @@ impl SourceBackfillExecutorInner { backfill_states.insert(split_id, backfill_state); } let mut backfill_stage = BackfillStage { + // init with None + target_offsets: backfill_states + .keys() + .map(|split_id| (split_id.clone(), None)) + .collect(), states: backfill_states, splits: owned_splits, }; + backfill_stage.debug_assert_consistent(); tracing::debug!(?backfill_stage, "source backfill started"); // Return the ownership of `stream_source_core` to the source executor. @@ -348,6 +412,7 @@ impl SourceBackfillExecutorInner { let mut last_barrier_time = Instant::now(); let mut self_paused = false; + // The main logic of the loop is in handle_upstream_row and handle_backfill_row. 'backfill_loop: while let Some(either) = backfill_stream.next().await { match either { // Upstream @@ -485,9 +550,7 @@ impl SourceBackfillExecutorInner { for (i, (_, row)) in chunk.rows().enumerate() { let split = row.datum_at(split_idx).unwrap().into_utf8(); let offset = row.datum_at(offset_idx).unwrap().into_utf8(); - let backfill_state = - backfill_stage.states.get_mut(split).unwrap(); - let vis = backfill_state.handle_upstream_row(offset); + let vis = backfill_stage.handle_upstream_row(split, offset); new_vis.set(i, vis); } // emit chunk if vis is not empty. i.e., some splits finished backfilling. @@ -527,36 +590,12 @@ impl SourceBackfillExecutorInner { self.system_params.load().barrier_interval_ms() as u128 * WAIT_BARRIER_MULTIPLE_TIMES; } - // TODO(optimize): actually each msg is from one split. We can - // include split from the message and avoid iterating over all rows. let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len()); for (i, (_, row)) in chunk.rows().enumerate() { - let split_id: Arc = - row.datum_at(split_idx).unwrap().into_utf8().into(); - let offset: String = - row.datum_at(offset_idx).unwrap().into_utf8().into(); - // update backfill progress - let mut vis = true; - match backfill_stage.states.entry(split_id.clone()) { - Entry::Occupied(mut entry) => { - let state = entry.get_mut(); - match state { - BackfillState::Backfilling(_) => { - *state = - BackfillState::Backfilling(Some(offset.clone())); - } - BackfillState::SourceCachingUp(_) - | BackfillState::Finished => { - // backfilling stopped. ignore - vis = false - } - } - } - Entry::Vacant(entry) => { - entry.insert(BackfillState::Backfilling(Some(offset.clone()))); - } - } + let split_id = row.datum_at(split_idx).unwrap().into_utf8(); + let offset = row.datum_at(offset_idx).unwrap().into_utf8(); + let vis = backfill_stage.handle_backfill_row(split_id, offset); new_vis.set(i, vis); } @@ -678,7 +717,7 @@ impl SourceBackfillExecutorInner { // Iterate over the target (assigned) splits // - check if any new splits are added // - build target_state - for split in target_splits { + for split in &target_splits { let split_id = split.id(); if let Some(s) = old_states.get(&split_id) { target_state.insert(split_id, s.clone()); @@ -727,7 +766,19 @@ impl SourceBackfillExecutorInner { debug_assert_eq!(old_states, target_state); } stage.states = target_state; - + stage.splits = target_splits; + let old_target_offsets = std::mem::take(&mut stage.target_offsets); + stage.target_offsets = stage + .states + .keys() + .map(|split_id| { + ( + split_id.clone(), + old_target_offsets.get(split_id).cloned().flatten(), + ) + }) + .collect(); + stage.debug_assert_consistent(); Ok(split_changed) }