From 163110d978b7c9c28f09fe7f5f2c1aa9bcedfc7b Mon Sep 17 00:00:00 2001 From: joyCurry30 Date: Thu, 11 Jan 2024 14:47:25 +0800 Subject: [PATCH] [cdc-e2e] Add e2e base api. --- .../flink-cdc-pipeline-e2e-base/pom.xml | 85 ++++++++++++++++-- .../connectors/tests/ContainerStartUp.java | 7 ++ .../tests/DockerContainerProvider.java | 89 +++++++++++++++++++ .../tests/SupportedFlinkVersion.java | 45 ++++++++++ .../cdc/connectors/tests/TestCase.java | 8 ++ .../cdc/connectors/tests/TestExecutor.java | 28 ++++++ .../cdc/connectors/tests/TestLifeCycle.java | 5 ++ .../test/resources/setup/mysql/mysql_init.sql | 28 ++++++ .../flink-cdc-pipeline-e2e-doris/pom.xml | 20 +++++ .../flink-cdc-pipeline-e2e-tests/pom.xml | 1 + 10 files changed, 308 insertions(+), 8 deletions(-) create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/ContainerStartUp.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/DockerContainerProvider.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/SupportedFlinkVersion.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestCase.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestExecutor.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestLifeCycle.java create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/resources/setup/mysql/mysql_init.sql create mode 100644 flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-doris/pom.xml diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/pom.xml index 21f3ae72d20..259d106a435 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/pom.xml @@ -20,21 +20,90 @@ - - - org.testcontainers - testcontainers - ${testcontainer.version} - test - - org.testcontainers testcontainers-bom ${testcontainer.version} pom + import + + + + org.slf4j + slf4j-simple + ${slf4j.version} + test + + + + + org.testcontainers + testcontainers + test + + + + org.testcontainers + mysql + test + + + + com.mysql + mysql-connector-j + 8.1.0 + test + + + + org.testcontainers + postgresql + test + + + + org.postgresql + postgresql + 42.6.0 + test + + + + + com.ververica + flink-cdc-common + ${project.version} + test + + + com.ververica + flink-cdc-runtime + ${project.version} + test + + + com.ververica + flink-cdc-cli + ${project.version} + test + + + com.ververica + flink-cdc-composer + ${project.version} + test + + + + + com.ververica + flink-cdc-pipeline-connector-values + ${project.version} + test + + \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/ContainerStartUp.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/ContainerStartUp.java new file mode 100644 index 00000000000..90d332ce3c2 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/ContainerStartUp.java @@ -0,0 +1,7 @@ +package com.ververtica.cdc.connectors.tests; + +import org.testcontainers.containers.GenericContainer; + +public interface ContainerStartUp { + void startUp(GenericContainer container) throws Exception; +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/DockerContainerProvider.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/DockerContainerProvider.java new file mode 100644 index 00000000000..d4df07f0766 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/DockerContainerProvider.java @@ -0,0 +1,89 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververtica.cdc.connectors.tests; + +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; + +/** {@link DockerContainerProvider} is responsible for providing docker containers for the tests. */ +public class DockerContainerProvider { + + @ClassRule public static final Network NETWORK = Network.newNetwork(); + + @Rule public final TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * Provides a flink container based on the flink version specified by the flink summaryVersion. + * + * @param flinkSummaryVersion the flink summaryVersion. + * @return the flink container + */ + public static GenericContainer provideFlinkContainer(String flinkSummaryVersion) { + switch (SupportedFlinkVersion.of(flinkSummaryVersion)) { + case FLINK_1_14: + return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.11.3_2.12"); + case FLINK_1_15: + return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.12.2_2.12"); + case FLINK_1_16: + return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.13.0_2.12"); + case FLINK_1_17: + return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.17.0_2.12"); + case FLINK_1_18: + return new GenericContainer<>("ververica/flink-cdc-e2e-tests:1.18.0_2.12"); + default: + throw new IllegalArgumentException( + "Unsupported Flink version: " + System.getProperty("flink.version")); + } + } + + /** + * Provides a datasource container based on the datasource type. + * + * @param datasourceType the datasource type. + * @return the datasource container + */ + public static GenericContainer provideDatasourceContainer(String datasourceType) { + switch (datasourceType) { + case "mysql": + return new MySQLContainer<>(DockerImageName.parse("mysql:8.0.26")) + .withNetwork(NETWORK) + .withUsername("mysqluser") + .withPassword("mysqlpw"); + case "postgres": + return new PostgreSQLContainer<>(DockerImageName.parse("postgres:15-alpine")) + .withNetwork(NETWORK) + .withUsername("postgres") + .withPassword("postgres"); + case "sqlserver": + return new GenericContainer<>("ververica/flink-cdc-e2e-tests:sqlserver-2019"); + case "oracle": + return new GenericContainer<>("ververica/flink-cdc-e2e-tests:oracle-11g"); + case "doris": + return new GenericContainer<>(DockerImageName.parse("yagagagaga/doris-standalone")) + .withNetwork(NETWORK); + default: + throw new IllegalArgumentException( + "Unsupported datasource type: " + datasourceType); + } + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/SupportedFlinkVersion.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/SupportedFlinkVersion.java new file mode 100644 index 00000000000..4de28d02e75 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/SupportedFlinkVersion.java @@ -0,0 +1,45 @@ +package com.ververtica.cdc.connectors.tests; + +public enum SupportedFlinkVersion { + FLINK_1_14("1.14", "1.14.6"), + + FLINK_1_15("1.15", "1.15.4"), + + FLINK_1_16("1.16", "1.16.3"), + + FLINK_1_17("1.17", "1.17.2"), + + FLINK_1_18("1.18", "1.18.0"), + ; + + private final String summaryVersion; + + public final String flinkVersion; + + SupportedFlinkVersion(String summaryVersion, String flinkVersion) { + this.summaryVersion = summaryVersion; + this.flinkVersion = flinkVersion; + } + + public String flinkVersion() { + return flinkVersion; + } + + public static String getFlinkVersion(String summaryVersion) { + for (SupportedFlinkVersion supportedFlinkVersion : SupportedFlinkVersion.values()) { + if (supportedFlinkVersion.summaryVersion.equals(summaryVersion)) { + return supportedFlinkVersion.flinkVersion; + } + } + throw new IllegalArgumentException("Unsupported Flink version: " + summaryVersion); + } + + public static SupportedFlinkVersion of(String summaryVersion) { + for (SupportedFlinkVersion supportedFlinkVersion : SupportedFlinkVersion.values()) { + if (supportedFlinkVersion.summaryVersion.equals(summaryVersion)) { + return supportedFlinkVersion; + } + } + throw new IllegalArgumentException("Unsupported Flink version: " + summaryVersion); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestCase.java new file mode 100644 index 00000000000..6f7db573ce6 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestCase.java @@ -0,0 +1,8 @@ +package com.ververtica.cdc.connectors.tests; + +public interface TestCase { + + void run() throws Exception; + + void check() throws Exception; +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestExecutor.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestExecutor.java new file mode 100644 index 00000000000..c5c5c15c16e --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestExecutor.java @@ -0,0 +1,28 @@ +/* + * Copyright 2023 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververtica.cdc.connectors.tests; + +import com.ververica.cdc.cli.CliFrontend; + +/** {@link TestExecutor} is responsible for executing the tests. */ +public class TestExecutor { + + void execute(String testConfigPath) throws Exception { + String[] extArgs = {testConfigPath, "--use-mini-cluster", "true"}; + CliFrontend.main(extArgs); + } +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestLifeCycle.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestLifeCycle.java new file mode 100644 index 00000000000..e90afeb17f1 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/java/com/ververtica/cdc/connectors/tests/TestLifeCycle.java @@ -0,0 +1,5 @@ +package com.ververtica.cdc.connectors.tests; + +public class TestLifeCycle { + +} diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/resources/setup/mysql/mysql_init.sql b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/resources/setup/mysql/mysql_init.sql new file mode 100644 index 00000000000..8586a848947 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-base/src/test/resources/setup/mysql/mysql_init.sql @@ -0,0 +1,28 @@ +-- Copyright 2023 Ververica Inc. +-- +-- Licensed under the Apache License, Version 2.0 (the "License"); +-- you may not use this file except in compliance with the License. +-- You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +-- In production you would almost certainly limit the replication user must be on the follower (slave) machine, +-- to prevent other clients accessing the log from other machines. For example, 'replicator'@'follower.acme.com'. +-- However, in this database we'll grant 2 users different privileges: +-- +-- 1) 'flinkuser' - all privileges required by the snapshot reader AND binlog reader (used for testing) +-- 2) 'mysqluser' - all privileges +-- +GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT, LOCK TABLES ON *.* TO 'flinkuser'@'%'; +CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw'; +GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%'; + +-- ---------------------------------------------------------------------------------------------------------------- +-- DATABASE: emptydb +-- ---------------------------------------------------------------------------------------------------------------- +CREATE DATABASE emptydb; diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-doris/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-doris/pom.xml new file mode 100644 index 00000000000..6e5a9218af1 --- /dev/null +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/flink-cdc-pipeline-e2e-doris/pom.xml @@ -0,0 +1,20 @@ + + + 4.0.0 + + flink-cdc-pipeline-e2e-tests + com.ververica + ${revision} + + + flink-cdc-pipeline-e2e-doris + + + 8 + 8 + UTF-8 + + + \ No newline at end of file diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml index a963d5e4200..36b2a1d2714 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml @@ -28,6 +28,7 @@ under the License. flink-cdc-pipeline-e2e-base + flink-cdc-pipeline-e2e-doris \ No newline at end of file