Skip to content

Commit

Permalink
refactor: add target_offsets to determinine if source backfill finished
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 2, 2024
1 parent e788323 commit 272a7a4
Showing 1 changed file with 133 additions and 82 deletions.
215 changes: 133 additions & 82 deletions src/stream/src/executor/source/source_backfill_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

use std::cmp::Ordering;
use std::collections::hash_map::Entry;
use std::collections::{HashMap, HashSet};
use std::time::Instant;

Expand Down Expand Up @@ -45,7 +44,7 @@ use crate::executor::{AddMutation, UpdateMutation};
pub enum BackfillState {
/// `None` means not started yet. It's the initial state.
Backfilling(Option<String>),
/// Backfill is stopped at this offset. Source needs to filter out messages before this offset.
/// Backfill is stopped at this offset (inclusive). Source needs to filter out messages before this offset.
SourceCachingUp(String),
Finished,
}
Expand All @@ -59,54 +58,6 @@ impl BackfillState {
pub fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

/// Returns whether the row from upstream `SourceExecutor` is visible.
fn handle_upstream_row(&mut self, offset: &str) -> bool {
let mut vis = false;
match self {
BackfillState::Backfilling(None) => {
// backfilling for this split is not started yet. Ignore this row
}
BackfillState::Backfilling(Some(backfill_offset)) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// continue backfilling. Ignore this row
}
Ordering::Equal => {
// backfilling for this split is finished just right.
*self = BackfillState::Finished;
}
Ordering::Greater => {
// backfilling for this split produced more data than current source's progress.
// We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
*self = BackfillState::SourceCachingUp(backfill_offset.clone());
}
}
}
BackfillState::SourceCachingUp(backfill_offset) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// Source caught up, but doesn't contain the last backfilled row.
// This may happen e.g., if Kafka performed compaction.
vis = true;
*self = BackfillState::Finished;
}
Ordering::Equal => {
// Source just caught up with backfilling.
*self = BackfillState::Finished;
}
Ordering::Greater => {
// Source is still behind backfilling.
}
}
}
BackfillState::Finished => {
vis = true;
// This split's backfilling is finisehd, we are waiting for other splits
}
}
vis
}
}

pub struct SourceBackfillExecutor<S: StateStore> {
Expand Down Expand Up @@ -138,16 +89,37 @@ pub struct SourceBackfillExecutorInner<S: StateStore> {
}

/// Local variables used in the backfill stage.
///
/// Note: all off the fields should contain all available splits, and we can `unwrap()` safely when `get()`.
#[derive(Debug)]
struct BackfillStage {
states: BackfillStates,
/// A copy of all splits (incl unfinished and finished ones) assigned to the actor.
///
/// Note: the offsets are not updated. Should use `state`'s offset to update before using it (`get_latest_unfinished_splits`).
splits: Vec<SplitImpl>,
/// The latest offset from upstream (inclusive). After we reach this offset, we can stop backfilling.
/// TODO: initialize this with high watermark so that we can finish backfilling even when upstream
/// doesn't emit any data.
target_offsets: HashMap<SplitId, Option<String>>,
}

impl BackfillStage {
fn debug_assert_consistent(&self) {
if cfg!(debug_assertions) {
let all_splits: HashSet<_> =
self.splits.iter().map(|split| split.id().clone()).collect();
assert_eq!(
self.states.keys().cloned().collect::<HashSet<_>>(),
all_splits
);
assert_eq!(
self.target_offsets.keys().cloned().collect::<HashSet<_>>(),
all_splits
);
}
}

/// Get unfinished splits with latest offsets according to the backfill states.
fn get_latest_unfinished_splits(&self) -> StreamExecutorResult<Vec<SplitImpl>> {
let mut unfinished_splits = Vec::new();
Expand All @@ -165,6 +137,92 @@ impl BackfillStage {
}
Ok(unfinished_splits)
}

/// Updates backfill states and `target_offsets` and returns whether the row from upstream `SourceExecutor` is visible.
fn handle_upstream_row(&mut self, split_id: &str, offset: &str) -> bool {
let mut vis = false;
let state = self.states.get_mut(split_id).unwrap();
match state {
BackfillState::Backfilling(None) => {
// backfilling for this split is not started yet. Ignore this row
}
BackfillState::Backfilling(Some(backfill_offset)) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// continue backfilling. Ignore this row
}
Ordering::Equal => {
// backfilling for this split is finished just right.
*state = BackfillState::Finished;
}
Ordering::Greater => {
// backfilling for this split produced more data than current source's progress.
// We should stop backfilling, and filter out rows from upstream with offset <= backfill_offset.
*state = BackfillState::SourceCachingUp(backfill_offset.clone());
}
}
}
BackfillState::SourceCachingUp(backfill_offset) => {
match compare_kafka_offset(backfill_offset, offset) {
Ordering::Less => {
// Source caught up, but doesn't contain the last backfilled row.
// This may happen e.g., if Kafka performed compaction.
vis = true;
*state = BackfillState::Finished;
}
Ordering::Equal => {
// Source just caught up with backfilling.
*state = BackfillState::Finished;
}
Ordering::Greater => {
// Source is still behind backfilling.
}
}
}
BackfillState::Finished => {
vis = true;
// This split's backfilling is finisehd, we are waiting for other splits
}
}
if matches!(state, BackfillState::Backfilling(_)) {
*self.target_offsets.get_mut(split_id).unwrap() = Some(offset.to_string());
}
if vis {
debug_assert_eq!(*state, BackfillState::Finished);
}
vis
}

