From b7534ee48d835457ab6bb2f9b53b0d54c8d9afa7 Mon Sep 17 00:00:00 2001 From: He Wang Date: Wed, 23 Mar 2022 01:40:47 +0800 Subject: [PATCH] [docs][oceanbase] Add docs for OceanBase CDC connector --- README.md | 1 + docs/content/about.md | 1 + docs/content/connectors/index.md | 1 + docs/content/connectors/oceanbase-cdc.md | 509 +++++++++++++++++++++++ 4 files changed, 512 insertions(+) create mode 100644 docs/content/connectors/oceanbase-cdc.md diff --git a/README.md b/README.md index 51457b911cc..a22e5c504ba 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ This README is meant as a brief walkthrough on the core features with Flink CDC | [mongodb-cdc](docs/content/connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [oracle-cdc](docs/content/connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2017, 2019 | JDBC Driver: 7.2.2.jre8 | +| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x | JDBC Driver: 5.7.4x | ## Features diff --git a/docs/content/about.md b/docs/content/about.md index 10cf00521c9..a848f73ff4d 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -14,6 +14,7 @@ The Flink CDC Connectors integrates Debezium as the engine to capture data chang | [mongodb-cdc](connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [oracle-cdc](connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [sqlserver-cdc](connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2017, 2019 | JDBC Driver: 7.2.2.jre8 | +| [oceanbase-cdc](connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x | JDBC Driver: 5.7.4x | ## Supported Flink Versions The following table shows the version mapping between Flink CDC Connectors and Flink: diff --git a/docs/content/connectors/index.md b/docs/content/connectors/index.md index 12748d90dbc..3c53c69fb0b 100644 --- a/docs/content/connectors/index.md +++ b/docs/content/connectors/index.md @@ -6,6 +6,7 @@ mysql-cdc postgres-cdc mongodb-cdc +oceanbase-cdc oracle-cdc sqlserver-cdc ``` diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md new file mode 100644 index 00000000000..32e86b0472f --- /dev/null +++ b/docs/content/connectors/oceanbase-cdc.md @@ -0,0 +1,509 @@ +# OceanBase CDC Connector + +The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to setup the OceanBase CDC connector to run SQL queries against OceanBase. + +Dependencies +------------ + +In order to setup the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. + +```xml + + com.ververica + flink-connector-oceanbase-cdc + + 2.2-SNAPSHOT + +``` + +### SQL Client JAR + +```Download link is available only for stable releases.``` + +Download [flink-sql-connector-oceanbase-cdc-2.2-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.2-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.2-SNAPSHOT.jar) and put it under `/lib/`. + +Setup OceanBase and LogProxy Server +---------------------- + +1. Setup the OceanBase cluster following the [deployment doc](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/deploy-the-distributed-oceanbase-cluster). + +2. Create a user with password in `sys` tenant, this user is used in OceanBase LogProxy. See [user management doc](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/create-user-3). + + ```shell + mysql -h${host} -P${port} -uroot + + mysql> SHOW TENANT; + mysql> CREATE USER ${sys_username} IDENTIFIED BY '${sys_password}'; + mysql> GRANT ALL PRIVILEGES ON *.* TO ${sys_username} WITH GRANT OPTION; + ``` + +3. Create a user in the tenant you want to monitor, this is used to read data for snapshot and change event. + +4. Get the `rootservice_list`. You can use the following command to get the value: + + ```shell + mysql> show parameters like 'rootservice_list'; + ``` + +5. Setup OceanBase LogProxy following the [quick start](https://github.com/oceanbase/oblogproxy#quick-start). + +How to create a OceanBase CDC table +---------------- + +The OceanBase CDC table can be defined as following: + +```sql +-- checkpoint every 3000 milliseconds +Flink SQL> SET 'execution.checkpointing.interval' = '3s'; + +-- register a OceanBase table 'orders' in Flink SQL +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'oceanbase-cdc', + 'scan.startup.mode' = 'initial', + 'username' = 'user@test_tenant', + 'password' = 'pswd', + 'tenant-name' = 'test_tenant', + 'database-name' = 'test_db', + 'table-name' = 'orders', + 'hostname' = '127.0.0.1', + 'port' = '2881', + 'rootserver-list' = '127.0.0.1:2882:2881', + 'logproxy.host' = '127.0.0.1', + 'logproxy.port' = '2983'); + +-- read snapshot and binlogs from orders table +Flink SQL> SELECT * FROM orders; +``` + +Connector Options +---------------- + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    OptionRequiredDefaultTypeDescription
    connectorrequired(none)StringSpecify what connector to use, here should be 'oceanbase-cdc'.
    scan.startup.moderequired(none)StringSpecify the startup mode for OceanBase CDC consumer, valid enumerations are + 'initial','latest-offset' or 'timestamp'. +
    scan.startup.timestampoptional(none)LongTimestamp in seconds of the start point, only used for 'timestamp' startup mode.
    usernamerequired(none)StringUsername to be used when connecting to OceanBase.
    passwordrequired(none)StringPassword to be used when connecting to OceanBase.
    tenant-namerequired(none)StringTenant name of OceanBase to monitor.
    database-namerequired(none)StringDatabase name of OceanBase to monitor.
    table-namerequired(none)StringTable name of OceanBase to monitor.
    hostnameoptional(none)StringIP address or hostname of the OceanBase database server or OceanBase Proxy server.
    portoptional(none))IntegerInteger port number to connect to OceanBase. It can be the SQL port of OceanBase server, which is 2881 by default, or the port of OceanBase proxy service, which is 2883 by default.
    connect.timeoutoptional30sDurationThe maximum time that the connector should wait after trying to connect to the OceanBase database server before timing out.
    server-time-zoneoptionalUTCStringThe session time zone in database server, e.g. "Asia/Shanghai". It controls how the TIMESTAMP type in OceanBase converted to STRING in snapshot reading, please make sure to set it same with the timezone of `oblogproxy` deployment.
    rootserver-listrequired(none)StringThe semicolon-separated list of OceanBase root servers in format `ip:rpc_port:sql_port`.
    logproxy.hostrequired(none)StringHostname or IP address of OceanBase log proxy service.
    logproxy.portrequired(none)IntegerPort number of OceanBase log proxy service.
    +
    + +Available Metadata +---------------- + +The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    KeyDataTypeDescription
    tenant_nameSTRING NOT NULLName of the tenant that contains the row.
    database_nameSTRING NOT NULLName of the database that contains the row.
    table_nameSTRING NOT NULLName of the table that contains the row.
    op_tsTIMESTAMP_LTZ(3) NOT NULLIt indicates the time that the change was made in the database.
    + If the record is read from snapshot of the table instead of the change stream, the value is always 0.
    + +The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields: + +```sql +CREATE TABLE products ( + tenant_name STRING METADATA FROM 'tenant_name' VIRTUAL, + db_name STRING METADATA FROM 'database_name' VIRTUAL, + table_name STRING METADATA FROM 'table_name' VIRTUAL, + operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY(order_id) NOT ENFORCED +) WITH ( + 'connector' = 'oceanbase-cdc', + 'scan.startup.mode' = 'initial', + 'username' = 'user@test_tenant', + 'password' = 'pswd', + 'tenant-name' = 'test_tenant', + 'database-name' = 'test_db', + 'table-name' = 'orders', + 'hostname' = '127.0.0.1', + 'port' = '2881', + 'rootserver-list' = '127.0.0.1:2882:2881', + 'logproxy.host' = '127.0.0.1', + 'logproxy.port' = '2983'); +``` + +Features +-------- + +### At-Least-Once Processing + +The OceanBase CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with **at-least-once processing**. + +OceanBase is a kind of distributed database whose log files are distributed on different servers. As there is no position information like MySQL binlog offset, we can only use timestamp as the position mark. In order to ensure the completeness of reading data, `liboblog` (a C++ library to read OceanBase log record) might read some log data before the given timestamp. So in this way we may read duplicate data whose timestamp is around the start point, and only 'at-least-once' can be guaranteed. + +### Startup Reading Position + +The config option `scan.startup.mode` specifies the startup mode for OceanBase CDC consumer. The valid enumerations are: + +- `initial`: Performs an initial snapshot on the monitored table upon first startup, and continue to read the latest commit log. +- `latest-offset`: Never to perform snapshot on the monitored table upon first startup and just read the latest commit log since the connector is started. +- `timestamp`: Never to perform snapshot on the monitored table upon first startup and just read the commit log from the given `scan.startup.timestamp`. + +### Consume Commit Log + +The OceanBase CDC Connector using [oblogclient](https://github.com/oceanbase/oblogclient) to consume commit log from OceanBase LogProxy. + +### DataStream Source + +The OceanBase CDC connector can also be a DataStream source. You can create a SourceFunction as the following shows: + +```java +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import com.ververica.cdc.connectors.oceanbase.OceanBaseSource; +import com.ververica.cdc.connectors.oceanbase.table.OceanBaseTableSourceFactory; +import com.ververica.cdc.connectors.oceanbase.table.StartupMode; +import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; + +public class OceanBaseSourceExample { + + public static void main(String[] args) throws Exception { + SourceFunction oceanBaseSource = + OceanBaseSource.builder() + .rsList("127.0.0.1:2882:2881") // set root server list + .startupMode(StartupMode.INITIAL) // set startup mode + .username("user@test_tenant") // set cluster username + .password("pswd") // set cluster password + .tenantName("test_tenant") // set captured tenant name, do not support regex + .databaseName("test_db") // set captured database, support regex + .tableName("test_table") // set captured table, support regex + .hostname("127.0.0.1") // set hostname of OceanBase server or proxy + .port(2881) // set the sql port for OceanBase server or proxy + .logProxyHost("127.0.0.1") // set the hostname of log proxy + .logProxyPort(2983) // set the port of log proxy + .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String + .build(); + + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + // enable checkpoint + env.enableCheckpointing(3000); + + env.addSource(oceanBaseSource).print().setParallelism(1); + + env.execute("Print OceanBase Snapshot + Commit Log"); + } +} +``` +Data Type Mapping +---------------- + +When the startup mode is not `INITIAL`, we will not be able to get the precision and scale of a column. In order to be compatible with different startup modes, we will not map one OceanBase type of different precision to different FLink types. + +For example, you can get a boolean from a column with type BOOLEAN, TINYINT(1) or BIT(1). BOOLEAN is equivalent to TINYINT(1) in OceanBase, so columns of BOOLEAN and TINYINT types will be mapped to TINYINT in Flink, and BIT(1) will be mapped to BINARY(1) in Flink. + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    OceanBase typeFlink SQL typeNOTE
    BOOLEAN
    + TINYINT
    TINYINT
    + SMALLINT
    + TINYINT UNSIGNED
    SMALLINT
    + INT
    + MEDIUMINT
    + SMALLINT UNSIGNED
    INT
    + BIGINT
    + INT UNSIGNED
    BIGINT
    BIGINT UNSIGNEDDECIMAL(20, 0)
    + REAL
    + FLOAT
    +
    FLOAT
    + DOUBLE + DOUBLE
    + NUMERIC(p, s)
    + DECIMAL(p, s)
    + where p <= 38
    +
    DECIMAL(p, s)
    + NUMERIC(p, s)
    + DECIMAL(p, s)
    + where 38 < p <=65
    +
    STRINGDECIMAL is equivalent to NUMERIC. The precision for DECIMAL data type is up to 65 in OceanBase, but + the precision for DECIMAL is limited to 38 in Flink. + So if you define a decimal column whose precision is greater than 38, you should map it to STRING to + avoid precision loss.
    DATEDATE
    TIME [(p)]TIME [(p)]
    TIMESTAMP [(p)]
    + DATETIME [(p)] +
    TIMESTAMP [(p)] +
    CHAR(n)CHAR(n)
    VARCHAR(n)VARCHAR(n)
    BIT(n)BINARY(⌈n/8⌉)
    BINARY(n)BINARY(n)
    VARBINARY(N)VARBINARY(N)
    + TINYTEXT
    + TEXT
    + MEDIUMTEXT
    + LONGTEXT
    +
    STRING
    + TINYBLOB
    + BLOB
    + MEDIUMBLOB
    + LONGBLOB
    +
    BYTES
    YEARINT
    ENUMSTRING
    SETSTRING
    +