Skip to content

Commit

Permalink
feat(cdc): support default column in auto schema mapping (#18571)
Browse files Browse the repository at this point in the history
  • Loading branch information
StrikeW authored Sep 24, 2024
1 parent 75cc17b commit de73970
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 29 deletions.
40 changes: 40 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_mysql.slt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,22 @@ mysql --protocol=tcp -u root mytest -e "
);
INSERT INTO mysql_types_test VALUES ( False, 0, null, null, -8388608, -2147483647, 9223372036854775806, -10.0, -9999.999999, -10000.0, 'c', 'd', '', '', '1001-01-01', '-838:59:59.000000', '2000-01-01 00:00:00.000000', null, 'happy', '[1,2]');
INSERT INTO mysql_types_test VALUES ( True, 1, -128, -32767, -8388608, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'a', 'b', '', '', '1001-01-01', '00:00:00', '1998-01-01 00:00:00.000000', '1970-01-01 00:00:01', 'sad', '[3,4]');
CREATE TABLE IF NOT EXISTS test_default(
id int,
name varchar(255) DEFAULT 'default_name',
age int DEFAULT 18,
v1 real DEFAULT 1.1,
v2 double precision DEFAULT 2.2,
v3 decimal(5,2) DEFAULT 3.3,
v4 boolean DEFAULT false,
v5 date DEFAULT '2020-01-01',
v6 time DEFAULT '12:34:56',
v7 timestamp DEFAULT '2020-01-01 12:34:56',
v8 datetime DEFAULT '2020-01-01 12:34:56',
v9 VARCHAR(255) DEFAULT (UUID()),
PRIMARY KEY (id)
);
INSERT INTO test_default(id) VALUES (1),(2);
"

statement ok
Expand Down Expand Up @@ -70,6 +86,30 @@ HINT: Please define the schema manually
statement ok
ALTER SYSTEM SET license_key TO DEFAULT;

statement ok
create table test_default (*) from mysql_source table 'mytest.test_default';

sleep 3s

query TTTTTTTTTTTTT
SELECT count(*) FROM test_default;
----
2

statement ok
insert into test_default(id) values (4),(5);

statement ok
FLUSH;


# uuid() default expression is not supported
query TTTTTTTTTTTTTT
SELECT * FROM test_default where id>=4 order by id;
----
4 default_name 18 1.1 2.2 3.3 0 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 2020-01-01 12:34:56 NULL
5 default_name 18 1.1 2.2 3.3 0 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 2020-01-01 12:34:56 NULL

statement ok
create table rw_customers (*) from mysql_source table 'mytest.customers';

