Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36811][mysql] MySQL cdc setIsProcessingBacklog in snapshot phase and exit when snapshot phase finished #3793

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> createEnumerator(
sourceConfig,
enumContext.currentParallelism(),
new ArrayList<>(),
isTableIdCaseSensitive);
isTableIdCaseSensitive,
enumContext);
} catch (Exception e) {
throw new FlinkRuntimeException(
"Failed to discover captured tables for enumerator", e);
Expand All @@ -233,7 +234,8 @@ public SplitEnumerator<MySqlSplit, PendingSplitsState> restoreEnumerator(
new MySqlHybridSplitAssigner(
sourceConfig,
enumContext.currentParallelism(),
(HybridPendingSplitsState) checkpoint);
(HybridPendingSplitsState) checkpoint,
enumContext);
} else if (checkpoint instanceof BinlogPendingSplitsState) {
splitAssigner =
new MySqlBinlogSplitAssigner(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.source.assigners;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.HybridPendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.PendingSplitsState;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
Expand Down Expand Up @@ -59,23 +60,32 @@ public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
boolean isTableIdCaseSensitive,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, remainingTables, isTableIdCaseSensitive),
sourceConfig,
currentParallelism,
remainingTables,
isTableIdCaseSensitive,
enumeratorContext),
false,
sourceConfig.getSplitMetaGroupSize());
}

public MySqlHybridSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
HybridPendingSplitsState checkpoint) {
HybridPendingSplitsState checkpoint,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
new MySqlSnapshotSplitAssigner(
sourceConfig, currentParallelism, checkpoint.getSnapshotPendingSplits()),
sourceConfig,
currentParallelism,
checkpoint.getSnapshotPendingSplits(),
enumeratorContext),
checkpoint.isBinlogSplitAssigned(),
sourceConfig.getSplitMetaGroupSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.cdc.connectors.mysql.source.assigners;

import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.schema.MySqlSchema;
import org.apache.flink.cdc.connectors.mysql.source.assigners.state.ChunkSplitterState;
Expand Down Expand Up @@ -79,6 +80,7 @@ public class MySqlSnapshotSplitAssigner implements MySqlSplitAssigner {
private final int currentParallelism;
private final List<TableId> remainingTables;
private final boolean isRemainingTablesCheckpointed;
private final SplitEnumeratorContext<MySqlSplit> enumeratorContext;

private final MySqlPartition partition;
private final Object lock = new Object();
Expand All @@ -95,7 +97,8 @@ public MySqlSnapshotSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
List<TableId> remainingTables,
boolean isTableIdCaseSensitive) {
boolean isTableIdCaseSensitive,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
currentParallelism,
Expand All @@ -108,13 +111,15 @@ public MySqlSnapshotSplitAssigner(
remainingTables,
isTableIdCaseSensitive,
true,
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
ChunkSplitterState.NO_SPLITTING_TABLE_STATE,
enumeratorContext);
}

public MySqlSnapshotSplitAssigner(
MySqlSourceConfig sourceConfig,
int currentParallelism,
SnapshotPendingSplitsState checkpoint) {
SnapshotPendingSplitsState checkpoint,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this(
sourceConfig,
currentParallelism,
Expand All @@ -127,7 +132,8 @@ public MySqlSnapshotSplitAssigner(
checkpoint.getRemainingTables(),
checkpoint.isTableIdCaseSensitive(),
checkpoint.isRemainingTablesCheckpointed(),
checkpoint.getChunkSplitterState());
checkpoint.getChunkSplitterState(),
enumeratorContext);
}

