Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(connector): do not create span for parsing each row #13105

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 31 additions & 23 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,30 +143,32 @@ pub struct SourceStreamChunkRowWriter<'a> {
/// An optional meta data of the original message.
///
/// When this is set by `with_meta`, it'll be used to fill the columns of types other than [`SourceColumnType::Normal`].
row_meta: Option<MessageMeta>,
row_meta: Option<MessageMeta<'a>>,
}

/// The meta data of the original message for a row writer.
///
/// Extracted from the `SourceMessage`.
pub struct MessageMeta {
meta: SourceMeta,
offset: String,
#[derive(Clone, Copy)]
pub struct MessageMeta<'a> {
meta: &'a SourceMeta,
split_id: &'a str,
offset: &'a str,
}

impl MessageMeta {
impl MessageMeta<'_> {
/// Extract the value for the given column.
///
/// Returns `None` if the column is not a meta column.
fn value_for_column(&self, desc: &SourceColumnDesc) -> Option<Datum> {
fn value_for_column(self, desc: &SourceColumnDesc) -> Option<Datum> {
match desc.column_type {
// Row id columns are filled with `NULL` here and will be filled with the real
// row id generated by `RowIdGenExecutor` later.
SourceColumnType::RowId => Datum::None.into(),
// Extract the offset from the meta data.
SourceColumnType::Offset => Datum::Some(self.offset.as_str().into()).into(),
SourceColumnType::Offset => Datum::Some(self.offset.into()).into(),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = &self.meta => {
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => {
assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name");
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
Expand Down Expand Up @@ -274,11 +276,11 @@ impl OpAction for OpActionUpdate {
}
}

impl SourceStreamChunkRowWriter<'_> {
impl<'a> SourceStreamChunkRowWriter<'a> {
/// Set the meta data of the original message for this row writer.
///
/// This should always be called except for tests.
fn with_meta(self, row_meta: MessageMeta) -> Self {
fn with_meta(self, row_meta: MessageMeta<'a>) -> Self {
Self {
row_meta: Some(row_meta),
..self
Expand Down Expand Up @@ -308,8 +310,12 @@ impl SourceStreamChunkRowWriter<'_> {
Err(error) => {
// TODO: figure out a way to fill in not-null default value if user specifies one
// TODO: decide whether the error should not be ignored (e.g., even not a valid Debezium message)
// TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
// see #13105
tracing::warn!(
%error,
split_id = self.row_meta.as_ref().map(|m| m.split_id),
offset = self.row_meta.as_ref().map(|m| m.offset),
column = desc.name,
"failed to parse non-pk column, padding with `NULL`"
);
Expand Down Expand Up @@ -527,7 +533,7 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
for (i, msg) in batch.into_iter().enumerate() {
if msg.key.is_none() && msg.payload.is_none() {
if parser.parser_format() == ParserFormat::Debezium {
tracing::debug!("heartbeat message {}, skip parser", msg.offset);
tracing::debug!(offset = msg.offset, "skip parsing of heartbeat message");
// empty payload means a heartbeat in cdc source
// heartbeat message offset should not overwrite data messages offset
split_offset_mapping
Expand All @@ -537,25 +543,20 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream

continue;
}
let parse_span = tracing::info_span!(
"parse_one",
split_id = msg.split_id.as_ref(),
offset = msg.offset
);

split_offset_mapping.insert(msg.split_id, msg.offset.clone());
split_offset_mapping.insert(msg.split_id.clone(), msg.offset.clone());

let old_op_num = builder.op_num();
match parser
.parse_one_with_txn(
msg.key,
msg.payload,
builder.row_writer().with_meta(MessageMeta {
meta: msg.meta,
offset: msg.offset,
meta: &msg.meta,
split_id: &msg.split_id,
offset: &msg.offset,
}),
)
.instrument(parse_span.clone())
.await
{
// It's possible that parsing multiple rows in a single message PARTIALLY failed.
Expand All @@ -570,7 +571,14 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
}

if let Err(error) = res {
tracing::error!(parent: &parse_span, %error, "failed to parse message, skipping");
// TODO: not using tracing span to provide `split_id` and `offset` due to performance concern,
// see #13105
tracing::error!(
%error,
split_id = &*msg.split_id,
offset = msg.offset,
"failed to parse message, skipping"
);
parser.source_ctx().report_user_source_error(error);
}
}
Expand All @@ -579,14 +587,14 @@ async fn into_chunk_stream<P: ByteStreamSourceParser>(mut parser: P, data_stream
match txn_ctl {
TransactionControl::Begin { id } => {
if let Some(Transaction { id: current_id, .. }) = &current_transaction {
tracing::warn!(parent: &parse_span, current_id, id, "already in transaction");
tracing::warn!(current_id, id, "already in transaction");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Transaction is only applicable to CDC source, where split_id and offset is not that useful. So simply omit them here.

}
current_transaction = Some(Transaction { id, len: 0 });
}
TransactionControl::Commit { id } => {
let current_id = current_transaction.as_ref().map(|t| &t.id);
if current_id != Some(&id) {
tracing::warn!(parent: &parse_span, ?current_id, id, "transaction id mismatch");
tracing::warn!(?current_id, id, "transaction id mismatch");
}
current_transaction = None;
}
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/source/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ pub type SplitId = Arc<str>;
pub struct SourceMessage {
pub key: Option<Vec<u8>>,
pub payload: Option<Vec<u8>>,
pub offset: String,
pub offset: String, // TODO: use `Arc<str>`
pub split_id: SplitId,
pub meta: SourceMeta,
}
Expand Down
Loading