From 7ad375643ca0b6aea5a8110bb4d29e9ae16c091d Mon Sep 17 00:00:00 2001 From: gongzhongqiang <764629910@qq.com> Date: Wed, 23 Mar 2022 00:23:39 +0800 Subject: [PATCH] [docs][tidb] Add tidb cdc connector document This closes #959. --- README.md | 1 + docs/content/about.md | 1 + docs/content/connectors/index.md | 1 + docs/content/connectors/tidb-cdc.md | 473 ++++++++++++++++++++++++++++ 4 files changed, 476 insertions(+) create mode 100644 docs/content/connectors/tidb-cdc.md diff --git a/README.md b/README.md index a22e5c504ba..d0a5c35794b 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ This README is meant as a brief walkthrough on the core features with Flink CDC | [mongodb-cdc](docs/content/connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [oracle-cdc](docs/content/connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2017, 2019 | JDBC Driver: 7.2.2.jre8 | +| [tidb-cdc](docs/content/connectors/tidb-cdc.md) |
  • [TiDB](https://www.pingcap.com): 5.1.0+ | JDBC Driver: 8.0.16 | | [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x | JDBC Driver: 5.7.4x | ## Features diff --git a/docs/content/about.md b/docs/content/about.md index a848f73ff4d..07a5702c9d6 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -14,6 +14,7 @@ The Flink CDC Connectors integrates Debezium as the engine to capture data chang | [mongodb-cdc](connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [oracle-cdc](connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [sqlserver-cdc](connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2017, 2019 | JDBC Driver: 7.2.2.jre8 | +| [tidb-cdc](connectors/tidb-cdc.md) |
  • [TiDB](https://www.pingcap.com/): 5.1.0+ | JDBC Driver: 7.2.2.jre8 | | [oceanbase-cdc](connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x | JDBC Driver: 5.7.4x | ## Supported Flink Versions diff --git a/docs/content/connectors/index.md b/docs/content/connectors/index.md index 3c53c69fb0b..5083e713d73 100644 --- a/docs/content/connectors/index.md +++ b/docs/content/connectors/index.md @@ -9,4 +9,5 @@ mongodb-cdc oceanbase-cdc oracle-cdc sqlserver-cdc +tidb-cdc ``` diff --git a/docs/content/connectors/tidb-cdc.md b/docs/content/connectors/tidb-cdc.md new file mode 100644 index 00000000000..366a54414fc --- /dev/null +++ b/docs/content/connectors/tidb-cdc.md @@ -0,0 +1,473 @@ +# TiDB CDC Connector + +The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to setup the TiDB CDC connector to run SQL queries against TiDB databases. + +Dependencies +------------ + +In order to setup the TiDB CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. + +### Maven dependency + +``` + + com.ververica + flink-connector-tidb-cdc + + 2.2-SNAPSHOT + +``` + +### SQL Client JAR + +```Download link is available only for stable releases.``` + +Download [flink-sql-connector-tidb-cdc-2.2-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.2-SNAPSHOT/flink-sql-connector-tidb-cdc-2.2-SNAPSHOT.jar) and put it under `/lib/`. + +How to create a TiDB CDC table +---------------- + +The TiDB CDC table can be defined as following: + +```sql +-- checkpoint every 3000 milliseconds +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; + +-- register a TiDB table 'orders' in Flink SQL +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(3), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY(order_id) NOT ENFORCED + ) WITH ( + 'connector' = 'tidb-cdc', + 'tikv.grpc.timeout_in_ms' = '20000', + 'pd-addresses' = 'localhost:2379', + 'database-name' = 'mydb', + 'table-name' = 'orders' +); + +-- read snapshot and binlogs from orders table +Flink SQL> SELECT * FROM orders; +``` + +Connector Options +---------------- + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    OptionRequiredDefaultTypeDescription
    connectorrequired(none)StringSpecify what connector to use, here should be 'tidb-cdc'.
    database-namerequired(none)StringDatabase name of the TiDB server to monitor.
    table-namerequired(none)StringTable name of the TiDB database to monitor.
    scan.startup.modeoptionalinitialStringOptional startup mode for TiDB CDC consumer, valid enumerations are "initial" and "latest-offset".
    pd-addressesrequired(none)StringTiKV cluster's PD address.
    tikv.grpc.timeout_in_msoptional(none)LongTiKV GRPC timeout in ms.
    tikv.grpc.scan_timeout_in_msoptional(none)LongTiKV GRPC scan timeout in ms.
    tikv.batch_get_concurrencyoptional20IntegerTiKV GRPC batch get concurrency.
    tikv.*optional(none)StringPass-through TiDB client's properties.
    +
    + +Available Metadata +---------------- + +The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. + + + + + + + + + + + + + + + + + + + + + + + + + + +
    KeyDataTypeDescription
    table_nameSTRING NOT NULLName of the table that contain the row.
    database_nameSTRING NOT NULLName of the database that contain the row.
    op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
    If the record is read from snapshot of the table instead of the binlog, the value is always 0.
    + +The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields: +```sql +CREATE TABLE products ( + db_name STRING METADATA FROM 'database_name' VIRTUAL, + table_name STRING METADATA FROM 'table_name' VIRTUAL, + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY(order_id) NOT ENFORCED +) WITH ( + 'connector' = 'tidb-cdc', + 'tikv.grpc.timeout_in_ms' = '20000', + 'pd-addresses' = 'localhost:2379', + 'database-name' = 'mydb', + 'table-name' = 'orders' +); +``` + +Features +-------- +### Exactly-Once Processing + +The TiDB CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with **exactly-once processing** even failures happen. + +### Startup Reading Position + +The config option `scan.startup.mode` specifies the startup mode for TiDB CDC consumer. The valid enumerations are: + +- `initial` (default): Takes a snapshot of structure and data of captured tables; useful if you want fetch a complete representation of the data from the captured tables. +- `latest_offset`: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be fetched. + +### Multi Thread Reading + +The TiDB CDC source can work in parallel reading, because there is multiple tasks can receive change events. + +### DataStream Source + +The TiDB CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: + +### DataStream Source + +```java +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.util.Collector; + +import com.ververica.cdc.connectors.tidb.TDBSourceOptions; +import com.ververica.cdc.connectors.tidb.TiDBSource; +import com.ververica.cdc.connectors.tidb.TiKVChangeEventDeserializationSchema; +import com.ververica.cdc.connectors.tidb.TiKVSnapshotEventDeserializationSchema; +import org.tikv.kvproto.Cdcpb; +import org.tikv.kvproto.Kvrpcpb; + +import java.util.HashMap; + +public class TiDBSourceExample { + + public static void main(String[] args) throws Exception { + + SourceFunction tidbSource = + TiDBSource.builder() + .database("mydb") // set captured database + .tableName("products") // set captured table + .tiConf( + TDBSourceOptions.getTiConfiguration( + "localhost:2399", new HashMap<>())) + .snapshotEventDeserializer( + new TiKVSnapshotEventDeserializationSchema() { + @Override + public void deserialize( + Kvrpcpb.KvPair record, Collector out) + throws Exception { + out.collect(record.toString()); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }) + .changeEventDeserializer( + new TiKVChangeEventDeserializationSchema() { + @Override + public void deserialize( + Cdcpb.Event.Row record, Collector out) + throws Exception { + out.collect(record.toString()); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + }) + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + env.addSource(tidbSource).print().setParallelism(1); + + env.execute("Print TiDB Snapshot + Binlog"); + } +} +``` + +Data Type Mapping +---------------- + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    TiDB typeFlink SQL typeNOTE
    TINYINTTINYINT
    + SMALLINT
    + TINYINT UNSIGNED
    SMALLINT
    + INT
    + MEDIUMINT
    + SMALLINT UNSIGNED
    INT
    + BIGINT
    + INT UNSIGNED
    BIGINT
    BIGINT UNSIGNEDDECIMAL(20, 0)
    + FLOAT
    +
    FLOAT
    + REAL
    + DOUBLE +
    DOUBLE
    + NUMERIC(p, s)
    + DECIMAL(p, s)
    + where p <= 38
    +
    DECIMAL(p, s)
    + NUMERIC(p, s)
    + DECIMAL(p, s)
    + where 38 < p <= 65
    +
    STRINGThe precision for DECIMAL data type is up to 65 in TiDB, but the precision for DECIMAL is limited to 38 in Flink. + So if you define a decimal column whose precision is greater than 38, you should map it to STRING to avoid precision loss.
    + BOOLEAN
    + TINYINT(1)
    + BIT(1) +
    BOOLEAN
    DATEDATE
    TIME [(p)]TIME [(p)]
    TIMESTAMP [(p)]TIMESTAMP_LTZ [(p)]
    DATETIME [(p)]TIMESTAMP [(p)] +
    + CHAR(n) + CHAR(n)
    + VARCHAR(n) + VARCHAR(n)
    + BIT(n) + BINARY(⌈n/8⌉)
    + BINARY(n) + BINARY(n)
    + TINYTEXT
    + TEXT
    + MEDIUMTEXT
    + LONGTEXT
    +
    STRING
    + TINYBLOB
    + BLOB
    + MEDIUMBLOB
    + LONGBLOB
    +
    BYTESCurrently, for BLOB data type in TiDB, only the blob whose length isn't greater than 2,147,483,647(2 ** 31 - 1) is supported.
    + YEAR + INT
    + ENUM + STRING
    + JSON + STRINGThe JSON data type will be converted into STRING with JSON format in Flink.
    + SET + ARRAY<STRING>As the SET data type in TiDB is a string object that can have zero or more values, + it should always be mapped to an array of string +
    +