Skip to content

Commit

Permalink
[cdc-base] Improve the Incremental Snapshot Interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
leonardBang committed Mar 27, 2022
1 parent 31c6659 commit 3ef4f2f
Show file tree
Hide file tree
Showing 110 changed files with 2,049 additions and 5,550 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
MySqlSource<String> mySqlChangeEventSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
Expand All @@ -93,7 +93,7 @@ public class MySqlSourceExample {
env.enableCheckpointing(3000);

env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.fromSource(mySqlChangeEventSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
Expand Down
4 changes: 2 additions & 2 deletions docs/content/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
MySqlSource<String> mySqlChangeEventSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
Expand All @@ -107,7 +107,7 @@ public class MySqlBinlogSourceExample {
env.enableCheckpointing(3000);

env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.fromSource(mySqlChangeEventSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
Expand Down
4 changes: 2 additions & 2 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ import com.ververica.cdc.connectors.mysql.source.MySqlSource;

public class MySqlSourceExample {
public static void main(String[] args) throws Exception {
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
MySqlSource<String> mySqlChangeEventSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("yourDatabaseName") // set captured database
Expand All @@ -497,7 +497,7 @@ public class MySqlSourceExample {
env.enableCheckpointing(3000);

env
.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.fromSource(mySqlChangeEventSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
// set 4 parallel source tasks
.setParallelism(4)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
Expand Down
23 changes: 21 additions & 2 deletions flink-connector-base/pom.xml → flink-cdc-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,9 @@ under the License.
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>flink-connector-base</artifactId>
<artifactId>flink-cdc-base</artifactId>

<dependencies>
<!-- Debezium dependencies -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-debezium</artifactId>
Expand Down Expand Up @@ -114,5 +113,25 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>


<!-- Debezium test dependencies -->

<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>${debezium.version}</version>
<scope>test</scope>
</dependency>


<!-- test dependencies on TestContainers -->

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.base.config;

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import io.debezium.config.Configuration;

import java.util.Properties;

/** A basic Source configuration which is used by {@link JdbcIncrementalSource}. */
public abstract class BaseSourceConfig implements SourceConfig {

private static final long serialVersionUID = 1L;

protected final StartupOptions startupOptions;
protected final int splitSize;
protected final int splitMetaGroupSize;
protected final double distributionFactorUpper;
protected final double distributionFactorLower;
protected final boolean includeSchemaChanges;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
// --------------------------------------------------------------------------------------------
protected final Properties dbzProperties;
protected transient Configuration dbzConfiguration;

public BaseSourceConfig(
StartupOptions startupOptions,
int splitSize,
int splitMetaGroupSize,
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
Properties dbzProperties,
Configuration dbzConfiguration) {
this.startupOptions = startupOptions;
this.splitSize = splitSize;
this.splitMetaGroupSize = splitMetaGroupSize;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
}

public StartupOptions getStartupOptions() {
return startupOptions;
}

public int getSplitSize() {
return splitSize;
}

public int getSplitMetaGroupSize() {
return splitMetaGroupSize;
}

public double getDistributionFactorUpper() {
return distributionFactorUpper;
}

public double getDistributionFactorLower() {
return distributionFactorLower;
}

public boolean isIncludeSchemaChanges() {
return includeSchemaChanges;
}

public Properties getDbzProperties() {
return dbzProperties;
}

public Configuration getDbzConfiguration() {
return Configuration.from(dbzProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
* limitations under the License.
*/

package com.ververica.cdc.connectors.base.source.config;
package com.ververica.cdc.connectors.base.config;

import com.ververica.cdc.connectors.base.source.ChangeEventHybridSource;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.base.source.JdbcIncrementalSource;
import io.debezium.config.Configuration;
import io.debezium.relational.RelationalDatabaseConnectorConfig;

import java.io.Serializable;
import java.time.Duration;
import java.util.List;
import java.util.Properties;

/** A basic Source configuration which is used by {@link ChangeEventHybridSource}. */
public class SourceConfig implements Serializable {
private static final long serialVersionUID = 1L;
/**
* A Source configuration which is used by {@link JdbcIncrementalSource} which used JDBC data
* source.
*/
public abstract class JdbcSourceConfig extends BaseSourceConfig {

protected final String driverClassName;
protected final String hostname;
Expand All @@ -37,67 +40,58 @@ public class SourceConfig implements Serializable {
protected final String password;
protected final List<String> databaseList;
protected final List<String> tableList;

protected final StartupOptions startupOptions;
protected final int splitSize;
protected final int splitMetaGroupSize;
protected final int fetchSize;
protected final String serverTimeZone;
protected final Duration connectTimeout;
protected final int connectMaxRetries;
protected final int connectionPoolSize;
protected final double distributionFactorUpper;
protected final double distributionFactorLower;
protected final boolean includeSchemaChanges;
// --------------------------------------------------------------------------------------------
// Debezium Configurations
// --------------------------------------------------------------------------------------------
protected final Properties dbzProperties;
protected final Configuration dbzConfiguration;

public SourceConfig(

public JdbcSourceConfig(
StartupOptions startupOptions,
List<String> databaseList,
List<String> tableList,
int splitSize,
int splitMetaGroupSize,
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
Properties dbzProperties,
Configuration dbzConfiguration,
String driverClassName,
String hostname,
int port,
String username,
String password,
List<String> databaseList,
List<String> tableList,
StartupOptions startupOptions,
int splitSize,
int splitMetaGroupSize,
int fetchSize,
String serverTimeZone,
Duration connectTimeout,
int connectMaxRetries,
int connectionPoolSize,
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
Properties dbzProperties,
Configuration dbzConfiguration) {
int connectionPoolSize) {
super(
startupOptions,
splitSize,
splitMetaGroupSize,
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
dbzProperties,
dbzConfiguration);
this.driverClassName = driverClassName;
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.databaseList = databaseList;
this.tableList = tableList;
this.startupOptions = startupOptions;
this.splitSize = splitSize;
this.splitMetaGroupSize = splitMetaGroupSize;
this.fetchSize = fetchSize;
this.serverTimeZone = serverTimeZone;
this.connectTimeout = connectTimeout;
this.connectMaxRetries = connectMaxRetries;
this.connectionPoolSize = connectionPoolSize;
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.dbzProperties = dbzProperties;
this.dbzConfiguration = dbzConfiguration;
}

public abstract RelationalDatabaseConnectorConfig getDbzConnectorConfig();

public String getDriverClassName() {
return driverClassName;
}
Expand Down Expand Up @@ -126,18 +120,6 @@ public List<String> getTableList() {
return tableList;
}

public StartupOptions getStartupOptions() {
return startupOptions;
}

public int getSplitSize() {
return splitSize;
}

public int getSplitMetaGroupSize() {
return splitMetaGroupSize;
}

public int getFetchSize() {
return fetchSize;
}
Expand All @@ -157,24 +139,4 @@ public int getConnectMaxRetries() {
public int getConnectionPoolSize() {
return connectionPoolSize;
}

public double getDistributionFactorUpper() {
return distributionFactorUpper;
}

public double getDistributionFactorLower() {
return distributionFactorLower;
}

public boolean isIncludeSchemaChanges() {
return includeSchemaChanges;
}

public Properties getDbzProperties() {
return dbzProperties;
}

public Configuration getDbzConfiguration() {
return dbzConfiguration;
}
}
Loading

0 comments on commit 3ef4f2f

Please sign in to comment.