-
Notifications
You must be signed in to change notification settings - Fork 596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(bench): Add sink bench tool #14064
Conversation
Codecov ReportAttention:
Additional details and impacted files@@ Coverage Diff @@
## main #14064 +/- ##
==========================================
- Coverage 67.90% 67.82% -0.08%
==========================================
Files 1554 1555 +1
Lines 268655 268967 +312
==========================================
- Hits 182417 182416 -1
- Misses 86238 86551 +313
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
src/connector/src/sink/mod.rs
Outdated
@@ -297,6 +297,17 @@ pub trait Sink: TryFrom<SinkParam, Error = SinkError> { | |||
|
|||
async fn validate(&self) -> Result<()>; | |||
async fn new_log_sinker(&self, writer_param: SinkWriterParam) -> Result<Self::LogSinker>; | |||
#[cfg(feature = "sink_bench")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we introduce this new method because in benchmark we are not able to create a grpc meta client. If so, instead of adding a cargo feature and a new method in the sink trait, we can change the meta_client
in SinkWriterParam
to an enum that can be either grpc meta client, or our mocked meta client.
The call path of meta client in sink coordination is
let sink_coordinate_client = meta_client.sink_coordinate_client();
let stream_handle = StreamHandle::new(sink_coordinate_client);
only the stream_handle
at the end will be stored as a field in some sink writer. The BidiStreamHandle
contains only a request channel sender and a box response stream, which can be easily mocked.
Therefore, we can have enum for meta_client
and coordinate_client
. The meta_client
enum has method fn sink_coordinate_client(&self)
to create the corresponding sink_coordinate_client
enum. The sink_coordinate_client
enum has method fn new_stream_handle(self, PbSinkParam, Bitmap)
to create the stream handle. The grpc variant of the sink_coordinate_client
enum will follow the original call path, while the mock variant will create a mocked bidi stream handle.
Besides, since we only bench the performance of writer, we don't really need to create the coordinator to commit the metadata. When creating the mocked bidi stream handle, we can just spawn a tokio task to receive the commit request and return with a commit response immediately.
In this way, we will not have to have a new cargo feature and introduce so many hacks in the concrete sink implementation.
@@ -122,6 +124,7 @@ private void write_append_only(Iterator<SinkRow> rows) { | |||
.withDescription("Unknown operation: " + op) | |||
.asRuntimeException(); | |||
} | |||
tryCommit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This optimization of cassandra sink can be put in a separate PR and we can paste improvement gained from the optimization in the PR description.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug, fixed in this pr by the way, the exact description has been added to the pr description
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The benchmark output of the sink in the current PR description is not readable. Can we port to some benchmark framework to better virtualize the output? Such as the criterion framework.
@@ -166,6 +166,31 @@ impl SinkFormatDesc { | |||
})) | |||
} | |||
|
|||
#[cfg(feature = "sink_bench")] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose for this mock method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is to be compatible with sink's two syntaxes, and our redis is only support format syntax, so there is no redis in the method, but the mock needs to be converted to format by redis's option, I will move this method to the bench directory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 4686 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
2048 | 1 | 2637 | 0 |
Click to see the invalid file list
- src/bench/sink_bench/main.rs
src/bench/sink_bench/main.rs
Outdated
@@ -0,0 +1,426 @@ | |||
// Copyright 2023 RisingWave Labs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Copyright 2023 RisingWave Labs | |
// Copyright 2024 RisingWave Labs | |
// | |
// Licensed under the Apache License, Version 2.0 (the "License"); | |
// you may not use this file except in compliance with the License. | |
// You may obtain a copy of the License at | |
// | |
// http://www.apache.org/licenses/LICENSE-2.0 | |
// | |
// Unless required by applicable law or agreed to in writing, software | |
// distributed under the License is distributed on an "AS IS" BASIS, | |
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
// See the License for the specific language governing permissions and | |
// limitations under the License. | |
// Copyright 2023 RisingWave Labs |
2374951
to
ea658e9
Compare
I guess something wrong happened in the rebase. There are many unrelated diff in the |
Yes, I am working on it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for Cargo.toml
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
resolve #13551
In this pr. We add a sink bench tool, We use datagen create data and write it into our sink. We can use it with
schema.yml
and add sink option insink_option.yml
We set barrier latency is 1000ms, and throughput is calculated every 500ms, A total of 20s
We run bench in ec2(36cpu), where datagen has a thread count of 20 and sink has a parallelism of 1
The result is(rows/s)
At the same time we fix a bug in this pr cassandra sink, cassandra api commit each time the number of rows has a maximum value. With higher throughput, can not be simply according to the checkpoint for the commit
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.