From 9ceb32624dbe780b1c2ea3a7c0e06718a8e48b62 Mon Sep 17 00:00:00 2001 From: "guiyuan.hx" Date: Thu, 12 Dec 2024 11:44:03 +0800 Subject: [PATCH] [FLINK-36811][mysql] MySQL cdc setIsProcessingBacklog in snapshot phase and exit when snapshot phase finished --- .../connectors/mysql/source/MySqlSource.java | 6 +- .../assigners/MySqlHybridSplitAssigner.java | 18 +++-- .../assigners/MySqlSnapshotSplitAssigner.java | 50 +++++++++----- .../reader/BinlogSplitReaderTest.java | 7 +- .../reader/SnapshotSplitReaderTest.java | 7 +- .../MySqlHybridSplitAssignerTest.java | 66 ++++++++++++++++++- .../MySqlSnapshotSplitAssignerTest.java | 19 +++++- .../source/reader/MySqlSourceReaderTest.java | 7 +- ...MySqlSplitEnumeratorEnumeratorContext.java | 40 +++++++++++ .../mysql/testutils/MetricsUtils.java | 27 ++++++++ 10 files changed, 216 insertions(+), 31 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/MockMySqlSplitEnumeratorEnumeratorContext.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MetricsUtils.java diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java index 47c3af92b6e..421e1b7a900 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java @@ -208,7 +208,8 @@ public SplitEnumerator createEnumerator( sourceConfig, enumContext.currentParallelism(), new ArrayList<>(), - isTableIdCaseSensitive); + isTableIdCaseSensitive, + enumContext); } catch (Exception e) { throw new FlinkRuntimeException( "Failed to discover captured tables for enumerator", e); @@ -233,7 +234,8 @@ public SplitEnumerator restoreEnumerator( new MySqlHybridSplitAssigner( sourceConfig, enumContext.currentParallelism(), - (HybridPendingSplitsState) checkpoint); + (HybridPendingSplitsState) checkpoint, + enumContext); } else if (checkpoint instanceof BinlogPendingSplitsState) { splitAssigner = new MySqlBinlogSplitAssigner( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java index fb654687939..d66e6f3da1f 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.source.assigners; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; @@ -59,11 +60,16 @@ public MySqlHybridSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, List remainingTables, - boolean isTableIdCaseSensitive) { + boolean isTableIdCaseSensitive, + SplitEnumeratorContext enumeratorContext) { this( sourceConfig, new MySqlSnapshotSplitAssigner( - sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive), + sourceConfig, + currentParallelism, + remainingTables, + isTableIdCaseSensitive, + enumeratorContext), false, sourceConfig.getSplitMetaGroupSize()); } @@ -71,11 +77,15 @@ public MySqlHybridSplitAssigner( public MySqlHybridSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, - HybridPendingSplitsState checkpoint) { + HybridPendingSplitsState checkpoint, + SplitEnumeratorContext enumeratorContext) { this( sourceConfig, new MySqlSnapshotSplitAssigner( - sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()), + sourceConfig, + currentParallelism, + checkpoint.getSnapshotPendingSplits(), + enumeratorContext), checkpoint.isBinlogSplitAssigned(), sourceConfig.getSplitMetaGroupSize()); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java index 0382cf6e7d7..f65d96c6091 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java @@ -17,6 +17,7 @@ package org.apache.flink.cdc.connectors.mysql.source.assigners; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema; import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState; @@ -79,6 +80,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner { private final int currentParallelism; private final List remainingTables; private final boolean isRemainingTablesCheckpointed; + private final SplitEnumeratorContext enumeratorContext; private final MySqlPartition partition; private final Object lock = new Object(); @@ -95,7 +97,8 @@ public MySqlSnapshotSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, List remainingTables, - boolean isTableIdCaseSensitive) { + boolean isTableIdCaseSensitive, + SplitEnumeratorContext enumeratorContext) { this( sourceConfig, currentParallelism, @@ -108,13 +111,15 @@ public MySqlSnapshotSplitAssigner( remainingTables, isTableIdCaseSensitive, true, - ChunkSplitterState.NO_SPLITTING_TABLE_STATE); + ChunkSplitterState.NO_SPLITTING_TABLE_STATE, + enumeratorContext); } public MySqlSnapshotSplitAssigner( MySqlSourceConfig sourceConfig, int currentParallelism, - SnapshotPendingSplitsState checkpoint) { + SnapshotPendingSplitsState checkpoint, + SplitEnumeratorContext enumeratorContext) { this( sourceConfig, currentParallelism, @@ -127,7 +132,8 @@ public MySqlSnapshotSplitAssigner( checkpoint.getRemainingTables(), checkpoint.isTableIdCaseSensitive(), checkpoint.isRemainingTablesCheckpointed(), - checkpoint.getChunkSplitterState()); + checkpoint.getChunkSplitterState(), + enumeratorContext); } private MySqlSnapshotSplitAssigner( @@ -142,7 +148,8 @@ private MySqlSnapshotSplitAssigner( List remainingTables, boolean isTableIdCaseSensitive, boolean isRemainingTablesCheckpointed, - ChunkSplitterState chunkSplitterState) { + ChunkSplitterState chunkSplitterState, + SplitEnumeratorContext enumeratorContext) { this.sourceConfig = sourceConfig; this.currentParallelism = currentParallelism; this.alreadyProcessedTables = alreadyProcessedTables; @@ -168,10 +175,12 @@ private MySqlSnapshotSplitAssigner( createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSplitterState); this.partition = new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName()); + this.enumeratorContext = enumeratorContext; } @Override public void open() { + shouldEnterProcessingBacklog(); chunkSplitter.open(); discoveryCaptureTables(); captureNewlyAddedTables(); @@ -397,17 +406,20 @@ public List getFinishedSplitInfos() { @Override public void onFinishedSplits(Map splitFinishedOffsets) { this.splitFinishedOffsets.putAll(splitFinishedOffsets); - if (allSnapshotSplitsFinished() - && AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) { - // Skip the waiting checkpoint when current parallelism is 1 which means we do not need - // to care about the global output data order of snapshot splits and binlog split. - if (currentParallelism == 1) { - assignerStatus = assignerStatus.onFinish(); - LOG.info( - "Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status."); - } else { - LOG.info( - "Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished."); + if (allSnapshotSplitsFinished()) { + enumeratorContext.setIsProcessingBacklog(false); + if (AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) { + // Skip the waiting checkpoint when current parallelism is 1 which means we do not + // need + // to care about the global output data order of snapshot splits and binlog split. + if (currentParallelism == 1) { + assignerStatus = assignerStatus.onFinish(); + LOG.info( + "Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status."); + } else { + LOG.info( + "Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished."); + } } } } @@ -607,4 +619,10 @@ private static MySqlChunkSplitter createChunkSplitter( } return new MySqlChunkSplitter(mySqlSchema, sourceConfig); } + + private void shouldEnterProcessingBacklog() { + if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING) { + enumeratorContext.setIsProcessingBacklog(true); + } + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index f9c070d964b..17ade8f0f95 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -86,6 +86,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; +import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -1182,7 +1183,11 @@ private List getMySqlSplits( final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( - sourceConfig, DEFAULT_PARALLELISM, remainingTables, false); + sourceConfig, + DEFAULT_PARALLELISM, + remainingTables, + false, + getMySqlSplitEnumeratorContext()); assigner.open(); List mySqlSplits = new ArrayList<>(); while (true) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java index 147fe441a1b..2eb7f3b342a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReaderTest.java @@ -50,6 +50,7 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -629,7 +630,11 @@ private List getMySqlSplits( MySqlSourceConfig sourceConfig, List remainingTables) { final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( - sourceConfig, DEFAULT_PARALLELISM, remainingTables, false); + sourceConfig, + DEFAULT_PARALLELISM, + remainingTables, + false, + getMySqlSplitEnumeratorContext()); assigner.open(); List mySqlSplitList = new ArrayList<>(); while (true) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java index 134f426bec0..3521a3506ed 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssignerTest.java @@ -29,6 +29,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; +import org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.table.api.DataTypes; @@ -50,6 +51,8 @@ import java.util.Optional; import java.util.stream.Collectors; +import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -109,8 +112,11 @@ public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() { ChunkSplitterState.NO_SPLITTING_TABLE_STATE); HybridPendingSplitsState checkpoint = new HybridPendingSplitsState(snapshotPendingSplitsState, false); + MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext = + getMySqlSplitEnumeratorContext(); final MySqlHybridSplitAssigner assigner = - new MySqlHybridSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint); + new MySqlHybridSplitAssigner( + configuration, DEFAULT_PARALLELISM, checkpoint, enumeratorContext); // step 2. Get the MySqlBinlogSplit after all snapshot splits finished Optional binlogSplit = assigner.getNext(); @@ -152,7 +158,12 @@ public void testAssigningInSnapshotOnlyMode() { // Create and initialize assigner MySqlHybridSplitAssigner assigner = - new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList<>(), false); + new MySqlHybridSplitAssigner( + sourceConfig, + 1, + new ArrayList<>(), + false, + getMySqlSplitEnumeratorContext()); assigner.open(); // Get all snapshot splits @@ -201,6 +212,57 @@ private MySqlSourceConfig getConfig(String[] captureTables, StartupOptions start .createConfig(0); } + @Test + public void testSetProcessingBacklog() { + final String captureTable = "customers"; + MySqlSourceConfig configuration = getConfig(new String[] {captureTable}); + MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext = + getMySqlSplitEnumeratorContext(); + final MySqlHybridSplitAssigner assigner = + new MySqlHybridSplitAssigner( + configuration, + DEFAULT_PARALLELISM, + new ArrayList<>(), + false, + enumeratorContext); + assertThat(enumeratorContext.isProcessingBacklog()).isFalse(); + assigner.open(); + assertThat(enumeratorContext.isProcessingBacklog()).isTrue(); + // Get all snapshot splits + List snapshotSplits = drainSnapshotSplits(assigner); + Map finishedOffsets = new HashMap<>(); + int i = 0; + for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) { + BinlogOffset binlogOffset = + BinlogOffset.builder().setBinlogFilePosition("foo", i++).build(); + finishedOffsets.put(snapshotSplit.splitId(), binlogOffset); + } + assigner.onFinishedSplits(finishedOffsets); + assertThat(enumeratorContext.isProcessingBacklog()).isFalse(); + assigner.close(); + } + + private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) { + String[] captureTableIds = + Arrays.stream(captureTables) + .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) + .toArray(String[]::new); + + return new MySqlSourceConfigFactory() + .startupOptions(StartupOptions.initial()) + .databaseList(customerDatabase.getDatabaseName()) + .tableList(captureTableIds) + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(customerDatabase.getUsername()) + .password(customerDatabase.getPassword()) + .serverTimeZone(ZoneId.of("UTC").toString()); + } + + private MySqlSourceConfig getConfig(String[] captureTables) { + return getConfigFactory(captureTables).createConfig(0); + } + private List drainSnapshotSplits(MySqlHybridSplitAssigner assigner) { List snapshotSplits = new ArrayList<>(); while (true) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java index de875a0ed75..7f16cab3f0d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java @@ -50,6 +50,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND; import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest; +import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -475,7 +476,11 @@ public void testEnumerateTablesLazily() { final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( - configuration, DEFAULT_PARALLELISM, new ArrayList<>(), false); + configuration, + DEFAULT_PARALLELISM, + new ArrayList<>(), + false, + getMySqlSplitEnumeratorContext()); assertTrue(assigner.needToDiscoveryTables()); assigner.open(); @@ -549,7 +554,11 @@ private List getTestAssignSnapshotSplits( .collect(Collectors.toList()); final MySqlSnapshotSplitAssigner assigner = new MySqlSnapshotSplitAssigner( - configuration, DEFAULT_PARALLELISM, remainingTables, false); + configuration, + DEFAULT_PARALLELISM, + remainingTables, + false, + getMySqlSplitEnumeratorContext()); return getSplitsFromAssigner(assigner); } @@ -642,7 +651,11 @@ private List getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as true, ChunkSplitterState.NO_SPLITTING_TABLE_STATE); final MySqlSnapshotSplitAssigner assigner = - new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint); + new MySqlSnapshotSplitAssigner( + configuration, + DEFAULT_PARALLELISM, + checkpoint, + getMySqlSplitEnumeratorContext()); return getSplitsFromAssigner(assigner); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 2f674b612d9..9dff0446c27 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -99,6 +99,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isHighWatermarkEvent; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent; import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.isWatermarkEvent; +import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext; import static org.apache.flink.core.io.InputStatus.MORE_AVAILABLE; import static org.apache.flink.util.Preconditions.checkState; import static org.junit.Assert.assertEquals; @@ -404,7 +405,8 @@ public void testRemoveSplitAccordingToNewFilter() throws Exception { sourceConfig, DEFAULT_PARALLELISM, tableNames.stream().map(TableId::parse).collect(Collectors.toList()), - false); + false, + getMySqlSplitEnumeratorContext()); assigner.open(); List splits = new ArrayList<>(); MySqlSnapshotSplit split = (MySqlSnapshotSplit) assigner.getNext().get(); @@ -459,7 +461,8 @@ public void testNoDuplicateRecordsWhenKeepUpdating() throws Exception { sourceConfig, DEFAULT_PARALLELISM, Collections.singletonList(TableId.parse(tableName)), - false); + false, + getMySqlSplitEnumeratorContext()); assigner.open(); MySqlSnapshotSplit snapshotSplit = (MySqlSnapshotSplit) assigner.getNext().get(); // should contain only one split diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/MockMySqlSplitEnumeratorEnumeratorContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/MockMySqlSplitEnumeratorEnumeratorContext.java new file mode 100644 index 00000000000..fe547dad0ae --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/MockMySqlSplitEnumeratorEnumeratorContext.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext; +import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit; + +/** A mock enumerator context to record isProcessingBacklog. */ +public class MockMySqlSplitEnumeratorEnumeratorContext + extends MockSplitEnumeratorContext { + private boolean isProcessingBacklog = false; + + public MockMySqlSplitEnumeratorEnumeratorContext(int parallelism) { + super(parallelism); + } + + @Override + public void setIsProcessingBacklog(boolean isProcessingBacklog) { + this.isProcessingBacklog = isProcessingBacklog; + } + + public boolean isProcessingBacklog() { + return isProcessingBacklog; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MetricsUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MetricsUtils.java new file mode 100644 index 00000000000..f4d10b472ae --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MetricsUtils.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.testutils; + +import org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext; + +/** The test utils for metrics. */ +public class MetricsUtils { + public static MockMySqlSplitEnumeratorEnumeratorContext getMySqlSplitEnumeratorContext() { + return new MockMySqlSplitEnumeratorEnumeratorContext(1); + } +}