private MySqlSnapshotSplitAssigner(
Expand All @@ -142,7 +148,8 @@ private MySqlSnapshotSplitAssigner(
List<TableId> remainingTables,
boolean isTableIdCaseSensitive,
boolean isRemainingTablesCheckpointed,
ChunkSplitterState chunkSplitterState) {
ChunkSplitterState chunkSplitterState,
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
this.sourceConfig = sourceConfig;
this.currentParallelism = currentParallelism;
this.alreadyProcessedTables = alreadyProcessedTables;
Expand All @@ -168,10 +175,12 @@ private MySqlSnapshotSplitAssigner(
createChunkSplitter(sourceConfig, isTableIdCaseSensitive, chunkSplitterState);
this.partition =
new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
this.enumeratorContext = enumeratorContext;
}

@Override
public void open() {
shouldEnterProcessingBacklog();
chunkSplitter.open();
discoveryCaptureTables();
captureNewlyAddedTables();
Expand Down Expand Up @@ -397,17 +406,20 @@ public List<FinishedSnapshotSplitInfo> getFinishedSplitInfos() {
@Override
public void onFinishedSplits(Map<String, BinlogOffset> splitFinishedOffsets) {
this.splitFinishedOffsets.putAll(splitFinishedOffsets);
if (allSnapshotSplitsFinished()
&& AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerStatus = assignerStatus.onFinish();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
if (allSnapshotSplitsFinished()) {
enumeratorContext.setIsProcessingBacklog(false);
if (AssignerStatus.isAssigningSnapshotSplits(assignerStatus)) {
// Skip the waiting checkpoint when current parallelism is 1 which means we do not
// need
// to care about the global output data order of snapshot splits and binlog split.
if (currentParallelism == 1) {
assignerStatus = assignerStatus.onFinish();
LOG.info(
"Snapshot split assigner received all splits finished and the job parallelism is 1, snapshot split assigner is turn into finished status.");
} else {
LOG.info(
"Snapshot split assigner received all splits finished, waiting for a complete checkpoint to mark the assigner finished.");
}
}
}
}
Expand Down Expand Up @@ -607,4 +619,10 @@ private static MySqlChunkSplitter createChunkSplitter(
}
return new MySqlChunkSplitter(mySqlSchema, sourceConfig);
}

private void shouldEnterProcessingBacklog() {
if (assignerStatus == AssignerStatus.INITIAL_ASSIGNING) {
enumeratorContext.setIsProcessingBacklog(true);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -1182,7 +1183,11 @@ private List<MySqlSnapshotSplit> getMySqlSplits(

final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
sourceConfig,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
assigner.open();
List<MySqlSnapshotSplit> mySqlSplits = new ArrayList<>();
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -629,7 +630,11 @@ private List<MySqlSplit> getMySqlSplits(
MySqlSourceConfig sourceConfig, List<TableId> remainingTables) {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
sourceConfig,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
assigner.open();
List<MySqlSplit> mySqlSplitList = new ArrayList<>();
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSchemalessSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.MockMySqlSplitEnumeratorEnumeratorContext;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.api.DataTypes;
Expand All @@ -50,6 +51,8 @@
import java.util.Optional;
import java.util.stream.Collectors;

import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -109,8 +112,11 @@ public void testAssignMySqlBinlogSplitAfterAllSnapshotSplitsFinished() {
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
HybridPendingSplitsState checkpoint =
new HybridPendingSplitsState(snapshotPendingSplitsState, false);
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
getMySqlSplitEnumeratorContext();
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
new MySqlHybridSplitAssigner(
configuration, DEFAULT_PARALLELISM, checkpoint, enumeratorContext);

// step 2. Get the MySqlBinlogSplit after all snapshot splits finished
Optional<MySqlSplit> binlogSplit = assigner.getNext();
Expand Down Expand Up @@ -152,7 +158,12 @@ public void testAssigningInSnapshotOnlyMode() {

// Create and initialize assigner
MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(sourceConfig, 1, new ArrayList<>(), false);
new MySqlHybridSplitAssigner(
sourceConfig,
1,
new ArrayList<>(),
false,
getMySqlSplitEnumeratorContext());
assigner.open();

// Get all snapshot splits
Expand Down Expand Up @@ -201,6 +212,57 @@ private MySqlSourceConfig getConfig(String[] captureTables, StartupOptions start
.createConfig(0);
}

@Test
public void testSetProcessingBacklog() {
final String captureTable = "customers";
MySqlSourceConfig configuration = getConfig(new String[] {captureTable});
MockMySqlSplitEnumeratorEnumeratorContext enumeratorContext =
getMySqlSplitEnumeratorContext();
final MySqlHybridSplitAssigner assigner =
new MySqlHybridSplitAssigner(
configuration,
DEFAULT_PARALLELISM,
new ArrayList<>(),
false,
enumeratorContext);
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
assigner.open();
assertThat(enumeratorContext.isProcessingBacklog()).isTrue();
// Get all snapshot splits
List<MySqlSnapshotSplit> snapshotSplits = drainSnapshotSplits(assigner);
Map<String, BinlogOffset> finishedOffsets = new HashMap<>();
int i = 0;
for (MySqlSnapshotSplit snapshotSplit : snapshotSplits) {
BinlogOffset binlogOffset =
BinlogOffset.builder().setBinlogFilePosition("foo", i++).build();
finishedOffsets.put(snapshotSplit.splitId(), binlogOffset);
}
assigner.onFinishedSplits(finishedOffsets);
assertThat(enumeratorContext.isProcessingBacklog()).isFalse();
assigner.close();
}

private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

return new MySqlSourceConfigFactory()
.startupOptions(StartupOptions.initial())
.databaseList(customerDatabase.getDatabaseName())
.tableList(captureTableIds)
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.password(customerDatabase.getPassword())
.serverTimeZone(ZoneId.of("UTC").toString());
}

private MySqlSourceConfig getConfig(String[] captureTables) {
return getConfigFactory(captureTables).createConfig(0);
}

private List<MySqlSnapshotSplit> drainSnapshotSplits(MySqlHybridSplitAssigner assigner) {
List<MySqlSnapshotSplit> snapshotSplits = new ArrayList<>();
while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset.ofEarliest;
import static org.apache.flink.cdc.connectors.mysql.testutils.MetricsUtils.getMySqlSplitEnumeratorContext;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -475,7 +476,11 @@ public void testEnumerateTablesLazily() {

final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, new ArrayList<>(), false);
configuration,
DEFAULT_PARALLELISM,
new ArrayList<>(),
false,
getMySqlSplitEnumeratorContext());

assertTrue(assigner.needToDiscoveryTables());
assigner.open();
Expand Down Expand Up @@ -549,7 +554,11 @@ private List<String> getTestAssignSnapshotSplits(
.collect(Collectors.toList());
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
configuration, DEFAULT_PARALLELISM, remainingTables, false);
configuration,
DEFAULT_PARALLELISM,
remainingTables,
false,
getMySqlSplitEnumeratorContext());
return getSplitsFromAssigner(assigner);
}

Expand Down Expand Up @@ -642,7 +651,11 @@ private List<String> getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as
true,
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(configuration, DEFAULT_PARALLELISM, checkpoint);
new MySqlSnapshotSplitAssigner(
configuration,
DEFAULT_PARALLELISM,
checkpoint,
getMySqlSplitEnumeratorContext());
return getSplitsFromAssigner(assigner);
}

Expand Down
Loading
Loading