Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(cdc source): merge cdc heartbeat chunk builder & data chunk builder #19671

Merged
merged 6 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/connector/src/parser/chunk_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl SourceStreamChunkBuilder {
/// Resets the builder and returns a [`StreamChunk`], while reserving `next_cap` capacity for
/// the builders of the next [`StreamChunk`].
#[must_use]
pub fn take(&mut self, next_cap: usize) -> StreamChunk {
pub fn take_and_reserve(&mut self, next_cap: usize) -> StreamChunk {
let descs = std::mem::take(&mut self.descs); // we don't use `descs` in `finish`
let builder = std::mem::replace(self, Self::with_capacity(descs, next_cap));
builder.finish()
Expand Down
125 changes: 78 additions & 47 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
.map_ok(|_| ParseResult::Rows)
}

fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
fn append_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
_ = writer.do_insert(|_column| Ok(Datum::None));
}
}
Expand Down Expand Up @@ -250,7 +250,7 @@ impl<P: ByteStreamSourceParser> P {

/// Maximum number of rows in a transaction. If a transaction is larger than this, it will be force
/// committed to avoid potential OOM.
const MAX_ROWS_FOR_TRANSACTION: usize = 4096;
const MAX_TRANSACTION_SIZE: usize = 4096;

// TODO: when upsert is disabled, how to filter those empty payload
// Currently, an err is returned for non upsert with empty payload
Expand All @@ -261,8 +261,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
) {
let columns = parser.columns().to_vec();

let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0);
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);
let mut chunk_builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

struct Transaction {
id: Box<str>,
Expand All @@ -273,45 +272,71 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(

#[for_await]
for batch in data_stream {
// It's possible that the split is not active, which means the next batch may arrive
// very lately, so we should prefer emitting all records in current batch before the end
// of each iteration, instead of merging them with the next batch. An exception is when
// a transaction is not committed yet, in which yield when the transaction is committed.

let batch = batch?;
let batch_len = batch.len();

let mut last_batch_not_yielded = false;
if let Some(Transaction { len, id }) = &mut current_transaction {
// Dirty state. The last batch is not yielded due to uncommitted transaction.
if *len > MAX_ROWS_FOR_TRANSACTION {
// Large transaction. Force commit.
if batch_len == 0 {
continue;
}

if batch.iter().all(|msg| msg.is_cdc_heartbeat()) {
// This `.iter().all(...)` will short-circuit after seeing the first `false`, so in
// normal cases, this should only involve a constant time cost.

// Now we know that there is no data message in the batch, let's just emit the latest
// heartbeat message. Note that all messages in `batch` should belong to the same
// split, so we don't have to do a split to heartbeats mapping here.

if let Some(Transaction { id, len }) = &mut current_transaction {
// if there's an ongoing transaction, something may be wrong
tracing::warn!(
id,
len,
"transaction is larger than {MAX_ROWS_FOR_TRANSACTION} rows, force commit"
"got a batch of empty messages during an ongoing transaction"
);
*len = 0; // reset `len` while keeping `id`
yield builder.take(batch_len);
} else {
last_batch_not_yielded = true
// for the sake of simplicity, let's force emit the partial transaction chunk
if *len > 0 {
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take_and_reserve(1); // next chunk will only contain the heartbeat
}
}
} else {
// Clean state. Reserve capacity for the builder.
assert!(builder.is_empty());
let _ = builder.take(batch_len);

// According to the invariant we mentioned at the beginning of the `for batch` loop,
// there should be no data of previous batch in `chunk_builder`.
assert!(chunk_builder.is_empty());

let heartbeat_msg = batch.last().unwrap();
tracing::debug!(
offset = heartbeat_msg.offset,
"emitting a heartbeat message"
);
// TODO(rc): should be `chunk_builder.append_heartbeat` instead, which is simpler
parser.append_empty_row(chunk_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &heartbeat_msg.meta,
split_id: &heartbeat_msg.split_id,
offset: &heartbeat_msg.offset,
},
));
yield chunk_builder.take_and_reserve(batch_len);

continue;
}

// When we reach here, there is at least one data message in the batch. We should ignore all
// heartbeat messages.

let mut txn_started_in_last_batch = current_transaction.is_some();
let process_time_ms = chrono::Utc::now().timestamp_millis();

for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
tracing::debug!(
offset = msg.offset,
"got a empty message, could be a heartbeat"
);
// Emit an empty invisible row for the heartbeat message.
parser.emit_empty_row(heartbeat_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
},
));
if msg.is_cdc_heartbeat() {
// ignore heartbeat messages
continue;
}

Expand All @@ -330,12 +355,12 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
direct_cdc_event_lag_latency.observe(lag_ms as f64);
}