Expand Down
37 changes: 37 additions & 0 deletions e2e_test/source/cdc_inline/auto_schema_map_pg.slt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,23 @@ psql -c "
INSERT INTO postgres_types_test VALUES ( False, -32767, -2147483647, -9223372036854775807, -10.0, -9999.999999, -10000.0, 'd', '00'::bytea, '0001-01-01', '00:00:00', '2001-01-01 00:00:00'::timestamp, '2001-01-01 00:00:00-8'::timestamptz, interval '0 second', '{}', 'bb488f9b-330d-4012-b849-12adeb49e57e', 'happy', array[False::boolean]::boolean[], array[-32767::smallint]::smallint[], array[-2147483647::integer]::integer[], array[-9223372036854775807::bigint]::bigint[], array[-10.0::decimal]::decimal[], array[-9999.999999::real]::real[], array[-10000.0::double precision]::double precision[], array[''::varchar]::varchar[], array['00'::bytea]::bytea[], array['0001-01-01'::date]::date[], array['00:00:00'::time]::time[], array['2001-01-01 00:00:00'::timestamp::timestamp]::timestamp[], array['2001-01-01 00:00:00-8'::timestamptz::timestamptz]::timestamptz[], array[interval '0 second'::interval]::interval[], array['{}'::jsonb]::jsonb[], '{bb488f9b-330d-4012-b849-12adeb49e57e}', '{happy,ok,sad}');
INSERT INTO postgres_types_test VALUES ( False, 1, 123, 1234567890, 123.45, 123.45, 123.456, 'a_varchar', 'DEADBEEF'::bytea, '0024-01-01', '12:34:56', '2024-05-19 12:34:56', '2024-05-19 12:34:56+00', INTERVAL '1 day', to_jsonb('hello'::text), '123e4567-e89b-12d3-a456-426614174000', 'happy', ARRAY[NULL, TRUE]::boolean[], ARRAY[NULL, 1::smallint], ARRAY[NULL, 123], ARRAY[NULL, 1234567890], ARRAY[NULL, 123.45::numeric], ARRAY[NULL, 123.45::real], ARRAY[NULL, 123.456], ARRAY[NULL, 'a_varchar'], ARRAY[NULL, 'DEADBEEF'::bytea], ARRAY[NULL, '2024-05-19'::date], ARRAY[NULL, '12:34:56'::time], ARRAY[NULL, '2024-05-19 12:34:56'::timestamp], ARRAY[NULL, '2024-05-19 12:34:56+00'::timestamptz], ARRAY[NULL, INTERVAL '1 day'], ARRAY[NULL, to_jsonb('hello'::text)], ARRAY[NULL, '123e4567-e89b-12d3-a456-426614174000'::uuid], ARRAY[NULL, 'happy'::mood]);
INSERT INTO postgres_types_test VALUES ( False, NULL, NULL, 1, NULL, NULL, NULL, NULL, NULL, '0024-05-19', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
CREATE TABLE IF NOT EXISTS test_default(
id int,
name varchar(255) DEFAULT 'default_name',
age int DEFAULT 18,
v1 real DEFAULT 1.1,
v2 double precision DEFAULT 2.2,
v3 numeric DEFAULT 3.3,
v4 boolean DEFAULT false,
v5 date DEFAULT '2020-01-01',
v6 time DEFAULT '12:34:56',
v7 timestamp DEFAULT '2020-01-01 12:34:56',
v8 timestamptz DEFAULT '2020-01-01 12:34:56+00',
v9 interval DEFAULT '1 day',
v10 jsonb DEFAULT '{}',
PRIMARY KEY (id)
);
INSERT INTO test_default(id,name,age) VALUES (1, 'name1', 20), (2, 'name2', 21), (3, 'name3', 22);
"

statement ok
Expand All @@ -58,6 +75,26 @@ create source pg_source with (
slot.name = 'pg_slot'
);

statement ok
create table test_default (*) from pg_source table 'public.test_default';

sleep 3s

statement ok
insert into test_default(id) values (4),(5);

statement ok
FLUSH;

query TTTTTTTTTTTTT
SELECT * from test_default order by id;
----
1 name1 20 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
2 name2 21 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
3 name3 22 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
4 default_name 18 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}
5 default_name 18 1.1 2.2 3.3 f 2020-01-01 12:34:56 2020-01-01 12:34:56 2020-01-01 12:34:56+00:00 1 day {}


statement ok
create table rw_postgres_types_test (*) from pg_source table 'public.postgres_types_test';
Expand Down
16 changes: 14 additions & 2 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::borrow::Cow;

