From e349cb1a836ec610a8f6beeb580f804eb4ac466c Mon Sep 17 00:00:00 2001 From: Simonas Gelazevicius Date: Fri, 27 Aug 2021 10:10:32 +0300 Subject: [PATCH] [vitess] Add Vitess CDC connector (#456) Co-authored-by: gintarasm --- README.md | 1 + docs/content/about.md | 5 +- docs/content/connectors/index.md | 1 + docs/content/connectors/vitess-cdc.md | 309 +++++++++++++++++ flink-connector-vitess-cdc/pom.xml | 166 +++++++++ .../cdc/connectors/vitess/VitessSource.java | 246 ++++++++++++++ .../connectors/vitess/VitessValidator.java | 46 +++ .../connectors/vitess/config/TabletType.java | 41 +++ .../vitess/config/VtctldConfig.java | 134 ++++++++ .../vitess/table/VitessTableFactory.java | 195 +++++++++++ .../vitess/table/VitessTableSource.java | 230 +++++++++++++ .../org.apache.flink.table.factories.Factory | 16 + .../connectors/vitess/VitessSourceTest.java | 319 ++++++++++++++++++ .../cdc/connectors/vitess/VitessTestBase.java | 106 ++++++ .../vitess/container/VitessContainer.java | 121 +++++++ .../vitess/table/VitessConnectorITCase.java | 264 +++++++++++++++ .../vitess/table/VitessTableFactoryTest.java | 202 +++++++++++ .../test/resources/ddl/column_type_test.sql | 44 +++ .../src/test/resources/ddl/inventory.sql | 24 ++ .../src/test/resources/log4j2-test.properties | 28 ++ flink-sql-connector-vitess-cdc/pom.xml | 95 ++++++ .../cdc/connectors/vitess/DummyDocs.java | 22 ++ pom.xml | 2 + 23 files changed, 2615 insertions(+), 2 deletions(-) create mode 100644 docs/content/connectors/vitess-cdc.md create mode 100644 flink-connector-vitess-cdc/pom.xml create mode 100644 flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessSource.java create mode 100644 flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessValidator.java create mode 100644 flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/TabletType.java create mode 100644 flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/VtctldConfig.java create mode 100644 flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableFactory.java create mode 100644 flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableSource.java create mode 100644 flink-connector-vitess-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessSourceTest.java create mode 100644 flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessTestBase.java create mode 100644 flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/container/VitessContainer.java create mode 100644 flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessConnectorITCase.java create mode 100644 flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessTableFactoryTest.java create mode 100644 flink-connector-vitess-cdc/src/test/resources/ddl/column_type_test.sql create mode 100644 flink-connector-vitess-cdc/src/test/resources/ddl/inventory.sql create mode 100644 flink-connector-vitess-cdc/src/test/resources/log4j2-test.properties create mode 100644 flink-sql-connector-vitess-cdc/pom.xml create mode 100644 flink-sql-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/DummyDocs.java diff --git a/README.md b/README.md index 9a7dae8057e..1be8961afe1 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ This README is meant as a brief walkthrough on the core features of CDC Connecto | [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 | | [tidb-cdc](docs/content/connectors/tidb-cdc.md) |
  • [TiDB](https://www.pingcap.com): 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 | | [Db2-cdc](docs/content/connectors/db2-cdc.md) |
  • [Db2](https://www.ibm.com/products/db2): 11.5 | Db2 Driver: 11.5.0.0 | +| [Vitess-cdc](connectors/vitess-cdc.md) |
  • [Vitess](https://vitess.io/): 8.0.x, 9.0.x | MySql JDBC Driver: 8.0.16 | ## Features diff --git a/docs/content/about.md b/docs/content/about.md index 458c15eff27..f26bddcc3e8 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -16,9 +16,10 @@ The CDC Connectors for Apache Flink® integrate Debezium as the engin | [postgres-cdc](connectors/postgres-cdc.md) |
  • [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.5.1 | | [sqlserver-cdc](connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 | | [tidb-cdc](connectors/tidb-cdc.md) |
  • [TiDB](https://www.pingcap.com/): 5.1.x, 5.2.x, 5.3.x, 5.4.x, 6.0.0 | JDBC Driver: 8.0.27 | -| [db2-cdc](connectors/db2-cdc.md) |
  • [Db2](https://www.ibm.com/products/db2): 11.5 | DB2 Driver: 11.5.0.0 | +| [db2-cdc](connectors/db2-cdc.md) |
  • [Db2](https://www.ibm.com/products/db2): 11.5 | DB2 Driver: 11.5.0.0 | +| [vitess-cdc](connectors/vitess-cdc.md) |
  • [Vitess](https://vitess.io/): 8.0.x, 9.0.x | MySql JDBC Driver: 8.0.16 | -## Supported Flink Versions +## Supported Flink Versions The following table shows the version mapping between Flink® CDC Connectors and Flink®: | Flink® CDC Version | Flink® Version | diff --git a/docs/content/connectors/index.md b/docs/content/connectors/index.md index bf2d604f57c..cc81316313e 100644 --- a/docs/content/connectors/index.md +++ b/docs/content/connectors/index.md @@ -14,4 +14,5 @@ postgres-cdc sqlserver-cdc tidb-cdc db2-cdc +vitess-cdc ``` diff --git a/docs/content/connectors/vitess-cdc.md b/docs/content/connectors/vitess-cdc.md new file mode 100644 index 00000000000..dee7032544b --- /dev/null +++ b/docs/content/connectors/vitess-cdc.md @@ -0,0 +1,309 @@ +# Vitess CDC Connector + +The Vitess CDC connector allows for reading of incremental data from Vitess cluster. The connector does not support snapshot feature at the moment. This document describes how to setup the Vitess CDC connector to run SQL queries against Vitess databases. +[Vitess debezium documentation](https://debezium.io/documentation/reference/connectors/vitess.html) + +Dependencies +------------ + +In order to setup the Vitess CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. + +### Maven dependency + +``` + + com.ververica + flink-connector-vitess-cdc + 2.4-SNAPSHOT + +``` + +### SQL Client JAR + +Download [flink-sql-connector-vitess-cdc-2.0.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-vitess-cdc/2.0.0/flink-sql-connector-vitess-cdc-2.0.0.jar) and put it under `/lib/`. + +Setup Vitess server +---------------- + +You can follow the Local Install via [Docker guide](https://vitess.io/docs/get-started/local-docker/), or the Vitess Operator for [Kubernetes guide](https://vitess.io/docs/get-started/operator/) to install Vitess. No special setup is needed to support Vitess connector. + +### Checklist +* Make sure that the VTGate host and its gRPC port (default is 15991) is accessible from the machine where the Vitess connector is installed +* Make sure that the VTCtld host and its gRPC port (default is 15999) is accessible from the machine where the Vitess connector is installed + +### gRPC authentication +Because Vitess connector reads change events from the VTGate VStream gRPC server, it does not need to connect directly to MySQL instances. +Therefore, no special database user and permissions are needed. At the moment, Vitess connector only supports unauthenticated access to the VTGate gRPC server. + +How to create a Vitess CDC table +---------------- + +The Vitess CDC table can be defined as following: + +```sql +-- checkpoint every 3000 milliseconds +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; + +-- register a Vitess table 'orders' in Flink SQL +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY(order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'vitess-cdc', + 'hostname' = 'localhost', + 'port' = '3306', + 'keyspace' = 'mydb', + 'vtctl.hostname' = 'localhost', + 'table-name' = 'orders'); + +-- read snapshot and binlogs from orders table +Flink SQL> SELECT * FROM orders; +``` + +Connector Options +---------------- + + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    OptionRequiredDefaultTypeDescription
    connectorrequired(none)StringSpecify what connector to use, here should be ‘vitess-cdc’.
    hostnamerequired(none)StringIP address or hostname of the Vitess database server (VTGate).
    keyspacerequired(none)  StringThe name of the keyspace from which to stream the changes.
    usernameoptional(none)StringAn optional username of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.
    passwordoptional(none)StringAn optional password of the Vitess database server (VTGate). If not configured, unauthenticated VTGate gRPC is used.
    table-namerequired(none)StringTable name of the MySQL database to monitor.
    portoptional15991IntegerInteger port number of the VTCtld server.
    vtctld.hostrequired(none)String IP address or hostname of the VTCtld server.
    vtctld.portoptional15999Integer Integer port number of the VTCtld server.
    vtctld.useroptional(none) String An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used.
    vtctld.passwordoptional(none) String An optional password of the VTCtld server. If not configured, unauthenticated VTCtld gRPC is used.
    tablet.typeoptionalRDONLY String The type of Tablet (hence MySQL) from which to stream the changes: MASTER represents streaming from the master MySQL instance REPLICA represents streaming from the replica slave MySQL instance RDONLY represents streaming from the read-only slave MySQL instance.
    +
    + +Features +-------- + +### Incremental Reading + +The Vitess connector spends all its time streaming changes from the VTGate’s VStream gRPC service to which it is subscribed. The client receives changes from VStream as they are committed in the underlying MySQL server’s binlog at certain positions, which are referred to as VGTID. + +The VGTID in Vitess is the equivalent of GTID in MySQL, it describes the position in the VStream in which a change event happens. Typically, A VGTID has multiple shard GTIDs, each shard GTID is a tuple of (Keyspace, Shard, GTID), which describes the GTID position of a given shard. + +When subscribing to a VStream service, the connector needs to provide a VGTID and a Tablet Type (e.g. MASTER, REPLICA). The VGTID describes the position from which VStream should starts sending change events; the Tablet type describes which underlying MySQL instance (master or replica) in each shard do we read change events from. + +The first time the connector connects to a Vitess cluster, it gets the current VGTID from a Vitess component called VTCtld and provides the current VGTID to VStream. + +The Debezium Vitess connector acts as a gRPC client of VStream. When the connector receives changes it transforms the events into Debezium create, update, or delete events that include the VGTID of the event. The Vitess connector forwards these change events in records to the Kafka Connect framework, which is running in the same process. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic. + +#### Checkpoint + +Incremental snapshot reading provides the ability to perform checkpoint in chunk level. It resolves the checkpoint timeout problem in previous version with old snapshot reading mechanism. + +### Exactly-Once Processing + +The Vitess CDC connector is a Flink Source connector which will read table snapshot chunks first and then continues to read binlog, +both snapshot phase and binlog phase, Vitess CDC connector read with **exactly-once processing** even failures happen. + +### DataStream Source + +The Incremental Reading feature of Vitess CDC Source only exposes in SQL currently, if you're using DataStream, please use Vitess Source: + +```java +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; +import com.ververica.cdc.connectors.vitess.VitessSource; + +public class VitessSourceExample { + public static void main(String[] args) throws Exception { + SourceFunction sourceFunction = VitessSource.builder() + .hostname("localhost") + .port(15991) + .keyspace("inventory") + .username("flinkuser") + .password("flinkpw") + .vtctldConfig(VtctldConfig + .builder() + .hostname("localhost") + .port(15999) + .build()) + .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env + .addSource(sourceFunction) + .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering + + env.execute(); + } +} +``` + +**Note:** Please refer [Deserialization](../about.html#deserialization) for more details about the JSON deserialization. + +Data Type Mapping +---------------- + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    MySQL typeFlink SQL type
    TINYINTTINYINT
    + SMALLINT
    + TINYINT UNSIGNED
    SMALLINT
    + INT
    + MEDIUMINT
    + SMALLINT UNSIGNED
    INT
    + BIGINT
    + INT UNSIGNED
    BIGINT
    BIGINT UNSIGNEDDECIMAL(20, 0)
    BIGINTBIGINT
    FLOATFLOAT
    + DOUBLE
    + DOUBLE PRECISION
    DOUBLE
    + NUMERIC(p, s)
    + DECIMAL(p, s)
    DECIMAL(p, s)
    + BOOLEAN
    + TINYINT(1)
    BOOLEAN
    + CHAR(n)
    + VARCHAR(n)
    + TEXT
    STRING
    +
    diff --git a/flink-connector-vitess-cdc/pom.xml b/flink-connector-vitess-cdc/pom.xml new file mode 100644 index 00000000000..446c9dd5d21 --- /dev/null +++ b/flink-connector-vitess-cdc/pom.xml @@ -0,0 +1,166 @@ + + + + + flink-cdc-connectors + com.ververica + 2.4-SNAPSHOT + + 4.0.0 + + flink-connector-vitess-cdc + flink-connector-vitess-cdc + jar + + + + + + com.ververica + flink-connector-debezium + ${project.version} + + + kafka-log4j-appender + org.apache.kafka + + + + + + io.debezium + debezium-connector-vitess + ${debezium.version} + + + + + + com.ververica + flink-connector-test-util + ${project.version} + test + + + + io.debezium + debezium-core + ${debezium.version} + test-jar + test + + + + mysql + mysql-connector-java + 8.0.26 + test + + + + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test + + + + org.apache.flink + flink-table-runtime + ${flink.version} + test + + + + org.apache.flink + flink-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-core + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-streaming-java + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-common + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-tests + ${flink.version} + test-jar + test + + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + test-jar + test + + + + + + org.testcontainers + testcontainers + ${testcontainers.version} + test + + + + org.testcontainers + jdbc + ${testcontainers.version} + test + + + + com.jayway.jsonpath + json-path + 2.4.0 + test + + + + + diff --git a/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessSource.java b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessSource.java new file mode 100644 index 00000000000..cf52e2ed882 --- /dev/null +++ b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessSource.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.vitess; + +import com.ververica.cdc.connectors.vitess.config.TabletType; +import com.ververica.cdc.connectors.vitess.config.VtctldConfig; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import io.debezium.connector.vitess.VitessConnector; + +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A builder to build a SourceFunction which can read and process vitess database changes. The + * Vitess connector subscribes to VTGate's VStream gRPC service. VTGate is a lightweight, stateless + * gRPC server, which is part of the Vitess cluster setup. + */ +public class VitessSource { + + public static Builder builder() { + return new Builder<>(); + } + + /** Builder class of {@link VitessSource}. */ + public static class Builder { + + private String pluginName = "decoderbufs"; + private String name = "flink"; + private int port = 15991; // default 15991 port + private String hostname; + private String keyspace; + private String username; + private String password; + private VtctldConfig vtctldConfig; + private TabletType tabletType = TabletType.RDONLY; + private String[] tableIncludeList; + private String[] tableExcludeList; + private String[] columnIncludeList; + private String[] columnExcludeList; + private Properties dbzProperties; + private DebeziumDeserializationSchema deserializer; + + /** + * The name of the Vitess logical decoding plug-in installed on the server. Supported values + * are decoderbufs + */ + public Builder decodingPluginName(String name) { + this.pluginName = name; + return this; + } + + /** Hostname of the VTGate’s VStream server. */ + public Builder hostname(String hostname) { + this.hostname = hostname; + return this; + } + + /** Integer port number of the VTGate’s VStream server. */ + public Builder port(int port) { + this.port = port; + return this; + } + + /** + * The name of the keyspace (a.k.a database). If no shard is specified, it reads change + * events from all shards in the keyspace. + */ + public Builder keyspace(String keyspace) { + this.keyspace = keyspace; + return this; + } + + /** VTCtld server config. */ + public Builder vtctldConfig(VtctldConfig vtctldConfig) { + this.vtctldConfig = vtctldConfig; + return this; + } + + /** + * The type of Tablet (hence MySQL) from which to stream the changes: MASTER represents + * streaming from the master MySQL instance REPLICA represents streaming from the replica + * slave MySQL instance RDONLY represents streaming from the read-only slave MySQL instance. + */ + public Builder tabletType(TabletType tabletType) { + this.tabletType = tabletType; + return this; + } + + /** The username of the Vitess database server (VTGate gRPC). */ + public Builder username(String username) { + this.username = username; + return this; + } + + /** The password of the Vitess database server (VTGate gRPC). */ + public Builder password(String password) { + this.password = password; + return this; + } + + /** + * Unique name for the connector. Attempting to register again with the same name will fail. + * This property is required by all Kafka Connect connectors. Default is "flink". + */ + public Builder name(String name) { + this.name = name; + return this; + } + + /** + * An optional, comma-separated list of regular expressions that match fully-qualified table + * identifiers for tables whose changes you want to capture. Any table not included in + * table.include.list does not have its changes captured. Each identifier is of the form + * keyspace.tableName. By default, the connector captures changes in every non-system table + * in each schema whose changes are being captured. Do not also set the table.exclude.list + * property. + */ + public Builder tableIncludeList(String... tableIncludeList) { + this.tableIncludeList = tableIncludeList; + return this; + } + + /** + * An optional, comma-separated list of regular expressions that match fully-qualified table + * identifiers for tables whose changes you do not want to capture. Any table not included + * in table.exclude.list has it changes captured. Each identifier is of the form + * keyspace.tableName. Do not also set the table.include.list property. + */ + public Builder tableExcludeList(String... tableExcludeList) { + this.tableExcludeList = tableExcludeList; + return this; + } + + /** + * An optional, comma-separated list of regular expressions that match the fully-qualified + * names of columns that should be included in change event record values. Fully-qualified + * names for columns are of the form keyspace.tableName.columnName. Do not also set the + * column.exclude.list property. + */ + public Builder columnIncludeList(String... columnIncludeList) { + this.columnIncludeList = columnIncludeList; + return this; + } + + /** + * An optional, comma-separated list of regular expressions that match the fully-qualified + * names of columns that should be excluded from change event record values. Fully-qualified + * names for columns are of the form keyspace.tableName.columnName. Do not also set the + * column.include.list property. + */ + public Builder columnExcludeList(String... columnExcludeList) { + this.columnExcludeList = columnExcludeList; + return this; + } + + /** The Debezium Vitess connector properties. */ + public Builder debeziumProperties(Properties properties) { + this.dbzProperties = properties; + return this; + } + + /** + * The deserializer used to convert from consumed {@link + * org.apache.kafka.connect.source.SourceRecord}. + */ + public Builder deserializer(DebeziumDeserializationSchema deserializer) { + this.deserializer = deserializer; + return this; + } + + public DebeziumSourceFunction build() { + Properties props = new Properties(); + props.setProperty("connector.class", VitessConnector.class.getCanonicalName()); + props.setProperty("plugin.name", pluginName); + props.setProperty("name", name); + // hard code server name, because we don't need to distinguish it, docs: + // Logical name that identifies and provides a namespace for the particular Vitess + // Vtgate server/cluster being monitored. The logical name should be unique across + // all other connectors, since it is used as a prefix for all Kafka topic names coming + // from this connector. Only alphanumeric characters and underscores should be used. + props.setProperty("database.server.name", "vitess_cdc_source"); + props.setProperty("database.hostname", checkNotNull(hostname)); + props.setProperty("database.port", String.valueOf(port)); + props.setProperty("vitess.keyspace", checkNotNull(keyspace)); + props.setProperty("vitess.tablet.type", tabletType.name()); + props.setProperty("vitess.vtctld.host", checkNotNull(vtctldConfig.getHostname())); + props.setProperty("vitess.vtctld.port", String.valueOf(vtctldConfig.getPort())); + + if (username != null) { + props.setProperty("user", username); + } + if (vtctldConfig.getPassword() != null) { + props.setProperty("password", password); + } + + if (vtctldConfig.getUsername() != null) { + props.setProperty("vitess.vtctld.user", vtctldConfig.getUsername()); + } + if (vtctldConfig.getPassword() != null) { + props.setProperty("vitess.vtctld.password", vtctldConfig.getPassword()); + } + + // The maximum number of tasks that should be created for this connector. + // The Vitess connector always uses a single task and therefore does not use this value, + // so the default is always acceptable. + props.setProperty("tasks.max", "1"); + + if (tableIncludeList != null) { + props.setProperty("table.include.list", String.join(",", tableIncludeList)); + } + if (tableExcludeList != null) { + props.setProperty("table.exclude.list", String.join(",", tableExcludeList)); + } + if (columnIncludeList != null) { + props.setProperty("column.include.list", String.join(",", columnIncludeList)); + } + if (columnExcludeList != null) { + props.setProperty("column.exclude.list", String.join(",", columnExcludeList)); + } + if (dbzProperties != null) { + dbzProperties.forEach(props::put); + } + + return new DebeziumSourceFunction<>( + deserializer, props, null, new VitessValidator(props)); + } + } +} diff --git a/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessValidator.java b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessValidator.java new file mode 100644 index 00000000000..99a429c3063 --- /dev/null +++ b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/VitessValidator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.vitess; + +import org.apache.flink.shaded.guava30.com.google.common.collect.Maps; + +import com.ververica.cdc.debezium.Validator; +import io.debezium.connector.vitess.VitessConnector; + +import java.io.Serializable; +import java.util.Map; +import java.util.Properties; + +/** The validator for Vitess. */ +public class VitessValidator implements Validator, Serializable { + + private static final long serialVersionUID = 1L; + + private final Map configuration; + + public VitessValidator(Properties properties) { + this.configuration = Maps.fromProperties(properties); + } + + @Override + public void validate() { + VitessConnector c = new VitessConnector(); + c.validate(configuration); + } +} diff --git a/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/TabletType.java b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/TabletType.java new file mode 100644 index 00000000000..a4bd79ebd51 --- /dev/null +++ b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/TabletType.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.vitess.config; + +/** The type of Tablet (hence MySQL) from which to stream the changes. */ +public enum TabletType { + /** Streaming from the master MySQL instance. */ + MASTER, + /** Streaming from the replica slave MySQL instance. */ + REPLICA, + /** Streaming from the read-only slave MySQL instance. */ + RDONLY; + + public static TabletType master() { + return TabletType.MASTER; + } + + public static TabletType replica() { + return TabletType.REPLICA; + } + + public static TabletType rdonly() { + return TabletType.RDONLY; + } +} diff --git a/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/VtctldConfig.java b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/VtctldConfig.java new file mode 100644 index 00000000000..feab2fb9b77 --- /dev/null +++ b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/config/VtctldConfig.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.vitess.config; + +import java.util.Objects; + +/** VTCtld server configuration options. */ +public class VtctldConfig { + + private String hostname; + private int port = 15999; // default 15999 port + private String username; + private String password; + + public String getHostname() { + return hostname; + } + + public int getPort() { + return port; + } + + public String getUsername() { + return username; + } + + public String getPassword() { + return password; + } + + public static VtctldConfig.Builder builder() { + return new VtctldConfig.Builder(); + } + + /** Builder class of {@link VtctldConfig}. */ + public static final class Builder { + private String hostname; + private int port = 15999; // default 15999 port + private String username; + private String password; + + /** IP address or hostname of the VTCtld server. */ + public Builder hostname(String hostname) { + this.hostname = hostname; + return this; + } + + /** Integer port number of the VTCtld server. */ + public Builder port(int port) { + this.port = port; + return this; + } + + /** + * An optional username of the VTCtld server. If not configured, unauthenticated VTCtld gRPC + * is used. + */ + public Builder username(String username) { + this.username = username; + return this; + } + + /** + * An optional password of the VTCtld server. If not configured, unauthenticated VTCtld gRPC + * is used. + */ + public Builder password(String password) { + this.password = password; + return this; + } + + public VtctldConfig build() { + VtctldConfig vtctldConfig = new VtctldConfig(); + vtctldConfig.password = this.password; + vtctldConfig.username = this.username; + vtctldConfig.hostname = this.hostname; + vtctldConfig.port = this.port; + return vtctldConfig; + } + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + VtctldConfig that = (VtctldConfig) o; + return port == that.port + && Objects.equals(hostname, that.hostname) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password); + } + + @Override + public int hashCode() { + return Objects.hash(hostname, port, username, password); + } + + @Override + public String toString() { + return "VtctldConfig{" + + "hostname='" + + hostname + + '\'' + + ", port=" + + port + + ", username='" + + username + + '\'' + + ", password='" + + password + + '\'' + + '}'; + } +} diff --git a/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableFactory.java b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableFactory.java new file mode 100644 index 00000000000..f6c5ec66458 --- /dev/null +++ b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableFactory.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.vitess.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableFactory; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; + +import com.ververica.cdc.connectors.vitess.config.TabletType; +import com.ververica.cdc.connectors.vitess.config.VtctldConfig; + +import java.util.HashSet; +import java.util.Set; + +import static com.ververica.cdc.debezium.table.DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX; +import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties; + +/** Factory for creating configured instance of {@link VitessTableSource}. */ +public class VitessTableFactory implements DynamicTableSourceFactory { + + private static final String IDENTIFIER = "vitess-cdc"; + + private static final ConfigOption HOSTNAME = + ConfigOptions.key("hostname") + .stringType() + .noDefaultValue() + .withDescription("Hostname of the VTGate’s VStream server."); + + private static final ConfigOption PORT = + ConfigOptions.key("port") + .intType() + .defaultValue(15991) + .withDescription("Integer port number of the VTGate’s VStream server."); + + private static final ConfigOption KEYSPACE = + ConfigOptions.key("keyspace") + .stringType() + .noDefaultValue() + .withDescription( + "The name of the keyspace (a.k.a database). If no shard is specified, it reads change events from all shards in the keyspace."); + + private static final ConfigOption USERNAME = + ConfigOptions.key("username") + .stringType() + .noDefaultValue() + .withDescription("The username of the Vitess database server (VTGate gRPC)."); + + private static final ConfigOption PASSWORD = + ConfigOptions.key("password") + .stringType() + .noDefaultValue() + .withDescription("The password of the Vitess database server (VTGate gRPC)."); + + private static final ConfigOption VTCTL_HOSTNAME = + ConfigOptions.key("vtctl.hostname") + .stringType() + .noDefaultValue() + .withDescription("IP address or hostname of the VTCtld server."); + + private static final ConfigOption VTCTL_PORT = + ConfigOptions.key("vtctl.port") + .intType() + .defaultValue(15999) + .withDescription("Integer port number of the VTCtld server."); + + private static final ConfigOption VTCTL_USERNAME = + ConfigOptions.key("vtctl.username") + .stringType() + .noDefaultValue() + .withDescription("The username of the Vitess VTCtld server."); + + private static final ConfigOption VTCTL_PASSWORD = + ConfigOptions.key("vtctl.password") + .stringType() + .noDefaultValue() + .withDescription("The password of the Vitess VTCtld server."); + + private static final ConfigOption TABLET_TYPE = + ConfigOptions.key("tablet-type") + .stringType() + .defaultValue(TabletType.RDONLY.name()) + .withDescription( + "The type of Tablet (hence MySQL) from which to stream the changes:"); + + private static final ConfigOption TABLE_NAME = + ConfigOptions.key("table-name") + .stringType() + .noDefaultValue() + .withDescription("Table name of the MYSQL database to monitor."); + + private static final ConfigOption DECODING_PLUGIN_NAME = + ConfigOptions.key("decoding.plugin.name") + .stringType() + .defaultValue("decoderbufs") + .withDescription( + "The name of the Vitess logical decoding plug-in installed on the server."); + + private static final ConfigOption NAME = + ConfigOptions.key("name") + .stringType() + .defaultValue("flink") + .withDescription( + "Unique name for the connector." + + " Attempting to register again with the same name will fail. " + + "This property is required by all Kafka Connect connectors. Default is flink."); + + @Override + public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) { + final FactoryUtil.TableFactoryHelper helper = + FactoryUtil.createTableFactoryHelper(this, context); + helper.validateExcept(DEBEZIUM_OPTIONS_PREFIX); + + final ReadableConfig config = helper.getOptions(); + String hostname = config.get(HOSTNAME); + int port = config.get(PORT); + String keyspace = config.get(KEYSPACE); + String tableName = config.get(TABLE_NAME); + String username = config.get(USERNAME); + String password = config.get(PASSWORD); + VtctldConfig vtctldConfig = + new VtctldConfig.Builder() + .hostname(config.get(VTCTL_HOSTNAME)) + .port(config.get(VTCTL_PORT)) + .username(config.get(VTCTL_USERNAME)) + .password(config.get(VTCTL_PASSWORD)) + .build(); + TabletType tabletType = TabletType.valueOf(config.get(TABLET_TYPE)); + String pluginName = config.get(DECODING_PLUGIN_NAME); + String name = config.get(NAME); + ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); + + return new VitessTableSource( + physicalSchema, + port, + hostname, + keyspace, + tableName, + username, + password, + vtctldConfig, + tabletType, + pluginName, + name, + getDebeziumProperties(context.getCatalogTable().getOptions())); + } + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + Set> options = new HashSet<>(); + options.add(HOSTNAME); + options.add(KEYSPACE); + options.add(VTCTL_HOSTNAME); + options.add(TABLE_NAME); + return options; + } + + @Override + public Set> optionalOptions() { + Set> options = new HashSet<>(); + options.add(PORT); + options.add(VTCTL_PORT); + options.add(USERNAME); + options.add(PASSWORD); + options.add(TABLET_TYPE); + options.add(DECODING_PLUGIN_NAME); + options.add(NAME); + return options; + } +} diff --git a/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableSource.java b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableSource.java new file mode 100644 index 00000000000..3cdd673b3a4 --- /dev/null +++ b/flink-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/table/VitessTableSource.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.vitess.table; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.connector.source.SourceFunctionProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; + +import com.ververica.cdc.connectors.vitess.VitessSource; +import com.ververica.cdc.connectors.vitess.config.TabletType; +import com.ververica.cdc.connectors.vitess.config.VtctldConfig; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import com.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema; + +import java.time.ZoneId; +import java.util.Objects; +import java.util.Properties; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A {@link DynamicTableSource} that describes how to create a Vitess source from a logical + * description. + */ +public class VitessTableSource implements ScanTableSource { + + private final ResolvedSchema physicalSchema; + private final String pluginName; + private final String name; + private final int port; + private final String hostname; + private final String keyspace; + private final String username; + private final String password; + private final String tableName; + private final VtctldConfig vtctldConfig; + private final TabletType tabletType; + private final Properties dbzProperties; + + public VitessTableSource( + ResolvedSchema physicalSchema, + int port, + String hostname, + String keyspace, + String tableName, + String username, + String password, + VtctldConfig vtctldConfig, + TabletType tabletType, + String pluginName, + String name, + Properties dbzProperties) { + this.physicalSchema = physicalSchema; + this.port = port; + this.hostname = checkNotNull(hostname); + this.keyspace = checkNotNull(keyspace); + this.tableName = checkNotNull(tableName); + this.username = username; + this.password = password; + this.vtctldConfig = checkNotNull(vtctldConfig); + this.tabletType = checkNotNull(tabletType); + this.pluginName = checkNotNull(pluginName); + this.name = name; + this.dbzProperties = dbzProperties; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.newBuilder() + .addContainedKind(RowKind.INSERT) + .addContainedKind(RowKind.UPDATE_BEFORE) + .addContainedKind(RowKind.UPDATE_AFTER) + .addContainedKind(RowKind.DELETE) + .build(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + RowType physicalDataType = + (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType(); + TypeInformation typeInfo = + scanContext.createTypeInformation(physicalSchema.toPhysicalRowDataType()); + + DebeziumDeserializationSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType(physicalDataType) + .setResultTypeInfo(typeInfo) + .setServerTimeZone(ZoneId.of("UTC")) + .build(); + + DebeziumSourceFunction sourceFunction = + VitessSource.builder() + .hostname(hostname) + .port(port) + .keyspace(keyspace) + .tableIncludeList(tableName) + .username(username) + .password(password) + .tabletType(tabletType) + .decodingPluginName(pluginName) + .vtctldConfig(vtctldConfig) + .name(name) + .debeziumProperties(dbzProperties) + .deserializer(deserializer) + .build(); + return SourceFunctionProvider.of(sourceFunction, false); + } + + @Override + public DynamicTableSource copy() { + return new VitessTableSource( + physicalSchema, + port, + hostname, + keyspace, + tableName, + username, + password, + vtctldConfig, + tabletType, + pluginName, + name, + dbzProperties); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + VitessTableSource that = (VitessTableSource) o; + return port == that.port + && Objects.equals(physicalSchema, that.physicalSchema) + && Objects.equals(pluginName, that.pluginName) + && Objects.equals(name, that.name) + && Objects.equals(hostname, that.hostname) + && Objects.equals(keyspace, that.keyspace) + && Objects.equals(username, that.username) + && Objects.equals(password, that.password) + && Objects.equals(tableName, that.tableName) + && Objects.equals(vtctldConfig, that.vtctldConfig) + && tabletType == that.tabletType + && Objects.equals(dbzProperties, that.dbzProperties); + } + + @Override + public int hashCode() { + return Objects.hash( + physicalSchema, + pluginName, + name, + port, + hostname, + keyspace, + username, + password, + tableName, + vtctldConfig, + tabletType, + dbzProperties); + } + + @Override + public String toString() { + return "VitessTableSource{" + + "physicalSchema=" + + physicalSchema + + ", pluginName='" + + pluginName + + '\'' + + ", name='" + + name + + '\'' + + ", port=" + + port + + ", hostname='" + + hostname + + '\'' + + ", keyspace='" + + keyspace + + '\'' + + ", username='" + + username + + '\'' + + ", password='" + + password + + '\'' + + ", tableName='" + + tableName + + '\'' + + ", vtctldConfig=" + + vtctldConfig + + ", tabletType=" + + tabletType + + ", dbzProperties=" + + dbzProperties + + '}'; + } + + @Override + public String asSummaryString() { + return "Vitess-CDC"; + } +} diff --git a/flink-connector-vitess-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector-vitess-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 00000000000..5cb7d68c2fc --- /dev/null +++ b/flink-connector-vitess-cdc/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +com.ververica.cdc.connectors.vitess.table.VitessTableFactory \ No newline at end of file diff --git a/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessSourceTest.java b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessSourceTest.java new file mode 100644 index 00000000000..fcd878403f0 --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessSourceTest.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.vervetica.cdc.connectors.vitess; + +import org.apache.flink.api.common.state.BroadcastState; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.OperatorStateStore; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.CheckedThread; +import org.apache.flink.runtime.state.FunctionInitializationContext; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.util.MockStreamingRuntimeContext; +import org.apache.flink.util.Collector; + +import com.ververica.cdc.connectors.utils.TestSourceContext; +import com.ververica.cdc.connectors.vitess.VitessSource; +import com.ververica.cdc.connectors.vitess.config.TabletType; +import com.ververica.cdc.connectors.vitess.config.VtctldConfig; +import com.ververica.cdc.debezium.DebeziumDeserializationSchema; +import com.ververica.cdc.debezium.DebeziumSourceFunction; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.Statement; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.OptionalLong; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static com.ververica.cdc.connectors.utils.AssertUtils.assertDelete; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertInsert; +import static com.ververica.cdc.connectors.utils.AssertUtils.assertUpdate; + +/** Tests for {@link VitessSource} which also heavily tests {@link DebeziumSourceFunction}. */ +public class VitessSourceTest extends VitessTestBase { + + @Before + public void before() { + initializeTable("inventory"); + } + + @Test + public void testConsumingAllEvents() throws Exception { + DebeziumSourceFunction source = createVitessSqlSource(0); + TestSourceContext sourceContext = new TestSourceContext<>(); + + setupSource(source); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + + // start the source + final CheckedThread runThread = + new CheckedThread() { + @Override + public void go() throws Exception { + source.run(sourceContext); + } + }; + runThread.start(); + + waitForSourceToStart(Duration.ofSeconds(60), source); + List records; + + statement.execute( + "INSERT INTO test.products VALUES (default,'robot','Toy robot',1.304)"); // 110 + records = drain(sourceContext, 1); + assertInsert(records.get(0), "id", 101); + + statement.execute( + "INSERT INTO test.products VALUES (1001,'roy','old robot',1234.56)"); // 1001 + records = drain(sourceContext, 1); + assertInsert(records.get(0), "id", 1001); + + // --------------------------------------------------------------------------------------------------------------- + // Changing the primary key of a row should result in 2 events: INSERT, DELETE + // (TOMBSTONE is dropped) + // --------------------------------------------------------------------------------------------------------------- + statement.execute( + "UPDATE test.products SET id=2001, description='really old robot' WHERE id=1001"); + records = drain(sourceContext, 2); + assertDelete(records.get(0), "id", 1001); + assertInsert(records.get(1), "id", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Simple UPDATE (with no schema changes) + // --------------------------------------------------------------------------------------------------------------- + statement.execute("UPDATE test.products SET weight=1345.67 WHERE id=2001"); + records = drain(sourceContext, 1); + assertUpdate(records.get(0), "id", 2001); + + // --------------------------------------------------------------------------------------------------------------- + // Change our schema with a fully-qualified name; we should still see this event + // --------------------------------------------------------------------------------------------------------------- + // Add a column with default to the 'products' table and explicitly update one record + // ... + statement.execute( + "ALTER TABLE test.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL"); + + // Vitess schema change has eventual consistency, wait few seconds. + Thread.sleep(5000); + statement.execute("UPDATE test.products SET volume=13.5 WHERE id=2001"); + records = drain(sourceContext, 1); + assertUpdate(records.get(0), "id", 2001); + + // cleanup + source.cancel(); + source.close(); + runThread.sync(); + } + } + + // ------------------------------------------------------------------------------------------ + // Utilities + // ------------------------------------------------------------------------------------------ + + private DebeziumSourceFunction createVitessSqlSource(int heartbeatInterval) { + Properties properties = new Properties(); + properties.setProperty("heartbeat.interval.ms", String.valueOf(heartbeatInterval)); + return VitessSource.builder() + .hostname(VITESS_CONTAINER.getHost()) + .port(VITESS_CONTAINER.getGrpcPort()) + .keyspace(VITESS_CONTAINER.getKeyspace()) + .tabletType(TabletType.MASTER) + .tableIncludeList("test.products") + .vtctldConfig( + VtctldConfig.builder() + .hostname(VITESS_CONTAINER.getHost()) + .port(VITESS_CONTAINER.getVtctldGrpcPort()) + .build()) + .deserializer(new ForwardDeserializeSchema()) + .debeziumProperties(properties) + .build(); + } + + private List drain(TestSourceContext sourceContext, int expectedRecordCount) + throws Exception { + List allRecords = new ArrayList<>(); + LinkedBlockingQueue> queue = sourceContext.getCollectedOutputs(); + while (allRecords.size() < expectedRecordCount) { + StreamRecord record = queue.poll(1000, TimeUnit.SECONDS); + if (record != null) { + allRecords.add(record.getValue()); + } else { + throw new RuntimeException( + "Can't receive " + expectedRecordCount + " elements before timeout."); + } + } + + return allRecords; + } + + private boolean waitForSourceToStart( + Duration timeout, DebeziumSourceFunction source) + throws InterruptedException { + long now = System.currentTimeMillis(); + long stop = now + timeout.toMillis(); + while (System.currentTimeMillis() < stop) { + if (source.getDebeziumStarted()) { + break; + } + Thread.sleep(10); // save CPU + } + Thread.sleep(10000); // Wait for full start + return source.getDebeziumStarted(); + } + + private static void setupSource(DebeziumSourceFunction source) throws Exception { + setupSource( + source, false, null, null, + true, // enable checkpointing; auto commit should be ignored + 0, 1); + } + + private static void setupSource( + DebeziumSourceFunction source, + boolean isRestored, + ListState restoredOffsetState, + ListState restoredHistoryState, + boolean isCheckpointingEnabled, + int subtaskIndex, + int totalNumSubtasks) + throws Exception { + + // run setup procedure in operator life cycle + source.setRuntimeContext( + new MockStreamingRuntimeContext( + isCheckpointingEnabled, totalNumSubtasks, subtaskIndex)); + source.initializeState( + new MockFunctionInitializationContext( + isRestored, + new MockOperatorStateStore(restoredOffsetState, restoredHistoryState))); + source.open(new Configuration()); + } + + private static class ForwardDeserializeSchema + implements DebeziumDeserializationSchema { + + private static final long serialVersionUID = 2975058057832211228L; + + @Override + public void deserialize(SourceRecord record, Collector out) throws Exception { + out.collect(record); + } + + @Override + public TypeInformation getProducedType() { + return TypeInformation.of(SourceRecord.class); + } + } + + private static class MockOperatorStateStore implements OperatorStateStore { + + private final ListState restoredOffsetListState; + private final ListState restoredHistoryListState; + + private MockOperatorStateStore( + ListState restoredOffsetListState, ListState restoredHistoryListState) { + this.restoredOffsetListState = restoredOffsetListState; + this.restoredHistoryListState = restoredHistoryListState; + } + + @Override + @SuppressWarnings("unchecked") + public ListState getUnionListState(ListStateDescriptor stateDescriptor) + throws Exception { + if (stateDescriptor.getName().equals(DebeziumSourceFunction.OFFSETS_STATE_NAME)) { + return (ListState) restoredOffsetListState; + } else if (stateDescriptor + .getName() + .equals(DebeziumSourceFunction.HISTORY_RECORDS_STATE_NAME)) { + return (ListState) restoredHistoryListState; + } else { + throw new IllegalStateException("Unknown state."); + } + } + + @Override + public BroadcastState getBroadcastState( + MapStateDescriptor stateDescriptor) throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public ListState getListState(ListStateDescriptor stateDescriptor) + throws Exception { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredStateNames() { + throw new UnsupportedOperationException(); + } + + @Override + public Set getRegisteredBroadcastStateNames() { + throw new UnsupportedOperationException(); + } + } + + private static class MockFunctionInitializationContext + implements FunctionInitializationContext { + + private final boolean isRestored; + private final OperatorStateStore operatorStateStore; + + private MockFunctionInitializationContext( + boolean isRestored, OperatorStateStore operatorStateStore) { + this.isRestored = isRestored; + this.operatorStateStore = operatorStateStore; + } + + @Override + public boolean isRestored() { + return isRestored; + } + + @Override + public OptionalLong getRestoredCheckpointId() { + throw new UnsupportedOperationException(); + } + + @Override + public OperatorStateStore getOperatorStateStore() { + return operatorStateStore; + } + + @Override + public KeyedStateStore getKeyedStateStore() { + throw new UnsupportedOperationException(); + } + } +} diff --git a/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessTestBase.java b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessTestBase.java new file mode 100644 index 00000000000..c6530515a76 --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/VitessTestBase.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.vervetica.cdc.connectors.vitess; + +import org.apache.flink.test.util.AbstractTestBase; + +import com.vervetica.cdc.connectors.vitess.container.VitessContainer; +import org.junit.BeforeClass; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; + +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.GRPC_PORT; +import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.MYSQL_PORT; +import static com.vervetica.cdc.connectors.vitess.container.VitessContainer.VTCTLD_GRPC_PORT; +import static org.junit.Assert.assertNotNull; + +/** Basic class for testing Vitess source, this contains a Vitess container. */ +public abstract class VitessTestBase extends AbstractTestBase { + + private static final Logger LOG = LoggerFactory.getLogger(VitessTestBase.class); + private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$"); + + protected static final VitessContainer VITESS_CONTAINER = + (VitessContainer) + new VitessContainer() + .withKeyspace("test") + .withUsername("flinkuser") + .withPassword("flinkpwd") + .withExposedPorts(MYSQL_PORT, GRPC_PORT, VTCTLD_GRPC_PORT) + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + @BeforeClass + public static void startContainers() { + LOG.info("Starting containers..."); + Startables.deepStart(Stream.of(VITESS_CONTAINER)).join(); + LOG.info("Containers are started."); + } + + public Connection getJdbcConnection() throws SQLException { + return DriverManager.getConnection(VITESS_CONTAINER.getJdbcUrl()); + } + + /** + * Executes a JDBC statement using the default jdbc config without autocommitting the + * connection. + */ + protected void initializeTable(String sqlFile) { + final String ddlFile = String.format("ddl/%s.sql", sqlFile); + final URL ddlTestFile = VitessTestBase.class.getClassLoader().getResource(ddlFile); + assertNotNull("Cannot locate " + ddlFile, ddlTestFile); + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + final List statements = + Arrays.stream( + Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream() + .map(String::trim) + .filter(x -> !x.startsWith("--") && !x.isEmpty()) + .map( + x -> { + final Matcher m = + COMMENT_PATTERN.matcher(x); + return m.matches() ? m.group(1) : x; + }) + .collect(Collectors.joining("\n")) + .split(";")) + .collect(Collectors.toList()); + for (String stmt : statements) { + statement.execute(stmt); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/container/VitessContainer.java b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/container/VitessContainer.java new file mode 100644 index 00000000000..ef6990a250b --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/container/VitessContainer.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.vervetica.cdc.connectors.vitess.container; + +import org.testcontainers.containers.JdbcDatabaseContainer; + +/** Vitess container. */ +public class VitessContainer extends JdbcDatabaseContainer { + + public static final String IMAGE = "vitess/vttestserver"; + public static final String DEFAULT_TAG = "mysql80"; + private static final Integer VITESS_PORT = 15991; + public static final Integer GRPC_PORT = VITESS_PORT + 1; + public static final Integer VTCTLD_GRPC_PORT = VITESS_PORT + 8; + public static final Integer MYSQL_PORT = VITESS_PORT + 3; + + private String keyspaces = "test"; + private String username = "flinkuser"; + private String password = "flinkpwd"; + + public VitessContainer() { + this(DEFAULT_TAG); + } + + public VitessContainer(String tag) { + super(IMAGE + ":" + tag); + } + + @Override + protected void configure() { + addEnv("PORT", VITESS_PORT.toString()); + addEnv("KEYSPACES", getKeyspace()); + addEnv("NUM_SHARDS", "1"); + addEnv("MYSQL_BIND_HOST", "0.0.0.0"); + } + + @Override + public String getDriverClassName() { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + return "com.mysql.cj.jdbc.Driver"; + } catch (ClassNotFoundException e) { + return "com.mysql.jdbc.Driver"; + } + } + + @Override + public String getJdbcUrl() { + return "jdbc:mysql://" + getHost() + ":" + getMysqlPort() + "/" + getKeyspace(); + } + + @Override + public String getUsername() { + return username; + } + + @Override + public String getPassword() { + return password; + } + + public String getKeyspace() { + return keyspaces; + } + + public Integer getMysqlPort() { + return this.getMappedPort(MYSQL_PORT); + } + + public Integer getGrpcPort() { + return this.getMappedPort(GRPC_PORT); + } + + public Integer getVtctldGrpcPort() { + return this.getMappedPort(VTCTLD_GRPC_PORT); + } + + @Override + protected String getTestQueryString() { + return "SELECT 1"; + } + + @Override + public VitessContainer withDatabaseName(final String keyspace) { + this.keyspaces = keyspace; + return this; + } + + public VitessContainer withKeyspace(String keyspace) { + this.keyspaces = keyspace; + return this; + } + + @Override + public VitessContainer withUsername(final String username) { + this.username = username; + return this; + } + + @Override + public VitessContainer withPassword(final String password) { + this.password = password; + return this; + } +} diff --git a/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessConnectorITCase.java b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessConnectorITCase.java new file mode 100644 index 00000000000..8c59b4c39c7 --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessConnectorITCase.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.vervetica.cdc.connectors.vitess.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; + +import com.vervetica.cdc.connectors.vitess.VitessTestBase; +import org.junit.Before; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** Integration tests for MySQL binlog SQL source. */ +public class VitessConnectorITCase extends VitessTestBase { + + private final StreamExecutionEnvironment env = + StreamExecutionEnvironment.getExecutionEnvironment(); + private final StreamTableEnvironment tEnv = + StreamTableEnvironment.create( + env, EnvironmentSettings.newInstance().inStreamingMode().build()); + + @Before + public void before() { + TestValuesTableFactory.clearAllData(); + env.setParallelism(1); + } + + @Test + public void testConsumingAllEvents() + throws SQLException, ExecutionException, InterruptedException { + initializeTable("inventory"); + String sourceDDL = + String.format( + "CREATE TABLE debezium_source (" + + " `id` INT NOT NULL," + + " name STRING," + + " description STRING," + + " weight DECIMAL(10,3)," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'vitess-cdc'," + + " 'tablet-type' = 'MASTER'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'vtctl.hostname' = '%s'," + + " 'vtctl.port' = '%s'," + + " 'keyspace' = '%s'," + + " 'table-name' = '%s'" + + ")", + VITESS_CONTAINER.getHost(), + VITESS_CONTAINER.getGrpcPort(), + VITESS_CONTAINER.getHost(), + VITESS_CONTAINER.getVtctldGrpcPort(), + VITESS_CONTAINER.getKeyspace(), + "test.products"); + String sinkDDL = + "CREATE TABLE sink (" + + " name STRING," + + " weightSum DECIMAL(10,3)," + + " PRIMARY KEY (name) NOT ENFORCED" + + ") WITH (" + + " 'connector' = 'values'," + + " 'sink-insert-only' = 'false'," + + " 'sink-expected-messages-num' = '20'" + + ")"; + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + + // async submit job + TableResult result = + tEnv.executeSql( + "INSERT INTO sink SELECT name, SUM(weight) FROM debezium_source GROUP BY name"); + + // Vitess source doesn't read snapshot data. Source will be empty at first. + // There's no way knowing if it's started, using sleep here. + Thread.sleep(10000); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO test.products \n" + + "VALUES (default,'scooter','Small 2-wheel scooter',3.14),\n" + + " (default,'car battery','12V car battery',8.1),\n" + + " (default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8),\n" + + " (default,'hammer','12oz carpenters hammer',0.75),\n" + + " (default,'hammer','14oz carpenters hammer',0.875),\n" + + " (default,'hammer','16oz carpenters hammer',1.0),\n" + + " (default,'rocks','box of assorted rocks',5.3),\n" + + " (default,'jacket','water resistent black wind breaker',0.1),\n" + + " (default,'spare tire','24 inch spare tire',22.2);"); + statement.execute( + "UPDATE test.products SET description='18oz carpenter hammer' WHERE id=106;"); + statement.execute("UPDATE test.products SET weight='5.1' WHERE id=107;"); + statement.execute( + "INSERT INTO test.products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); // 110 + statement.execute( + "INSERT INTO test.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE test.products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE test.products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM test.products WHERE id=111;"); + } + + waitForSinkSize("sink", 20); + + List expected = + Arrays.asList( + "+I[scooter, 3.140]", + "+I[car battery, 8.100]", + "+I[12-pack drill bits, 0.800]", + "+I[hammer, 2.625]", + "+I[rocks, 5.100]", + "+I[jacket, 0.600]", + "+I[spare tire, 22.200]"); + + List actual = TestValuesTableFactory.getResults("sink"); + assertEqualsInAnyOrder(expected, actual); + result.getJobClient().get().cancel().get(); + } + + @Test + public void testAllTypes() throws Throwable { + initializeTable("column_type_test"); + String sourceDDL = + String.format( + "CREATE TABLE full_types (\n" + + " `id` INT NOT NULL,\n" + + " tiny_c TINYINT,\n" + + " tiny_un_c SMALLINT ,\n" + + " small_c SMALLINT,\n" + + " small_un_c INT,\n" + + " int_c INT ,\n" + + " int_un_c BIGINT,\n" + + " int11_c BIGINT,\n" + + " big_c BIGINT,\n" + + " varchar_c STRING,\n" + + " char_c STRING,\n" + + " float_c FLOAT,\n" + + " double_c DOUBLE,\n" + + " decimal_c DECIMAL(8, 4),\n" + + " numeric_c DECIMAL(6, 0),\n" + + " boolean_c BOOLEAN,\n" + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'vitess-cdc'," + + " 'tablet-type' = 'MASTER'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'vtctl.hostname' = '%s'," + + " 'vtctl.port' = '%s'," + + " 'keyspace' = '%s'," + + " 'table-name' = '%s'" + + ")", + VITESS_CONTAINER.getHost(), + VITESS_CONTAINER.getGrpcPort(), + VITESS_CONTAINER.getHost(), + VITESS_CONTAINER.getVtctldGrpcPort(), + VITESS_CONTAINER.getKeyspace(), + "test.full_types"); + tEnv.executeSql(sourceDDL); + + // async submit job + TableResult result = tEnv.executeSql("SELECT * FROM full_types"); + + // Vitess source doesn't read snapshot data. Source will be empty at first. + // There's no way knowing if it's started, using sleep here. + Thread.sleep(10000); + + try (Connection connection = getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO test.full_types VALUES (\n" + + " DEFAULT, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807,\n" + + " 'Hello World', 'abc', 123.102, 404.4443, 123.4567, 345.6, true);"); + statement.execute("UPDATE test.full_types SET varchar_c = 'Bye World' WHERE id=1;"); + } + + waitForSnapshotStarted(result.collect()); + + List expected = + Arrays.asList( + "+I[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]", + "-U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Hello World, abc, 123.102, 404.4443, 123.4567, 346, true]", + "+U[1, 127, 255, 32767, 65535, 2147483647, 4294967295, 2147483647, 9223372036854775807, Bye World, abc, 123.102, 404.4443, 123.4567, 346, true]"); + + List actual = fetchRows(result.collect(), expected.size()); + assertEquals(expected, actual); + result.getJobClient().get().cancel().get(); + } + + private static List fetchRows(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + Row row = iter.next(); + rows.add(row.toString()); + size--; + } + return rows; + } + + public static void assertEqualsInAnyOrder(List actual, List expected) { + assertTrue(actual != null && expected != null); + assertEquals( + actual.stream().sorted().collect(Collectors.toList()), + expected.stream().sorted().collect(Collectors.toList())); + } + + private static void waitForSnapshotStarted(CloseableIterator iterator) throws Exception { + while (!iterator.hasNext()) { + Thread.sleep(100); + } + } + + private static void waitForSinkSize(String sinkName, int expectedSize) + throws InterruptedException { + while (sinkSize(sinkName) < expectedSize) { + Thread.sleep(100); + } + } + + private static int sinkSize(String sinkName) { + synchronized (TestValuesTableFactory.class) { + try { + return TestValuesTableFactory.getRawResults(sinkName).size(); + } catch (IllegalArgumentException e) { + // job is not started yet + return 0; + } + } + } +} diff --git a/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessTableFactoryTest.java b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessTableFactoryTest.java new file mode 100644 index 00000000000..c751fee0207 --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/java/com/vervetica/cdc/connectors/vitess/table/VitessTableFactoryTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.vervetica.cdc.connectors.vitess.table; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.ExceptionUtils; + +import com.ververica.cdc.connectors.vitess.config.TabletType; +import com.ververica.cdc.connectors.vitess.config.VtctldConfig; +import com.ververica.cdc.connectors.vitess.table.VitessTableFactory; +import com.ververica.cdc.connectors.vitess.table.VitessTableSource; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import static org.apache.flink.table.api.TableSchema.fromResolvedSchema; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** Test for {@link VitessTableSource} created by {@link VitessTableFactory}. */ +public class VitessTableFactoryTest { + + private static final ResolvedSchema SCHEMA = + new ResolvedSchema( + Arrays.asList( + Column.physical("aaa", DataTypes.INT().notNull()), + Column.physical("bbb", DataTypes.STRING().notNull()), + Column.physical("ccc", DataTypes.DOUBLE()), + Column.physical("ddd", DataTypes.DECIMAL(31, 18)), + Column.physical("eee", DataTypes.TIMESTAMP(3))), + new ArrayList<>(), + UniqueConstraint.primaryKey("pk", Arrays.asList("bbb", "aaa"))); + + private static final String MY_SCHEMA = "public"; + private static final String MY_LOCALHOST = "localhost"; + private static final String MY_USERNAME = "flinkuser"; + private static final String MY_PASSWORD = "flinkpw"; + private static final String MY_KEYSPACE = "myDB"; + private static final String MY_TABLE = "myTable"; + private static final Properties PROPERTIES = new Properties(); + + @Test + public void testCommonProperties() { + Map properties = getAllOptions(); + + // validation for source + DynamicTableSource actualSource = createTableSource(properties); + VitessTableSource expectedSource = + new VitessTableSource( + SCHEMA, + 15991, + MY_LOCALHOST, + MY_KEYSPACE, + MY_TABLE, + null, + null, + VtctldConfig.builder().hostname(MY_LOCALHOST).port(15999).build(), + TabletType.RDONLY, + "decoderbufs", + "flink", + PROPERTIES); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testOptionalProperties() { + Map options = getAllOptions(); + options.put("port", "5444"); + options.put("vtctl.port", "5445"); + options.put("decoding.plugin.name", "wal2json"); + options.put("debezium.snapshot.mode", "never"); + options.put("name", "flink"); + options.put("tablet-type", "MASTER"); + options.put("username", MY_USERNAME); + options.put("password", MY_PASSWORD); + + DynamicTableSource actualSource = createTableSource(options); + Properties dbzProperties = new Properties(); + dbzProperties.put("snapshot.mode", "never"); + VitessTableSource expectedSource = + new VitessTableSource( + SCHEMA, + 5444, + MY_LOCALHOST, + MY_KEYSPACE, + MY_TABLE, + MY_USERNAME, + MY_PASSWORD, + VtctldConfig.builder().hostname(MY_LOCALHOST).port(5445).build(), + TabletType.MASTER, + "wal2json", + "flink", + dbzProperties); + assertEquals(expectedSource, actualSource); + } + + @Test + public void testValidation() { + // validate illegal port + try { + Map properties = getAllOptions(); + properties.put("port", "123b"); + + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, "Could not parse value '123b' for key 'port'.") + .isPresent()); + } + + // validate missing required + Factory factory = new VitessTableFactory(); + for (ConfigOption requiredOption : factory.requiredOptions()) { + Map properties = getAllOptions(); + properties.remove(requiredOption.key()); + + try { + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage( + t, + "Missing required options are:\n\n" + requiredOption.key()) + .isPresent()); + } + } + + // validate unsupported option + try { + Map properties = getAllOptions(); + properties.put("unknown", "abc"); + + createTableSource(properties); + fail("exception expected"); + } catch (Throwable t) { + assertTrue( + ExceptionUtils.findThrowableWithMessage(t, "Unsupported options:\n\nunknown") + .isPresent()); + } + } + + private Map getAllOptions() { + Map options = new HashMap<>(); + options.put("connector", "vitess-cdc"); + options.put("hostname", MY_LOCALHOST); + options.put("keyspace", MY_KEYSPACE); + options.put("vtctl.hostname", MY_LOCALHOST); + options.put("table-name", MY_TABLE); + return options; + } + + private static DynamicTableSource createTableSource(Map options) { + return FactoryUtil.createTableSource( + null, + ObjectIdentifier.of("default", "default", "t1"), + new ResolvedCatalogTable( + CatalogTable.of( + fromResolvedSchema(SCHEMA).toSchema(), + "mock source", + new ArrayList<>(), + options), + SCHEMA), + new Configuration(), + VitessTableFactoryTest.class.getClassLoader(), + false); + } +} diff --git a/flink-connector-vitess-cdc/src/test/resources/ddl/column_type_test.sql b/flink-connector-vitess-cdc/src/test/resources/ddl/column_type_test.sql new file mode 100644 index 00000000000..7bf4fe191ee --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/resources/ddl/column_type_test.sql @@ -0,0 +1,44 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + + +USE test; +DROP TABLE IF EXISTS full_types; +-- TODO add DATE, DATETIME, TIMESTAMP, TIME type mapping +CREATE TABLE full_types ( + id INT AUTO_INCREMENT NOT NULL, + tiny_c TINYINT, + tiny_un_c TINYINT UNSIGNED, + small_c SMALLINT, + small_un_c SMALLINT UNSIGNED, + int_c INTEGER , + int_un_c INTEGER UNSIGNED, + int11_c INT(11) , + big_c BIGINT, + varchar_c VARCHAR(255), + char_c CHAR(3), + float_c FLOAT, + double_c DOUBLE, + decimal_c DECIMAL(8, 4), + numeric_c NUMERIC(6, 0), + boolean_c BOOLEAN, +-- date_c DATE, +-- time_c TIME(0), +-- datetime3_c DATETIME(3), +-- datetime6_c DATETIME(6), +-- timestamp_c TIMESTAMP, +-- file_uuid BINARY(16), + PRIMARY KEY (id) +) DEFAULT CHARSET=utf8; diff --git a/flink-connector-vitess-cdc/src/test/resources/ddl/inventory.sql b/flink-connector-vitess-cdc/src/test/resources/ddl/inventory.sql new file mode 100644 index 00000000000..b6be784ca82 --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/resources/ddl/inventory.sql @@ -0,0 +1,24 @@ +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- http://www.apache.org/licenses/LICENSE-2.0 +-- Unless required by applicable law or agreed to in writing, +-- software distributed under the License is distributed on an +-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +-- KIND, either express or implied. See the License for the +-- specific language governing permissions and limitations +-- under the License. + +USE test; +DROP TABLE IF EXISTS products; +CREATE TABLE products ( + id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, + name VARCHAR(255) NOT NULL DEFAULT 'flink', + description VARCHAR(512), + weight FLOAT +); +ALTER TABLE products AUTO_INCREMENT = 101; diff --git a/flink-connector-vitess-cdc/src/test/resources/log4j2-test.properties b/flink-connector-vitess-cdc/src/test/resources/log4j2-test.properties new file mode 100644 index 00000000000..b82a9606dbf --- /dev/null +++ b/flink-connector-vitess-cdc/src/test/resources/log4j2-test.properties @@ -0,0 +1,28 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +rootLogger.level=INFO +rootLogger.appenderRef.test.ref = TestLogger + +appender.testlogger.name = TestLogger +appender.testlogger.type = CONSOLE +appender.testlogger.target = SYSTEM_ERR +appender.testlogger.layout.type = PatternLayout +appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n diff --git a/flink-sql-connector-vitess-cdc/pom.xml b/flink-sql-connector-vitess-cdc/pom.xml new file mode 100644 index 00000000000..8cb3befb0f0 --- /dev/null +++ b/flink-sql-connector-vitess-cdc/pom.xml @@ -0,0 +1,95 @@ + + + + + flink-cdc-connectors + com.ververica + 2.4-SNAPSHOT + + 4.0.0 + + flink-sql-connector-vitess-cdc + + + + com.ververica + flink-connector-vitess-cdc + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.4 + + + shade-flink + package + + shade + + + false + + + + + + *:* + + + + + io.grpc + com.ververica.cdc.connectors.vitess.shaded.io.grpc + + + io.netty + com.ververica.cdc.connectors.vitess.shaded.io.netty + + + com.google + com.ververica.cdc.connectors.vitess.shaded.com.google + + + + + org.apache.kafka:* + + kafka/kafka-version.properties + LICENSE + + NOTICE + common/** + + + + + + + + + + diff --git a/flink-sql-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/DummyDocs.java b/flink-sql-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/DummyDocs.java new file mode 100644 index 00000000000..546ee208eaf --- /dev/null +++ b/flink-sql-connector-vitess-cdc/src/main/java/com/ververica/cdc/connectors/vitess/DummyDocs.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.ververica.cdc.connectors.vitess; + +/** This is used to generate a dummy docs jar for this module to pass OSS repository rule. */ +public class DummyDocs {} diff --git a/pom.xml b/pom.xml index f57c86cad35..925ef6d092e 100644 --- a/pom.xml +++ b/pom.xml @@ -471,6 +471,7 @@ under the License. flink-connector-sqlserver-cdc flink-connector-tidb-cdc flink-connector-db2-cdc + flink-connector-vitess-cdc flink-sql-connector-mysql-cdc flink-sql-connector-postgres-cdc flink-sql-connector-mongodb-cdc @@ -479,6 +480,7 @@ under the License. flink-sql-connector-sqlserver-cdc flink-sql-connector-tidb-cdc flink-sql-connector-db2-cdc + flink-sql-connector-vitess-cdc flink-cdc-e2e-tests