-
Notifications
You must be signed in to change notification settings - Fork 590
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(meta): inject and collect barrier in bidi stream #14887
Conversation
️✅ There are no secrets present in this pull request anymore.If these secrets were true positive and are still valid, we highly recommend you to revoke them. 🦉 GitGuardian detects secrets in your source code to help developers and security teams secure the modern development process. You are seeing this because you or someone else with access to this repository has authorized GitGuardian to scan your pull request. |
Can you add some comments on the motivation behind this PR? |
e2e5807
to
9bf3366
Compare
The main motivation behind this PR is for support of partial checkpoint. In the design of partial checkpoint, barrier collection will be reported to meta in the granularity of per MV or per actor, and there will be more control messages sent between meta and CN. Therefore it's not suitable to use a single rpc to collect barrier from all actors any more. Apart from support for partial checkpoint, there is some general benefits replace oneshot rpc calls with a single bidi stream. First, we can avoid out of order messages. For example, currently when injecting barrier, we need to wait for the inject response before injecting the next barrier because inflight messages of multiple concurrents rpc can be out of order. With bidi stream, the messages must be in the order of being sent. Second, we can avoid stale requests. In current code, some stale rpc might be still in flight when recovery happens, and may be handled after recovery, and we will need extra effort to handle these stale requests. After this PR, a new control stream will be established in recovery and all previously in flight request in the stream will be discarded, and we can get rid of stale requests. |
@hzxa21 @BugenZhao @kwannoel @yezizp2012 @shanicky The PR is ready for review now. PTAL~ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please explain why introducing a bidi-stream to solve the problem? What's the limitation of the original design? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
self.context.update_actors(&info).await.inspect_err(|err| { | ||
warn!(error = %err.as_report(), "update actors failed"); | ||
})?; | ||
self.build_actors(&info).await.inspect_err(|err| { | ||
self.context.build_actors(&info).await.inspect_err(|err| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious: will it be beneficial if we move update_actors and build_actors into the bidi-streaming rpc?
@@ -175,6 +175,8 @@ pub enum Command { | |||
RescheduleFragment { | |||
reschedules: HashMap<FragmentId, Reschedule>, | |||
table_parallelism: HashMap<TableId, TableParallelism>, | |||
// should contain the actor ids in upstream and downstream fragment of `reschedules` | |||
fragment_actors: HashMap<FragmentId, HashSet<ActorId>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor comment, I suppose RescheduleFragment
is actually RescheduleFragments
? Hence we have a hashmap here, rather than just the actor ids in upstream and downstream fragment of reschedules
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also why we refactor and add the field fragment_actors
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it is to make to_mutation non-async
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It's for making it non-async.
@@ -489,7 +496,7 @@ impl GlobalBarrierManager { | |||
} | |||
|
|||
/// Check whether we should pause on bootstrap from the system parameter and reset it. | |||
async fn take_pause_on_bootstrap(&self) -> MetaResult<bool> { | |||
async fn take_pause_on_bootstrap(&mut self) -> MetaResult<bool> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's this change for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we take &self
, the return future will take the immutable reference to self. The future is Send
only when &self
is Send
, which then requires self
to be Sync
. However, after this PR, self
holds the bidi stream returned from rpc client, which is only Send
but not Sync
. If we don't change it, we will fail to compile it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting. This sounds like the problem that Exclusive
is designed for.
type ResponseStreamFuture = impl Future< | ||
Output = ( | ||
WorkerId, | ||
BoxStream< | ||
'static, | ||
risingwave_rpc_client::error::Result<StreamingControlStreamResponse>, | ||
>, | ||
MetaResult<StreamingControlStreamResponse>, | ||
), | ||
> + 'static; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wow, so complicated. 🫨
Prior to this PR, we have await-tree instrumentation for all RPC calls. The tree for |
It's not supported in the PR currently. Do you have any recommendation on how to support it? Is it doable to store a await tree tracing span in the inflight barrier queue? |
I don't have an idea. #4581 introduces a middleware for non-streaming gRPC calls so that they can be instrumented transparently. Now it seems that we have to find a way to do that manually. |
Split to Implement in #15608, given that the current PR is quite large. |
This reverts commit 72d37ef.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In this PR, we replace the
inject_barrier
,collect_barrier
andforce_stop_actors
rpc with a bidi-stream calledStreamingControlStream
.When a new stream between meta and CN is established, the CN will be reset, which is similar to the original
force_stop_actors
. After the stream is established, meta node will inject barrier via aInjectBarrierRequest
request in the stream, and when the barrier is collected on CN, CN will response with aBarrierCompleteResponse
.InjectBarrierResponse
andBarrierCompleteRequest
is no longer needed and is removed in this PR.Any CN failure, such as failure to collect a barrier or CN crashes, will be reflected to meta node as an error on the stream. On error, the meta node will trigger a recovery, which will reestablish the stream between meta and each CN, and each CN will be reset.
On meta side, in
CheckpointControl
previously we have an enumBarrierEpochState
to represent the state of barrier, which can be eitherInflight
, which means the barrier is injected but not yet completed, orCompleted
, which means the barrier has been collected from all CNs. In this PR, the barrier may have been collected from a subset of CNs, and there are some pending CNs to be collect, and therefore we change it to the following struct to store the node to be collected and the collected response. Whennode_to_collect
gets empty, it means the barrier has been collected from all CNs, and we can continue complete the barrier.Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.