From c09d2647ef245d214da3e5b1826ae593a0b16d41 Mon Sep 17 00:00:00 2001 From: StrikeW Date: Fri, 2 Aug 2024 18:32:23 +0800 Subject: [PATCH] feat(cdc): parse debezium schema event for mysql (#17707) --- .../connector/source/core/DbzCdcEngine.java | 1 + .../source/core/DbzChangeEventConsumer.java | 67 +++++- proto/connector_service.proto | 11 +- proto/ddl_service.proto | 18 ++ src/common/src/types/jsonb.rs | 8 + src/connector/codec/src/decoder/mod.rs | 3 + src/connector/src/parser/debezium/mod.rs | 2 + .../src/parser/debezium/schema_change.rs | 94 ++++++++ .../src/parser/debezium/simd_json_parser.rs | 7 + src/connector/src/parser/mod.rs | 8 + src/connector/src/parser/plain_parser.rs | 210 ++++++++++++++++-- src/connector/src/parser/unified/debezium.rs | 98 ++++++++ .../src/source/cdc/external/mysql.rs | 53 ++++- .../src/source/cdc/source/message.rs | 38 +++- 14 files changed, 585 insertions(+), 33 deletions(-) create mode 100644 src/connector/src/parser/debezium/schema_change.rs diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java index ed22fe36416ec..9227d8225a65c 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzCdcEngine.java @@ -47,6 +47,7 @@ public DbzCdcEngine( sourceId, heartbeatTopicPrefix, transactionTopic, + topicPrefix, new ArrayBlockingQueue<>(DEFAULT_QUEUE_CAPACITY)); // Builds a debezium engine but not start it diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java index 98a0a171ec4cc..375b4d4a3ad62 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/core/DbzChangeEventConsumer.java @@ -17,6 +17,7 @@ import com.risingwave.connector.api.source.SourceTypeE; import com.risingwave.connector.cdc.debezium.internal.DebeziumOffset; import com.risingwave.connector.cdc.debezium.internal.DebeziumOffsetSerializer; +import com.risingwave.connector.source.common.CdcConnectorException; import com.risingwave.proto.ConnectorServiceProto.CdcMessage; import com.risingwave.proto.ConnectorServiceProto.GetEventStreamResponse; import io.debezium.connector.postgresql.PostgresOffsetContext; @@ -43,6 +44,7 @@ enum EventType { HEARTBEAT, TRANSACTION, DATA, + SCHEMA_CHANGE, } public class DbzChangeEventConsumer @@ -57,6 +59,7 @@ public class DbzChangeEventConsumer private final JsonConverter keyConverter; private final String heartbeatTopicPrefix; private final String transactionTopic; + private final String schemaChangeTopic; private volatile DebeziumEngine.RecordCommitter> currentRecordCommitter; @@ -66,12 +69,14 @@ public class DbzChangeEventConsumer long sourceId, String heartbeatTopicPrefix, String transactionTopic, + String schemaChangeTopic, BlockingQueue queue) { this.connector = connector; this.sourceId = sourceId; this.outputChannel = queue; this.heartbeatTopicPrefix = heartbeatTopicPrefix; this.transactionTopic = transactionTopic; + this.schemaChangeTopic = schemaChangeTopic; LOG.info("heartbeat topic: {}, trnx topic: {}", heartbeatTopicPrefix, transactionTopic); // The default JSON converter will output the schema field in the JSON which is unnecessary @@ -105,6 +110,8 @@ private EventType getEventType(SourceRecord record) { return EventType.HEARTBEAT; } else if (isTransactionMetaEvent(record)) { return EventType.TRANSACTION; + } else if (isSchemaChangeEvent(record)) { + return EventType.SCHEMA_CHANGE; } else { return EventType.DATA; } @@ -122,6 +129,11 @@ private boolean isTransactionMetaEvent(SourceRecord record) { return topic != null && topic.equals(transactionTopic); } + private boolean isSchemaChangeEvent(SourceRecord record) { + String topic = record.topic(); + return topic != null && topic.equals(schemaChangeTopic); + } + @Override public void handleBatch( List> events, @@ -155,7 +167,8 @@ var record = event.value(); switch (eventType) { case HEARTBEAT: { - var message = msgBuilder.build(); + var message = + msgBuilder.setMsgType(CdcMessage.CdcMessageType.HEARTBEAT).build(); LOG.debug("heartbeat => {}", message.getOffset()); respBuilder.addEvents(message); break; @@ -168,7 +181,7 @@ var record = event.value(); record.topic(), record.valueSchema(), record.value()); var message = msgBuilder - .setIsTransactionMeta(true) + .setMsgType(CdcMessage.CdcMessageType.TRANSACTION_META) .setPayload(new String(payload, StandardCharsets.UTF_8)) .setSourceTsMs(trxTs) .build(); @@ -176,6 +189,46 @@ var record = event.value(); respBuilder.addEvents(message); break; } + + case SCHEMA_CHANGE: + { + var sourceStruct = ((Struct) record.value()).getStruct("source"); + if (sourceStruct == null) { + throw new CdcConnectorException( + "source field is missing in schema change event"); + } + + // upstream event time + long sourceTsMs = sourceStruct.getInt64("ts_ms"); + byte[] payload = + payloadConverter.fromConnectData( + record.topic(), record.valueSchema(), record.value()); + + // We intentionally don't set the fullTableName for schema change event, + // since it doesn't need to be routed to a specific cdc table + var message = + msgBuilder + .setMsgType(CdcMessage.CdcMessageType.SCHEMA_CHANGE) + .setPayload(new String(payload, StandardCharsets.UTF_8)) + .setSourceTsMs(sourceTsMs) + .build(); + LOG.debug( + "offset => {}, key => {}, payload => {}", + message.getOffset(), + message.getKey(), + message.getPayload()); + respBuilder.addEvents(message); + + // emit the schema change event as a single response + respBuilder.setSourceId(sourceId); + var response = respBuilder.build(); + outputChannel.put(response); + + // reset the response builder + respBuilder = GetEventStreamResponse.newBuilder(); + break; + } + case DATA: { // Topic naming conventions @@ -192,10 +245,11 @@ var record = event.value(); } // get upstream event time from the "source" field var sourceStruct = ((Struct) record.value()).getStruct("source"); - long sourceTsMs = - sourceStruct == null - ? System.currentTimeMillis() - : sourceStruct.getInt64("ts_ms"); + if (sourceStruct == null) { + throw new CdcConnectorException( + "source field is missing in data change event"); + } + long sourceTsMs = sourceStruct.getInt64("ts_ms"); byte[] payload = payloadConverter.fromConnectData( record.topic(), record.valueSchema(), record.value()); @@ -208,6 +262,7 @@ var record = event.value(); String msgKey = key == null ? "" : new String(key, StandardCharsets.UTF_8); var message = msgBuilder + .setMsgType(CdcMessage.CdcMessageType.DATA) .setFullTableName(fullTableName) .setPayload(msgPayload) .setKey(msgKey) diff --git a/proto/connector_service.proto b/proto/connector_service.proto index da2c2b88087ea..cf549a8e2e493 100644 --- a/proto/connector_service.proto +++ b/proto/connector_service.proto @@ -147,13 +147,22 @@ message SinkCoordinatorStreamResponse { /* Source Service */ message CdcMessage { + enum CdcMessageType { + UNSPECIFIED = 0; + HEARTBEAT = 1; + DATA = 2; + TRANSACTION_META = 3; + SCHEMA_CHANGE = 4; + } + // The value of the Debezium message string payload = 1; string partition = 2; string offset = 3; string full_table_name = 4; int64 source_ts_ms = 5; - bool is_transaction_meta = 6; + CdcMessageType msg_type = 6; + // The key of the Debezium message, which only used by `mongodb-cdc` connector. string key = 7; } diff --git a/proto/ddl_service.proto b/proto/ddl_service.proto index 1b4f4e423949e..f78c08e2a9b52 100644 --- a/proto/ddl_service.proto +++ b/proto/ddl_service.proto @@ -5,6 +5,7 @@ package ddl_service; import "catalog.proto"; import "common.proto"; import "meta.proto"; +import "plan_common.proto"; import "stream_plan.proto"; option java_package = "com.risingwave.proto"; @@ -444,6 +445,23 @@ message CommentOnResponse { uint64 version = 2; } +message TableSchemaChange { + enum TableChangeType { + UNSPECIFIED = 0; + ALTER = 1; + CREATE = 2; + DROP = 3; + } + + TableChangeType change_type = 1; + string cdc_table_name = 2; + repeated plan_common.ColumnCatalog columns = 3; +} + +message SchemaChangeEnvelope { + repeated TableSchemaChange table_changes = 1; +} + service DdlService { rpc CreateDatabase(CreateDatabaseRequest) returns (CreateDatabaseResponse); rpc DropDatabase(DropDatabaseRequest) returns (DropDatabaseResponse); diff --git a/src/common/src/types/jsonb.rs b/src/common/src/types/jsonb.rs index 642b363a8c67e..fa80069080ff4 100644 --- a/src/common/src/types/jsonb.rs +++ b/src/common/src/types/jsonb.rs @@ -301,6 +301,14 @@ impl<'a> JsonbRef<'a> { .ok_or_else(|| format!("cannot cast jsonb {} to type boolean", self.type_name())) } + /// If the JSON is a string, returns the associated string. + pub fn as_string(&self) -> Result { + self.0 + .as_str() + .map(|s| s.to_owned()) + .ok_or_else(|| format!("cannot cast jsonb {} to type string", self.type_name())) + } + /// Attempt to read jsonb as a JSON number. /// /// According to RFC 8259, only number within IEEE 754 binary64 (double precision) has good diff --git a/src/connector/codec/src/decoder/mod.rs b/src/connector/codec/src/decoder/mod.rs index cd7fe14ab74ea..814e06a166c6c 100644 --- a/src/connector/codec/src/decoder/mod.rs +++ b/src/connector/codec/src/decoder/mod.rs @@ -44,6 +44,9 @@ pub enum AccessError { #[error(transparent)] NotImplemented(#[from] NotImplemented), + // NOTE: We intentionally don't embed `anyhow::Error` in `AccessError` since it happens + // in record-level and it might be too heavy to capture the backtrace + // when creating a new `anyhow::Error`. } pub type AccessResult = std::result::Result; diff --git a/src/connector/src/parser/debezium/mod.rs b/src/connector/src/parser/debezium/mod.rs index 5b5416e647268..8852bfc25eb41 100644 --- a/src/connector/src/parser/debezium/mod.rs +++ b/src/connector/src/parser/debezium/mod.rs @@ -17,7 +17,9 @@ mod avro_parser; mod debezium_parser; mod mongo_json_parser; +pub mod schema_change; pub mod simd_json_parser; + pub use avro_parser::*; pub use debezium_parser::*; pub use mongo_json_parser::DebeziumMongoJsonParser; diff --git a/src/connector/src/parser/debezium/schema_change.rs b/src/connector/src/parser/debezium/schema_change.rs new file mode 100644 index 0000000000000..4c61b52caaba9 --- /dev/null +++ b/src/connector/src/parser/debezium/schema_change.rs @@ -0,0 +1,94 @@ +// Copyright 2024 RisingWave Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::catalog::ColumnCatalog; +use risingwave_pb::ddl_service::table_schema_change::TableChangeType as PbTableChangeType; +use risingwave_pb::ddl_service::{ + SchemaChangeEnvelope as PbSchemaChangeEnvelope, TableSchemaChange as PbTableSchemaChange, +}; + +#[derive(Debug)] +pub struct SchemaChangeEnvelope { + pub table_changes: Vec, +} + +#[derive(Debug, Clone, Copy, PartialEq)] +pub(crate) enum TableChangeType { + Unspecified, + Alter, + Create, + Drop, +} + +impl TableChangeType { + #[allow(dead_code)] + pub fn from_proto(value: PbTableChangeType) -> Self { + match value { + PbTableChangeType::Alter => TableChangeType::Alter, + PbTableChangeType::Create => TableChangeType::Create, + PbTableChangeType::Drop => TableChangeType::Drop, + PbTableChangeType::Unspecified => TableChangeType::Unspecified, + } + } + + pub fn to_proto(self) -> PbTableChangeType { + match self { + TableChangeType::Alter => PbTableChangeType::Alter, + TableChangeType::Create => PbTableChangeType::Create, + TableChangeType::Drop => PbTableChangeType::Drop, + TableChangeType::Unspecified => PbTableChangeType::Unspecified, + } + } +} + +impl From<&str> for TableChangeType { + fn from(value: &str) -> Self { + match value { + "ALTER" => TableChangeType::Alter, + "CREATE" => TableChangeType::Create, + "DROP" => TableChangeType::Drop, + _ => TableChangeType::Unspecified, + } + } +} + +#[derive(Debug)] +pub struct TableSchemaChange { + pub(crate) cdc_table_name: String, + pub(crate) columns: Vec, + pub(crate) change_type: TableChangeType, +} + +impl SchemaChangeEnvelope { + pub fn to_protobuf(&self) -> PbSchemaChangeEnvelope { + let table_changes = self + .table_changes + .iter() + .map(|table_change| { + let columns = table_change + .columns + .iter() + .map(|column| column.to_protobuf()) + .collect(); + PbTableSchemaChange { + change_type: table_change.change_type.to_proto() as _, + cdc_table_name: table_change.cdc_table_name.clone(), + columns, + } + }) + .collect(); + + PbSchemaChangeEnvelope { table_changes } + } +} diff --git a/src/connector/src/parser/debezium/simd_json_parser.rs b/src/connector/src/parser/debezium/simd_json_parser.rs index f9738eb9e357e..08dd4ef7c2bdc 100644 --- a/src/connector/src/parser/debezium/simd_json_parser.rs +++ b/src/connector/src/parser/debezium/simd_json_parser.rs @@ -37,6 +37,13 @@ impl DebeziumJsonAccessBuilder { json_parse_options: JsonParseOptions::new_for_debezium(timestamptz_handling), }) } + + pub fn new_for_schema_event() -> ConnectorResult { + Ok(Self { + value: None, + json_parse_options: JsonParseOptions::default(), + }) + } } impl AccessBuilder for DebeziumJsonAccessBuilder { diff --git a/src/connector/src/parser/mod.rs b/src/connector/src/parser/mod.rs index 25a7d8c169ab5..055eab777be5b 100644 --- a/src/connector/src/parser/mod.rs +++ b/src/connector/src/parser/mod.rs @@ -89,6 +89,7 @@ mod unified; mod upsert_parser; mod util; +use debezium::schema_change::SchemaChangeEnvelope; pub use debezium::DEBEZIUM_IGNORE_KEY; use risingwave_common::bitmap::BitmapBuilder; pub use unified::{AccessError, AccessResult}; @@ -579,6 +580,9 @@ pub enum ParseResult { Rows, /// A transaction control message is parsed. TransactionControl(TransactionControl), + + /// A schema change message is parsed. + SchemaChange(SchemaChangeEnvelope), } #[derive(Clone, Copy, Debug, PartialEq)] @@ -829,6 +833,10 @@ async fn into_chunk_stream_inner( } } }, + + Ok(ParseResult::SchemaChange(_)) => { + // TODO + } } } diff --git a/src/connector/src/parser/plain_parser.rs b/src/connector/src/parser/plain_parser.rs index 1454ca8ade1fc..663fcb30e6ac9 100644 --- a/src/connector/src/parser/plain_parser.rs +++ b/src/connector/src/parser/plain_parser.rs @@ -23,10 +23,11 @@ use super::{ use crate::error::ConnectorResult; use crate::parser::bytes_parser::BytesAccessBuilder; use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder; -use crate::parser::unified::debezium::parse_transaction_meta; +use crate::parser::unified::debezium::{parse_schema_change, parse_transaction_meta}; use crate::parser::unified::AccessImpl; use crate::parser::upsert_parser::get_key_column_name; use crate::parser::{BytesProperties, ParseResult, ParserFormat}; +use crate::source::cdc::CdcMessageType; use crate::source::{SourceColumnDesc, SourceContext, SourceContextRef, SourceMeta}; /// Parser for `FORMAT PLAIN`, i.e., append-only source. @@ -38,6 +39,7 @@ pub struct PlainParser { pub source_ctx: SourceContextRef, // parsing transaction metadata for shared cdc source pub transaction_meta_builder: Option, + pub schema_change_builder: Option, } impl PlainParser { @@ -69,12 +71,18 @@ impl PlainParser { let transaction_meta_builder = Some(AccessBuilderImpl::DebeziumJson( DebeziumJsonAccessBuilder::new(TimestamptzHandling::GuessNumberUnit)?, )); + + let schema_change_builder = Some(AccessBuilderImpl::DebeziumJson( + DebeziumJsonAccessBuilder::new_for_schema_event()?, + )); + Ok(Self { key_builder, payload_builder, rw_columns, source_ctx, transaction_meta_builder, + schema_change_builder, }) } @@ -82,26 +90,62 @@ impl PlainParser { &mut self, key: Option>, payload: Option>, - mut writer: SourceStreamChunkRowWriter<'_>, + writer: SourceStreamChunkRowWriter<'_>, ) -> ConnectorResult { - // if the message is transaction metadata, parse it and return + // plain parser also used in the shared cdc source, + // we need to handle transaction metadata and schema change messages here if let Some(msg_meta) = writer.row_meta && let SourceMeta::DebeziumCdc(cdc_meta) = msg_meta.meta - && cdc_meta.is_transaction_meta && let Some(data) = payload { - let accessor = self - .transaction_meta_builder - .as_mut() - .expect("expect transaction metadata access builder") - .generate_accessor(data) - .await?; - return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props) { - Ok(transaction_control) => Ok(ParseResult::TransactionControl(transaction_control)), - Err(err) => Err(err)?, - }; + match cdc_meta.msg_type { + CdcMessageType::Data | CdcMessageType::Heartbeat => { + return self.parse_rows(key, Some(data), writer).await; + } + CdcMessageType::TransactionMeta => { + let accessor = self + .transaction_meta_builder + .as_mut() + .expect("expect transaction metadata access builder") + .generate_accessor(data) + .await?; + return match parse_transaction_meta(&accessor, &self.source_ctx.connector_props) + { + Ok(transaction_control) => { + Ok(ParseResult::TransactionControl(transaction_control)) + } + Err(err) => Err(err)?, + }; + } + CdcMessageType::SchemaChange => { + let accessor = self + .schema_change_builder + .as_mut() + .expect("expect schema change access builder") + .generate_accessor(data) + .await?; + + return match parse_schema_change(&accessor, &self.source_ctx.connector_props) { + Ok(schema_change) => Ok(ParseResult::SchemaChange(schema_change)), + Err(err) => Err(err)?, + }; + } + CdcMessageType::Unspecified => { + unreachable!() + } + } } + // for non-cdc source messages + self.parse_rows(key, payload, writer).await + } + + async fn parse_rows( + &mut self, + key: Option>, + payload: Option>, + mut writer: SourceStreamChunkRowWriter<'_>, + ) -> ConnectorResult { let mut row_op: KvEvent, AccessImpl<'_>> = KvEvent::default(); if let Some(data) = key @@ -158,11 +202,13 @@ mod tests { use std::ops::Deref; use std::sync::Arc; + use expect_test::expect; use futures::executor::block_on; use futures::StreamExt; use futures_async_stream::try_stream; use itertools::Itertools; use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; + use risingwave_pb::connector_service::cdc_message; use super::*; use crate::parser::{MessageMeta, SourceStreamChunkBuilder, TransactionControl}; @@ -281,7 +327,11 @@ mod tests { meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( "orders".to_string(), 0, - transactional, + if transactional { + cdc_message::CdcMessageType::TransactionMeta + } else { + cdc_message::CdcMessageType::Data + }, )), split_id: SplitId::from("1001"), offset: "0".into(), @@ -295,7 +345,7 @@ mod tests { meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( "orders".to_string(), 0, - false, + cdc_message::CdcMessageType::Data, )), split_id: SplitId::from("1001"), offset: "0".into(), @@ -309,7 +359,11 @@ mod tests { meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( "orders".to_string(), 0, - transactional, + if transactional { + cdc_message::CdcMessageType::TransactionMeta + } else { + cdc_message::CdcMessageType::Data + }, )), split_id: SplitId::from("1001"), offset: "0".into(), @@ -355,7 +409,11 @@ mod tests { let begin_msg = r#"{"schema":null,"payload":{"status":"BEGIN","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":null,"data_collections":null,"ts_ms":1704269323180}}"#; let commit_msg = r#"{"schema":null,"payload":{"status":"END","id":"3E11FA47-71CA-11E1-9E33-C80AA9429562:23","event_count":11,"data_collections":[{"data_collection":"public.orders_tx","event_count":5},{"data_collection":"public.person","event_count":6}],"ts_ms":1704269323180}}"#; - let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new("orders".to_string(), 0, true)); + let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( + "orders".to_string(), + 0, + cdc_message::CdcMessageType::TransactionMeta, + )); let msg_meta = MessageMeta { meta: &cdc_meta, split_id: "1001", @@ -393,4 +451,120 @@ mod tests { let output = builder.take(10); assert_eq!(0, output.cardinality()); } + + #[tokio::test] + async fn test_parse_schema_change() { + let schema = vec![ + ColumnCatalog { + column_desc: ColumnDesc::named("payload", ColumnId::placeholder(), DataType::Jsonb), + is_hidden: false, + }, + ColumnCatalog::offset_column(), + ColumnCatalog::cdc_table_name_column(), + ]; + + let columns = schema + .iter() + .map(|c| SourceColumnDesc::from(&c.column_desc)) + .collect::>(); + + // format plain encode json parser + let source_ctx = SourceContext { + connector_props: ConnectorProperties::MysqlCdc(Box::default()), + ..SourceContext::dummy() + }; + let mut parser = PlainParser::new( + SpecificParserConfig::DEFAULT_PLAIN_JSON, + columns.clone(), + Arc::new(source_ctx), + ) + .await + .unwrap(); + let mut builder = SourceStreamChunkBuilder::with_capacity(columns, 0); + + let msg = r#"{"schema":null,"payload": { "databaseName": "mydb", "ddl": "ALTER TABLE test add column v2 varchar(32)", "schemaName": null, "source": { "connector": "mysql", "db": "mydb", "file": "binlog.000065", "gtid": null, "name": "RW_CDC_1", "pos": 234, "query": null, "row": 0, "sequence": null, "server_id": 1, "snapshot": "false", "table": "test", "thread": null, "ts_ms": 1718354727000, "version": "2.4.2.Final" }, "tableChanges": [ { "id": "\"mydb\".\"test\"", "table": { "columns": [ { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 4, "length": null, "name": "id", "nativeType": null, "optional": false, "position": 1, "scale": null, "typeExpression": "INT", "typeName": "INT" }, { "autoIncremented": false, "charsetName": null, "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 2014, "length": null, "name": "v1", "nativeType": null, "optional": true, "position": 2, "scale": null, "typeExpression": "TIMESTAMP", "typeName": "TIMESTAMP" }, { "autoIncremented": false, "charsetName": "utf8mb4", "comment": null, "defaultValueExpression": null, "enumValues": null, "generated": false, "jdbcType": 12, "length": 32, "name": "v2", "nativeType": null, "optional": true, "position": 3, "scale": null, "typeExpression": "VARCHAR", "typeName": "VARCHAR" } ], "comment": null, "defaultCharsetName": "utf8mb4", "primaryKeyColumnNames": [ "id" ] }, "type": "ALTER" } ], "ts_ms": 1718354727594 }}"#; + let cdc_meta = SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( + "mydb.test".to_string(), + 0, + cdc_message::CdcMessageType::SchemaChange, + )); + let msg_meta = MessageMeta { + meta: &cdc_meta, + split_id: "1001", + offset: "", + }; + + let res = parser + .parse_one_with_txn( + None, + Some(msg.as_bytes().to_vec()), + builder.row_writer().with_meta(msg_meta), + ) + .await; + + let res = res.unwrap(); + expect![[r#" + SchemaChange( + SchemaChangeEnvelope { + table_changes: [ + TableSchemaChange { + cdc_table_name: "mydb.test", + columns: [ + ColumnCatalog { + column_desc: ColumnDesc { + data_type: Int32, + column_id: #2147483646, + name: "id", + field_descs: [], + type_name: "", + generated_or_default_column: None, + description: None, + additional_column: AdditionalColumn { + column_type: None, + }, + version: Pr13707, + }, + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc { + data_type: Timestamptz, + column_id: #2147483646, + name: "v1", + field_descs: [], + type_name: "", + generated_or_default_column: None, + description: None, + additional_column: AdditionalColumn { + column_type: None, + }, + version: Pr13707, + }, + is_hidden: false, + }, + ColumnCatalog { + column_desc: ColumnDesc { + data_type: Varchar, + column_id: #2147483646, + name: "v2", + field_descs: [], + type_name: "", + generated_or_default_column: None, + description: None, + additional_column: AdditionalColumn { + column_type: None, + }, + version: Pr13707, + }, + is_hidden: false, + }, + ], + change_type: Alter, + }, + ], + }, + ) + "#]] + .assert_debug_eq(&res); + } } diff --git a/src/connector/src/parser/unified/debezium.rs b/src/connector/src/parser/unified/debezium.rs index d90463698577d..e4ec3f9870b43 100644 --- a/src/connector/src/parser/unified/debezium.rs +++ b/src/connector/src/parser/unified/debezium.rs @@ -12,14 +12,20 @@ // See the License for the specific language governing permissions and // limitations under the License. +use itertools::Itertools; +use risingwave_common::catalog::{ColumnCatalog, ColumnDesc, ColumnId}; use risingwave_common::types::{ DataType, Datum, DatumCow, Scalar, ScalarImpl, ScalarRefImpl, Timestamptz, ToDatumRef, }; use risingwave_connector_codec::decoder::AccessExt; use risingwave_pb::plan_common::additional_column::ColumnType; +use thiserror_ext::AsReport; use super::{Access, AccessError, AccessResult, ChangeEvent, ChangeEventOperation}; +use crate::parser::debezium::schema_change::{SchemaChangeEnvelope, TableSchemaChange}; +use crate::parser::schema_change::TableChangeType; use crate::parser::TransactionControl; +use crate::source::cdc::external::mysql::{mysql_type_to_rw_type, type_name_to_mysql_type}; use crate::source::{ConnectorProperties, SourceColumnDesc}; // Example of Debezium JSON value: @@ -75,6 +81,8 @@ const OP: &str = "op"; pub const TRANSACTION_STATUS: &str = "status"; pub const TRANSACTION_ID: &str = "id"; +pub const TABLE_CHANGES: &str = "tableChanges"; + pub const DEBEZIUM_READ_OP: &str = "r"; pub const DEBEZIUM_CREATE_OP: &str = "c"; pub const DEBEZIUM_UPDATE_OP: &str = "u"; @@ -129,6 +137,96 @@ pub fn parse_transaction_meta( }) } +macro_rules! jsonb_access_field { + ($col:expr, $field:expr, $as_type:tt) => { + $crate::paste! { + $col.access_object_field($field).unwrap().[]().unwrap() + } + }; +} + +pub fn parse_schema_change( + accessor: &impl Access, + connector_props: &ConnectorProperties, +) -> AccessResult { + let mut schema_changes = vec![]; + + if let Some(ScalarRefImpl::List(table_changes)) = accessor + .access(&[TABLE_CHANGES], &DataType::List(Box::new(DataType::Jsonb)))? + .to_datum_ref() + { + for datum in table_changes.iter() { + let jsonb = match datum { + Some(ScalarRefImpl::Jsonb(jsonb)) => jsonb, + _ => unreachable!(""), + }; + + let id = jsonb_access_field!(jsonb, "id", string); + let ty = jsonb_access_field!(jsonb, "type", string); + let ddl_type: TableChangeType = ty.as_str().into(); + if matches!(ddl_type, TableChangeType::Create | TableChangeType::Drop) { + tracing::debug!("skip table schema change for create/drop command"); + continue; + } + + let mut column_descs: Vec = vec![]; + if let Some(table) = jsonb.access_object_field("table") + && let Some(columns) = table.access_object_field("columns") + { + for col in columns.array_elements().unwrap() { + let name = jsonb_access_field!(col, "name", string); + let type_name = jsonb_access_field!(col, "typeName", string); + + let data_type = match *connector_props { + ConnectorProperties::PostgresCdc(_) => { + unimplemented!() + } + ConnectorProperties::MysqlCdc(_) => { + let ty = type_name_to_mysql_type(type_name.as_str()); + match ty { + Some(ty) => mysql_type_to_rw_type(&ty).map_err(|err| { + tracing::warn!(error=%err.as_report(), "unsupported mysql type in schema change message"); + AccessError::UnsupportedType { + ty: type_name.clone(), + } + })?, + None => { + Err(AccessError::UnsupportedType { ty: type_name })? + } + } + } + _ => { + unreachable!() + } + }; + + column_descs.push(ColumnDesc::named(name, ColumnId::placeholder(), data_type)); + } + } + schema_changes.push(TableSchemaChange { + cdc_table_name: id.replace('"', ""), // remove the double quotes + columns: column_descs + .into_iter() + .map(|column_desc| ColumnCatalog { + column_desc, + is_hidden: false, + }) + .collect_vec(), + change_type: ty.as_str().into(), + }); + } + + Ok(SchemaChangeEnvelope { + table_changes: schema_changes, + }) + } else { + Err(AccessError::Undefined { + name: "table schema change".into(), + path: TABLE_CHANGES.into(), + }) + } +} + impl DebeziumChangeEvent where A: Access, diff --git a/src/connector/src/source/cdc/external/mysql.rs b/src/connector/src/source/cdc/external/mysql.rs index e5f53720dd6ee..6947ba7a46d6b 100644 --- a/src/connector/src/source/cdc/external/mysql.rs +++ b/src/connector/src/source/cdc/external/mysql.rs @@ -106,7 +106,7 @@ impl MySqlExternalTable { let mut column_descs = vec![]; let mut pk_names = vec![]; for col in columns { - let data_type = type_to_rw_type(&col.col_type)?; + let data_type = mysql_type_to_rw_type(&col.col_type)?; // column name in mysql is case-insensitive, convert to lowercase let col_name = col.name.to_lowercase(); column_descs.push(ColumnDesc::named( @@ -138,7 +138,56 @@ impl MySqlExternalTable { } } -fn type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { +pub fn type_name_to_mysql_type(ty_name: &str) -> Option { + macro_rules! column_type { + ($($name:literal => $variant:ident),* $(,)?) => { + match ty_name.to_lowercase().as_str() { + $( + $name => Some(ColumnType::$variant(Default::default())), + )* + _ => None, + } + }; + } + + column_type! { + "bit" => Bit, + "tinyint" => TinyInt, + "smallint" => SmallInt, + "mediumint" => MediumInt, + "int" => Int, + "bigint" => BigInt, + "decimal" => Decimal, + "float" => Float, + "double" => Double, + "time" => Time, + "datetime" => DateTime, + "timestamp" => Timestamp, + "char" => Char, + "nchar" => NChar, + "varchar" => Varchar, + "nvarchar" => NVarchar, + "binary" => Binary, + "varbinary" => Varbinary, + "text" => Text, + "tinytext" => TinyText, + "mediumtext" => MediumText, + "longtext" => LongText, + "blob" => Blob, + "enum" => Enum, + "set" => Set, + "geometry" => Geometry, + "point" => Point, + "linestring" => LineString, + "polygon" => Polygon, + "multipoint" => MultiPoint, + "multilinestring" => MultiLineString, + "multipolygon" => MultiPolygon, + "geometrycollection" => GeometryCollection, + } +} + +pub fn mysql_type_to_rw_type(col_type: &ColumnType) -> ConnectorResult { let dtype = match col_type { ColumnType::Serial => DataType::Int32, ColumnType::Bit(attr) => { diff --git a/src/connector/src/source/cdc/source/message.rs b/src/connector/src/source/cdc/source/message.rs index f12d18339b527..831e242fe78de 100644 --- a/src/connector/src/source/cdc/source/message.rs +++ b/src/connector/src/source/cdc/source/message.rs @@ -13,11 +13,32 @@ // limitations under the License. use risingwave_common::types::{DatumRef, ScalarRefImpl, Timestamptz}; -use risingwave_pb::connector_service::CdcMessage; +use risingwave_pb::connector_service::{cdc_message, CdcMessage}; use crate::source::base::SourceMessage; use crate::source::SourceMeta; +#[derive(Clone, Debug)] +pub enum CdcMessageType { + Unspecified, + Heartbeat, + Data, + TransactionMeta, + SchemaChange, +} + +impl From for CdcMessageType { + fn from(msg_type: cdc_message::CdcMessageType) -> Self { + match msg_type { + cdc_message::CdcMessageType::Data => CdcMessageType::Data, + cdc_message::CdcMessageType::Heartbeat => CdcMessageType::Heartbeat, + cdc_message::CdcMessageType::TransactionMeta => CdcMessageType::TransactionMeta, + cdc_message::CdcMessageType::SchemaChange => CdcMessageType::SchemaChange, + cdc_message::CdcMessageType::Unspecified => CdcMessageType::Unspecified, + } + } +} + #[derive(Debug, Clone)] pub struct DebeziumCdcMeta { db_name_prefix_len: usize, @@ -25,11 +46,11 @@ pub struct DebeziumCdcMeta { pub full_table_name: String, // extracted from `payload.source.ts_ms`, the time that the change event was made in the database pub source_ts_ms: i64, - // Whether the message is a transaction metadata - pub is_transaction_meta: bool, + pub msg_type: CdcMessageType, } impl DebeziumCdcMeta { + // These `extract_xxx` methods are used to support the `INCLUDE TIMESTAMP/DATABASE_NAME/TABLE_NAME` feature pub fn extract_timestamp(&self) -> DatumRef<'_> { Some(ScalarRefImpl::Timestamptz( Timestamptz::from_millis(self.source_ts_ms).unwrap(), @@ -48,20 +69,25 @@ impl DebeziumCdcMeta { )) } - pub fn new(full_table_name: String, source_ts_ms: i64, is_transaction_meta: bool) -> Self { + pub fn new( + full_table_name: String, + source_ts_ms: i64, + msg_type: cdc_message::CdcMessageType, + ) -> Self { // full_table_name is in the format of `database_name.table_name` let db_name_prefix_len = full_table_name.as_str().find('.').unwrap_or(0); Self { db_name_prefix_len, full_table_name, source_ts_ms, - is_transaction_meta, + msg_type: msg_type.into(), } } } impl From for SourceMessage { fn from(message: CdcMessage) -> Self { + let msg_type = message.get_msg_type().expect("invalid message type"); SourceMessage { key: if message.key.is_empty() { None // only data message has key @@ -78,7 +104,7 @@ impl From for SourceMessage { meta: SourceMeta::DebeziumCdc(DebeziumCdcMeta::new( message.full_table_name, message.source_ts_ms, - message.is_transaction_meta, + msg_type, )), } }