From 410e4fae57d2ac9d67d949a2fc01c537d3f3c57b Mon Sep 17 00:00:00 2001 From: "hele.kc" Date: Wed, 16 Mar 2022 11:27:15 +0800 Subject: [PATCH] [mysql] Fallback to build schema by 'desc table' case when parsing DDL failure (#949) --- .../connectors/mysql/schema/MySqlSchema.java | 95 ++- .../mysql/schema/MySqlTableDefinition.java | 147 +++++ .../polardbx/PolardbxSourceTCase.java | 550 ++++++++++++++++++ .../test/resources/ddl/polardbx_ddl_test.sql | 138 +++++ 4 files changed, 912 insertions(+), 18 deletions(-) create mode 100644 flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTableDefinition.java create mode 100644 flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/polardbx/PolardbxSourceTCase.java create mode 100644 flink-connector-mysql-cdc/src/test/resources/ddl/polardbx_ddl_test.sql diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlSchema.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlSchema.java index 6e5eb5049fa..79c33f87cd0 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlSchema.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlSchema.java @@ -28,9 +28,11 @@ import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges.TableChange; import io.debezium.schema.SchemaChangeEvent; +import org.apache.commons.lang3.StringUtils; import java.sql.SQLException; import java.time.Instant; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,6 +42,9 @@ /** A component used to get schema by table path. */ public class MySqlSchema { + private static final String SHOW_CREATE_TABLE = "SHOW CREATE TABLE "; + private static final String DESC_TABLE = "DESC "; + private final MySqlConnectorConfig connectorConfig; private final MySqlDatabaseSchema databaseSchema; private final Map schemasByTableId; @@ -58,7 +63,7 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) { // read schema from cache first TableChange schema = schemasByTableId.get(tableId); if (schema == null) { - schema = readTableSchema(jdbc, tableId); + schema = buildTableSchema(jdbc, tableId); schemasByTableId.put(tableId, schema); } return schema; @@ -68,26 +73,34 @@ public TableChange getTableSchema(JdbcConnection jdbc, TableId tableId) { // Helpers // ------------------------------------------------------------------------------------------ - private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) { + private TableChange buildTableSchema(JdbcConnection jdbc, TableId tableId) { final Map tableChangeMap = new HashMap<>(); - final String sql = "SHOW CREATE TABLE " + quote(tableId); + String showCreateTable = SHOW_CREATE_TABLE + quote(tableId); + buildSchemaByShowCreateTable(jdbc, tableId, tableChangeMap); + if (!tableChangeMap.containsKey(tableId)) { + // fallback to desc table + String descTable = DESC_TABLE + quote(tableId); + buildSchemaByDescTable(jdbc, descTable, tableId, tableChangeMap); + if (!tableChangeMap.containsKey(tableId)) { + throw new FlinkRuntimeException( + String.format( + "Can't obtain schema for table %s by running %s and %s ", + tableId, showCreateTable, descTable)); + } + } + return tableChangeMap.get(tableId); + } + + private void buildSchemaByShowCreateTable( + JdbcConnection jdbc, TableId tableId, Map tableChangeMap) { + final String sql = SHOW_CREATE_TABLE + quote(tableId); try { jdbc.query( sql, rs -> { if (rs.next()) { final String ddl = rs.getString(2); - final MySqlOffsetContext offsetContext = - MySqlOffsetContext.initial(connectorConfig); - List schemaChangeEvents = - databaseSchema.parseSnapshotDdl( - ddl, tableId.catalog(), offsetContext, Instant.now()); - for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { - for (TableChange tableChange : - schemaChangeEvent.getTableChanges()) { - tableChangeMap.put(tableId, tableChange); - } - } + parseSchemaByDdl(ddl, tableId, tableChangeMap); } }); } catch (SQLException e) { @@ -95,11 +108,57 @@ private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) { String.format("Failed to read schema for table %s by running %s", tableId, sql), e); } - if (!tableChangeMap.containsKey(tableId)) { - throw new FlinkRuntimeException( - String.format("Can't obtain schema for table %s by running %s", tableId, sql)); + } + + private void parseSchemaByDdl( + String ddl, TableId tableId, Map tableChangeMap) { + final MySqlOffsetContext offsetContext = MySqlOffsetContext.initial(connectorConfig); + List schemaChangeEvents = + databaseSchema.parseSnapshotDdl( + ddl, tableId.catalog(), offsetContext, Instant.now()); + for (SchemaChangeEvent schemaChangeEvent : schemaChangeEvents) { + for (TableChange tableChange : schemaChangeEvent.getTableChanges()) { + tableChangeMap.put(tableId, tableChange); + } } + } - return tableChangeMap.get(tableId); + private void buildSchemaByDescTable( + JdbcConnection jdbc, + String descTable, + TableId tableId, + Map tableChangeMap) { + List fieldMetas = new ArrayList<>(); + List primaryKeys = new ArrayList<>(); + try { + jdbc.query( + descTable, + rs -> { + while (rs.next()) { + MySqlFieldDefinition meta = new MySqlFieldDefinition(); + meta.setColumnName(rs.getString("Field")); + meta.setColumnType(rs.getString("Type")); + meta.setNullable( + StringUtils.equalsIgnoreCase(rs.getString("Null"), "YES")); + meta.setKey("PRI".equalsIgnoreCase(rs.getString("Key"))); + meta.setUnique("UNI".equalsIgnoreCase(rs.getString("Key"))); + meta.setDefaultValue(rs.getString("Default")); + meta.setExtra(rs.getString("Extra")); + if (meta.isKey()) { + primaryKeys.add(meta.getColumnName()); + } + fieldMetas.add(meta); + } + }); + parseSchemaByDdl( + new MySqlTableDefinition(tableId, fieldMetas, primaryKeys).toDdl(), + tableId, + tableChangeMap); + } catch (SQLException e) { + throw new FlinkRuntimeException( + String.format( + "Failed to read schema for table %s by running %s", tableId, descTable), + e); + } } } diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTableDefinition.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTableDefinition.java new file mode 100644 index 00000000000..812e83120b3 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/schema/MySqlTableDefinition.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.mysql.schema; + +import org.apache.flink.util.CollectionUtil; + +import com.ververica.cdc.connectors.mysql.source.utils.StatementUtils; +import io.debezium.relational.TableId; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.stream.Collectors; + +import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote; + +/** used to generate table definition in ddl with "desc table". */ +public class MySqlTableDefinition { + TableId tableId; + List fieldDefinitions; + List primaryKeys; + + public MySqlTableDefinition( + TableId tableId, + List fieldDefinitions, + List primaryKeys) { + this.tableId = tableId; + this.fieldDefinitions = fieldDefinitions; + this.primaryKeys = primaryKeys; + } + + String toDdl() { + return String.format( + "CREATE TABLE %s (\n\t %s %s );", + quote(tableId), fieldDefinitions(), pkDefinition()); + } + + private String fieldDefinitions() { + return fieldDefinitions.stream() + .map(MySqlFieldDefinition::toDdl) + .collect(Collectors.joining(", \n\t")); + } + + private String pkDefinition() { + StringBuilder pkDefinition = new StringBuilder(); + if (!CollectionUtil.isNullOrEmpty(primaryKeys)) { + pkDefinition.append(","); + pkDefinition.append( + String.format( + "PRIMARY KEY ( %s )", + primaryKeys.stream() + .map(StatementUtils::quote) + .collect(Collectors.joining(",")))); + } + return pkDefinition.toString(); + } +} + +/** used to generate field definition in ddl with "desc table". */ +class MySqlFieldDefinition { + private String columnName; + private String columnType; + private boolean nullable; + private boolean key; + private String defaultValue; + private String extra; + private boolean unique; + + public String getColumnName() { + return columnName; + } + + public void setColumnName(String columnName) { + this.columnName = columnName; + } + + public String getColumnType() { + return columnType; + } + + public void setColumnType(String columnType) { + this.columnType = columnType; + } + + public void setNullable(boolean nullable) { + this.nullable = nullable; + } + + public String getDefaultValue() { + return StringUtils.isEmpty(defaultValue) ? "" : "DEFAULT " + defaultValue; + } + + public void setDefaultValue(String defaultValue) { + this.defaultValue = defaultValue; + } + + public boolean isUnsigned() { + return StringUtils.containsIgnoreCase(columnType, "unsigned"); + } + + public boolean isNullable() { + return nullable; + } + + public boolean isKey() { + return key; + } + + public void setKey(boolean key) { + this.key = key; + } + + public String getExtra() { + return extra; + } + + public void setExtra(String extra) { + this.extra = extra; + } + + public boolean isUnique() { + return unique; + } + + public void setUnique(boolean unique) { + this.unique = unique; + } + + public String toDdl() { + return quote(columnName) + " " + columnType + " " + (nullable ? "" : "NOT NULL"); + } +} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/polardbx/PolardbxSourceTCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/polardbx/PolardbxSourceTCase.java new file mode 100644 index 00000000000..e170fb348da --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/polardbx/PolardbxSourceTCase.java @@ -0,0 +1,550 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.polardbx; + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports; +import com.ververica.cdc.connectors.mysql.schema.MySqlSchema; +import org.apache.commons.lang3.StringUtils; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerImageName; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Random; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.lang.String.format; +import static org.apache.flink.util.Preconditions.checkState; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +/** + * Database Polardbx supported the mysql protocol, but there are some different features in ddl. So + * we added fallback in {@link MySqlSchema} when parsing ddl failed and provided these cases to + * test. + */ +public class PolardbxSourceTCase extends AbstractTestBase { + private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTCase.class); + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + private static final Integer PORT = 8527; + private static final String HOST_NAME = "127.0.0.1"; + private static final String USER_NAME = "polardbx_root"; + private static final String PASSWORD = "123456"; + private static final String DATABASE = "polardbx_ddl_test"; + private static final String IMAGE_VERSION = "2.0.1"; + private static final DockerImageName POLARDBX_IMAGE = + DockerImageName.parse("polardbx/polardb-x:" + IMAGE_VERSION); + + public static final GenericContainer POLARDBX_CONTAINER = + new GenericContainer<>(POLARDBX_IMAGE) + .withExposedPorts(PORT) + .withLogConsumer(new Slf4jLogConsumer(LOG)) + .withStartupTimeout(Duration.ofMinutes(3)) + .withCreateContainerCmdModifier( + c -> + c.withPortBindings( + new PortBinding( + Ports.Binding.bindPort(PORT), + new ExposedPort(PORT)))); + + @BeforeClass + public static void startContainers() throws InterruptedException { + // no need to start container when the port 8527 is listening + if (!checkConnection()) { + LOG.info("Polardbx connection is not valid, so try to start containers..."); + Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join(); + LOG.info("Containers are started."); + // here should wait 10s that make sure the polardbx is ready + Thread.sleep(10 * 1000); + } + initializePolardbxTables(DATABASE); + } + + private static String getJdbcUrl() { + return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT); + } + + protected static Connection getJdbcConnection() throws SQLException { + String jdbcUrl = getJdbcUrl(); + LOG.info("jdbcUrl is :" + jdbcUrl); + return DriverManager.getConnection(jdbcUrl, USER_NAME, PASSWORD); + } + + private static Boolean checkConnection() { + LOG.info("check polardbx connection validation..."); + try { + Connection connection = getJdbcConnection(); + return connection.isValid(3); + } catch (SQLException e) { + LOG.warn("polardbx connection is not valid... caused by:" + e.getMessage()); + return false; + } + } + + /** initialize database and tables with ${databaseName}.sql for testing. */ + protected static void initializePolardbxTables(String databaseName) + throws InterruptedException { + final String ddlFile = String.format("ddl/%s.sql", databaseName); + final URL ddlTestFile = PolardbxSourceTCase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + // need to sleep 1s, make sure the jdbc connection can be created + Thread.sleep(1000); + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("drop database if exists " + databaseName); + statement.execute("create database if not exists " + databaseName); + statement.execute("use " + databaseName + ";"); + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testSingleKey() throws Exception { + int parallelism = 1; + String[] captureCustomerTables = new String[] {"orders"}; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + String sourceDDL = + format( + "CREATE TABLE orders_source (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'server-id' = '%s'" + + ")", + HOST_NAME, + PORT, + USER_NAME, + PASSWORD, + DATABASE, + getTableNameRegex(captureCustomerTables), + getServerId()); + + // first step: check the snapshot data + String[] snapshotForSingleTable = + new String[] { + "+I[1, 1001, 1, 102, 2022-01-16T00:00]", + "+I[2, 1002, 2, 105, 2022-01-16T00:00]", + "+I[3, 1004, 3, 109, 2022-01-16T00:00]", + "+I[4, 1002, 2, 106, 2022-01-16T00:00]", + "+I[5, 1003, 1, 107, 2022-01-16T00:00]", + }; + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from orders_source"); + CloseableIterator iterator = tableResult.collect(); + List expectedSnapshotData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); + } + + List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size()); + assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData); + + // second step: check the sink data + tEnv.executeSql( + "CREATE TABLE sink (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + tEnv.executeSql("insert into sink select * from orders_source"); + + waitForSinkSize("sink", realSnapshotData.size()); + assertEqualsInAnyOrder(expectedSnapshotData, TestValuesTableFactory.getRawResults("sink")); + + // third step: check dml events + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE); + statement.execute("INSERT INTO orders VALUES (6, 1006,1006, 1006,'2022-01-17');"); + statement.execute("INSERT INTO orders VALUES (7,1007, 1007,1007, '2022-01-17');"); + statement.execute("UPDATE orders SET seller_id= 9999, order_id=9999 WHERE id=6;"); + statement.execute("UPDATE orders SET seller_id= 9999, order_id=9999 WHERE id=7;"); + statement.execute("DELETE FROM orders WHERE id=7;"); + } + + String[] expectedBinlog = + new String[] { + "+I[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+I[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "-D[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+I[6, 9999, 9999, 1006, 2022-01-17T00:00]", + "-D[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "+I[7, 9999, 9999, 1007, 2022-01-17T00:00]", + "-D[7, 9999, 9999, 1007, 2022-01-17T00:00]" + }; + List expectedBinlogData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedBinlogData.addAll(Arrays.asList(expectedBinlog)); + } + List realBinlog = fetchRows(iterator, expectedBinlog.length); + assertEqualsInOrder(expectedBinlogData, realBinlog); + } + + @Test + public void testFullTypesDdl() { + int parallelism = 1; + String[] captureCustomerTables = new String[] {"polardbx_full_types"}; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + String sourceDDL = + String.format( + "CREATE TABLE polardbx_full_types (\n" + + " `id` INT NOT NULL,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT ,\n" + + " small_c SMALLINT,\n" + + " small_un_c INT,\n" + + " medium_c INT,\n" + + " medium_un_c INT,\n" + + " int_c INT ,\n" + + " int_un_c BIGINT,\n" + + " int11_c BIGINT,\n" + + " big_c BIGINT,\n" + + " big_un_c DECIMAL(20, 0),\n" + + " varchar_c VARCHAR(255),\n" + + " char_c CHAR(3),\n" + + " real_c FLOAT,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " big_decimal_c STRING,\n" + + " bit1_c BOOLEAN,\n" + + " tiny1_c BOOLEAN,\n" + + " boolean_c BOOLEAN,\n" + + " date_c DATE,\n" + + " time_c TIME(0),\n" + + " datetime3_c TIMESTAMP(3),\n" + + " datetime6_c TIMESTAMP(6),\n" + + " timestamp_c TIMESTAMP(0),\n" + + " file_uuid BYTES,\n" + + " bit_c BINARY(8),\n" + + " text_c STRING,\n" + + " tiny_blob_c BYTES,\n" + + " blob_c BYTES,\n" + + " medium_blob_c BYTES,\n" + + " long_blob_c BYTES,\n" + + " year_c INT,\n" + + " enum_c STRING,\n" + + " set_c ARRAY,\n" + + " json_c STRING,\n" + + " point_c STRING,\n" + + " geometry_c STRING,\n" + + " linestring_c STRING,\n" + + " polygon_c STRING,\n" + + " multipoint_c STRING,\n" + + " multiline_c STRING,\n" + + " multipolygon_c STRING,\n" + + " geometrycollection_c STRING,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'server-id' = '%s'" + + ")", + HOST_NAME, + PORT, + USER_NAME, + PASSWORD, + DATABASE, + getTableNameRegex(captureCustomerTables), + getServerId()); + tEnv.executeSql(sourceDDL); + + TableResult tableResult = tEnv.executeSql("select * from polardbx_full_types"); + CloseableIterator iterator = tableResult.collect(); + List realSnapshotData = fetchRows(iterator, 1); + realSnapshotData.forEach(System.out::println); + String[] expectedSnapshotData = + new String[] { + "+I[100001, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, " + + "9223372036854775807, 18446744073709551615, Hello World, abc, 123.102, 123.102, 404.4443, 123.4567," + + " 346, 34567892.1, false, true, true, 2020-07-17, 18:00:22, 2020-07-17T18:00:22.123, " + + "2020-07-17T18:00:22.123456, 2020-07-17T18:00:22, [101, 26, -19, 8, 57, 15, 72, -109, -78, -15, 54," + + " -110, 62, 123, 116, 0], [4, 4, 4, 4, 4, 4, 4, 4], text, [16], [16], [16], [16], 2021, red, [a, " + + "b], {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, " + + "{\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, " + + "{\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1," + + "1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]]," + + "\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]]," + + "\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]]," + + "[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, " + + "{\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\"," + + "\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}]," + + "\"type\":\"GeometryCollection\",\"srid\":0}]", + }; + assertEqualsInAnyOrder(Arrays.asList(expectedSnapshotData), realSnapshotData); + } + + @Test + public void testMultiKeys() throws Exception { + int parallelism = 1; + String[] captureCustomerTables = new String[] {"orders_with_multi_pks"}; + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + env.setParallelism(parallelism); + env.enableCheckpointing(200L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + String sourceDDL = + format( + "CREATE TABLE orders_with_multi_pks (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id,order_id) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'scan.incremental.snapshot.enabled' = 'true'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '100'," + + " 'server-id' = '%s'" + + ")", + HOST_NAME, + PORT, + USER_NAME, + PASSWORD, + DATABASE, + getTableNameRegex(captureCustomerTables), + getServerId()); + + // first step: check the snapshot data + String[] snapshotForSingleTable = + new String[] { + "+I[1, 1001, 1, 102, 2022-01-16T00:00]", + "+I[2, 1002, 2, 105, 2022-01-16T00:00]", + "+I[3, 1004, 3, 109, 2022-01-16T00:00]", + "+I[4, 1002, 2, 106, 2022-01-16T00:00]", + "+I[5, 1003, 1, 107, 2022-01-16T00:00]", + }; + tEnv.executeSql(sourceDDL); + TableResult tableResult = tEnv.executeSql("select * from orders_with_multi_pks"); + CloseableIterator iterator = tableResult.collect(); + List expectedSnapshotData = new ArrayList<>(); + for (int i = 0; i < captureCustomerTables.length; i++) { + expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable)); + } + + List realSnapshotData = fetchRows(iterator, expectedSnapshotData.size()); + assertEqualsInAnyOrder(expectedSnapshotData, realSnapshotData); + + // second step: check the sink data + tEnv.executeSql( + "CREATE TABLE multi_key_sink (" + + " id BIGINT NOT NULL," + + " seller_id STRING," + + " order_id STRING," + + " buyer_id STRING," + + " create_time TIMESTAMP," + + " primary key (id,order_id) not enforced" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'" + + ")"); + + tEnv.executeSql("insert into multi_key_sink select * from orders_with_multi_pks"); + + waitForSinkSize("multi_key_sink", realSnapshotData.size()); + assertEqualsInAnyOrder( + expectedSnapshotData, TestValuesTableFactory.getRawResults("multi_key_sink")); + + // third step: check dml events + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("use " + DATABASE); + statement.execute( + "INSERT INTO orders_with_multi_pks VALUES (6, 1006,1006, 1006,'2022-01-17');"); + statement.execute( + "INSERT INTO orders_with_multi_pks VALUES (7,1007, 1007,1007, '2022-01-17');"); + statement.execute( + "UPDATE orders_with_multi_pks SET seller_id= 9999, order_id=9999 WHERE id=6;"); + statement.execute( + "UPDATE orders_with_multi_pks SET seller_id= 9999, order_id=9999 WHERE id=7;"); + statement.execute("DELETE FROM orders_with_multi_pks WHERE id=7;"); + } + + String[] expectedBinlog = + new String[] { + "+I[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+I[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "-D[6, 1006, 1006, 1006, 2022-01-17T00:00]", + "+I[6, 9999, 9999, 1006, 2022-01-17T00:00]", + "-D[7, 1007, 1007, 1007, 2022-01-17T00:00]", + "+I[7, 9999, 9999, 1007, 2022-01-17T00:00]", + "-D[7, 9999, 9999, 1007, 2022-01-17T00:00]" + }; + List realBinlog = fetchRows(iterator, expectedBinlog.length); + assertEqualsInAnyOrder(Arrays.asList(expectedBinlog), realBinlog); + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + private String getTableNameRegex(String[] captureCustomerTables) { + checkState(captureCustomerTables.length > 0); + if (captureCustomerTables.length == 1) { + return captureCustomerTables[0]; + } else { + // pattern that matches multiple tables + return format("(%s)", StringUtils.join(captureCustomerTables, "|")); + } + } + + private String getServerId() { + final Random random = new Random(); + int serverId = random.nextInt(100) + 5400; + return serverId + "-" + (serverId + 4); + } + + // ------------------------------------------------------------------------ + // test utilities + // ------------------------------------------------------------------------ + + private static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } + + private static void assertEqualsInAnyOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEqualsInOrder( + expected.stream().sorted().collect(Collectors.toList()), + actual.stream().sorted().collect(Collectors.toList())); + } + + private static void assertEqualsInOrder(List expected, List actual) { + assertTrue(expected != null && actual != null); + assertEquals(expected.size(), actual.size()); + assertArrayEquals(expected.toArray(new String[0]), actual.toArray(new String[0])); + } +} diff --git a/flink-connector-mysql-cdc/src/test/resources/ddl/polardbx_ddl_test.sql b/flink-connector-mysql-cdc/src/test/resources/ddl/polardbx_ddl_test.sql new file mode 100644 index 00000000000..beb04f426c8 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/resources/ddl/polardbx_ddl_test.sql @@ -0,0 +1,138 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: polardbx_ddl_test +-- ---------------------------------------------------------------------------------------------------------------- + +-- Create orders table with single primary key +create table orders ( + id bigint not null auto_increment by group, + seller_id varchar(30) DEFAULT NULL, + order_id varchar(30) DEFAULT NULL, + buyer_id varchar(30) DEFAULT NULL, + create_time datetime DEFAULT NULL, + primary key(id), + GLOBAL INDEX `g_i_seller`(`seller_id`) dbpartition by hash(`seller_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 dbpartition by RANGE_HASH(buyer_id, order_id, 10) tbpartition by RANGE_HASH(buyer_id, order_id, 10) tbpartitions 3; + +-- insert some orders for testing +INSERT INTO orders +VALUES (1, 1001, 1, 102, '2022-01-16'), + (2, 1002, 2, 105, '2022-01-16'), + (3, 1004, 3, 109, '2022-01-16'), + (4, 1002, 2, 106, '2022-01-16'), + (5, 1003, 1, 107, '2022-01-16'); + +-- Create orders with multi primary keys +create table orders_with_multi_pks ( + id bigint not null auto_increment by group, + seller_id varchar(30) DEFAULT NULL, + order_id varchar(30) NOT NULL, + buyer_id varchar(30) DEFAULT NULL, + create_time datetime DEFAULT NULL, + primary key(id,order_id), + GLOBAL INDEX `g_mi_seller`(`seller_id`) dbpartition by hash(`seller_id`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 dbpartition by RANGE_HASH(buyer_id, order_id, 10) tbpartition by RANGE_HASH(buyer_id, order_id, 10) tbpartitions 3; + +-- insert some orders for testing +INSERT INTO orders_with_multi_pks +VALUES (1, 1001, 1, 102, '2022-01-16'), + (2, 1002, 2, 105, '2022-01-16'), + (3, 1004, 3, 109, '2022-01-16'), + (4, 1002, 2, 106, '2022-01-16'), + (5, 1003, 1, 107, '2022-01-16'); + + +-- create table with full types +CREATE TABLE polardbx_full_types ( + id INT AUTO_INCREMENT BY GROUP, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED, + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + medium_c MEDIUMINT, + medium_un_c MEDIUMINT UNSIGNED, + int_c INTEGER , + int_un_c INTEGER UNSIGNED, + int11_c INT(11) DEFAULT 0, + big_c BIGINT, + big_un_c BIGINT UNSIGNED, + varchar_c VARCHAR(255) DEFAULT '1', + char_c CHAR(3) DEFAULT '', + real_c REAL, + float_c FLOAT, + double_c DOUBLE, + decimal_c DECIMAL(8, 4), + numeric_c NUMERIC(6, 0), + big_decimal_c DECIMAL(65, 1), + bit1_c BIT, + tiny1_c TINYINT(1), + boolean_c BOOLEAN, + date_c DATE, + time_c TIME(0), + datetime3_c DATETIME(3), + datetime6_c DATETIME(6), + timestamp_c TIMESTAMP, + file_uuid BINARY(16), + bit_c BIT(64), + text_c TEXT, + tiny_blob_c TINYBLOB, + blob_c BLOB, + medium_blob_c MEDIUMBLOB, + long_blob_c LONGBLOB, + year_c YEAR, + enum_c enum('red', 'white') default 'red', + set_c SET('a', 'b'), + json_c JSON, + point_c POINT, + geometry_c GEOMETRY, + linestring_c LINESTRING, + polygon_c POLYGON, + multipoint_c MULTIPOINT, + multiline_c MULTILINESTRING, + multipolygon_c MULTIPOLYGON, + geometrycollection_c GEOMETRYCOLLECTION, + PRIMARY KEY (id), + GLOBAL INDEX `g_mit_seller`(`int_c`) dbpartition by hash(`int_c`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 dbpartition by Hash(id); + +INSERT INTO polardbx_full_types VALUES ( + DEFAULT, 127, 255, 32767, 65535, 8388607, 16777215, 2147483647, 4294967295, 2147483647, 9223372036854775807, + 18446744073709551615, + 'Hello World', 'abc', 123.102, 123.102, 404.4443, 123.4567, 345.6, 34567892.1, 0, 1, true, + '2020-07-17', '18:00:22', '2020-07-17 18:00:22.123', '2020-07-17 18:00:22.123456', '2020-07-17 18:00:22', + unhex(replace('651aed08-390f-4893-b2f1-36923e7b7400','-','')), b'0000010000000100000001000000010000000100000001000000010000000100', + 'text',UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)),UNHEX(HEX(16)), 2021, + 'red', 'a,b,a', '{"key1": "value1"}', + ST_GeomFromText('POINT(1 1)'), + ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'), + ST_GeomFromText('LINESTRING(3 0, 3 3, 3 5)'), + ST_GeomFromText('POLYGON((1 1, 2 1, 2 2, 1 2, 1 1))'), + ST_GeomFromText('MULTIPOINT((1 1),(2 2))'), + ST_GeomFromText('MultiLineString((1 1,2 2,3 3),(4 4,5 5))'), + ST_GeomFromText('MULTIPOLYGON(((0 0, 10 0, 10 10, 0 10, 0 0)), ((5 5, 7 5, 7 7, 5 7, 5 5)))'), + ST_GeomFromText('GEOMETRYCOLLECTION(POINT(10 10), POINT(30 30), LINESTRING(15 15, 20 20))') +); + +-- Create orders_sink for testing the sink of flink-jdbc-connector +create table orders_sink ( + id bigint not null auto_increment by group, + seller_id varchar(30) DEFAULT NULL, + order_id varchar(30) DEFAULT NULL, + buyer_id varchar(30) DEFAULT NULL, + create_time datetime DEFAULT NULL, + primary key(id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8 dbpartition by RANGE_HASH(buyer_id, order_id, 10) tbpartition by RANGE_HASH(buyer_id, order_id, 10) tbpartitions 3; \ No newline at end of file