use itertools::Itertools;
use risingwave_common::types::Datum;
use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::column_desc::GeneratedOrDefaultColumn;
use risingwave_pb::plan_common::{
Expand All @@ -24,6 +26,7 @@ use risingwave_pb::plan_common::{
use super::{row_id_column_desc, USER_COLUMN_ID_OFFSET};
use crate::catalog::{cdc_table_name_column_desc, offset_column_desc, Field, ROW_ID_COLUMN_ID};
use crate::types::DataType;
use crate::util::value_encoding::DatumToProtoExt;

/// Column ID is the unique identifier of a column in a table. Different from table ID, column ID is
/// not globally unique.
Expand Down Expand Up @@ -144,10 +147,19 @@ impl ColumnDesc {
name: impl Into<String>,
column_id: ColumnId,
data_type: DataType,
default_val: DefaultColumnDesc,
snapshot_value: Datum,
) -> ColumnDesc {
let default_col = DefaultColumnDesc {
expr: Some(ExprNode {
// equivalent to `Literal::to_expr_proto`
function_type: ExprType::Unspecified as i32,
return_type: Some(data_type.to_protobuf()),
rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
}),
snapshot_value: Some(snapshot_value.to_protobuf()),
};
ColumnDesc {
generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_val)),
generated_or_default_column: Some(GeneratedOrDefaultColumn::DefaultColumn(default_col)),
..Self::named(name, column_id, data_type)
}
}
Expand Down
15 changes: 1 addition & 14 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ use risingwave_common::types::{
DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef,
ToOwnedDatum,
};
use risingwave_common::util::value_encoding::DatumToProtoExt;
use risingwave_connector_codec::decoder::AccessExt;
use risingwave_pb::expr::expr_node::{RexNode, Type as ExprType};
use risingwave_pb::expr::ExprNode;
use risingwave_pb::plan_common::additional_column::ColumnType;
use risingwave_pb::plan_common::DefaultColumnDesc;
use thiserror_ext::AsReport;

use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation};
Expand Down Expand Up @@ -240,20 +236,11 @@ pub fn parse_schema_change(
}},
)?,
);
// equivalent to `Literal::to_expr_proto`
let default_val_expr_node = ExprNode {
function_type: ExprType::Unspecified as i32,
return_type: Some(data_type.to_protobuf()),
rex_node: Some(RexNode::Constant(snapshot_value.to_protobuf())),
};
ColumnDesc::named_with_default_value(
name,
ColumnId::placeholder(),
data_type,
DefaultColumnDesc {
expr: Some(default_val_expr_node),
snapshot_value: Some(snapshot_value.to_protobuf()),
},
snapshot_value,
)
}
_ => ColumnDesc::named(name, ColumnId::placeholder(), data_type),
Expand Down
76 changes: 69 additions & 7 deletions src/connector/src/source/cdc/external/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::collections::HashMap;

use anyhow::{anyhow, Context};
use chrono::{DateTime, NaiveDateTime};
use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
Expand All @@ -25,15 +26,16 @@ use mysql_common::value::Value;
use risingwave_common::bail;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, OFFSET_COLUMN_NAME};
use risingwave_common::row::OwnedRow;
use risingwave_common::types::DataType;
use risingwave_common::types::{DataType, Decimal, ScalarImpl, F32};
use risingwave_common::util::iter_util::ZipEqFast;
use sea_schema::mysql::def::{ColumnKey, ColumnType};
use sea_schema::mysql::def::{ColumnDefault, ColumnKey, ColumnType};
use sea_schema::mysql::discovery::SchemaDiscovery;
use sea_schema::mysql::query::SchemaQueryBuilder;
use sea_schema::sea_query::{Alias, IntoIden};
use serde_derive::{Deserialize, Serialize};
use sqlx::mysql::MySqlConnectOptions;
use sqlx::MySqlPool;
use thiserror_ext::AsReport;

