diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
index c8d41fe5eba..1ebaef2c45c 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
@@ -115,6 +115,19 @@ limitations under the License.
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+
+
+ test-jar
+
+ test-jar
+
+
+
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
index cd2850316ee..ec336d1c50a 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/ChangeLogJsonFormatFactory.java
@@ -22,6 +22,7 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.kafka.json.canal.CanalJsonSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.json.debezium.DebeziumJsonSerializationSchema;
+import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
@@ -57,6 +58,9 @@ public static SerializationSchema createSerializationSchema(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields =
+ JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(formatOptions);
+
switch (type) {
case DEBEZIUM_JSON:
{
@@ -65,7 +69,8 @@ public static SerializationSchema createSerializationSchema(
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
case CANAL_JSON:
{
@@ -74,7 +79,8 @@ public static SerializationSchema createSerializationSchema(
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
default:
{
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
index d0c6179752b..6a75aaf892f 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
@@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
+import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@@ -73,6 +74,8 @@ public class CanalJsonSerializationSchema implements SerializationSchema
private final boolean encodeDecimalAsPlainNumber;
+ private final boolean ignoreNullFields;
+
private final ZoneId zoneId;
private InitializationContext context;
@@ -82,13 +85,15 @@ public CanalJsonSerializationSchema(
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
+ this.ignoreNullFields = ignoreNullFields;
}
@Override
@@ -114,12 +119,13 @@ public byte[] serialize(Event event) {
LogicalType rowType =
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
JsonRowDataSerializationSchema jsonSerializer =
- new JsonRowDataSerializationSchema(
+ JsonRowDataSerializationSchemaUtils.createSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
try {
jsonSerializer.open(context);
} catch (Exception e) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
index 15cecbc4f8e..238c2121daf 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
@@ -27,6 +27,7 @@
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
+import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@@ -72,6 +73,8 @@ public class DebeziumJsonSerializationSchema implements SerializationSchema();
+ this.ignoreNullFields = ignoreNullFields;
}
@Override
@@ -113,12 +118,13 @@ public byte[] serialize(Event event) {
LogicalType rowType =
DataTypeUtils.toFlinkDataType(schema.toRowDataType()).getLogicalType();
JsonRowDataSerializationSchema jsonSerializer =
- new JsonRowDataSerializationSchema(
+ JsonRowDataSerializationSchemaUtils.createSerializationSchema(
createJsonRowType(fromLogicalToDataType(rowType)),
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
try {
jsonSerializer.open(context);
} catch (Exception e) {
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
index 5425d444ead..a9a0d845f58 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/serialization/JsonSerializationSchema.java
@@ -33,6 +33,7 @@
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.SchemaUtils;
import org.apache.flink.cdc.connectors.kafka.json.TableSchemaInfo;
+import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
@@ -61,6 +62,8 @@ public class JsonSerializationSchema implements SerializationSchema {
private final boolean encodeDecimalAsPlainNumber;
+ private final boolean ignoreNullFields;
+
private final ZoneId zoneId;
private InitializationContext context;
@@ -70,13 +73,15 @@ public JsonSerializationSchema(
JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
String mapNullKeyLiteral,
ZoneId zoneId,
- boolean encodeDecimalAsPlainNumber) {
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
this.timestampFormat = timestampFormat;
this.mapNullKeyMode = mapNullKeyMode;
this.mapNullKeyLiteral = mapNullKeyLiteral;
this.encodeDecimalAsPlainNumber = encodeDecimalAsPlainNumber;
this.zoneId = zoneId;
jsonSerializers = new HashMap<>();
+ this.ignoreNullFields = ignoreNullFields;
}
@Override
@@ -131,11 +136,12 @@ private JsonRowDataSerializationSchema buildSerializationForPrimaryKey(Schema sc
// the row should never be null
DataType dataType = DataTypes.ROW(fields).notNull();
LogicalType rowType = DataTypeUtils.toFlinkDataType(dataType).getLogicalType();
- return new JsonRowDataSerializationSchema(
+ return JsonRowDataSerializationSchemaUtils.createSerializationSchema(
(RowType) rowType,
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
index 76132d8e547..339e0a10639 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/sink/KeySerializationFactory.java
@@ -21,6 +21,7 @@
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.kafka.serialization.CsvSerializationSchema;
import org.apache.flink.cdc.connectors.kafka.serialization.JsonSerializationSchema;
+import org.apache.flink.cdc.connectors.kafka.utils.JsonRowDataSerializationSchemaUtils;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.formats.common.TimestampFormat;
import org.apache.flink.formats.json.JsonFormatOptions;
@@ -54,12 +55,16 @@ public static SerializationSchema createSerializationSchema(
final boolean encodeDecimalAsPlainNumber =
formatOptions.get(ENCODE_DECIMAL_AS_PLAIN_NUMBER);
+ final boolean ignoreNullFields =
+ JsonRowDataSerializationSchemaUtils.enableIgnoreNullFields(
+ formatOptions);
return new JsonSerializationSchema(
timestampFormat,
mapNullKeyMode,
mapNullKeyLiteral,
zoneId,
- encodeDecimalAsPlainNumber);
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields);
}
case CSV:
{
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
new file mode 100644
index 00000000000..e021e0f1bdf
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/JsonRowDataSerializationSchemaUtils.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.kafka.utils;
+
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.formats.common.TimestampFormat;
+import org.apache.flink.formats.json.JsonFormatOptions;
+import org.apache.flink.formats.json.JsonRowDataSerializationSchema;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+
+/**
+ * Utils for creating JsonRowDataSerializationSchema.TODO: Remove this class after bump to Flink
+ * 1.20 or higher.
+ */
+public class JsonRowDataSerializationSchemaUtils {
+
+ /**
+ * In flink>=1.20, the constructor of JsonRowDataSerializationSchema has 6 parameters, and in
+ * flink<1.20, the constructor of JsonRowDataSerializationSchema has 5 parameters.
+ */
+ public static JsonRowDataSerializationSchema createSerializationSchema(
+ RowType rowType,
+ TimestampFormat timestampFormat,
+ JsonFormatOptions.MapNullKeyMode mapNullKeyMode,
+ String mapNullKeyLiteral,
+ boolean encodeDecimalAsPlainNumber,
+ boolean ignoreNullFields) {
+ try {
+ Class>[] fullParams =
+ new Class[] {
+ RowType.class,
+ TimestampFormat.class,
+ JsonFormatOptions.MapNullKeyMode.class,
+ String.class,
+ boolean.class,
+ boolean.class
+ };
+
+ Object[] fullParamValues =
+ new Object[] {
+ rowType,
+ timestampFormat,
+ mapNullKeyMode,
+ mapNullKeyLiteral,
+ encodeDecimalAsPlainNumber,
+ ignoreNullFields
+ };
+
+ for (int i = fullParams.length; i >= 5; i--) {
+ try {
+ Constructor> constructor =
+ JsonRowDataSerializationSchema.class.getConstructor(
+ Arrays.copyOfRange(fullParams, 0, i));
+
+ return (JsonRowDataSerializationSchema)
+ constructor.newInstance(Arrays.copyOfRange(fullParamValues, 0, i));
+ } catch (NoSuchMethodException ignored) {
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to create JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 1.20.",
+ e);
+ }
+ throw new RuntimeException(
+ "Failed to find appropriate constructor for JsonRowDataSerializationSchema,please check your Flink version is 1.19 or 1.20.");
+ }
+
+ /** flink>=1.20 only has the ENCODE_IGNORE_NULL_FIELDS parameter. */
+ public static boolean enableIgnoreNullFields(ReadableConfig formatOptions) {
+ try {
+ Field field = JsonFormatOptions.class.getField("ENCODE_IGNORE_NULL_FIELDS");
+ ConfigOption encodeOption = (ConfigOption) field.get(null);
+ return formatOptions.get(encodeOption);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ return false;
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 31c4d2ec815..eb0e0f9aac4 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -98,6 +98,13 @@ limitations under the License.
test-jar
test
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-kafka
+ ${project.version}
+ test-jar
+ test
+
org.apache.flink
flink-connector-test-util
@@ -126,6 +133,13 @@ limitations under the License.
${scala.version}
test
+
+
+ org.testcontainers
+ kafka
+ ${testcontainers.version}
+ test
+
@@ -245,6 +259,16 @@ limitations under the License.
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-kafka
+ ${project.version}
+ kafka-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
+
org.apache.flink
flink-cdc-pipeline-udf-examples
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
new file mode 100644
index 00000000000..bf306001963
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToKafkaE2eITCase.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.kafka.sink.KafkaUtil;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.flink.util.DockerImageVersions.KAFKA;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** End-to-end tests for mysql cdc to Kafka pipeline job. */
+@RunWith(Parameterized.class)
+public class MysqlToKafkaE2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlToKafkaE2eITCase.class);
+
+ // ------------------------------------------------------------------------------------------
+ // MySQL Variables (we always use MySQL as the data source for easier verifying)
+ // ------------------------------------------------------------------------------------------
+ protected static final String MYSQL_TEST_USER = "mysqluser";
+ protected static final String MYSQL_TEST_PASSWORD = "mysqlpw";
+ protected static final String MYSQL_DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
+ protected static final String INTER_CONTAINER_MYSQL_ALIAS = "mysql";
+ protected static final long EVENT_WAITING_TIMEOUT = 60000L;
+
+ private static AdminClient admin;
+ private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
+ private static final int ZK_TIMEOUT_MILLIS = 30000;
+ private static final short TOPIC_REPLICATION_FACTOR = 1;
+ private TableId table;
+ private String topic;
+ private KafkaConsumer consumer;
+
+ @ClassRule
+ public static final MySqlContainer MYSQL =
+ (MySqlContainer)
+ new MySqlContainer(
+ MySqlVersion.V8_0) // v8 support both ARM and AMD architectures
+ .withConfigurationOverride("docker/mysql/my.cnf")
+ .withSetupSQL("docker/mysql/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_MYSQL_ALIAS)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ @ClassRule
+ public static final KafkaContainer KAFKA_CONTAINER =
+ KafkaUtil.createKafkaContainer(KAFKA, LOG)
+ .withNetworkAliases("kafka")
+ .withEmbeddedZookeeper()
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+
+ protected final UniqueDatabase mysqlInventoryDatabase =
+ new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ @BeforeClass
+ public static void initializeContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL)).join();
+ Startables.deepStart(Stream.of(KAFKA_CONTAINER)).join();
+ Map properties = new HashMap<>();
+ properties.put(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+ KAFKA_CONTAINER.getBootstrapServers());
+ admin = AdminClient.create(properties);
+ LOG.info("Containers are started.");
+ }
+
+ @Before
+ public void before() throws Exception {
+ super.before();
+ createTestTopic(1, TOPIC_REPLICATION_FACTOR);
+ Properties properties = getKafkaClientConfiguration();
+ consumer = new KafkaConsumer<>(properties);
+ consumer.subscribe(Collections.singletonList(topic));
+ mysqlInventoryDatabase.createAndInitialize();
+ }
+
+ @After
+ public void after() {
+ super.after();
+ admin.deleteTopics(Collections.singletonList(topic));
+ consumer.close();
+ mysqlInventoryDatabase.dropDatabase();
+ }
+
+ @Test
+ public void testSyncWholeDatabaseWithDebeziumJson() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: kafka\n"
+ + " properties.bootstrap.servers: kafka:9092\n"
+ + " topic: %s\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ topic,
+ parallelism);
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ List> collectedRecords = new ArrayList<>();
+ int expectedEventCount = 13;
+ waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+ List expectedRecords =
+ getExpectedRecords("expectedEvents/mysqlToKafka/debezium-json.txt");
+ assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords));
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+ // modify table schema
+ stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
+ stat.execute(
+ "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
+ stat.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
+ stat.execute(
+ "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
+ stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
+ stat.execute("DELETE FROM products WHERE id=111;");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ expectedEventCount = 20;
+ waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+ assertThat(expectedRecords)
+ .containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
+ }
+
+ @Test
+ public void testSyncWholeDatabaseWithCanalJson() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: kafka\n"
+ + " properties.bootstrap.servers: kafka:9092\n"
+ + " topic: %s\n"
+ + " value.format: canal-json\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ topic,
+ parallelism);
+ Path mysqlCdcJar = TestUtils.getResource("mysql-cdc-pipeline-connector.jar");
+ Path kafkaCdcJar = TestUtils.getResource("kafka-cdc-pipeline-connector.jar");
+ Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");
+ submitPipelineJob(pipelineJob, mysqlCdcJar, kafkaCdcJar, mysqlDriverJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+ List> collectedRecords = new ArrayList<>();
+ int expectedEventCount = 13;
+ waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+ List expectedRecords =
+ getExpectedRecords("expectedEvents/mysqlToKafka/canal-json.txt");
+ assertThat(expectedRecords).containsAll(deserializeValues(collectedRecords));
+ LOG.info("Begin incremental reading stage.");
+ // generate binlogs
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(),
+ MYSQL.getDatabasePort(),
+ mysqlInventoryDatabase.getDatabaseName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+
+ // modify table schema
+ stat.execute("ALTER TABLE products ADD COLUMN new_col INT;");
+ stat.execute(
+ "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2, null, null, null, 1);"); // 110
+ stat.execute(
+ "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, null, null, null, 1);"); // 111
+ stat.execute(
+ "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;");
+ stat.execute("UPDATE products SET weight='5.17' WHERE id=111;");
+ stat.execute("DELETE FROM products WHERE id=111;");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ expectedEventCount = 20;
+ waitUntilSpecificEventCount(collectedRecords, expectedEventCount);
+ assertThat(expectedRecords)
+ .containsExactlyInAnyOrderElementsOf(deserializeValues(collectedRecords));
+ }
+
+ private void waitUntilSpecificEventCount(
+ List> actualEvent, int expectedCount) throws Exception {
+ boolean result = false;
+ long endTimeout = System.currentTimeMillis() + MysqlToKafkaE2eITCase.EVENT_WAITING_TIMEOUT;
+ while (System.currentTimeMillis() < endTimeout) {
+ ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
+ records.forEach(actualEvent::add);
+ if (actualEvent.size() == expectedCount) {
+ result = true;
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ if (!result) {
+ throw new TimeoutException(
+ "failed to get specific event count: "
+ + expectedCount
+ + " from stdout: "
+ + actualEvent.size());
+ }
+ }
+
+ private static Properties getKafkaClientConfiguration() {
+ final Properties standardProps = new Properties();
+ standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
+ standardProps.put("group.id", UUID.randomUUID().toString());
+ standardProps.put("enable.auto.commit", false);
+ standardProps.put("auto.offset.reset", "earliest");
+ standardProps.put("max.partition.fetch.bytes", 256);
+ standardProps.put("zookeeper.session.timeout.ms", ZK_TIMEOUT_MILLIS);
+ standardProps.put("zookeeper.connection.timeout.ms", ZK_TIMEOUT_MILLIS);
+ standardProps.put("key.deserializer", ByteArrayDeserializer.class.getName());
+ standardProps.put("value.deserializer", ByteArrayDeserializer.class.getName());
+ return standardProps;
+ }
+
+ private void createTestTopic(int numPartitions, short replicationFactor)
+ throws ExecutionException, InterruptedException {
+ table =
+ TableId.tableId(
+ "default_namespace", "default_schema", UUID.randomUUID().toString());
+ topic = table.toString();
+ final CreateTopicsResult result =
+ admin.createTopics(
+ Collections.singletonList(
+ new NewTopic(topic, numPartitions, replicationFactor)));
+ result.all().get();
+ }
+
+ private List deserializeValues(List> records)
+ throws IOException {
+ List result = new ArrayList<>();
+ for (ConsumerRecord record : records) {
+ result.add(new String(record.value(), "UTF-8"));
+ }
+ return result;
+ }
+
+ protected List getExpectedRecords(String resourceDirFormat) throws Exception {
+ URL url =
+ MysqlToKafkaE2eITCase.class
+ .getClassLoader()
+ .getResource(String.format(resourceDirFormat));
+ return Files.readAllLines(Paths.get(url.toURI())).stream()
+ .filter(this::isValidJsonRecord)
+ .map(
+ line ->
+ line.replace(
+ "$databaseName", mysqlInventoryDatabase.getDatabaseName()))
+ .collect(Collectors.toList());
+ }
+
+ protected boolean isValidJsonRecord(String line) {
+ try {
+ ObjectMapper objectMapper = new ObjectMapper();
+ objectMapper.readTree(line);
+ return !StringUtils.isEmpty(line);
+ } catch (JsonProcessingException e) {
+ return false;
+ }
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/canal-json.txt b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/canal-json.txt
new file mode 100644
index 00000000000..ed649e2fcf5
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/canal-json.txt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"old":null,"data":[{"id":103,"name":"user_3","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
+{"old":null,"data":[{"id":104,"name":"user_4","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
+{"old":null,"data":[{"id":102,"name":"user_2","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
+{"old":null,"data":[{"id":101,"name":"user_1","address":"Shanghai","phone_number":"123567891234"}],"type":"INSERT","database":"$databaseName","table":"customers","pkNames":["id"]}
+{"old":null,"data":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"enum_c":"red","json_c":"{\"key3\": \"value3\"}","point_c":"{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"enum_c":"red","json_c":"{\"k1\": \"v1\", \"k2\": \"v2\"}","point_c":"{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"enum_c":"red","json_c":"{\"key1\": \"value1\"}","point_c":"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"enum_c":"white","json_c":"{\"key2\": \"value2\"}","point_c":"{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"enum_c":"white","json_c":"{\"key4\": \"value4\"}","point_c":"{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null}],"data":[{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1,"enum_c":null,"json_c":null,"point_c":null}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":[{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"data":[{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":null,"data":[{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"INSERT","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":[{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":null,"type":"DELETE","database":"$databaseName","table":"products","pkNames":["id"]}
+{"old":[{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"data":[{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5,"enum_c":null,"json_c":null,"point_c":null,"new_col":1}],"type":"UPDATE","database":"$databaseName","table":"products","pkNames":["id"]}
\ No newline at end of file
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json.txt b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json.txt
new file mode 100644
index 00000000000..13f689aea07
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/resources/expectedEvents/mysqlToKafka/debezium-json.txt
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{"before":null,"after":{"id":102,"name":"user_2","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
+{"before":null,"after":{"id":101,"name":"user_1","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
+{"before":null,"after":{"id":103,"name":"user_3","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
+{"before":null,"after":{"id":104,"name":"user_4","address":"Shanghai","phone_number":"123567891234"},"op":"c","source":{"db":"$databaseName","table":"customers"}}
+{"before":null,"after":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":105,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875,"enum_c":"red","json_c":"{\"k1\": \"v1\", \"k2\": \"v2\"}","point_c":"{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":109,"name":"spare tire","description":"24 inch spare tire","weight":22.2,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":102,"name":"car battery","description":"12V car battery","weight":8.1,"enum_c":"white","json_c":"{\"key2\": \"value2\"}","point_c":"{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":101,"name":"scooter","description":"Small 2-wheel scooter","weight":3.14,"enum_c":"red","json_c":"{\"key1\": \"value1\"}","point_c":"{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":108,"name":"jacket","description":"water resistent black wind breaker","weight":0.1,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":104,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75,"enum_c":"white","json_c":"{\"key4\": \"value4\"}","point_c":"{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":103,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.8,"enum_c":"red","json_c":"{\"key3\": \"value3\"}","point_c":"{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}"},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":{"id":106,"name":"hammer","description":"16oz carpenter's hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":106,"name":"hammer","description":"18oz carpenter hammer","weight":1.0,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}}
+{"before":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.3,"enum_c":null,"json_c":null,"point_c":null},"after":{"id":107,"name":"rocks","description":"box of assorted rocks","weight":5.1,"enum_c":null,"json_c":null,"point_c":null},"op":"u","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":null,"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"c","source":{"db":"$databaseName","table":"products"}}
+{"before":{"id":110,"name":"jacket","description":"water resistent white wind breaker","weight":0.2,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":110,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}}
+{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.18,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"op":"u","source":{"db":"$databaseName","table":"products"}}
+{"before":{"id":111,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.17,"enum_c":null,"json_c":null,"point_c":null,"new_col":1},"after":null,"op":"d","source":{"db":"$databaseName","table":"products"}}