let old_len = builder.len();
let old_len = chunk_builder.len();
match parser
.parse_one_with_txn(
msg.key,
msg.payload,
builder.row_writer().with_meta(MessageMeta {
chunk_builder.row_writer().with_meta(MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
Expand All @@ -348,7 +373,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
res @ (Ok(ParseResult::Rows) | Err(_)) => {
// Aggregate the number of new rows into the current transaction.
if let Some(Transaction { len, .. }) = &mut current_transaction {
let n_new_rows = builder.len() - old_len;
let n_new_rows = chunk_builder.len() - old_len;
*len += n_new_rows;
}

Expand Down Expand Up @@ -394,9 +419,9 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
tracing::debug!(id, "commit upstream transaction");
current_transaction = None;

if last_batch_not_yielded {
yield builder.take(batch_len - (i + 1));
last_batch_not_yielded = false;
if txn_started_in_last_batch {
yield chunk_builder.take_and_reserve(batch_len - (i + 1));
txn_started_in_last_batch = false;
}
}
},
Expand Down Expand Up @@ -425,16 +450,22 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}
}

// emit heartbeat for each message batch
// we must emit heartbeat chunk before the data chunk,
// otherwise the source offset could be backward due to the heartbeat
if !heartbeat_builder.is_empty() {
yield heartbeat_builder.take(0);
}

// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield builder.take(0);
if let Some(Transaction { len, id }) = &mut current_transaction {
// in transaction, check whether it's too large
if *len > MAX_TRANSACTION_SIZE {
// force commit
tracing::warn!(
id,
len,
"transaction is larger than {MAX_TRANSACTION_SIZE} rows, force commit"
);
*len = 0; // reset `len` while keeping `id`
yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint
}
// TODO(rc): we will have better chunk size control later
} else if !chunk_builder.is_empty() {
// not in transaction, yield the chunk now
yield chunk_builder.take_and_reserve(batch_len); // use curr batch len as next capacity, just a hint
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ mod tests {
_ => panic!("unexpected parse result: {:?}", res),
}

let output = builder.take(10);
let output = builder.take_and_reserve(10);
assert_eq!(0, output.cardinality());
}

Expand Down
5 changes: 5 additions & 0 deletions src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,11 @@ impl SourceMessage {
meta: SourceMeta::Empty,
}
}

/// Check whether the source message is a CDC heartbeat message.
pub fn is_cdc_heartbeat(&self) -> bool {
self.key.is_none() && self.payload.is_none()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The payload of postgres heartbeat message is not none after #19385. If you rely on the assumption that payload.is_none(), you should revisit your code logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic on line 302 also relies on this, and it is the only place that appends to heartbeat_builder instead of builder. Do you mean that #19385 may cause the original logic to misplace heartbeat msgs into builder as well?

Copy link
Contributor

@StrikeW StrikeW Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For postgres source, the heartbeat message would not goes into the heartbeat_builder, since every time the heartbeat message will contain the payload now()::varchar as show in #19385, which can be seen as a normal data chunk.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if this now()::varchar row is at the end of a data chunk? Seems in this case the snapshot_done flag won't be update correctly, just like what we saw in the CI failure.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, who will handle the now()::varchar row?

Copy link
Contributor

@StrikeW StrikeW Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And, who will handle the now()::varchar row?

For postgres source it just same as other data chunk, the schema of cdc source is payload jsonb. I think you don't need to aware these details.

Copy link
Member Author

@stdrc stdrc Dec 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some diving, now I understand this process. Let me try to summarize.

Why we need Debezium heartbeat?

The PostgreSQL instance contains multiple databases and one of them is a high-traffic database. Debezium captures changes in another database that is low-traffic in comparison to the other database. Debezium then cannot confirm the LSN as replication slots work per-database and Debezium is not invoked. As WAL is shared by all databases, the amount used tends to grow until an event is emitted by the database for which Debezium is capturing changes. To overcome this, it is necessary to:

  • Enable periodic heartbeat record generation with the heartbeat.interval.ms connector configuration property.
  • Regularly emit change events from the database for which Debezium is capturing changes.

As explained by Debezium document (quoted above), when Debezium is capturing a low-traffic database A, and at the same time there's another high-traffic database B in the PG instance, Debezium cannot "confirm the LSN", because the low-traffic database may have no traffic and hence won't trigger Debezium. To resolve this problem, we have to tell Debezium to "make some changes in database A periodically" so that the "LSN confirmation" can be triggered. This "periodic" behavior is controlled by heartbeat.interval.ms.

How is a heartbeat message like?

After we set heartbeat.interval.ms, RisingWave will receive heartbeat message from Debezium periodically, which are SourceMessages like this:

SourceMessage {
  key: None,
  payload: None,
  offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn\":120699456,\"txId\":1035,\"ts_usec\":1733494134595487},\"isHeartbeat\":true}",
  split_id: "6",
  meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "", source_ts_ms: 0, msg_type: Heartbeat })
}