use crate::error::{ConnectorError, ConnectorResult};
use crate::source::cdc::external::{
Expand Down Expand Up @@ -112,11 +114,71 @@ impl MySqlExternalTable {
let data_type = mysql_type_to_rw_type(&col.col_type)?;
// column name in mysql is case-insensitive, convert to lowercase
let col_name = col.name.to_lowercase();
column_descs.push(ColumnDesc::named(
col_name.clone(),
ColumnId::placeholder(),
data_type,
));
let column_desc = if let Some(default) = col.default {
let snapshot_value = match default {
ColumnDefault::Null => None,
ColumnDefault::Int(val) => match data_type {
DataType::Int16 => Some(ScalarImpl::Int16(val as _)),
DataType::Int32 => Some(ScalarImpl::Int32(val as _)),
DataType::Int64 => Some(ScalarImpl::Int64(val)),
_ => Err(anyhow!("unexpected default value type for integer column"))?,
},
ColumnDefault::Real(val) => match data_type {
DataType::Float32 => Some(ScalarImpl::Float32(F32::from(val as f32))),
DataType::Float64 => Some(ScalarImpl::Float64(val.into())),
DataType::Decimal => Some(ScalarImpl::Decimal(
Decimal::try_from(val).map_err(|err| {
anyhow!("failed to convert default value to decimal").context(err)
})?,
)),
_ => Err(anyhow!("unexpected default value type for float column"))?,
},
ColumnDefault::String(mut val) => {
// mysql timestamp is mapped to timestamptz, we use UTC timezone to
// interpret its value
if data_type == DataType::Timestamptz {
let format = "%Y-%m-%d %H:%M:%S";
let naive_datetime = NaiveDateTime::parse_from_str(
val.as_str(),
format,
)
.map_err(|err| {
anyhow!("failed to parse mysql timestamp value").context(err)
})?;
let postgres_timestamptz: DateTime<chrono::Utc> =
DateTime::<chrono::Utc>::from_naive_utc_and_offset(
naive_datetime,
chrono::Utc,
);
val = postgres_timestamptz
.format("%Y-%m-%d %H:%M:%S%:z")
.to_string();
}
match ScalarImpl::from_text(val.as_str(), &data_type) {
Ok(scalar) => Some(scalar),
Err(err) => {
tracing::warn!(error=%err.as_report(), "failed to parse mysql default value expression, only constant is supported");
None
}
}
}
ColumnDefault::CurrentTimestamp | ColumnDefault::CustomExpr(_) => {
tracing::warn!("MySQL CURRENT_TIMESTAMP and custom expression default value not supported");
None
}
};

ColumnDesc::named_with_default_value(
col_name.clone(),
ColumnId::placeholder(),
data_type.clone(),
snapshot_value,
)
} else {
ColumnDesc::named(col_name.clone(), ColumnId::placeholder(), data_type)
};

column_descs.push(column_desc);
if matches!(col.key, ColumnKey::Primary) {
pk_names.push(col_name);
}
Expand Down
33 changes: 27 additions & 6 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
use postgres_openssl::MakeTlsConnector;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{DataType, StructType};
use risingwave_common::types::{DataType, ScalarImpl, StructType};
use risingwave_common::util::iter_util::ZipEqFast;
use sea_schema::postgres::def::{ColumnType, TableInfo};
use sea_schema::postgres::discovery::SchemaDiscovery;
Expand Down Expand Up @@ -123,11 +123,32 @@ impl PostgresExternalTable {
let mut column_descs = vec![];
for col in &table_schema.columns {
let data_type = type_to_rw_type(&col.col_type)?;
column_descs.push(ColumnDesc::named(
col.name.clone(),
ColumnId::placeholder(),
data_type,
));
let column_desc = if let Some(ref default_expr) = col.default {
// parse the value of "column_default" field in information_schema.columns,
// non number data type will be stored as "'value'::type"
let val_text = default_expr
.0
.split("::")
.map(|s| s.trim_matches('\''))
.next()
.expect("default value expression");

match ScalarImpl::from_text(val_text, &data_type) {
Ok(scalar) => ColumnDesc::named_with_default_value(
col.name.clone(),
ColumnId::placeholder(),
data_type.clone(),
Some(scalar),
),
Err(err) => {
tracing::warn!(error=%err.as_report(), "failed to parse postgres default value expression, only constant is supported");
ColumnDesc::named(col.name.clone(), ColumnId::placeholder(), data_type)
}
}
} else {
ColumnDesc::named(col.name.clone(), ColumnId::placeholder(), data_type)
};
column_descs.push(column_desc);
}

if table_schema.primary_key_constraints.is_empty() {
Expand Down

0 comments on commit de73970

Please sign in to comment.