diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 7362497bb2dcb..584e0e497d1f3 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -143,30 +143,32 @@ pub struct SourceStreamChunkRowWriter<'a> { /// An optional meta data of the original message. /// /// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`]. - row_meta: Option, + row_meta: Option>, } /// The meta data of the original message for a row writer. /// /// Extracted from the `SourceMessage`. -pub struct MessageMeta { - meta: SourceMeta, - offset: String, +#[derive(Clone, Copy)] +pub struct MessageMeta<'a> { + meta: &'a SourceMeta, + split_id: &'a str, + offset: &'a str, } -impl MessageMeta { +impl MessageMeta<'_> { /// Extract the value for the given column. /// /// Returns `None` if the column is not a meta column. - fn value_for_column(&self, desc: &SourceColumnDesc) -> Option { + fn value_for_column(self, desc: &SourceColumnDesc) -> Option { match desc.column_type { // Row id columns are filled with `NULL` here and will be filled with the real // row id generated by `RowIdGenExecutor` later. SourceColumnType::RowId => Datum::None.into(), // Extract the offset from the meta data. - SourceColumnType::Offset => Datum::Some(self.offset.as_str().into()).into(), + SourceColumnType::Offset => Datum::Some(self.offset.into()).into(), // Extract custom meta data per connector. - SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => { + SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => { assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name"); kafka_meta.timestamp.map(|ts| { risingwave_common::cast::i64_to_timestamptz(ts) @@ -274,11 +276,11 @@ impl OpAction for OpActionUpdate { } } -impl SourceStreamChunkRowWriter<'_> { +impl<'a> SourceStreamChunkRowWriter<'a> { /// Set the meta data of the original message for this row writer. /// /// This should always be called except for tests. - fn with_meta(self, row_meta: MessageMeta) -> Self { + fn with_meta(self, row_meta: MessageMeta<'a>) -> Self { Self { row_meta: Some(row_meta), ..self @@ -308,8 +310,12 @@ impl SourceStreamChunkRowWriter<'_> { Err(error) => { // TODO: figure out a way to fill in not-null default value if user specifies one // TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message) + // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, + // see #13105 tracing::warn!( %error, + split_id = self.row_meta.as_ref().map(|m| m.split_id), + offset = self.row_meta.as_ref().map(|m| m.offset), column = desc.name, "failed to parse non-pk column, padding with `NULL`" ); @@ -527,7 +533,7 @@ async fn into_chunk_stream(mut parser: P, data_stream for (i, msg) in batch.into_iter().enumerate() { if msg.key.is_none() && msg.payload.is_none() { if parser.parser_format() == ParserFormat::Debezium { - tracing::debug!("heartbeat message {}, skip parser", msg.offset); + tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message"); // empty payload means a heartbeat in cdc source // heartbeat message offset should not overwrite data messages offset split_offset_mapping @@ -537,13 +543,8 @@ async fn into_chunk_stream(mut parser: P, data_stream continue; } - let parse_span = tracing::info_span!( - "parse_one", - split_id = msg.split_id.as_ref(), - offset = msg.offset - ); - split_offset_mapping.insert(msg.split_id, msg.offset.clone()); + split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone()); let old_op_num = builder.op_num(); match parser @@ -551,11 +552,11 @@ async fn into_chunk_stream(mut parser: P, data_stream msg.key, msg.payload, builder.row_writer().with_meta(MessageMeta { - meta: msg.meta, - offset: msg.offset, + meta: &msg.meta, + split_id: &msg.split_id, + offset: &msg.offset, }), ) - .instrument(parse_span.clone()) .await { // It's possible that parsing multiple rows in a single message PARTIALLY failed. @@ -570,7 +571,14 @@ async fn into_chunk_stream(mut parser: P, data_stream } if let Err(error) = res { - tracing::error!(parent: &parse_span, %error, "failed to parse message, skipping"); + // TODO: not using tracing span to provide `split_id` and `offset` due to performance concern, + // see #13105 + tracing::error!( + %error, + split_id = &*msg.split_id, + offset = msg.offset, + "failed to parse message, skipping" + ); parser.source_ctx().report_user_source_error(error); } } @@ -579,14 +587,14 @@ async fn into_chunk_stream(mut parser: P, data_stream match txn_ctl { TransactionControl::Begin { id } => { if let Some(Transaction { id: current_id, .. }) = ¤t_transaction { - tracing::warn!(parent: &parse_span, current_id, id, "already in transaction"); + tracing::warn!(current_id, id, "already in transaction"); } current_transaction = Some(Transaction { id, len: 0 }); } TransactionControl::Commit { id } => { let current_id = current_transaction.as_ref().map(|t| &t.id); if current_id != Some(&id) { - tracing::warn!(parent: &parse_span, ?current_id, id, "transaction id mismatch"); + tracing::warn!(?current_id, id, "transaction id mismatch"); } current_transaction = None; } diff --git a/src/connector/src/source/base.rs b/src/connector/src/source/base.rs index 49dc3b5d87119..e5193291a6d14 100644 --- a/src/connector/src/source/base.rs +++ b/src/connector/src/source/base.rs @@ -561,7 +561,7 @@ pub type SplitId = Arc; pub struct SourceMessage { pub key: Option>, pub payload: Option>, - pub offset: String, + pub offset: String, // TODO: use `Arc` pub split_id: SplitId, pub meta: SourceMeta, }