Skip to content

Commit

Permalink
perf(connector): do not create span for parsing each row (#13105)
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao authored Oct 27, 2023
1 parent f60f5b6 commit a6f1714
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 24 deletions.
54 changes: 31 additions & 23 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,30 +143,32 @@ pub struct SourceStreamChunkRowWriter<'a> {
/// An optional meta data of the original message.
///
/// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`].
row_meta: Option<MessageMeta>,
row_meta: Option<MessageMeta<'a>>,
}

/// The meta data of the original message for a row writer.
///
/// Extracted from the `SourceMessage`.
pub struct MessageMeta {
meta: SourceMeta,
offset: String,
#[derive(Clone, Copy)]
pub struct MessageMeta<'a> {
meta: &'a SourceMeta,
split_id: &'a str,
offset: &'a str,
}

impl MessageMeta {
impl MessageMeta<'_> {
/// Extract the value for the given column.
///
/// Returns `None` if the column is not a meta column.
fn value_for_column(&self, desc: &SourceColumnDesc) -> Option<Datum> {
fn value_for_column(self, desc: &SourceColumnDesc) -> Option<Datum> {
match desc.column_type {
// Row id columns are filled with `NULL` here and will be filled with the real
// row id generated by `RowIdGenExecutor` later.
SourceColumnType::RowId => Datum::None.into(),
// Extract the offset from the meta data.
SourceColumnType::Offset => Datum::Some(self.offset.as_str().into()).into(),
SourceColumnType::Offset => Datum::Some(self.offset.into()).into(),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => {
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => {
assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name");
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
Expand Down Expand Up @@ -274,11 +276,11 @@ impl OpAction for OpActionUpdate {
}
}

impl SourceStreamChunkRowWriter<'_> {
impl<'a> SourceStreamChunkRowWriter<'a> {
/// Set the meta data of the original message for this row writer.
///
/// This should always be called except for tests.
fn with_meta(self, row_meta: MessageMeta) -> Self {
fn with_meta(self, row_meta: MessageMeta<'a>) -> Self {
Self {
row_meta: Some(row_meta),
..self
Expand Down Expand Up @@ -308,8 +310,12 @@ impl SourceStreamChunkRowWriter<'_> {
Err(error) => {
// TODO: figure out a way to fill in not-null default value if user specifies one
// TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
// TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
// see #13105
tracing::warn!(
%error,
split_id = self.row_meta.as_ref().map(|m| m.split_id),
offset = self.row_meta.as_ref().map(|m| m.offset),
column = desc.name,
"failed to parse non-pk column, padding with `NULL`"
);
Expand Down Expand Up @@ -527,7 +533,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
if parser.parser_format() == ParserFormat::Debezium {
tracing::debug!("heartbeat message {}, skip parser", msg.offset);
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// empty payload means a heartbeat in cdc source
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
Expand All @@ -537,25 +543,20 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream

continue;
}
let parse_span = tracing::info_span!(
"parse_one",
split_id = msg.split_id.as_ref(),
offset = msg.offset
);

split_offset_mapping.insert(msg.split_id, msg.offset.clone());
split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone());

let old_op_num = builder.op_num();
match parser
.parse_one_with_txn(
msg.key,
msg.payload,
builder.row_writer().with_meta(MessageMeta {
meta: msg.meta,
offset: msg.offset,
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
}),
)
.instrument(parse_span.clone())
.await
{
// It's possible that parsing multiple rows in a single message PARTIALLY failed.
Expand All @@ -570,7 +571,14 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
}

if let Err(error) = res {
tracing::error!(parent: &parse_span, %error, "failed to parse message, skipping");
// TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
// see #13105
tracing::error!(
%error,
split_id = &*msg.split_id,
offset = msg.offset,
"failed to parse message, skipping"
);
parser.source_ctx().report_user_source_error(error);
}
}
Expand All @@ -579,14 +587,14 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
match txn_ctl {
TransactionControl::Begin { id } => {
if let Some(Transaction { id: current_id, .. }) = &current_transaction {
tracing::warn!(parent: &parse_span, current_id, id, "already in transaction");
tracing::warn!(current_id, id, "already in transaction");
}
current_transaction = Some(Transaction { id, len: 0 });
}
TransactionControl::Commit { id } => {
let current_id = current_transaction.as_ref().map(|t| &t.id);
if current_id != Some(&id) {
tracing::warn!(parent: &parse_span, ?current_id, id, "transaction id mismatch");
tracing::warn!(?current_id, id, "transaction id mismatch");
}
current_transaction = None;
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ pub type SplitId = Arc<str>;
pub struct SourceMessage {
pub key: Option<Vec<u8>>,
pub payload: Option<Vec<u8>>,
pub offset: String,
pub offset: String, // TODO: use `Arc<str>`
pub split_id: SplitId,
pub meta: SourceMeta,
}
Expand Down

0 comments on commit a6f1714

Please sign in to comment.