From 28c68f6b80c44f96b136d157ed9f2f73f5f0a9b6 Mon Sep 17 00:00:00 2001 From: Richard Chien Date: Wed, 4 Dec 2024 14:36:06 +0800 Subject: [PATCH] merge cdc heartbeat chunk & data chunk Signed-off-by: Richard Chien --- src/connector/src/parser/mod.rs | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 22cce134906c..1dd5adb9ff3f 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -700,8 +700,7 @@ async fn into_chunk_stream_inner( ) { let columns = parser.columns().to_vec(); - let mut heartbeat_builder = SourceStreamChunkBuilder::with_capacity(columns.clone(), 0); - let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + let mut chunk_builder = SourceStreamChunkBuilder::with_capacity(columns, 0); struct Transaction { id: Box, @@ -726,14 +725,14 @@ async fn into_chunk_stream_inner( "transaction is larger than {MAX_ROWS_FOR_TRANSACTION} rows, force commit" ); *len = 0; // reset `len` while keeping `id` - yield builder.take(batch_len); + yield chunk_builder.take(batch_len); } else { last_batch_not_yielded = true } } else { // Clean state. Reserve capacity for the builder. - assert!(builder.is_empty()); - let _ = builder.take(batch_len); + assert!(chunk_builder.is_empty()); + let _ = chunk_builder.take(batch_len); } let process_time_ms = chrono::Utc::now().timestamp_millis(); @@ -744,7 +743,7 @@ async fn into_chunk_stream_inner( "got a empty message, could be a heartbeat" ); // Emit an empty invisible row for the heartbeat message. - parser.emit_empty_row(heartbeat_builder.row_writer().invisible().with_meta( + parser.emit_empty_row(chunk_builder.row_writer().invisible().with_meta( MessageMeta { meta: &msg.meta, split_id: &msg.split_id, @@ -769,12 +768,12 @@ async fn into_chunk_stream_inner( direct_cdc_event_lag_latency.observe(lag_ms as f64); } - let old_len = builder.len(); + let old_len = chunk_builder.len(); match parser .parse_one_with_txn( msg.key, msg.payload, - builder.row_writer().with_meta(MessageMeta { + chunk_builder.row_writer().with_meta(MessageMeta { meta: &msg.meta, split_id: &msg.split_id, offset: &msg.offset, @@ -787,7 +786,7 @@ async fn into_chunk_stream_inner( res @ (Ok(ParseResult::Rows) | Err(_)) => { // Aggregate the number of new rows into the current transaction. if let Some(Transaction { len, .. }) = &mut current_transaction { - let n_new_rows = builder.len() - old_len; + let n_new_rows = chunk_builder.len() - old_len; *len += n_new_rows; } @@ -834,7 +833,7 @@ async fn into_chunk_stream_inner( current_transaction = None; if last_batch_not_yielded { - yield builder.take(batch_len - (i + 1)); + yield chunk_builder.take(batch_len - (i + 1)); last_batch_not_yielded = false; } } @@ -864,16 +863,9 @@ async fn into_chunk_stream_inner( } } - // emit heartbeat for each message batch - // we must emit heartbeat chunk before the data chunk, - // otherwise the source offset could be backward due to the heartbeat - if !heartbeat_builder.is_empty() { - yield heartbeat_builder.take(0); - } - // If we are not in a transaction, we should yield the chunk now. if current_transaction.is_none() { - yield builder.take(0); + yield chunk_builder.take(0); } } }