/// Updates backfill states and returns whether the row from upstream `SourceExecutor` is visible.
fn handle_backfill_row(&mut self, split_id: &str, offset: &str) -> bool {
let state = self.states.get_mut(split_id).unwrap();
match state {
BackfillState::Backfilling(_old_offset) => {
let target_offset = self.target_offsets.get(split_id).unwrap();
if let Some(target_offset) = target_offset
&& compare_kafka_offset(offset, target_offset).is_ge()
{
// Note1: If target_offset = offset, it seems we can mark the state as Finished without waiting for upstream to catch up
// and dropping duplicated messages.
// But it's not true if target_offset is fetched from other places, like Kafka high watermark.
// In this case, upstream hasn't reached the target_offset yet.
//
// Note2: after this, all following rows in the current chunk will be invisible.
//
// Note3: if target_offset is None (e.g., when upstream doesn't emit messages at all), we will
// keep backfilling.
*state = BackfillState::SourceCachingUp(offset.to_string());
} else {
*state = BackfillState::Backfilling(Some(offset.to_string()));
}
true
}
BackfillState::SourceCachingUp(_) | BackfillState::Finished => {
// backfilling stopped. ignore
false
}
}
}
}

impl<S: StateStore> SourceBackfillExecutorInner<S> {
Expand Down Expand Up @@ -275,9 +333,15 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
backfill_states.insert(split_id, backfill_state);
}
let mut backfill_stage = BackfillStage {
// init with None
target_offsets: backfill_states
.keys()
.map(|split_id| (split_id.clone(), None))
.collect(),
states: backfill_states,
splits: owned_splits,
};
backfill_stage.debug_assert_consistent();
tracing::debug!(?backfill_stage, "source backfill started");

// Return the ownership of `stream_source_core` to the source executor.
Expand Down Expand Up @@ -348,6 +412,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
let mut last_barrier_time = Instant::now();
let mut self_paused = false;

// The main logic of the loop is in handle_upstream_row and handle_backfill_row.
'backfill_loop: while let Some(either) = backfill_stream.next().await {
match either {
// Upstream
Expand Down Expand Up @@ -485,9 +550,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
for (i, (_, row)) in chunk.rows().enumerate() {
let split = row.datum_at(split_idx).unwrap().into_utf8();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
let backfill_state =
backfill_stage.states.get_mut(split).unwrap();
let vis = backfill_state.handle_upstream_row(offset);
let vis = backfill_stage.handle_upstream_row(split, offset);
new_vis.set(i, vis);
}
// emit chunk if vis is not empty. i.e., some splits finished backfilling.
Expand Down Expand Up @@ -527,36 +590,12 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
self.system_params.load().barrier_interval_ms() as u128
* WAIT_BARRIER_MULTIPLE_TIMES;
}
// TODO(optimize): actually each msg is from one split. We can
// include split from the message and avoid iterating over all rows.
let mut new_vis = BitmapBuilder::zeroed(chunk.visibility().len());

for (i, (_, row)) in chunk.rows().enumerate() {
let split_id: Arc<str> =
row.datum_at(split_idx).unwrap().into_utf8().into();
let offset: String =
row.datum_at(offset_idx).unwrap().into_utf8().into();
// update backfill progress
let mut vis = true;
match backfill_stage.states.entry(split_id.clone()) {
Entry::Occupied(mut entry) => {
let state = entry.get_mut();
match state {
BackfillState::Backfilling(_) => {
*state =
BackfillState::Backfilling(Some(offset.clone()));
}
BackfillState::SourceCachingUp(_)
| BackfillState::Finished => {
// backfilling stopped. ignore
vis = false
}
}
}
Entry::Vacant(entry) => {
entry.insert(BackfillState::Backfilling(Some(offset.clone())));
}
}
let split_id = row.datum_at(split_idx).unwrap().into_utf8();
let offset = row.datum_at(offset_idx).unwrap().into_utf8();
let vis = backfill_stage.handle_backfill_row(split_id, offset);
new_vis.set(i, vis);
}

Expand Down Expand Up @@ -678,7 +717,7 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
// Iterate over the target (assigned) splits
// - check if any new splits are added
// - build target_state
for split in target_splits {
for split in &target_splits {
let split_id = split.id();
if let Some(s) = old_states.get(&split_id) {
target_state.insert(split_id, s.clone());
Expand Down Expand Up @@ -727,7 +766,19 @@ impl<S: StateStore> SourceBackfillExecutorInner<S> {
debug_assert_eq!(old_states, target_state);
}
stage.states = target_state;

stage.splits = target_splits;
let old_target_offsets = std::mem::take(&mut stage.target_offsets);
stage.target_offsets = stage
.states
.keys()
.map(|split_id| {
(
split_id.clone(),
old_target_offsets.get(split_id).cloned().flatten(),
)
})
.collect();
stage.debug_assert_consistent();
Ok(split_changed)
}

Expand Down

0 comments on commit 272a7a4

Please sign in to comment.