The key and payload are both None, no matter what heartbeat.action.query is (we will come to this in the next section). When seeing a heartbeat message, connector::parser::into_chunk_stream_inner should yield a chunk containing an invisible row with the offset in the message, so that SourceExecutor can then do update_in_place and hence update_offset for the DebeziumCdcSplit metadata.

However, like the document said, the LSN (inside sourceOffset field) will not change because the "LSN confirmation" process cannot be triggered.

How does Debezium "make some changes"?

So now we need Debezium to "make some changes".

A separate process would then periodically update the table by either inserting a new row or repeatedly updating the same row. PostgreSQL then invokes Debezium, which confirms the latest LSN and allows the database to reclaim the WAL space. This task can be automated by means of the heartbeat.action.query connector configuration property.

As the same document said, the operation used to generate changes is configured via heartbeat.action.query. An example for this config given in the document is INSERT INTO test_heartbeat_table (text) VALUES ('test_heartbeat'), which just inserts a meaningless row to a pre-created meaningless "heartbeat table".

In our case, we don't make changes by inserting into some "heartbeat table", we use pg_logical_emit_message, which "emits a logical decoding message" to "pass generic messages to logical decoding plugins through WAL". This PG function is also mentioned in Debezium document. When this function is called inside database A, a "message event" will be captured by Debezium, then it can do "LSN confirmation".

With this config set to SELECT pg_logical_emit_message(false, 'heartbeat', now()::varchar), we receive the following two SourceMessages periodically:

SourceMessage {
  key: None,
  payload: None,
  offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn\":125080800,\"txId\":1050,\"ts_usec\":1733496265175398},\"isHeartbeat\":true}",
  split_id: "6",
  meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "", source_ts_ms: 0, msg_type: Heartbeat })
}

SourceMessage {
  key: Some([123, 34, 115, 99, 104, 101, 109, 97, 34, 58, 110, 117, 108, 108, 44, 34, 112, 97, 121, 108, 111, 97, 100, 34, 58, 123, 34, 112, 114, 101, 102, 105, 120, 34, 58, 34, 104, 101, 97, 114, 116, 98, 101, 97, 116, 34, 125, 125]),
  payload: Some([123, 34, 115, 99, 104, 101, 109, 97, 34, 58, 110, 117, 108, 108, 44, 34, 112, 97, 121, 108, 111, 97, 100, 34, 58, 123, 34, 111, 112, 34, 58, 34, 109, 34, 44, 34, 116, 115, 95, 109, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 55, 54, 48, 50, 56, 44, 34, 115, 111, 117, 114, 99, 101, 34, 58, 123, 34, 118, 101, 114, 115, 105, 111, 110, 34, 58, 34, 50, 46, 54, 46, 50, 46, 70, 105, 110, 97, 108, 34, 44, 34, 99, 111, 110, 110, 101, 99, 116, 111, 114, 34, 58, 34, 112, 111, 115, 116, 103, 114, 101, 115, 113, 108, 34, 44, 34, 110, 97, 109, 101, 34, 58, 34, 82, 87, 95, 67, 68, 67, 95, 54, 34, 44, 34, 116, 115, 95, 109, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 44, 34, 115, 110, 97, 112, 115, 104, 111, 116, 34, 58, 34, 102, 97, 108, 115, 101, 34, 44, 34, 100, 98, 34, 58, 34, 112, 111, 115, 116, 103, 114, 101, 115, 34, 44, 34, 115, 101, 113, 117, 101, 110, 99, 101, 34, 58, 34, 91, 92, 34, 49, 50, 53, 48, 56, 49, 48, 52, 56, 92, 34, 44, 92, 34, 49, 50, 53, 48, 56, 49, 48, 52, 56, 92, 34, 93, 34, 44, 34, 116, 115, 95, 117, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 51, 57, 56, 44, 34, 116, 115, 95, 110, 115, 34, 58, 49, 55, 51, 51, 52, 57, 54, 50, 54, 53, 49, 55, 53, 51, 57, 56, 48, 48, 48, 44, 34, 115, 99, 104, 101, 109, 97, 34, 58, 34, 34, 44, 34, 116, 97, 98, 108, 101, 34, 58, 34, 34, 44, 34, 116, 120, 73, 100, 34, 58, 110, 117, 108, 108, 44, 34, 108, 115, 110, 34, 58, 49, 50, 53, 48, 56, 49, 48, 52, 56, 44, 34, 120, 109, 105, 110, 34, 58, 110, 117, 108, 108, 125, 44, 34, 109, 101, 115, 115, 97, 103, 101, 34, 58, 123, 34, 112, 114, 101, 102, 105, 120, 34, 58, 34, 104, 101, 97, 114, 116, 98, 101, 97, 116, 34, 44, 34, 99, 111, 110, 116, 101, 110, 116, 34, 58, 34, 77, 106, 65, 121, 78, 67, 48, 120, 77, 105, 48, 119, 78, 105, 65, 121, 77, 106, 111, 48, 78, 68, 111, 122, 78, 83, 52, 49, 77, 84, 85, 52, 79, 84, 89, 114, 77, 68, 103, 61, 34, 125, 125, 125]),
  offset: "{\"sourcePartition\":{\"server\":\"RW_CDC_6\"},\"sourceOffset\":{\"lsn_proc\":125081048,\"messageType\":\"MESSAGE\",\"lsn_commit\":125081048,\"lsn\":125081048,\"ts_usec\":1733496265175398},\"isHeartbeat\":false}",
  split_id: "6",
  meta: DebeziumCdc(DebeziumCdcMeta { db_name_prefix_len: 0, full_table_name: "message", source_ts_ms: 1733496265175, msg_type: Data })
}

