-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: track progress for SourceBackfill (blocking DDL) #18112
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
2a91599
to
9460191
Compare
d32fec9
to
222775f
Compare
8bf4b4d
to
9e34c99
Compare
222775f
to
3c893e7
Compare
3c893e7
to
2d650db
Compare
2d650db
to
b3d554c
Compare
b3d554c
to
a0ddd6d
Compare
& (FragmentTypeFlag::Values as u32 | ||
| FragmentTypeFlag::StreamScan as u32 | ||
| FragmentTypeFlag::SourceScan as u32)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is core change 1
efc2a77
to
dc3aae9
Compare
2ed4c17
to
6328de6
Compare
true, | ||
false, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a minor bug. (not related
if !self.backfill_finished(&backfill_stage.states).await? { | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove the early exit condition, let it only exit in one place to make it safer & simpler
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM
// For now, we just rely on the same code path, and for source backfill, the progress will always be 99.99%. | ||
tracing::info!("progress finish"); | ||
let epoch = barrier.epoch; | ||
self.progress.finish(epoch, 114514); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The 2nd argument of progress.finish()
is current_consumed_rows
, which I think it's easy to count and report an accurate number. Why not doing it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Not did it just because no total_consumed_rows
and thought it meaningless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the #18112 (comment) will make it more meaningful. I think we can do the counting part in this PR.
if self.backfill_finished(&backfill_stage.states).await? { | ||
break 'backfill_loop; | ||
if self.should_report_finished(&backfill_stage.states) { | ||
// TODO: use a specialized progress for source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An idea: We may report the process and total rows in the progress as well.
current: (show jobs
)
Id | Statement | Progress
------+----------------------------------------------------+----------
1010 | CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM mv1 | 2.21%
expected:
Id | Statement | Processed | Total | Progress
------+-------------------------------------------------------+-----------+-------+---------
1010 | CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM mv1 | 221 | 1000 | 2.21%
1012 | CREATE MATERIALIZED VIEW mv2 AS SELECT * FROM source1 | 114514 | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found #17735 already did similar thing when set streaming_use_snapshot_backfill = true
. It just changed the Progress
string.
dev=> show jobs;
┌────┬────────────────────────────────────────────────┬───────────────────┐
│ Id │ Statement │ Progress │
├────┼────────────────────────────────────────────────┼───────────────────┤
│ 29 │ CREATE MATERIALIZED VIEW mv AS SELECT * FROM t │ Snapshot [32.08%] │
└────┴────────────────────────────────────────────────┴───────────────────┘
(1 row)
risingwave/src/meta/src/barrier/creating_job/mod.rs
Lines 173 to 227 in 3133a72
pub(super) fn gen_ddl_progress(&self) -> DdlProgress { | |
let progress = match &self.status { | |
CreatingStreamingJobStatus::ConsumingSnapshot { | |
create_mview_tracker, | |
.. | |
} => { | |
if create_mview_tracker.has_pending_finished_jobs() { | |
"Snapshot finished".to_string() | |
} else { | |
let progress = create_mview_tracker | |
.gen_ddl_progress() | |
.remove(&self.info.table_fragments.table_id().table_id) | |
.expect("should exist"); | |
format!("Snapshot [{}]", progress.progress) | |
} | |
} | |
CreatingStreamingJobStatus::ConsumingLogStore { | |
start_consume_log_store_epoch, | |
.. | |
} => { | |
let max_collected_epoch = max( | |
self.barrier_control.max_collected_epoch().unwrap_or(0), | |
self.backfill_epoch, | |
); | |
let lag = Duration::from_millis( | |
Epoch(*start_consume_log_store_epoch) | |
.physical_time() | |
.saturating_sub(Epoch(max_collected_epoch).physical_time()), | |
); | |
format!( | |
"LogStore [remain lag: {:?}, epoch cnt: {}]", | |
lag, | |
self.barrier_control.inflight_barrier_count() | |
) | |
} | |
CreatingStreamingJobStatus::ConsumingUpstream { .. } => { | |
format!( | |
"Upstream [unattached: {}, epoch cnt: {}]", | |
self.barrier_control.unattached_epochs().count(), | |
self.barrier_control.inflight_barrier_count(), | |
) | |
} | |
CreatingStreamingJobStatus::Finishing { .. } => { | |
format!( | |
"Finishing [epoch count: {}]", | |
self.barrier_control.inflight_barrier_count() | |
) | |
} | |
}; | |
DdlProgress { | |
id: self.info.table_fragments.table_id().table_id as u64, | |
statement: self.info.definition.clone(), | |
progress, | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me do this in a separate PR, because it's more complex than I expected. Since one MV may have multiple backfill upstreams, and I want to distinguish MV backfill rows and source backfill rows.
4a5c008
to
1a663a9
Compare
6328de6
to
e2fd423
Compare
Signed-off-by: xxchan <[email protected]>
e2fd423
to
a54a89f
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Support blocking DDL for source #18338
Core changes:
SourceScan
totracking_progress_actor_ids
of a MV. So a MV will not enterCreated
state until theSourceBackfillExecutor
's actor reported progress.SourceBackfillExecutor
, the progress reporting is a dummy one: it simply reused MV's mechanism, and only usesfinish
.A subtle case to consider: whether to report
finish
after enteringSourceCatchingUp
state, or afterFinished
state?Finished
might be more intuitive and safe. However, it may make a foreground DDL blocked forever until upstream messages come. (See state diagram in Let source backfill finish when there's no data from upstream #18299. We will be inSourceCatchingUp
.) It may seem strange: why with refactor: use high watermark to finish backfill faster #18342 (end at high watermark), we still cannot enterFinished
state? Or why no messages come from upstream when we already backfilled to high watermark?Here's a illustration: Assume there's no messages from upstream, and we reached
high_watermark
. Now we cannot distinguish whether upstream has reachedhigh_watermark
, so we enterSourceCatchingUp
state. If it hasn't caught up, we can enterFinished
when it catches up. If it's already at latest, we will not be able to transition state. (Let source backfill finish when there's no data from upstream #18299's point 4 mentioned that we might distinguish this situation if we can dohas_message_available
)So it seems we'd better end at
SourceCatchingUp
to avoid blocking forever. It's also reasonable, since now we already exited backfill stream. But the problem is that after reportingfinish
, and the DDL returns, online scaling may happen now. But we need to handle it carefully (Problem of online scaling for source backfill #18300).Note: for scaling, "backfill finished" == "progress reported finish". Not exactly the same as the loop code in the executor. So we can't use the old hack like scanning whole state table to delay
finished
. Now we do want to finish earlier! But need to make it scaling-safe.So the solution is to decouple 2 timings:
scan
the state table, and exit loop. This PR usedtry_wait_epoch
to fix the problem described in Problem of online scaling for source backfill #18300)Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.