Skip to content

Commit

Permalink
feat(stream): support row count for arrangement backfill (#14836)
Browse files Browse the repository at this point in the history
Co-authored-by: August <[email protected]>
  • Loading branch information
kwannoel and yezizp2012 authored Jan 30, 2024
1 parent 1e26054 commit 63e5485
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 83 deletions.
2 changes: 1 addition & 1 deletion risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ profile:
steps:
- use: minio
api-requests-max: 30
api-requests-deadline: 2s
api-requests-deadline: 3s
- use: etcd
unsafe-no-fsync: true
- use: meta-node
Expand Down
18 changes: 11 additions & 7 deletions src/stream/src/executor/backfill/arrangement_backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ where
let mut snapshot_read_epoch;

// Keep track of rows from the snapshot.
let mut total_snapshot_processed_rows: u64 = 0;
let mut total_snapshot_processed_rows: u64 = backfill_state.get_snapshot_row_count();

// Arrangement Backfill Algorithm:
//
Expand Down Expand Up @@ -278,9 +278,8 @@ where
// mark.
for chunk in upstream_chunk_buffer.drain(..) {
let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows +=
cur_barrier_upstream_processed_rows +=
chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(mapping_chunk(
chunk,
&self.output_indices,
Expand All @@ -290,6 +289,8 @@ where
break 'backfill_loop;
}
Some((vnode, chunk)) => {
let chunk_cardinality = chunk.cardinality() as u64;

// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand All @@ -298,9 +299,9 @@ where
&chunk,
&pk_in_output_indices,
&mut backfill_state,
chunk_cardinality,
)?;

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
let chunk = Message::Chunk(mapping_chunk(
Expand Down Expand Up @@ -354,6 +355,7 @@ where
})
})) {
if let Some(chunk) = chunk {
let chunk_cardinality = chunk.cardinality() as u64;
// Raise the current position.
// As snapshot read streams are ordered by pk, so we can
// just use the last row to update `current_pos`.
Expand All @@ -362,9 +364,9 @@ where
&chunk,
&pk_in_output_indices,
&mut backfill_state,
chunk_cardinality,
)?;

let chunk_cardinality = chunk.cardinality() as u64;
cur_barrier_snapshot_processed_rows += chunk_cardinality;
total_snapshot_processed_rows += chunk_cardinality;
yield Message::Chunk(mapping_chunk(chunk, &self.output_indices));
Expand Down Expand Up @@ -585,8 +587,10 @@ where
let backfill_progress = backfill_state.get_progress(&vnode)?;
let current_pos = match backfill_progress {
BackfillProgressPerVnode::NotStarted => None,
BackfillProgressPerVnode::Completed(current_pos)
| BackfillProgressPerVnode::InProgress(current_pos) => Some(current_pos.clone()),
BackfillProgressPerVnode::Completed { current_pos, .. }
| BackfillProgressPerVnode::InProgress { current_pos, .. } => {
Some(current_pos.clone())
}
};

let range_bounds = compute_bounds(upstream_table.pk_indices(), current_pos.clone());
Expand Down
Loading

0 comments on commit 63e5485

Please sign in to comment.