Skip to content

Commit

Permalink
merge cdc heartbeat chunk & data chunk
Browse files Browse the repository at this point in the history
Signed-off-by: Richard Chien <[email protected]>
  • Loading branch information
stdrc committed Dec 4, 2024
1 parent 0b7c43c commit 28c68f6
Showing 1 changed file with 10 additions and 18 deletions.
28 changes: 10 additions & 18 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -700,8 +700,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
) {
let columns = parser.columns().to_vec();

let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0);
let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0);
let mut chunk_builder = SourceStreamChunkBuilder::with_capacity(columns, 0);

struct Transaction {
id: Box<str>,
Expand All @@ -726,14 +725,14 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
"transaction is larger than {MAX_ROWS_FOR_TRANSACTION} rows, force commit"
);
*len = 0; // reset `len` while keeping `id`
yield builder.take(batch_len);
yield chunk_builder.take(batch_len);
} else {
last_batch_not_yielded = true
}
} else {
// Clean state. Reserve capacity for the builder.
assert!(builder.is_empty());
let _ = builder.take(batch_len);
assert!(chunk_builder.is_empty());
let _ = chunk_builder.take(batch_len);
}

let process_time_ms = chrono::Utc::now().timestamp_millis();
Expand All @@ -744,7 +743,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
"got a empty message, could be a heartbeat"
);
// Emit an empty invisible row for the heartbeat message.
parser.emit_empty_row(heartbeat_builder.row_writer().invisible().with_meta(
parser.emit_empty_row(chunk_builder.row_writer().invisible().with_meta(
MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
Expand All @@ -769,12 +768,12 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
direct_cdc_event_lag_latency.observe(lag_ms as f64);
}

let old_len = builder.len();
let old_len = chunk_builder.len();
match parser
.parse_one_with_txn(
msg.key,
msg.payload,
builder.row_writer().with_meta(MessageMeta {
chunk_builder.row_writer().with_meta(MessageMeta {
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
Expand All @@ -787,7 +786,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
res @ (Ok(ParseResult::Rows) | Err(_)) => {
// Aggregate the number of new rows into the current transaction.
if let Some(Transaction { len, .. }) = &mut current_transaction {
let n_new_rows = builder.len() - old_len;
let n_new_rows = chunk_builder.len() - old_len;
*len += n_new_rows;
}

Expand Down Expand Up @@ -834,7 +833,7 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
current_transaction = None;

if last_batch_not_yielded {
yield builder.take(batch_len - (i + 1));
yield chunk_builder.take(batch_len - (i + 1));
last_batch_not_yielded = false;
}
}
Expand Down Expand Up @@ -864,16 +863,9 @@ async fn into_chunk_stream_inner<P: ByteStreamSourceParser>(
}
}

// emit heartbeat for each message batch
// we must emit heartbeat chunk before the data chunk,
// otherwise the source offset could be backward due to the heartbeat
if !heartbeat_builder.is_empty() {
yield heartbeat_builder.take(0);
}

// If we are not in a transaction, we should yield the chunk now.
if current_transaction.is_none() {
yield builder.take(0);
yield chunk_builder.take(0);
}
}
}
Expand Down

0 comments on commit 28c68f6

Please sign in to comment.