Skip to content

Commit

Permalink
fix(mysql-cdc): add missing data types when parse schema change event (
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Sep 25, 2024
1 parent 2c89c24 commit 0679b2f
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
19 changes: 17 additions & 2 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaCh
use crate::parser::schema_change::TableChangeType;
use crate::parser::TransactionControl;
use crate::source::cdc::build_cdc_table_id;
use crate::source::cdc::external::mysql::{mysql_type_to_rw_type, type_name_to_mysql_type};
use crate::source::cdc::external::mysql::{
mysql_type_to_rw_type, timestamp_val_to_timestamptz, type_name_to_mysql_type,
};
use crate::source::{ConnectorProperties, SourceColumnDesc};

// Example of Debezium JSON value:
Expand Down Expand Up @@ -224,7 +226,20 @@ pub fn parse_schema_change(
// handle default value expression, currently we only support constant expression
let column_desc = match col.access_object_field("defaultValueExpression") {
Some(default_val_expr_str) if !default_val_expr_str.is_jsonb_null() => {
let value_text = default_val_expr_str.as_string().unwrap();
let mut value_text = default_val_expr_str.as_string().unwrap();
// mysql timestamp is mapped to timestamptz, we use UTC timezone to
// interpret its value
if data_type == DataType::Timestamptz
&& matches!(*connector_props, ConnectorProperties::MysqlCdc(_))
{
value_text = timestamp_val_to_timestamptz(value_text.as_str()).map_err(|err| {
tracing::error!(target: "auto_schema_change", error=%err.as_report(), "failed to convert timestamp value to timestamptz");
AccessError::TypeError {
expected: "timestamp in YYYY-MM-DD HH:MM:SS".into(),
got: data_type.to_string(),
value: value_text,
}})?;
}
let snapshot_value: Datum = Some(
ScalarImpl::from_text(value_text.as_str(), &data_type).map_err(
|err| {
Expand Down
33 changes: 17 additions & 16 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,22 +137,7 @@ impl MySqlExternalTable {
// mysql timestamp is mapped to timestamptz, we use UTC timezone to
// interpret its value
if data_type == DataType::Timestamptz {
let format = "%Y-%m-%d %H:%M:%S";
let naive_datetime = NaiveDateTime::parse_from_str(
val.as_str(),
format,
)
.map_err(|err| {
anyhow!("failed to parse mysql timestamp value").context(err)
})?;
let postgres_timestamptz: DateTime<chrono::Utc> =
DateTime::<chrono::Utc>::from_naive_utc_and_offset(
naive_datetime,
chrono::Utc,
);
val = postgres_timestamptz
.format("%Y-%m-%d %H:%M:%S%:z")
.to_string();
val = timestamp_val_to_timestamptz(val.as_str())?;
}
match ScalarImpl::from_text(val.as_str(), &data_type) {
Ok(scalar) => Some(scalar),
Expand Down Expand Up @@ -203,6 +188,17 @@ impl MySqlExternalTable {
}
}

pub fn timestamp_val_to_timestamptz(value_text: &str) -> ConnectorResult<String> {
let format = "%Y-%m-%d %H:%M:%S";
let naive_datetime = NaiveDateTime::parse_from_str(value_text, format)
.map_err(|err| anyhow!("failed to parse mysql timestamp value").context(err))?;
let postgres_timestamptz: DateTime<chrono::Utc> =
DateTime::<chrono::Utc>::from_naive_utc_and_offset(naive_datetime, chrono::Utc);
Ok(postgres_timestamptz
.format("%Y-%m-%d %H:%M:%S%:z")
.to_string())
}

pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
macro_rules! column_type {
($($name:literal => $variant:ident),* $(,)?) => {
Expand All @@ -211,6 +207,11 @@ pub fn type_name_to_mysql_type(ty_name: &str) -> Option<ColumnType> {
$name => Some(ColumnType::$variant(Default::default())),
)*
"json" => Some(ColumnType::Json),
"date" => Some(ColumnType::Date),
"bool" => Some(ColumnType::Bool),
"tinyblob" => Some(ColumnType::TinyBlob),
"mediumblob" => Some(ColumnType::MediumBlob),
"longblob" => Some(ColumnType::LongBlob),
_ => None,
}
};
Expand Down

0 comments on commit 0679b2f

Please sign in to comment.