Skip to content

Commit

Permalink
[FLINK-36811][mysql] MySQL cdc setIsProcessingBacklog in snapshot pha…
Browse files Browse the repository at this point in the history
…se and exit when snapshot phase finished
  • Loading branch information
Shawn-Hx committed Dec 12, 2024
1 parent 9f8268c commit 9ceb326
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
sourceConfig,
enumContext.currentParallelism(),
new ArrayList<>(),
isTableIdCaseSensitive);
isTableIdCaseSensitive,
enumContext);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e);
Expand All @@ -233,7 +234,8 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint);
(HybridPendingSplitsState) checkpoint,
enumContext);
} else if (checkpoint instanceof BinlogPendingSplitsState) {
splitAssigner =
new MySqlBinlogSplitAssigner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.source.assigners;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
Expand Down Expand Up @@ -59,23 +60,32 @@ public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
boolean isTableIdCaseSensitive,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
sourceConfig,
currentParallelism,
remainingTables,
isTableIdCaseSensitive,
enumeratorContext),
false,
sourceConfig.getSplitMetaGroupSize());
}

public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
HybridPendingSplitsState checkpoint) {
HybridPendingSplitsState checkpoint,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
sourceConfig,
currentParallelism,
checkpoint.getSnapshotPendingSplits(),
enumeratorContext),
checkpoint.isBinlogSplitAssigned(),
sourceConfig.getSplitMetaGroupSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.source.assigners;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final int currentParallelism;
private final List<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private final SplitEnumeratorContext<MySqlSplit> enumeratorContext;

private final MySqlPartition partition;
private final Object lock = new Object();
Expand All @@ -95,7 +97,8 @@ public MySqlSnapshotSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
boolean isTableIdCaseSensitive,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
currentParallelism,
Expand All @@ -108,13 +111,15 @@ public MySqlSnapshotSplitAssigner(
remainingTables,
isTableIdCaseSensitive,
true,
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
ChunkSplitterState.NO_SPLITTING_TABLE_STATE,
enumeratorContext);
}

public MySqlSnapshotSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
SnapshotPendingSplitsState checkpoint) {
SnapshotPendingSplitsState checkpoint,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
currentParallelism,
Expand All @@ -127,7 +132,8 @@ public MySqlSnapshotSplitAssigner(
checkpoint.getRemainingTables(),
checkpoint.isTableIdCaseSensitive(),
checkpoint.isRemainingTablesCheckpointed(),
checkpoint.getChunkSplitterState());
checkpoint.getChunkSplitterState(),
enumeratorContext);
}

private MySqlSnapshotSplitAssigner(
Expand All @@ -142,7 +148,8 @@ private MySqlSnapshotSplitAssigner(
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed,
ChunkSplitterState chunkSplitterState) {
ChunkSplitterState chunkSplitterState,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this.sourceConfig = sourceConfig;
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
Expand All @@ -168,10 +175,12 @@ private MySqlSnapshotSplitAssigner(
createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSplitterState);
this.partition =
new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
this.enumeratorContext = enumeratorContext;
}

@Override
public void open() {
shouldEnterProcessingBacklog();
chunkSplitter.open();
discoveryCaptureTables();
captureNewlyAddedTables();
Expand Down Expand Up @@ -397,17 +406,20 @@ public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
@Override
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
if (allSnapshotSplitsFinished()
&& AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerStatus = assignerStatus.onFinish();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
if (allSnapshotSplitsFinished()) {
enumeratorContext.setIsProcessingBacklog(false);
if (AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not
// need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerStatus = assignerStatus.onFinish();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
}
}
}
}
Expand Down Expand Up @@ -607,4 +619,10 @@ private static MySqlChunkSplitter createChunkSplitter(
}
return new MySqlChunkSplitter(mySqlSchema, sourceConfig);
}

private void shouldEnterProcessingBacklog() {
if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING) {
enumeratorContext.setIsProcessingBacklog(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -1182,7 +1183,11 @@ private List<MySqlSnapshotSplit> getMySqlSplits(

final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
sourceConfig,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
assigner.open();
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -629,7 +630,11 @@ private List<MySqlSplit> getMySqlSplits(
MySqlSourceConfig sourceConfig, List<TableId> remainingTables) {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
sourceConfig,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
assigner.open();
List<MySqlSplit> mySqlSplitList = new ArrayList<>();
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.api.DataTypes;
Expand All @@ -50,6 +51,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -109,8 +112,11 @@ public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
HybridPendingSplitsState checkpoint =
new HybridPendingSplitsState(snapshotPendingSplitsState, false);
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
getMySqlSplitEnumeratorContext();
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
new MySqlHybridSplitAssigner(
configuration, DEFAULT_PARALLELISM, checkpoint, enumeratorContext);

// step 2. Get the MySqlBinlogSplit after all snapshot splits finished
Optional<MySqlSplit> binlogSplit = assigner.getNext();
Expand Down Expand Up @@ -152,7 +158,12 @@ public void testAssigningInSnapshotOnlyMode() {

// Create and initialize assigner
MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList<>(), false);
new MySqlHybridSplitAssigner(
sourceConfig,
1,
new ArrayList<>(),
false,
getMySqlSplitEnumeratorContext());
assigner.open();

// Get all snapshot splits
Expand Down Expand Up @@ -201,6 +212,57 @@ private MySqlSourceConfig getConfig(String[] captureTables, StartupOptions start
.createConfig(0);
}

@Test
public void testSetProcessingBacklog() {
final String captureTable = "customers";
MySqlSourceConfig configuration = getConfig(new String[] {captureTable});
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
getMySqlSplitEnumeratorContext();
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(
configuration,
DEFAULT_PARALLELISM,
new ArrayList<>(),
false,
enumeratorContext);
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
assigner.open();
assertThat(enumeratorContext.isProcessingBacklog()).isTrue();
// Get all snapshot splits
List<MySqlSnapshotSplit> snapshotSplits = drainSnapshotSplits(assigner);
Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
int i = 0;
for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) {
BinlogOffset binlogOffset =
BinlogOffset.builder().setBinlogFilePosition("foo", i++).build();
finishedOffsets.put(snapshotSplit.splitId(), binlogOffset);
}
assigner.onFinishedSplits(finishedOffsets);
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
assigner.close();
}

private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList(customerDatabase.getDatabaseName())
.tableList(captureTableIds)
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString());
}

private MySqlSourceConfig getConfig(String[] captureTables) {
return getConfigFactory(captureTables).createConfig(0);
}

private List<MySqlSnapshotSplit> drainSnapshotSplits(MySqlHybridSplitAssigner assigner) {
List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -475,7 +476,11 @@ public void testEnumerateTablesLazily() {

final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, new ArrayList<>(), false);
configuration,
DEFAULT_PARALLELISM,
new ArrayList<>(),
false,
getMySqlSplitEnumeratorContext());

assertTrue(assigner.needToDiscoveryTables());
assigner.open();
Expand Down Expand Up @@ -549,7 +554,11 @@ private List<String> getTestAssignSnapshotSplits(
.collect(Collectors.toList());
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, remainingTables, false);
configuration,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
return getSplitsFromAssigner(assigner);
}

Expand Down Expand Up @@ -642,7 +651,11 @@ private List<String> getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as
true,
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
new MySqlSnapshotSplitAssigner(
configuration,
DEFAULT_PARALLELISM,
checkpoint,
getMySqlSplitEnumeratorContext());
return getSplitsFromAssigner(assigner);
}

Expand Down
Loading

0 comments on commit 9ceb326

Please sign in to comment.