-
Notifications
You must be signed in to change notification settings - Fork 589
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: compact stream chunk with sink key when sink key mismatch #15345
Conversation
Co-authored-by: Yuhao Su <[email protected]>
@@ -114,8 +114,13 @@ impl<'a, 'b> RowOpMap<'a, 'b> { | |||
RowOp::Delete(ref old_v) => { | |||
e.insert(RowOp::Update((*old_v, v))); | |||
} | |||
RowOp::Insert(_) | RowOp::Update(_) => { | |||
tracing::warn!("double insert"); | |||
RowOp::Insert(_) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we log more information in the pk to help debug?
@@ -88,6 +91,8 @@ impl<F: LogStoreFactory> SinkExecutor<F> { | |||
sink_param: SinkParam, | |||
columns: Vec<ColumnCatalog>, | |||
log_store_factory: F, | |||
chunk_size: usize, | |||
input_data_types: Vec<DataType>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can get the input_data_types
from executor.schema()
instead of passing it separatedly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after #15167 I think it is banned
stream_key_sink_pk_mismatch && self.sink_param.sink_type != SinkType::AppendOnly; | ||
// NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case. | ||
let re_construct_with_sink_pk = need_advance_delete | ||
&& self.sink_param.sink_type == SinkType::Upsert |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this == SinkType::Upsert
necessary? Because if need_advance_delete
, it must != SinkType::AppendOnly
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is for the forceAppendOnly case.
// NOTE(st1page): reconstruct with sink pk need extra cost to buffer a barrier's data, so currently we bind it with mismatch case. | ||
let re_construct_with_sink_pk = need_advance_delete | ||
&& self.sink_param.sink_type == SinkType::Upsert | ||
&& !self.sink_param.downstream_pk.is_empty(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert that downstream_pk must be non-empty here? IIRC, for upsert sink we require the users to set downstream pk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not like to assert it in such a code branch and I think there should be some assert in other places. here is just a defensive programming to prevent too strange behavior here.
I think we are still mixing two independent concepts served for different purposes here. Let me leave a note here to clarify the behavior: Compaction by stream_key (i.e.
|
Co-authored-by: Yuhao Su <[email protected]>
Agree, so there are remaining works for Compaction by sink_pk, I will create issues
And I also think it is ok to merge this PR first because the main side effect of "Compaction by stream_key" and "Compaction by sink_pk" are the same, buffering all message during the epoch. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are still mixing two independent concepts served for different purposes here. Let me leave a note here to clarify the behavior:
Compaction by stream_key (i.e.
need_advance_delete
)This is a correctness requirement, not an optimization . In other words, compaction by stream_key is a must and cannot be skipped when
sink_pk != stream_key
.
Compaction by stream_pk will advance the DELETE messages before INSERT messages for the same stream_key within the same epoch. This is the only way to ensure correctness.Compaction by sink_pk (i.e.
re_construct_with_sink_pk
)This is an optimization, not a correctness requirement. In other words, regardless of what the relationship is between
sink_pk
andstream_key
, compaction by sink_pk is optional. Given that it is an optimization with the buffering and compaction overhead, I think eventually it is better to let user choose whether to enable it. I am okay with not providing this option and enabling it forcedly whensink_pk != stream_key
for now though.
Compaction by sink_pk has two potential implementations:
- Ensure there is only one change emitted per
sink_pk
in one epoch. Buffering all messages within an epoch is required and this is what this PR does.- Ensure there is no duplicate updates per
sink_pk
. This can be done by buffering delete messages on demand (ideas from @wenym1). If most duplicates are caused by delete + insert, this approach just need to maintain a very small buffer.Agree, so there are remaining works for Compaction by sink_pk, I will create issues
- add a session varible to force enable sink pk compaction
- optimze it with only buffering delete message. That's reasonable because there are less DELETE operations.
I am okay with not providing this option and enabling it forcedly when sink_pk != stream_key for now though
And I also think it is ok to merge this PR first because the main side effect of "Compaction by stream_key" and "Compaction by sink_pk" are the same, buffering all message during the epoch.
+1. Rest LGTM!
Co-authored-by: Yuhao Su <[email protected]>
…) (#15860) Co-authored-by: stonepage <[email protected]> Co-authored-by: Yuhao Su <[email protected]>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
In sink reconstruct the stream chunk and reorder the RowOp to make delete and insert on the same key adjacent. It is to make sinkImpl identity the update operation and omit the delete record for some downstream system.
Because it is expensive, currently only do it when sink pk and stream key mismatch.
close #13138
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.