Note that, we will receive both the empty-payloaded heartbeat message (with isHeartbeat: true), and the "message event" generated by pg_logical_emit_message belonging to the message table. And since Debezium captured a new event (the "message event"), it "confirms LSN", so the LSN got updated.

In our current implementation, the first message will be handled as heartbeat message and emitted as an invisible empty row with "offset", the second will be parsed as a regular row, like:

| + | {"message": {"content": "MjAyNC0xMi0wNiAyMzo0MjowMC44MDAwMDErMDg=", "prefix": "heartbeat"}, "op": "m", "source": {"connector": "postgresql", "db": "postgres", "lsn": 125280216, "name": "RW_CDC_8", "schema": "", "sequence": "[\"125280216\",\"125280216\"]", "snapshot": "false", "table": "", "ts_ms": 1733499710495, "ts_ns": 1733499710495961000, "ts_us": 1733499710495961, "txId": null, "version": "2.6.2.Final", "xmin": null}, "ts_ms": 1733499721323} | {"sourcePartition":{"server":"RW_CDC_8"},"sourceOffset":{"lsn_proc":125280216,"messageType":"MESSAGE","lsn_commit":125280216,"lsn":125280216,"ts_usec":1733499710495961},"isHeartbeat":false} | message |   | 8 | {"sourcePartition":{"server":"RW_CDC_8"},"sourceOffset":{"lsn_proc":125280216,"messageType":"MESSAGE","lsn_commit":125280216,"lsn":125280216,"ts_usec":1733499710495961},"isHeartbeat":false} |

The MjAyNC0xMi0wNiAyMzo0MjowMC44MDAwMDErMDg= is the base64-encoded now()::varchar, and the prefix is heartbeat which we set when calling pg_logical_emit_message. And because the full_table_name is message which does not match any table we want to replicate, this row will be dismissed later (I didn't dive in this part, so sorrect me if I'm wrong.)

Both the offset of heartbeat message and of the second message will take effect when it comes to DebeziumCdcSplit::update_offset.

Now we can conclude that, the value of now()::varchar simply does not matter at all, and we can actually change it to an empty string or any other string to avoid confusion. And, the PR #19385, changing the first argument transactional from false to true, will not significantly change the SourceMessages we mentioned above, instead, it just generates additional transaction BEGIN and COMMIT messages before and after the second SourceMessage.

On PR #19385

For the issue which #19385 wanted to resolve (#16697), I guess maybe a better fix will be to set the flush argument to true when calling pg_logical_emit_message, instead of setting transactional to true.

The PG document says:

The flush parameter (default set to false) controls if the message is immediately flushed to WAL or not. flush has no effect with transactional, as the message's WAL record is flushed along with its transaction.

From my understanding, what we actually want is to immediately flush the emitted message to WAL so that Debezium can capture it.

}
}

#[derive(Debug, Clone)]
Expand Down
Loading