-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: unify to subscribe mutation via barrier sender #18255
Conversation
…ing/remove-separate-mutation-subscriber
…ing/remove-separate-mutation-subscriber
let upstreams = std::mem::take(&mut self.blocked); | ||
if barrier.is_stop(self.actor_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's safe to ignore the is_stop
in the SelectReceiver
, because the merge executor stops when seeing the stop barrier, and should not poll the SelectReceiver
later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for changes in barrier manager and snapshot backfill.
@BugenZhao Can you help review the changes in executor/merge.rs
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally LGTM (cause I may not have sufficient knowledge on the background).
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
After #18210
Previously in #17612 we introduced the
subscribe_barrier_mutation
method inLocalBarrierManager
, so that inputs of actors with upstream can subscribe the mutation of barrier from local barrier worker. In #18104, we introduced a similarsubscribe_barrier
for actors without upstream to register barrier receiver to local barrier worker. The two methods have similar functionality, and therefore in this PR, we will try to unify to callsubscribe_barrier
only.The difference between the two methods is that, in
subscribe_barrier_mutation
we can specify the starting epoch to subscribe mutation, whilesubscribe_barrier
subscribe to mutation of all barriers to be collected from the actors. We specify the starting epoch insubscribe_barrier_mutation
because, it's called in the input(both remote and local input) of actors, and when scale happens, it's possible that an input subscribes barrier mutation at the epoch that scale happens rather than first epoch of the actor. Though inputs may start at epoch later than the first epoch, the merge executor (or receiver executor) that merges all input should start at the first epoch of the actor. Therefore, in this PR, instead of waiting to receive barrier mutation in local and remote input, we change to wait to receive the barrier mutation in the merge executor and receiver executor.For
SnapshotBackfillExecutor
, we also introduced a pre-sync mutation mechanism so that the input won't be blocked at fetching mutation of future barrier. The pre-sync mutation mechanism depends onsubscribe_barrier_mutation
. To removesubscribe_barrier_mutation
, we will also remote the pre-sync mutation mechanism. In #18210, we have done some prerequisite works. In this PR, theSnapshotBackfillExecutor
will be handled specially when we build actors. TheSnapshotBackfillExecutor
will hold the raw inputs instead of merge executor at the beginning. During snapshot backfill, it receivesDispatcherBarrier
from the raw inputs, and all upstream mutations will be ignored inSnapshotBackfillExecutor
during backfill. After snapshot backfill finishes, we will turn the raw inputs into merge executor, and then we consume the upstream as usual. Some refactors are done accordingly in merge executor and receiver executor.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.