From b106d6ae780330f65080d50ca987414d44a3d77f Mon Sep 17 00:00:00 2001 From: wudi <676366545@qq.com> Date: Thu, 28 Nov 2024 11:12:11 +0800 Subject: [PATCH] support mysql sync part columns --- .../task/MySqlSnapshotSplitReadTask.java | 44 ++++++++++++- .../mysql/source/utils/StatementUtils.java | 17 +++-- .../mysql/table/MySqlConnectorITCase.java | 66 +++++++++++++++++++ 3 files changed, 122 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index f729f59d05f..dda17d17c38 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -40,10 +40,12 @@ import io.debezium.pipeline.spi.ChangeRecordEmitter; import io.debezium.pipeline.spi.SnapshotResult; import io.debezium.relational.Column; +import io.debezium.relational.ColumnFilterMode; import io.debezium.relational.RelationalSnapshotChangeEventSource; import io.debezium.relational.SnapshotChangeRecordEmitter; import io.debezium.relational.Table; import io.debezium.relational.TableId; +import io.debezium.relational.Tables; import io.debezium.schema.TopicSelector; import io.debezium.util.Clock; import io.debezium.util.ColumnUtils; @@ -61,6 +63,11 @@ import java.sql.Types; import java.time.Duration; import java.util.Calendar; +import java.util.List; +import java.util.stream.Collectors; + +import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_EXCLUDE_LIST; +import static io.debezium.relational.RelationalDatabaseConnectorConfig.COLUMN_INCLUDE_LIST; /** Task to read snapshot split of table. */ public class MySqlSnapshotSplitReadTask @@ -83,6 +90,7 @@ public class MySqlSnapshotSplitReadTask private final SnapshotPhaseHooks hooks; private final boolean isBackfillSkipped; + private final Tables.ColumnNameFilter columnFilter; public MySqlSnapshotSplitReadTask( MySqlSourceConfig sourceConfig, @@ -109,6 +117,21 @@ public MySqlSnapshotSplitReadTask( this.snapshotChangeEventSourceMetrics = snapshotChangeEventSourceMetrics; this.hooks = hooks; this.isBackfillSkipped = isBackfillSkipped; + + // setting column filter + String columnIncludeList = + sourceConfig.getDbzConfiguration().getString(COLUMN_INCLUDE_LIST); + String columnExcludeList = + sourceConfig.getDbzConfiguration().getString(COLUMN_EXCLUDE_LIST); + if (columnIncludeList != null) { + this.columnFilter = + Tables.ColumnNameFilterFactory.createIncludeListFilter( + columnIncludeList, ColumnFilterMode.CATALOG); + } else { + this.columnFilter = + Tables.ColumnNameFilterFactory.createExcludeListFilter( + columnExcludeList, ColumnFilterMode.CATALOG); + } } @Override @@ -247,7 +270,8 @@ private void createDataEventsForTable( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, - snapshotSplit.getSplitEnd() == null); + snapshotSplit.getSplitEnd() == null, + getScanColumns(table)); LOG.info( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), @@ -305,6 +329,24 @@ private void createDataEventsForTable( } } + private String getScanColumns(Table table) { + List columnNames = + table.retrieveColumnNames().stream() + .filter( + columnName -> + columnFilter.matches( + table.id().catalog(), + table.id().schema(), + table.id().table(), + columnName)) + .map(columnName -> jdbcConnection.quotedColumnIdString(columnName)) + .collect(Collectors.toList()); + if (columnNames.isEmpty()) { + columnNames.add("*"); + } + return String.join(", ", columnNames); + } + protected ChangeRecordEmitter getChangeRecordEmitter( MySqlSnapshotContext snapshotContext, TableId tableId, Object[] row) { snapshotContext.offset.event(tableId, clock.currentTime()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java index 6e621e0a5fb..43ec8332f87 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -131,8 +131,12 @@ public static Object queryNextChunkMax( } public static String buildSplitScanQuery( - TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) { - return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true); + TableId tableId, + RowType pkRowType, + boolean isFirstSplit, + boolean isLastSplit, + String projection) { + return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true, projection); } private static String buildSplitQuery( @@ -141,7 +145,8 @@ private static String buildSplitQuery( boolean isFirstSplit, boolean isLastSplit, int limitSize, - boolean isScanningData) { + boolean isScanningData, + String projection) { final String condition; if (isFirstSplit && isLastSplit) { @@ -174,7 +179,11 @@ private static String buildSplitQuery( if (isScanningData) { return buildSelectWithRowLimits( - tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); + tableId, + limitSize, + projection, + Optional.ofNullable(condition), + Optional.empty()); } else { final String orderBy = pkRowType.getFieldNames().stream().collect(Collectors.joining(", ")); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java index 799e96ffa60..c10f707a94a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -2351,4 +2351,70 @@ public void testBinaryHandlingModeWithBase64() throws Exception { assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); result.getJobClient().get().cancel().get(); } + + @Test + public void testColumnFilters() throws Exception { + customerDatabase.createAndInitialize(); + String sourceDDL = + String.format( + "CREATE TABLE address (" + + " `id` DECIMAL(20, 0) NOT NULL," + + " country STRING," + + " city STRING," + + " detail_address STRING," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'debezium.column.include.list' = '%s.%s.id,%s.%s.country', " + + " 'server-time-zone' = 'UTC'," + + " 'server-id' = '%s'" + + ")", + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + customerDatabase.getUsername(), + customerDatabase.getPassword(), + customerDatabase.getDatabaseName(), + "address", + customerDatabase.getDatabaseName(), + "address", + customerDatabase.getDatabaseName(), + "address", + getServerId()); + tEnv.executeSql(sourceDDL); + // async submit job + TableResult result = tEnv.executeSql("SELECT * from address"); + + CloseableIterator iterator = result.collect(); + waitForSnapshotStarted(iterator); + + try (Connection connection = customerDatabase.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute("UPDATE address SET city = 'Hangzhou' WHERE id=416927583791428523;"); + statement.execute( + "INSERT INTO address VALUES(418257940021724075, 'Germany', 'Berlin', 'West Town address 3')"); + } + + String[] expected = + new String[] { + "+I[417271541558096811, America, null, null]", + "+I[417272886855938987, America, null, null]", + "+I[417111867899200427, America, null, null]", + "+I[417420106184475563, Germany, null, null]", + "+I[418161258277847979, Germany, null, null]", + "+I[416874195632735147, China, null, null]", + "+I[416927583791428523, China, null, null]", + "+I[417022095255614379, China, null, null]", + "-U[416927583791428523, China, null, null]", + "+U[416927583791428523, China, null, null]", + "+I[418257940021724075, Germany, null, null]" + }; + assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length)); + result.getJobClient().get().cancel().get(); + } }