From 9439306004f406a9814fc7c32e1c7c8ec0f7c3ef Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Mon, 9 Dec 2024 16:15:46 +0800 Subject: [PATCH 1/3] Fix compatibility with Flink 1.20 JsonRowDataSerializationSchema --- .../json/ChangeLogJsonFormatFactory.java | 10 +- .../canal/CanalJsonSerializationSchema.java | 12 ++- .../DebeziumJsonSerializationSchema.java | 12 ++- .../JsonSerializationSchema.java | 12 ++- .../kafka/sink/KeySerializationFactory.java | 7 +- .../JsonRowDataSerializationSchemaUtils.java | 94 +++++++++++++++++++ 6 files changed, 135 insertions(+), 12 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java index cd2850316ee..ec336d1c50a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema; import org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema; +import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonFormatOptions; @@ -57,6 +58,9 @@ public static SerializationSchema createSerializationSchema( final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = + JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(formatOptions); + switch (type) { case DEBEZIUM_JSON: { @@ -65,7 +69,8 @@ public static SerializationSchema createSerializationSchema( mapNullKeyMode, mapNullKeyLiteral, zoneId, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } case CANAL_JSON: { @@ -74,7 +79,8 @@ public static SerializationSchema createSerializationSchema( mapNullKeyMode, mapNullKeyLiteral, zoneId, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } default: { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java index d0c6179752b..6a75aaf892f 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo; +import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; @@ -73,6 +74,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema private final boolean encodeDecimalAsPlainNumber; + private final boolean ignoreNullFields; + private final ZoneId zoneId; private InitializationContext context; @@ -82,13 +85,15 @@ public CanalJsonSerializationSchema( JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, ZoneId zoneId, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { this.timestampFormat = timestampFormat; this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyLiteral = mapNullKeyLiteral; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; this.zoneId = zoneId; jsonSerializers = new HashMap<>(); + this.ignoreNullFields = ignoreNullFields; } @Override @@ -114,12 +119,13 @@ public byte[] serialize(Event event) { LogicalType rowType = DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType(); JsonRowDataSerializationSchema jsonSerializer = - new JsonRowDataSerializationSchema( + JsonRowDataSerializationSchemaUtils.createSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); try { jsonSerializer.open(context); } catch (Exception e) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java index 15cecbc4f8e..238c2121daf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java @@ -27,6 +27,7 @@ import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo; +import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; @@ -72,6 +73,8 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema(); + this.ignoreNullFields = ignoreNullFields; } @Override @@ -113,12 +118,13 @@ public byte[] serialize(Event event) { LogicalType rowType = DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType(); JsonRowDataSerializationSchema jsonSerializer = - new JsonRowDataSerializationSchema( + JsonRowDataSerializationSchemaUtils.createSerializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); try { jsonSerializer.open(context); } catch (Exception e) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java index 5425d444ead..a9a0d845f58 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java @@ -33,6 +33,7 @@ import org.apache.flink.cdc.common.types.utils.DataTypeUtils; import org.apache.flink.cdc.common.utils.SchemaUtils; import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo; +import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonFormatOptions; import org.apache.flink.formats.json.JsonRowDataSerializationSchema; @@ -61,6 +62,8 @@ public class JsonSerializationSchema implements SerializationSchema { private final boolean encodeDecimalAsPlainNumber; + private final boolean ignoreNullFields; + private final ZoneId zoneId; private InitializationContext context; @@ -70,13 +73,15 @@ public JsonSerializationSchema( JsonFormatOptions.MapNullKeyMode mapNullKeyMode, String mapNullKeyLiteral, ZoneId zoneId, - boolean encodeDecimalAsPlainNumber) { + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { this.timestampFormat = timestampFormat; this.mapNullKeyMode = mapNullKeyMode; this.mapNullKeyLiteral = mapNullKeyLiteral; this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber; this.zoneId = zoneId; jsonSerializers = new HashMap<>(); + this.ignoreNullFields = ignoreNullFields; } @Override @@ -131,11 +136,12 @@ private JsonRowDataSerializationSchema buildSerializationForPrimaryKey(Schema sc // the row should never be null DataType dataType = DataTypes.ROW(fields).notNull(); LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType(); - return new JsonRowDataSerializationSchema( + return JsonRowDataSerializationSchemaUtils.createSerializationSchema( (RowType) rowType, timestampFormat, mapNullKeyMode, mapNullKeyLiteral, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java index 76132d8e547..339e0a10639 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java @@ -21,6 +21,7 @@ import org.apache.flink.cdc.common.event.Event; import org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema; import org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema; +import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.formats.common.TimestampFormat; import org.apache.flink.formats.json.JsonFormatOptions; @@ -54,12 +55,16 @@ public static SerializationSchema createSerializationSchema( final boolean encodeDecimalAsPlainNumber = formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER); + final boolean ignoreNullFields = + JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields( + formatOptions); return new JsonSerializationSchema( timestampFormat, mapNullKeyMode, mapNullKeyLiteral, zoneId, - encodeDecimalAsPlainNumber); + encodeDecimalAsPlainNumber, + ignoreNullFields); } case CSV: { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java new file mode 100644 index 00000000000..dbcd879a219 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.kafka.utils; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.formats.common.TimestampFormat; +import org.apache.flink.formats.json.JsonFormatOptions; +import org.apache.flink.formats.json.JsonRowDataSerializationSchema; +import org.apache.flink.table.types.logical.RowType; + +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.util.Arrays; + +/** Utils for creating JsonRowDataSerializationSchema. */ +public class JsonRowDataSerializationSchemaUtils { + + /** + * In flink>=1.20, the constructor of JsonRowDataSerializationSchema has 6 parameters, and in + * flink<1.20, the constructor of JsonRowDataSerializationSchema has 5 parameters. + */ + public static JsonRowDataSerializationSchema createSerializationSchema( + RowType rowType, + TimestampFormat timestampFormat, + JsonFormatOptions.MapNullKeyMode mapNullKeyMode, + String mapNullKeyLiteral, + boolean encodeDecimalAsPlainNumber, + boolean ignoreNullFields) { + try { + Class[] fullParams = + new Class[] { + RowType.class, + TimestampFormat.class, + JsonFormatOptions.MapNullKeyMode.class, + String.class, + boolean.class, + boolean.class + }; + + Object[] fullParamValues = + new Object[] { + rowType, + timestampFormat, + mapNullKeyMode, + mapNullKeyLiteral, + encodeDecimalAsPlainNumber, + ignoreNullFields + }; + + for (int i = fullParams.length; i >= 5; i--) { + try { + Constructor constructor = + JsonRowDataSerializationSchema.class.getConstructor( + Arrays.copyOfRange(fullParams, 0, i)); + + return (JsonRowDataSerializationSchema) + constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i)); + } catch (NoSuchMethodException ignored) { + } + } + } catch (Exception e) { + throw new RuntimeException("Failed to create JsonRowDataSerializationSchema", e); + } + throw new RuntimeException( + "Failed to find appropriate constructor for JsonRowDataSerializationSchema"); + } + + /** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */ + public static boolean enableIgnoreNullFields(ReadableConfig formatOptions) { + try { + Field field = JsonFormatOptions.class.getField("ENCODE_IGNORE_NULL_FIELDS"); + ConfigOption encodeOption = (ConfigOption) field.get(null); + return formatOptions.get(encodeOption); + } catch (NoSuchFieldException | IllegalAccessException e) { + return false; + } + } +} From c6d34cbb7bfa5bfa66ec9f0206bfe1f0f44824ad Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Mon, 9 Dec 2024 16:16:29 +0800 Subject: [PATCH 2/3] add MysqlToKafkaE2eITCase --- .../pom.xml | 13 + .../flink-cdc-pipeline-e2e-tests/pom.xml | 24 ++ .../pipeline/tests/MysqlToKafkaE2eITCase.java | 388 ++++++++++++++++++ .../mysqlToKafka/canal-json.txt | 37 ++ .../mysqlToKafka/debezium-json.txt | 37 ++ 5 files changed, 499 insertions(+) create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/canal-json.txt create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json.txt diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml index 72065d657ba..8e45607c807 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml @@ -115,6 +115,19 @@ limitations under the License. + + + org.apache.maven.plugins + maven-jar-plugin + + + test-jar + + test-jar + + + + diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index 31c4d2ec815..eb0e0f9aac4 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -98,6 +98,13 @@ limitations under the License. test-jar test + + org.apache.flink + flink-cdc-pipeline-connector-kafka + ${project.version} + test-jar + test + org.apache.flink flink-connector-test-util @@ -126,6 +133,13 @@ limitations under the License. ${scala.version} test + + + org.testcontainers + kafka + ${testcontainers.version} + test + @@ -245,6 +259,16 @@ limitations under the License. + + org.apache.flink + flink-cdc-pipeline-connector-kafka + ${project.version} + kafka-cdc-pipeline-connector.jar + jar + ${project.build.directory}/dependencies + + + org.apache.flink flink-cdc-pipeline-udf-examples diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java new file mode 100644 index 00000000000..bf306001963 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.pipeline.tests; + +import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.test.utils.TestUtils; +import org.apache.flink.cdc.connectors.kafka.sink.KafkaUtil; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.CreateTopicsResult; +import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.junit.After; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.apache.flink.util.DockerImageVersions.KAFKA; +import static org.assertj.core.api.Assertions.assertThat; + +/** End-to-end tests for mysql cdc to Kafka pipeline job. */ +@RunWith(Parameterized.class) +public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment { + private static final Logger LOG = LoggerFactory.getLogger(MysqlToKafkaE2eITCase.class); + + // ------------------------------------------------------------------------------------------ + // MySQL Variables (we always use MySQL as the data source for easier verifying) + // ------------------------------------------------------------------------------------------ + protected static final String MYSQL_TEST_USER = "mysqluser"; + protected static final String MYSQL_TEST_PASSWORD = "mysqlpw"; + protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql"; + protected static final long EVENT_WAITING_TIMEOUT = 60000L; + + private static AdminClient admin; + private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka"; + private static final int ZK_TIMEOUT_MILLIS = 30000; + private static final short TOPIC_REPLICATION_FACTOR = 1; + private TableId table; + private String topic; + private KafkaConsumer consumer; + + @ClassRule + public static final MySqlContainer MYSQL = + (MySqlContainer) + new MySqlContainer( + MySqlVersion.V8_0) // v8 support both ARM and AMD architectures + .withConfigurationOverride("docker/mysql/my.cnf") + .withSetupSQL("docker/mysql/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @ClassRule + public static final KafkaContainer KAFKA_CONTAINER = + KafkaUtil.createKafkaContainer(KAFKA, LOG) + .withNetworkAliases("kafka") + .withEmbeddedZookeeper() + .withNetwork(NETWORK) + .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS); + + protected final UniqueDatabase mysqlInventoryDatabase = + new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + + @BeforeClass + public static void initializeContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(MYSQL)).join(); + Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join(); + Map properties = new HashMap<>(); + properties.put( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + KAFKA_CONTAINER.getBootstrapServers()); + admin = AdminClient.create(properties); + LOG.info("Containers are started."); + } + + @Before + public void before() throws Exception { + super.before(); + createTestTopic(1, TOPIC_REPLICATION_FACTOR); + Properties properties = getKafkaClientConfiguration(); + consumer = new KafkaConsumer<>(properties); + consumer.subscribe(Collections.singletonList(topic)); + mysqlInventoryDatabase.createAndInitialize(); + } + + @After + public void after() { + super.after(); + admin.deleteTopics(Collections.singletonList(topic)); + consumer.close(); + mysqlInventoryDatabase.dropDatabase(); + } + + @Test + public void testSyncWholeDatabaseWithDebeziumJson() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: kafka\n" + + " properties.bootstrap.servers: kafka:9092\n" + + " topic: %s\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + topic, + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + List> collectedRecords = new ArrayList<>(); + int expectedEventCount = 13; + waitUntilSpecificEventCount(collectedRecords, expectedEventCount); + List expectedRecords = + getExpectedRecords("expectedEvents/mysqlToKafka/debezium-json.txt"); + assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords)); + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // modify table schema + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + expectedEventCount = 20; + waitUntilSpecificEventCount(collectedRecords, expectedEventCount); + assertThat(expectedRecords) + .containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords)); + } + + @Test + public void testSyncWholeDatabaseWithCanalJson() throws Exception { + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.\\.*\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "\n" + + "sink:\n" + + " type: kafka\n" + + " properties.bootstrap.servers: kafka:9092\n" + + " topic: %s\n" + + " value.format: canal-json\n" + + "\n" + + "pipeline:\n" + + " parallelism: %d", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + mysqlInventoryDatabase.getDatabaseName(), + topic, + parallelism); + Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar"); + Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar"); + Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar"); + submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + List> collectedRecords = new ArrayList<>(); + int expectedEventCount = 13; + waitUntilSpecificEventCount(collectedRecords, expectedEventCount); + List expectedRecords = + getExpectedRecords("expectedEvents/mysqlToKafka/canal-json.txt"); + assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords)); + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + mysqlInventoryDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;"); + stat.execute("UPDATE products SET weight='5.1' WHERE id=107;"); + + // modify table schema + stat.execute("ALTER TABLE products ADD COLUMN new_col INT;"); + stat.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110 + stat.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111 + stat.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + stat.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + stat.execute("DELETE FROM products WHERE id=111;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + expectedEventCount = 20; + waitUntilSpecificEventCount(collectedRecords, expectedEventCount); + assertThat(expectedRecords) + .containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords)); + } + + private void waitUntilSpecificEventCount( + List> actualEvent, int expectedCount) throws Exception { + boolean result = false; + long endTimeout = System.currentTimeMillis() + MysqlToKafkaE2eITCase.EVENT_WAITING_TIMEOUT; + while (System.currentTimeMillis() < endTimeout) { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + records.forEach(actualEvent::add); + if (actualEvent.size() == expectedCount) { + result = true; + break; + } + Thread.sleep(1000); + } + if (!result) { + throw new TimeoutException( + "failed to get specific event count: " + + expectedCount + + " from stdout: " + + actualEvent.size()); + } + } + + private static Properties getKafkaClientConfiguration() { + final Properties standardProps = new Properties(); + standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers()); + standardProps.put("group.id", UUID.randomUUID().toString()); + standardProps.put("enable.auto.commit", false); + standardProps.put("auto.offset.reset", "earliest"); + standardProps.put("max.partition.fetch.bytes", 256); + standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS); + standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS); + standardProps.put("key.deserializer", ByteArrayDeserializer.class.getName()); + standardProps.put("value.deserializer", ByteArrayDeserializer.class.getName()); + return standardProps; + } + + private void createTestTopic(int numPartitions, short replicationFactor) + throws ExecutionException, InterruptedException { + table = + TableId.tableId( + "default_namespace", "default_schema", UUID.randomUUID().toString()); + topic = table.toString(); + final CreateTopicsResult result = + admin.createTopics( + Collections.singletonList( + new NewTopic(topic, numPartitions, replicationFactor))); + result.all().get(); + } + + private List deserializeValues(List> records) + throws IOException { + List result = new ArrayList<>(); + for (ConsumerRecord record : records) { + result.add(new String(record.value(), "UTF-8")); + } + return result; + } + + protected List getExpectedRecords(String resourceDirFormat) throws Exception { + URL url = + MysqlToKafkaE2eITCase.class + .getClassLoader() + .getResource(String.format(resourceDirFormat)); + return Files.readAllLines(Paths.get(url.toURI())).stream() + .filter(this::isValidJsonRecord) + .map( + line -> + line.replace( + "$databaseName", mysqlInventoryDatabase.getDatabaseName())) + .collect(Collectors.toList()); + } + + protected boolean isValidJsonRecord(String line) { + try { + ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.readTree(line); + return !StringUtils.isEmpty(line); + } catch (JsonProcessingException e) { + return false; + } + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/canal-json.txt b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/canal-json.txt new file mode 100644 index 00000000000..ed649e2fcf5 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/canal-json.txt @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"old":null,"data":[{"id":103,"name":"user_3","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]} +{"old":null,"data":[{"id":104,"name":"user_4","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]} +{"old":null,"data":[{"id":102,"name":"user_2","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]} +{"old":null,"data":[{"id":101,"name":"user_1","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]} +{"old":null,"data":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"enum_c":"red","json_c":"{\"key3\": \"value3\"}","point_c":"{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"enum_c":"red","json_c":"{\"k1\": \"v1\", \"k2\": \"v2\"}","point_c":"{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"enum_c":"red","json_c":"{\"key1\": \"value1\"}","point_c":"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"enum_c":"white","json_c":"{\"key2\": \"value2\"}","point_c":"{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"enum_c":"white","json_c":"{\"key4\": \"value4\"}","point_c":"{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null}],"data":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1,"enum_c":null,"json_c":null,"point_c":null}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":[{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"data":[{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":null,"data":[{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":null,"type":"DELETE","database":"$databaseName","table":"products","pkNames":["id"]} +{"old":[{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":[{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]} \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json.txt b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json.txt new file mode 100644 index 00000000000..13f689aea07 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json.txt @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"before":null,"after":{"id":102,"name":"user_2","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}} +{"before":null,"after":{"id":101,"name":"user_1","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}} +{"before":null,"after":{"id":103,"name":"user_3","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}} +{"before":null,"after":{"id":104,"name":"user_4","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}} +{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"enum_c":"red","json_c":"{\"k1\": \"v1\", \"k2\": \"v2\"}","point_c":"{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"enum_c":"white","json_c":"{\"key2\": \"value2\"}","point_c":"{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"enum_c":"red","json_c":"{\"key1\": \"value1\"}","point_c":"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"enum_c":"white","json_c":"{\"key4\": \"value4\"}","point_c":"{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"enum_c":"red","json_c":"{\"key3\": \"value3\"}","point_c":"{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}} +{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}} +{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}} +{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}} +{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":null,"op":"d","source":{"db":"$databaseName","table":"products"}} From aed3a7a071b40be86159855a9212ed13f6bd3fe4 Mon Sep 17 00:00:00 2001 From: MOBIN-F <18814118038@163.com> Date: Thu, 12 Dec 2024 18:25:24 +0800 Subject: [PATCH 3/3] fix comment --- .../utils/JsonRowDataSerializationSchemaUtils.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java index dbcd879a219..e021e0f1bdf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java @@ -28,7 +28,10 @@ import java.lang.reflect.Field; import java.util.Arrays; -/** Utils for creating JsonRowDataSerializationSchema. */ +/** + * Utils for creating JsonRowDataSerializationSchema.TODO: Remove this class after bump to Flink + * 1.20 or higher. + */ public class JsonRowDataSerializationSchemaUtils { /** @@ -75,10 +78,12 @@ public static JsonRowDataSerializationSchema createSerializationSchema( } } } catch (Exception e) { - throw new RuntimeException("Failed to create JsonRowDataSerializationSchema", e); + throw new RuntimeException( + "Failed to create JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 1.20.", + e); } throw new RuntimeException( - "Failed to find appropriate constructor for JsonRowDataSerializationSchema"); + "Failed to find appropriate constructor for JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 1.20."); } /** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */