Skip to content

Commit

Permalink
[cdc-e2e] Add e2e base api.
Browse files Browse the repository at this point in the history
  • Loading branch information
joyCurry30 committed Jan 15, 2024
1 parent 472ccf6 commit 163110d
Show file tree
Hide file tree
Showing 10 changed files with 308 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,90 @@

<dependencyManagement>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.testcontainers/testcontainers -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${testcontainer.version}</version>
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.testcontainers/testcontainers-bom -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers-bom</artifactId>
<version>${testcontainer.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>

<!-- TestContainer dependencies -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>mysql</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.1.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>postgresql</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.6.0</version>
<scope>test</scope>
</dependency>

<!-- Flink CDC dependencies -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-common</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-runtime</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-cli</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-composer</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<!--Flink CDC Connectors Dependencies-->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-pipeline-connector-values</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.ververtica.cdc.connectors.tests;

import org.testcontainers.containers.GenericContainer;

public interface ContainerStartUp {
void startUp(GenericContainer<?> container) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververtica.cdc.connectors.tests;

import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;

/** {@link DockerContainerProvider} is responsible for providing docker containers for the tests. */
public class DockerContainerProvider {

@ClassRule public static final Network NETWORK = Network.newNetwork();

@Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder();

/**
* Provides a flink container based on the flink version specified by the flink summaryVersion.
*
* @param flinkSummaryVersion the flink summaryVersion.
* @return the flink container
*/
public static GenericContainer<?> provideFlinkContainer(String flinkSummaryVersion) {
switch (SupportedFlinkVersion.of(flinkSummaryVersion)) {
case FLINK_1_14:
return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.11.3_2.12");
case FLINK_1_15:
return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.12.2_2.12");
case FLINK_1_16:
return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.13.0_2.12");
case FLINK_1_17:
return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.17.0_2.12");
case FLINK_1_18:
return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.18.0_2.12");
default:
throw new IllegalArgumentException(
"Unsupported Flink version: " + System.getProperty("flink.version"));
}
}

/**
* Provides a datasource container based on the datasource type.
*
* @param datasourceType the datasource type.
* @return the datasource container
*/
public static GenericContainer<?> provideDatasourceContainer(String datasourceType) {
switch (datasourceType) {
case "mysql":
return new MySQLContainer<>(DockerImageName.parse("mysql:8.0.26"))
.withNetwork(NETWORK)
.withUsername("mysqluser")
.withPassword("mysqlpw");
case "postgres":
return new PostgreSQLContainer<>(DockerImageName.parse("postgres:15-alpine"))
.withNetwork(NETWORK)
.withUsername("postgres")
.withPassword("postgres");
case "sqlserver":
return new GenericContainer<>("ververica/flink-cdc-e2e-tests:sqlserver-2019");
case "oracle":
return new GenericContainer<>("ververica/flink-cdc-e2e-tests:oracle-11g");
case "doris":
return new GenericContainer<>(DockerImageName.parse("yagagagaga/doris-standalone"))
.withNetwork(NETWORK);
default:
throw new IllegalArgumentException(
"Unsupported datasource type: " + datasourceType);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.ververtica.cdc.connectors.tests;

public enum SupportedFlinkVersion {
FLINK_1_14("1.14", "1.14.6"),

FLINK_1_15("1.15", "1.15.4"),

FLINK_1_16("1.16", "1.16.3"),

FLINK_1_17("1.17", "1.17.2"),

FLINK_1_18("1.18", "1.18.0"),
;

private final String summaryVersion;

public final String flinkVersion;

SupportedFlinkVersion(String summaryVersion, String flinkVersion) {
this.summaryVersion = summaryVersion;
this.flinkVersion = flinkVersion;
}

public String flinkVersion() {
return flinkVersion;
}

public static String getFlinkVersion(String summaryVersion) {
for (SupportedFlinkVersion supportedFlinkVersion : SupportedFlinkVersion.values()) {
if (supportedFlinkVersion.summaryVersion.equals(summaryVersion)) {
return supportedFlinkVersion.flinkVersion;
}
}
throw new IllegalArgumentException("Unsupported Flink version: " + summaryVersion);
}

public static SupportedFlinkVersion of(String summaryVersion) {
for (SupportedFlinkVersion supportedFlinkVersion : SupportedFlinkVersion.values()) {
if (supportedFlinkVersion.summaryVersion.equals(summaryVersion)) {
return supportedFlinkVersion;
}
}
throw new IllegalArgumentException("Unsupported Flink version: " + summaryVersion);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.ververtica.cdc.connectors.tests;

public interface TestCase {

void run() throws Exception;

void check() throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2023 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververtica.cdc.connectors.tests;

import com.ververica.cdc.cli.CliFrontend;

/** {@link TestExecutor} is responsible for executing the tests. */
public class TestExecutor {

void execute(String testConfigPath) throws Exception {
String[] extArgs = {testConfigPath, "--use-mini-cluster", "true"};
CliFrontend.main(extArgs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.ververtica.cdc.connectors.tests;

public class TestLifeCycle {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Copyright 2023 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- In production you would almost certainly limit the replication user must be on the follower (slave) machine,
-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'.
-- However, in this database we'll grant 2 users different privileges:
--
-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing)
-- 2) 'mysqluser' - all privileges
--
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%';
CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: emptydb
-- ----------------------------------------------------------------------------------------------------------------
CREATE DATABASE emptydb;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>flink-cdc-pipeline-e2e-tests</artifactId>
<groupId>com.ververica</groupId>
<version>${revision}</version>
</parent>

<artifactId>flink-cdc-pipeline-e2e-doris</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

</project>
1 change: 1 addition & 0 deletions flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ under the License.

<modules>
<module>flink-cdc-pipeline-e2e-base</module>
<module>flink-cdc-pipeline-e2e-doris</module>
</modules>

</project>

0 comments on commit 163110d

Please sign in to comment.