-
Notifications
You must be signed in to change notification settings - Fork 587
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(cdc source): merge cdc heartbeat chunk builder & data chunk builder #19671
Conversation
This stack of pull requests is managed by Graphite. Learn more about stacking. |
The context is a little vague, may I know more the context and background? |
src/connector/src/parser/mod.rs
Outdated
continue; | ||
} | ||
|
||
if batch.iter().all(|msg| msg.is_empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
batch.iter().all
will short-circuit after seeing the first false
, so in normal cases, this check won't cause an additional O(n)
cost.
Just renamed the PR title and changed description to make it more understandable. |
4001bfe
to
4e29427
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The updated implementation does what the description says:
- If the input batch contains only heartbeat, emit a chunk containing a single row of the last heartbeat msg.
- If the input batch contains data, emit a chunk skipping heartbeat msgs in it.
Also discussed offline about the rationale:
It is okay that heartbeat and data msgs are interleaved, except that heartbeat msg cannot be the last row of a chunk. The old logic of separating them into 2 chunks (and emitting heartbeat first) was due to a limitation of the old builder API.
With the enhanced builder API it is unnecessary to shift all heartbeats to be beginning. And to avoid heartbeat being the last row, they could be simply ignored when data is present.
|
||
/// Check whether the source message is a CDC heartbeat message. | ||
pub fn is_cdc_heartbeat(&self) -> bool { | ||
self.key.is_none() && self.payload.is_none() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The payload of postgres heartbeat message is not none after #19385. If you rely on the assumption that payload.is_none()
, you should revisit your code logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original logic on line 302 also relies on this, and it is the only place that appends to heartbeat_builder
instead of builder
. Do you mean that #19385 may cause the original logic to misplace heartbeat msgs into builder
as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For postgres source, the heartbeat message would not goes into the heartbeat_builder
, since every time the heartbeat message will contain the payload now()::varchar
as show in #19385, which can be seen as a normal data chunk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if this now()::varchar
row is at the end of a data chunk? Seems in this case the snapshot_done
flag won't be update correctly, just like what we saw in the CI failure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, who will handle the now()::varchar
row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And, who will handle the
now()::varchar
row?
For postgres source it just same as other data chunk, the schema of cdc source is payload jsonb
. I think you don't need to aware these details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After some diving, now I understand this process. Let me try to summarize.
Why we need Debezium heartbeat?
The PostgreSQL instance contains multiple databases and one of them is a high-traffic database. Debezium captures changes in another database that is low-traffic in comparison to the other database. Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked. As WAL is shared by all databases, the amount used tends to grow until an event is emitted by the database for which Debezium is capturing changes. To overcome this, it is necessary to:
- Enable periodic heartbeat record generation with the
heartbeat.interval.ms
connector configuration property.- Regularly emit change events from the database for which Debezium is capturing changes.
As explained by Debezium document (quoted above), when Debezium is capturing a low-traffic database A, and at the same time there's another high-traffic database B in the PG instance, Debezium cannot "confirm the LSN", because the low-traffic database may have no traffic and hence won't trigger Debezium. To resolve this problem, we have to tell Debezium to "make some changes in database A periodically" so that the "LSN confirmation" can be triggered. This "periodic" behavior is controlled by heartbeat.interval.ms
.
How is a heartbeat message like?
After we set heartbeat.interval.ms
, RisingWave will receive heartbeat message from Debezium periodically, which are SourceMessage
s like this:
SourceMessage {
key: None,
payload: None,
offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn\":120699456,\"txId\":1035,\"ts_usec\":1733494134595487},\"isHeartbeat\":true}",
split_id: "6",
meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "", source_ts_ms: 0, msg_type: Heartbeat })
}
The key
and payload
are both None
, no matter what heartbeat.action.query
is (we will come to this in the next section). When seeing a heartbeat message, connector::parser::into_chunk_stream_inner
should yield a chunk containing an invisible row with the offset
in the message, so that SourceExecutor
can then do update_in_place
and hence update_offset
for the DebeziumCdcSplit
metadata.
However, like the document said, the LSN (inside sourceOffset
field) will not change because the "LSN confirmation" process cannot be triggered.
How does Debezium "make some changes"?
So now we need Debezium to "make some changes".
A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row. PostgreSQL then invokes Debezium, which confirms the latest LSN and allows the database to reclaim the WAL space. This task can be automated by means of the
heartbeat.action.query
connector configuration property.
As the same document said, the operation used to generate changes is configured via heartbeat.action.query
. An example for this config given in the document is INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat')
, which just inserts a meaningless row to a pre-created meaningless "heartbeat table".
In our case, we don't make changes by inserting into some "heartbeat table", we use pg_logical_emit_message
, which "emits a logical decoding message" to "pass generic messages to logical decoding plugins through WAL". This PG function is also mentioned in Debezium document. When this function is called inside database A, a "message event" will be captured by Debezium, then it can do "LSN confirmation".
With this config set to SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar)
, we receive the following two SourceMessage
s periodically:
SourceMessage {
key: None,
payload: None,
offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn\":125080800,\"txId\":1050,\"ts_usec\":1733496265175398},\"isHeartbeat\":true}",
split_id: "6",
meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "", source_ts_ms: 0, msg_type: Heartbeat })
}
SourceMessage {
key: Some([123, 34, 115, 99, 104, 101, 109, 97, 34, 58, 110, 117, 108, 108, 44, 34, 112, 97, 121, 108, 111, 97, 100, 34, 58, 123, 34, 112, 114, 101, 102, 105, 120, 34, 58, 34, 104, 101, 97, 114, 116, 98, 101, 97, 116, 34, 125, 125]),
payload: Some([123, 34, 115, 99, 104, 101, 109, 97, 34, 58, 110, 117, 108, 108, 44, 34, 112, 97, 121, 108, 111, 97, 100, 34, 58, 123, 34, 111, 112, 34, 58, 34, 109, 34, 44, 34, 116, 115, 95, 109, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 55, 54, 48, 50, 56, 44, 34, 115, 111, 117, 114, 99, 101, 34, 58, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 34, 50, 46, 54, 46, 50, 46, 70, 105, 110, 97, 108, 34, 44, 34, 99, 111, 110, 110, 101, 99, 116, 111, 114, 34, 58, 34, 112, 111, 115, 116, 103, 114, 101, 115, 113, 108, 34, 44, 34, 110, 97, 109, 101, 34, 58, 34, 82, 87, 95, 67, 68, 67, 95, 54, 34, 44, 34, 116, 115, 95, 109, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 44, 34, 115, 110, 97, 112, 115, 104, 111, 116, 34, 58, 34, 102, 97, 108, 115, 101, 34, 44, 34, 100, 98, 34, 58, 34, 112, 111, 115, 116, 103, 114, 101, 115, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 34, 58, 34, 91, 92, 34, 49, 50, 53, 48, 56, 49, 48, 52, 56, 92, 34, 44, 92, 34, 49, 50, 53, 48, 56, 49, 48, 52, 56, 92, 34, 93, 34, 44, 34, 116, 115, 95, 117, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 51, 57, 56, 44, 34, 116, 115, 95, 110, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 51, 57, 56, 48, 48, 48, 44, 34, 115, 99, 104, 101, 109, 97, 34, 58, 34, 34, 44, 34, 116, 97, 98, 108, 101, 34, 58, 34, 34, 44, 34, 116, 120, 73, 100, 34, 58, 110, 117, 108, 108, 44, 34, 108, 115, 110, 34, 58, 49, 50, 53, 48, 56, 49, 48, 52, 56, 44, 34, 120, 109, 105, 110, 34, 58, 110, 117, 108, 108, 125, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 123, 34, 112, 114, 101, 102, 105, 120, 34, 58, 34, 104, 101, 97, 114, 116, 98, 101, 97, 116, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 77, 106, 65, 121, 78, 67, 48, 120, 77, 105, 48, 119, 78, 105, 65, 121, 77, 106, 111, 48, 78, 68, 111, 122, 78, 83, 52, 49, 77, 84, 85, 52, 79, 84, 89, 114, 77, 68, 103, 61, 34, 125, 125, 125]),
offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn_proc\":125081048,\"messageType\":\"MESSAGE\",\"lsn_commit\":125081048,\"lsn\":125081048,\"ts_usec\":1733496265175398},\"isHeartbeat\":false}",
split_id: "6",
meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "message", source_ts_ms: 1733496265175, msg_type: Data })
}
Note that, we will receive both the empty-payloaded heartbeat message (with isHeartbeat: true
), and the "message event" generated by pg_logical_emit_message
belonging to the message
table. And since Debezium captured a new event (the "message event"), it "confirms LSN", so the LSN
got updated.
In our current implementation, the first message will be handled as heartbeat message and emitted as an invisible empty row with "offset", the second will be parsed as a regular row, like:
| + | {"message": {"content": "MjAyNC0xMi0wNiAyMzo0MjowMC44MDAwMDErMDg=", "prefix": "heartbeat"}, "op": "m", "source": {"connector": "postgresql", "db": "postgres", "lsn": 125280216, "name": "RW_CDC_8", "schema": "", "sequence": "[\"125280216\",\"125280216\"]", "snapshot": "false", "table": "", "ts_ms": 1733499710495, "ts_ns": 1733499710495961000, "ts_us": 1733499710495961, "txId": null, "version": "2.6.2.Final", "xmin": null}, "ts_ms": 1733499721323} | {"sourcePartition":{"server":"RW_CDC_8"},"sourceOffset":{"lsn_proc":125280216,"messageType":"MESSAGE","lsn_commit":125280216,"lsn":125280216,"ts_usec":1733499710495961},"isHeartbeat":false} | message | | 8 | {"sourcePartition":{"server":"RW_CDC_8"},"sourceOffset":{"lsn_proc":125280216,"messageType":"MESSAGE","lsn_commit":125280216,"lsn":125280216,"ts_usec":1733499710495961},"isHeartbeat":false} |
The MjAyNC0xMi0wNiAyMzo0MjowMC44MDAwMDErMDg=
is the base64-encoded now()::varchar
, and the prefix
is heartbeat
which we set when calling pg_logical_emit_message
. And because the full_table_name
is message
which does not match any table we want to replicate, this row will be dismissed later (I didn't dive in this part, so sorrect me if I'm wrong.)
Both the offset
of heartbeat message and of the second message will take effect when it comes to DebeziumCdcSplit::update_offset
.
Now we can conclude that, the value of now()::varchar
simply does not matter at all, and we can actually change it to an empty string or any other string to avoid confusion. And, the PR #19385, changing the first argument transactional
from false
to true
, will not significantly change the SourceMessage
s we mentioned above, instead, it just generates additional transaction BEGIN
and COMMIT
messages before and after the second SourceMessage
.
On PR #19385
For the issue which #19385 wanted to resolve (#16697), I guess maybe a better fix will be to set the flush
argument to true
when calling pg_logical_emit_message
, instead of setting transactional
to true.
The PG document says:
The
flush
parameter (default set tofalse
) controls if the message is immediately flushed to WAL or not.flush
has no effect withtransactional
, as the message's WAL record is flushed along with its transaction.
From my understanding, what we actually want is to immediately flush the emitted message to WAL so that Debezium can capture it.
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
Signed-off-by: Richard Chien <[email protected]>
a8d2c05
to
185defc
Compare
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Due to some historical reason, we build data chunk and heartbeat chunk for CDC source with different
SourceStreamChunkBuilder
s, however, it seems that:Merging heartbeat chunk builder with data chunk builder seems do no harm.
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.