From 888f92d1a27f5301b9cc82df76e48a1390c94914 Mon Sep 17 00:00:00 2001 From: He Wang Date: Thu, 8 Jun 2023 13:50:24 +0800 Subject: [PATCH] [oceanbase] add jdbc options and support oracle mode (#1854) --- README.md | 2 +- docs/content/about.md | 2 +- docs/content/connectors/oceanbase-cdc(ZH).md | 164 +++++++++++++++++- docs/content/connectors/oceanbase-cdc.md | 164 +++++++++++++++++- .../cdc/debezium/utils}/JdbcUrlUtils.java | 2 +- .../mysql/table/MySqlTableSourceFactory.java | 1 + .../connectors/oceanbase/OceanBaseSource.java | 29 +++- .../oceanbase/source/OceanBaseConnection.java | 121 ++++++++++++- .../source/OceanBaseRichSourceFunction.java | 48 ++--- .../oceanbase/table/OceanBaseTableSource.java | 22 +++ .../table/OceanBaseTableSourceFactory.java | 49 +++++- .../table/OceanBaseConnectorITCase.java | 12 +- .../table/OceanBaseTableFactoryTest.java | 14 ++ 13 files changed, 588 insertions(+), 42 deletions(-) rename {flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table => flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/utils}/JdbcUrlUtils.java (97%) diff --git a/README.md b/README.md index b2c20d2c8d7..3624b3930a5 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ This README is meant as a brief walkthrough on the core features of CDC Connecto |------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------| | [mongodb-cdc](docs/content/connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [mysql-cdc](docs/content/connectors/mysql-cdc.md) |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
  • [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
  • [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
  • [MariaDB](https://mariadb.org): 10.x
  • [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 | -| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x | +| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x | | [oracle-cdc](docs/content/connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [postgres-cdc](docs/content/connectors/postgres-cdc.md) |
  • [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.27 | | [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 | diff --git a/docs/content/about.md b/docs/content/about.md index 2b96785c84b..609ce34bb73 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -11,7 +11,7 @@ The CDC Connectors for Apache Flink® integrate Debezium as the engin |----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------| | [mongodb-cdc](connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [mysql-cdc](connectors/mysql-cdc.md) |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
  • [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
  • [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
  • [MariaDB](https://mariadb.org): 10.x
  • [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 | -| [oceanbase-cdc](connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x | +| [oceanbase-cdc](connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x | | [oracle-cdc](connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [postgres-cdc](connectors/postgres-cdc.md) |
  • [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 | | [sqlserver-cdc](connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 | diff --git a/docs/content/connectors/oceanbase-cdc(ZH).md b/docs/content/connectors/oceanbase-cdc(ZH).md index 54e8d92fc5a..a12f62b7db9 100644 --- a/docs/content/connectors/oceanbase-cdc(ZH).md +++ b/docs/content/connectors/oceanbase-cdc(ZH).md @@ -15,6 +15,16 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。 ``` +如果您是要连接企业版的 OceanBase,您可能需要使用 OceanBase 官方的 JDBC 驱动,这时需要引入如下依赖。 + +```xml + + com.oceanbase + oceanbase-client + 2.4.2 + +``` + ## 下载 SQL 客户端 JAR 包 ```下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。``` @@ -23,6 +33,8 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。 **注意:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取。 +对于 JDBC 驱动,上述的 cdc jar 文件中已经包含了我们推荐的 MySQL 驱动版本 5.1.47。由于开源许可证的原因,我们不能在上述 cdc jar 文件中包含 OceanBase 的官方 JDBC 驱动,如果您需要使用它,可以从[这里](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.2/oceanbase-client-2.4.2.jar)下载,然后放到 `/lib/` 目录下,同时需要将配置项 `jdbc.driver` 设为 `com.oceanbase.jdbc.Driver`。 + ### 配置 OceanBase 数据库和 oblogproxy 服务 1. 按照 [文档](https://github.com/oceanbase/oceanbase#quick-start) 配置 OceanBase 集群。 @@ -69,7 +81,7 @@ Flink SQL> CREATE TABLE orders ( ) WITH ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', - 'username' = 'user@test_tenant', + 'username' = 'user@test_tenant#cluster_name', 'password' = 'pswd', 'tenant-name' = 'test_tenant', 'database-name' = '^test_db$', @@ -86,6 +98,36 @@ Flink SQL> CREATE TABLE orders ( Flink SQL> SELECT * FROM orders; ``` +如果您使用的是企业版的 OceanBase Oracle 模式,您需要先添加 OceanBase 的官方 JDBC 驱动 jar 包到 Flink 环境,并且部署企业版的 oblogproxy 服务,然后通过以下命令创建 OceanBase CDC 表: + +```sql +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'oceanbase-cdc', + 'scan.startup.mode' = 'initial', + 'username' = 'user@test_tenant#cluster_name', + 'password' = 'pswd', + 'tenant-name' = 'test_tenant', + 'database-name' = '^test_db$', + 'table-name' = '^orders$', + 'hostname' = '127.0.0.1', + 'port' = '2881', + 'compatible-mode' = 'oracle', + 'jdbc.driver' = 'com.oceanbase.jdbc.Driver', + 'config-url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx', + 'logproxy.host' = '127.0.0.1', + 'logproxy.port' = '2983', + 'working-mode' = 'memory' +); +``` + 您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 [Flink CDC 官网文档](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/oceanbase-tutorial-zh.html)。 ## OceanBase CDC 连接器选项 @@ -248,6 +290,27 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表 String 日志代理中 `libobcdc` 的工作模式 , 可以是 `storage` 或 `memory`。 + + compatible-mode + 否 + mysql + String + OceanBase 的兼容模式,可以是 `mysql` 或 `oracle`。 + + + jdbc.driver + 否 + com.mysql.jdbc.Driver + String + 全量读取时使用的 jdbc 驱动类名。 + + + jdbc.properties.* + 否 + 无 + String + 传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'。 + @@ -396,6 +459,8 @@ public class OceanBaseSourceExample { .tableName("^test_table$") .hostname("127.0.0.1") .port(2881) + .compatibleMode("mysql") + .jdbcDriver("com.mysql.jdbc.Driver") .logProxyHost("127.0.0.1") .logProxyPort(2983) .serverTimeZone(serverTimeZone) @@ -597,3 +662,100 @@ public class OceanBaseSourceExample { + +### Oracle 模式 + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    OceanBase typeFlink SQL typeNOTE
    NUMBER(1)BOOLEAN
    NUMBER(p, s <= 0), p - s < 3 TINYINT
    NUMBER(p, s <= 0), p - s < 5 SMALLINT
    NUMBER(p, s <= 0), p - s < 10 INT
    NUMBER(p, s <= 0), p - s < 19 BIGINT
    NUMBER(p, s <= 0), 19 <=p - s <=38DECIMAL(p - s, 0)
    NUMBER(p, s > 0)DECIMAL(p, s)
    NUMBER(p, s <= 0), p - s> 38 STRING
    + FLOAT
    + BINARY_FLOAT +
    FLOAT
    BINARY_DOUBLEDOUBLE
    + DATE
    + TIMESTAMP [(p)] +
    TIMESTAMP [(p)]
    + CHAR(n)
    + NCHAR(n)
    + VARCHAR(n)
    + VARCHAR2(n)
    + NVARCHAR2(n)
    + CLOB
    +
    STRING
    + RAW
    + BLOB
    + ROWID +
    BYTES
    +
    diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md index 60405fd5bc1..ceafb4f4fb3 100644 --- a/docs/content/connectors/oceanbase-cdc.md +++ b/docs/content/connectors/oceanbase-cdc.md @@ -16,6 +16,16 @@ In order to set up the OceanBase CDC connector, the following table provides dep ``` +If you want to use OceanBase JDBC driver to connect to the enterprise edition database, you should also include the following dependency in your class path. + +```xml + + com.oceanbase + oceanbase-client + 2.4.2 + +``` + ### SQL Client JAR ```Download link is available only for stable releases.``` @@ -24,6 +34,8 @@ Download [flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar](https://repo1.mave **Note:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-oceanbase-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc), the released version will be available in the Maven central warehouse. +For JDBC driver, the cdc jar above already contains MySQL JDBC driver 5.1.47, which is our recommended version. Due to the license issue, we can not include the OceanBase JDBC driver in the cdc jar. If you need to use it, you can download it from [here](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.2/oceanbase-client-2.4.2.jar) and put it under `/lib/`, you also need to set the start option `jdbc.driver` to `com.oceanbase.jdbc.Driver`. + Setup OceanBase and LogProxy Server ---------------------- @@ -76,7 +88,7 @@ Flink SQL> CREATE TABLE orders ( ) WITH ( 'connector' = 'oceanbase-cdc', 'scan.startup.mode' = 'initial', - 'username' = 'user@test_tenant', + 'username' = 'user@test_tenant#cluster_name', 'password' = 'pswd', 'tenant-name' = 'test_tenant', 'database-name' = '^test_db$', @@ -93,6 +105,36 @@ Flink SQL> CREATE TABLE orders ( Flink SQL> SELECT * FROM orders; ``` +If you want to use OceanBase Oracle mode, you need to add the OceanBase jdbc jar file to Flink and set up the enterprise edition of oblogproxy, then you can create a table in Flink as following: + +```sql +Flink SQL> CREATE TABLE orders ( + order_id INT, + order_date TIMESTAMP(0), + customer_name STRING, + price DECIMAL(10, 5), + product_id INT, + order_status BOOLEAN, + PRIMARY KEY (order_id) NOT ENFORCED +) WITH ( + 'connector' = 'oceanbase-cdc', + 'scan.startup.mode' = 'initial', + 'username' = 'user@test_tenant#cluster_name', + 'password' = 'pswd', + 'tenant-name' = 'test_tenant', + 'database-name' = '^test_db$', + 'table-name' = '^orders$', + 'hostname' = '127.0.0.1', + 'port' = '2881', + 'compatible-mode' = 'oracle', + 'jdbc.driver' = 'com.oceanbase.jdbc.Driver', + 'config-url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx', + 'logproxy.host' = '127.0.0.1', + 'logproxy.port' = '2983', + 'working-mode' = 'memory' +); +``` + You can also try the quickstart tutorial that sync data from OceanBase to Elasticsearch, please refer [Flink CDC Tutorial](https://ververica.github.io/flink-cdc-connectors/release-2.3//content/quickstart/oceanbase-tutorial.html) for more information. Connector Options @@ -251,6 +293,27 @@ The OceanBase CDC Connector contains some options for both sql and stream api as String Working mode of `obcdc` in LogProxy, can be `storage` or `memory`. + + compatible-mode + optional + mysql + String + Compatible mode of OceanBase, can be `mysql` or `oracle`. + + + jdbc.driver + optional + com.mysql.jdbc.Driver + String + JDBC driver class for snapshot reading. + + + jdbc.properties.* + optional + (none) + String + Option to pass custom JDBC URL properties. User can pass custom properties like 'jdbc.properties.useSSL' = 'false'. + @@ -404,6 +467,8 @@ public class OceanBaseSourceExample { .tableName("^test_table$") .hostname("127.0.0.1") .port(2881) + .compatibleMode("mysql") + .jdbcDriver("com.mysql.jdbc.Driver") .logProxyHost("127.0.0.1") .logProxyPort(2983) .serverTimeZone(serverTimeZone) @@ -598,3 +663,100 @@ Data Type Mapping + +### Oracle Mode + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    OceanBase typeFlink SQL typeNOTE
    NUMBER(1)BOOLEAN
    NUMBER(p, s <= 0), p - s < 3 TINYINT
    NUMBER(p, s <= 0), p - s < 5 SMALLINT
    NUMBER(p, s <= 0), p - s < 10 INT
    NUMBER(p, s <= 0), p - s < 19 BIGINT
    NUMBER(p, s <= 0), 19 <=p - s <=38DECIMAL(p - s, 0)
    NUMBER(p, s > 0)DECIMAL(p, s)
    NUMBER(p, s <= 0), p - s> 38 STRING
    + FLOAT
    + BINARY_FLOAT +
    FLOAT
    BINARY_DOUBLEDOUBLE
    + DATE
    + TIMESTAMP [(p)] +
    TIMESTAMP [(p)]
    + CHAR(n)
    + NCHAR(n)
    + VARCHAR(n)
    + VARCHAR2(n)
    + NVARCHAR2(n)
    + CLOB
    +
    STRING
    + RAW
    + BLOB
    + ROWID +
    BYTES
    +
    \ No newline at end of file diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/JdbcUrlUtils.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/utils/JdbcUrlUtils.java similarity index 97% rename from flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/JdbcUrlUtils.java rename to flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/utils/JdbcUrlUtils.java index ef3c35d6c99..de71c023069 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/JdbcUrlUtils.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/utils/JdbcUrlUtils.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package com.ververica.cdc.connectors.mysql.table; +package com.ververica.cdc.debezium.utils; import java.util.Map; import java.util.Properties; diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 43e13f81183..15c25e87c25 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -32,6 +32,7 @@ import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetBuilder; import com.ververica.cdc.connectors.mysql.utils.OptionUtils; import com.ververica.cdc.debezium.table.DebeziumOptions; +import com.ververica.cdc.debezium.utils.JdbcUrlUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java index 3f985dbfe65..7f268604c27 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java @@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils; import java.time.Duration; +import java.util.Properties; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -56,6 +57,9 @@ public static class Builder { // snapshot reading config private String hostname; private Integer port; + private String compatibleMode; + private String jdbcDriver; + private Properties jdbcProperties; // incremental reading config private String logProxyHost; @@ -123,6 +127,21 @@ public Builder port(int port) { return this; } + public Builder compatibleMode(String compatibleMode) { + this.compatibleMode = compatibleMode; + return this; + } + + public Builder jdbcDriver(String jdbcDriver) { + this.jdbcDriver = jdbcDriver; + return this; + } + + public Builder jdbcProperties(Properties jdbcProperties) { + this.jdbcProperties = jdbcProperties; + return this; + } + public Builder logProxyHost(String logProxyHost) { this.logProxyHost = logProxyHost; return this; @@ -168,6 +187,11 @@ public SourceFunction build() { case INITIAL: checkNotNull(hostname, "hostname shouldn't be null on startup mode 'initial'"); checkNotNull(port, "port shouldn't be null on startup mode 'initial'"); + checkNotNull( + compatibleMode, + "compatibleMode shouldn't be null on startup mode 'initial'"); + checkNotNull( + jdbcDriver, "jdbcDriver shouldn't be null on startup mode 'initial'"); startupTimestamp = 0L; break; case LATEST_OFFSET: @@ -237,7 +261,7 @@ public SourceFunction build() { obReaderConfig.setStartTimestamp(startupTimestamp); obReaderConfig.setTimezone(serverTimeZone); - return new OceanBaseRichSourceFunction( + return new OceanBaseRichSourceFunction<>( StartupMode.INITIAL.equals(startupMode), username, password, @@ -248,6 +272,9 @@ public SourceFunction build() { connectTimeout, hostname, port, + compatibleMode, + jdbcDriver, + jdbcProperties, logProxyHost, logProxyPort, clientConf, diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java index 79a5bf43402..c0c179656cf 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseConnection.java @@ -16,17 +16,31 @@ package com.ververica.cdc.connectors.oceanbase.source; +import org.apache.flink.util.FlinkRuntimeException; + import io.debezium.config.Configuration; import io.debezium.jdbc.JdbcConnection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; +import java.util.regex.Pattern; +import java.util.stream.Collectors; /** {@link JdbcConnection} extension to be used with OceanBase server. */ public class OceanBaseConnection extends JdbcConnection { - protected static final String URL_PATTERN = - "jdbc:mysql://${hostname}:${port}/?useInformationSchema=true&nullCatalogMeansCurrent=false&useSSL=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=convertToNull&connectTimeout=${connectTimeout}"; - protected static final String DRIVER_CLASS_NAME = "com.mysql.jdbc.Driver"; + private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties(); + private static final String MYSQL_URL_PATTERN = + "jdbc:mysql://${hostname}:${port}/?connectTimeout=${connectTimeout}"; + private static final String OB_URL_PATTERN = + "jdbc:oceanbase://${hostname}:${port}/?connectTimeout=${connectTimeout}"; + + private final String compatibleMode; public OceanBaseConnection( String hostname, @@ -34,11 +48,17 @@ public OceanBaseConnection( String user, String password, Duration timeout, + String compatibleMode, + String jdbcDriver, + Properties jdbcProperties, ClassLoader classLoader) { - super(config(hostname, port, user, password, timeout), factory(classLoader)); + super( + config(hostname, port, user, password, timeout), + factory(jdbcDriver, jdbcProperties, classLoader)); + this.compatibleMode = compatibleMode; } - public static Configuration config( + private static Configuration config( String hostname, Integer port, String user, String password, Duration timeout) { return Configuration.create() .with("hostname", hostname) @@ -49,7 +69,94 @@ public static Configuration config( .build(); } - public static JdbcConnection.ConnectionFactory factory(ClassLoader classLoader) { - return JdbcConnection.patternBasedFactory(URL_PATTERN, DRIVER_CLASS_NAME, classLoader); + private static String formatJdbcUrl(String jdbcDriver, Properties jdbcProperties) { + Properties combinedProperties = new Properties(); + combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES); + if (jdbcProperties != null) { + combinedProperties.putAll(jdbcProperties); + } + String urlPattern = + jdbcDriver.toLowerCase().contains("oceanbase") ? OB_URL_PATTERN : MYSQL_URL_PATTERN; + StringBuilder jdbcUrlStringBuilder = new StringBuilder(urlPattern); + combinedProperties.forEach( + (key, value) -> { + jdbcUrlStringBuilder.append("&").append(key).append("=").append(value); + }); + return jdbcUrlStringBuilder.toString(); + } + + private static Properties initializeDefaultJdbcProperties() { + Properties defaultJdbcProperties = new Properties(); + defaultJdbcProperties.setProperty("useInformationSchema", "true"); + defaultJdbcProperties.setProperty("nullCatalogMeansCurrent", "false"); + defaultJdbcProperties.setProperty("useUnicode", "true"); + defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "convertToNull"); + defaultJdbcProperties.setProperty("characterEncoding", "UTF-8"); + defaultJdbcProperties.setProperty("characterSetResults", "UTF-8"); + return defaultJdbcProperties; + } + + private static JdbcConnection.ConnectionFactory factory( + String jdbcDriver, Properties jdbcProperties, ClassLoader classLoader) { + return JdbcConnection.patternBasedFactory( + formatJdbcUrl(jdbcDriver, jdbcProperties), jdbcDriver, classLoader); + } + + /** + * Get table list by database name pattern and table name pattern. + * + * @param dbPattern Database name pattern. + * @param tbPattern Table name pattern. + * @return Table list. + * @throws SQLException If a database access error occurs. + */ + public List getTables(String dbPattern, String tbPattern) throws SQLException { + List result = new ArrayList<>(); + DatabaseMetaData metaData = connection().getMetaData(); + switch (compatibleMode.toLowerCase()) { + case "mysql": + List dbNames = getResultList(metaData.getCatalogs(), "TABLE_CAT"); + dbNames = + dbNames.stream() + .filter(dbName -> Pattern.matches(dbPattern, dbName)) + .collect(Collectors.toList()); + for (String dbName : dbNames) { + List tableNames = + getResultList( + metaData.getTables(dbName, null, null, new String[] {"TABLE"}), + "TABLE_NAME"); + tableNames.stream() + .filter(tbName -> Pattern.matches(tbPattern, tbName)) + .forEach(tbName -> result.add(dbName + "." + tbName)); + } + break; + case "oracle": + dbNames = getResultList(metaData.getSchemas(), "TABLE_SCHEM"); + dbNames = + dbNames.stream() + .filter(dbName -> Pattern.matches(dbPattern, dbName)) + .collect(Collectors.toList()); + for (String dbName : dbNames) { + List tableNames = + getResultList( + metaData.getTables(null, dbName, null, new String[] {"TABLE"}), + "TABLE_NAME"); + tableNames.stream() + .filter(tbName -> Pattern.matches(tbPattern, tbName)) + .forEach(tbName -> result.add(dbName + "." + tbName)); + } + break; + default: + throw new FlinkRuntimeException("Unsupported compatible mode: " + compatibleMode); + } + return result; + } + + private List getResultList(ResultSet resultSet, String columnName) throws SQLException { + List result = new ArrayList<>(); + while (resultSet.next()) { + result.add(resultSet.getString(columnName)); + } + return result; } } diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java index 64df710c4d9..0561ae5fdea 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java @@ -45,12 +45,12 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.time.Duration; -import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -82,6 +82,9 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction private final Duration connectTimeout; private final String hostname; private final Integer port; + private final String compatibleMode; + private final String jdbcDriver; + private final Properties jdbcProperties; private final String logProxyHost; private final int logProxyPort; private final ClientConf logProxyClientConf; @@ -110,6 +113,9 @@ public OceanBaseRichSourceFunction( Duration connectTimeout, String hostname, Integer port, + String compatibleMode, + String jdbcDriver, + Properties jdbcProperties, String logProxyHost, int logProxyPort, ClientConf logProxyClientConf, @@ -125,6 +131,9 @@ public OceanBaseRichSourceFunction( this.connectTimeout = checkNotNull(connectTimeout); this.hostname = hostname; this.port = port; + this.compatibleMode = compatibleMode; + this.jdbcDriver = jdbcDriver; + this.jdbcProperties = jdbcProperties; this.logProxyHost = checkNotNull(logProxyHost); this.logProxyPort = checkNotNull(logProxyPort); this.logProxyClientConf = checkNotNull(logProxyClientConf); @@ -172,6 +181,9 @@ private OceanBaseConnection getSnapshotConnection() { username, password, connectTimeout, + compatibleMode, + jdbcDriver, + jdbcProperties, getClass().getClassLoader()); } return snapshotConnection; @@ -206,28 +218,13 @@ private void initTableWhiteList() { if (StringUtils.isNotBlank(databaseName) && StringUtils.isNotBlank(tableName)) { try { - String sql = - String.format( - "SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " - + "WHERE TABLE_TYPE='BASE TABLE' and TABLE_SCHEMA REGEXP '%s' and TABLE_NAME REGEXP '%s'", - databaseName, tableName); - final List matchedTables = new ArrayList<>(); - getSnapshotConnection() - .query( - sql, - rs -> { - while (rs.next()) { - matchedTables.add( - String.format( - "%s.%s", rs.getString(1), rs.getString(2))); - } - }); - LOG.info("Pattern matched tables: {}", matchedTables); - localTableSet.addAll(matchedTables); + List tables = getSnapshotConnection().getTables(databaseName, tableName); + LOG.info("Pattern matched tables: {}", tables); + localTableSet.addAll(tables); } catch (SQLException e) { LOG.error( String.format( - "Query table list by 'database-name' %s and 'table-name' %s failed.", + "Query table list by 'databaseName' %s and 'tableName' %s failed.", databaseName, tableName), e); throw new FlinkRuntimeException(e); @@ -257,14 +254,17 @@ private void readSnapshotRecordsByTable(String databaseName, String tableName) { OceanBaseRecord.SourceInfo sourceInfo = new OceanBaseRecord.SourceInfo( tenantName, databaseName, tableName, resolvedTimestamp); - - String fullName = String.format("`%s`.`%s`", databaseName, tableName); - String selectSql = "SELECT * FROM " + fullName; + String fullName; + if ("mysql".equalsIgnoreCase(compatibleMode)) { + fullName = String.format("`%s`.`%s`", databaseName, tableName); + } else { + fullName = String.format("%s.%s", databaseName, tableName); + } try { LOG.info("Start to read snapshot from {}", fullName); getSnapshotConnection() .query( - selectSql, + "SELECT * FROM " + fullName, rs -> { ResultSetMetaData metaData = rs.getMetaData(); while (rs.next()) { diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java index ac8a23c9f59..3cabe667722 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSource.java @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -58,6 +59,9 @@ public class OceanBaseTableSource implements ScanTableSource, SupportsReadingMet private final String hostname; private final Integer port; + private final String compatibleMode; + private final String jdbcDriver; + private final Properties jdbcProperties; private final String logProxyHost; private final Integer logProxyPort; @@ -90,6 +94,9 @@ public OceanBaseTableSource( Duration connectTimeout, String hostname, Integer port, + String compatibleMode, + String jdbcDriver, + Properties jdbcProperties, String logProxyHost, Integer logProxyPort, String logProxyClientId, @@ -109,6 +116,9 @@ public OceanBaseTableSource( this.connectTimeout = connectTimeout; this.hostname = hostname; this.port = port; + this.compatibleMode = compatibleMode; + this.jdbcDriver = jdbcDriver; + this.jdbcProperties = jdbcProperties; this.logProxyHost = checkNotNull(logProxyHost); this.logProxyPort = checkNotNull(logProxyPort); this.logProxyClientId = logProxyClientId; @@ -154,6 +164,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext context) { .connectTimeout(connectTimeout) .hostname(hostname) .port(port) + .compatibleMode(compatibleMode) + .jdbcDriver(jdbcDriver) + .jdbcProperties(jdbcProperties) .logProxyHost(logProxyHost) .logProxyPort(logProxyPort) .logProxyClientId(logProxyClientId) @@ -211,6 +224,9 @@ public DynamicTableSource copy() { connectTimeout, hostname, port, + compatibleMode, + jdbcDriver, + jdbcProperties, logProxyHost, logProxyPort, logProxyClientId, @@ -244,6 +260,9 @@ public boolean equals(Object o) { && Objects.equals(this.connectTimeout, that.connectTimeout) && Objects.equals(this.hostname, that.hostname) && Objects.equals(this.port, that.port) + && Objects.equals(this.compatibleMode, that.compatibleMode) + && Objects.equals(this.jdbcDriver, that.jdbcDriver) + && Objects.equals(this.jdbcProperties, that.jdbcProperties) && Objects.equals(this.logProxyHost, that.logProxyHost) && Objects.equals(this.logProxyPort, that.logProxyPort) && Objects.equals(this.logProxyClientId, that.logProxyClientId) @@ -270,6 +289,9 @@ public int hashCode() { connectTimeout, hostname, port, + compatibleMode, + jdbcDriver, + jdbcProperties, logProxyHost, logProxyPort, logProxyClientId, diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java index d2d9f7884e3..eb7e6ddc0ba 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableSourceFactory.java @@ -26,9 +26,11 @@ import org.apache.flink.table.factories.FactoryUtil; import com.ververica.cdc.connectors.oceanbase.utils.OptionUtils; +import com.ververica.cdc.debezium.utils.JdbcUrlUtils; import java.time.Duration; import java.util.HashSet; +import java.util.Objects; import java.util.Set; /** Factory for creating configured instance of {@link OceanBaseTableSource}. */ @@ -110,6 +112,20 @@ public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory { .withDescription( "Integer port number of OceanBase database server or OceanBase proxy server."); + public static final ConfigOption COMPATIBLE_MODE = + ConfigOptions.key("compatible-mode") + .stringType() + .defaultValue("mysql") + .withDescription( + "The compatible mode of OceanBase, can be 'mysql' or 'oracle'."); + + public static final ConfigOption JDBC_DRIVER = + ConfigOptions.key("jdbc.driver") + .stringType() + .defaultValue("com.mysql.jdbc.Driver") + .withDescription( + "JDBC driver class name, use 'com.mysql.jdbc.Driver' by default."); + public static final ConfigOption LOG_PROXY_HOST = ConfigOptions.key("logproxy.host") .stringType() @@ -161,11 +177,12 @@ public class OceanBaseTableSourceFactory implements DynamicTableSourceFactory { public DynamicTableSource createDynamicTableSource(Context context) { final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context); - helper.validate(); + helper.validateExcept(JdbcUrlUtils.PROPERTIES_PREFIX); ResolvedSchema physicalSchema = context.getCatalogTable().getResolvedSchema(); ReadableConfig config = helper.getOptions(); + validate(config); StartupMode startupMode = StartupMode.getStartupMode(config.get(SCAN_STARTUP_MODE)); @@ -181,6 +198,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { String hostname = config.get(HOSTNAME); Integer port = config.get(PORT); + String compatibleMode = config.get(COMPATIBLE_MODE); + String jdbcDriver = config.get(JDBC_DRIVER); String logProxyHost = config.get(LOG_PROXY_HOST); Integer logProxyPort = config.get(LOG_PROXY_PORT); @@ -205,6 +224,9 @@ public DynamicTableSource createDynamicTableSource(Context context) { connectTimeout, hostname, port, + compatibleMode, + jdbcDriver, + JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()), logProxyHost, logProxyPort, logProxyClientId, @@ -240,6 +262,8 @@ public Set> optionalOptions() { options.add(TABLE_LIST); options.add(HOSTNAME); options.add(PORT); + options.add(COMPATIBLE_MODE); + options.add(JDBC_DRIVER); options.add(CONNECT_TIMEOUT); options.add(SERVER_TIME_ZONE); options.add(LOG_PROXY_CLIENT_ID); @@ -248,4 +272,27 @@ public Set> optionalOptions() { options.add(WORKING_MODE); return options; } + + private void validate(ReadableConfig config) { + String startupMode = config.get(SCAN_STARTUP_MODE); + if (StartupMode.getStartupMode(startupMode).equals(StartupMode.INITIAL)) { + String compatibleMode = + Objects.requireNonNull( + config.get(COMPATIBLE_MODE), + "'compatible-mode' is required for 'initial' startup mode."); + String jdbcDriver = + Objects.requireNonNull( + config.get(JDBC_DRIVER), + "'jdbc.driver' is required for 'initial' startup mode."); + if (compatibleMode.equalsIgnoreCase("oracle")) { + if (!jdbcDriver.toLowerCase().contains("oceanbase")) { + throw new IllegalArgumentException( + "OceanBase JDBC driver is required for OceanBase Enterprise Edition."); + } + Objects.requireNonNull( + config.get(CONFIG_URL), + "'config-url' is required for OceanBase Enterprise Edition."); + } + } + } } diff --git a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java index 99fdd4c7d69..11dd2be6a4d 100644 --- a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java +++ b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseConnectorITCase.java @@ -97,7 +97,8 @@ public void testTableList() throws Exception { + " 'logproxy.host' = '%s'," + " 'logproxy.port' = '%s'," + " 'rootserver-list' = '%s'," - + " 'working-mode' = 'memory'" + + " 'working-mode' = 'memory'," + + " 'jdbc.properties.useSSL' = 'false'" + ")", getUsername(), getPassword(), @@ -221,7 +222,8 @@ public void testMetadataColumns() throws Exception { + " 'logproxy.host' = '%s'," + " 'logproxy.port' = '%s'," + " 'rootserver-list' = '%s'," - + " 'working-mode' = 'memory'" + + " 'working-mode' = 'memory'," + + " 'jdbc.properties.useSSL' = 'false'" + ")", getUsername(), getPassword(), @@ -371,7 +373,8 @@ public void testAllDataTypes() throws Exception { + " 'logproxy.host' = '%s'," + " 'logproxy.port' = '%s'," + " 'rootserver-list' = '%s'," - + " 'working-mode' = 'memory'" + + " 'working-mode' = 'memory'," + + " 'jdbc.properties.useSSL' = 'false'" + ")", getUsername(), getPassword(), @@ -500,7 +503,8 @@ public void testTimeDataTypes(String serverTimeZone) throws Exception { + " 'logproxy.host' = '%s'," + " 'logproxy.port' = '%s'," + " 'rootserver-list' = '%s'," - + " 'working-mode' = 'memory'" + + " 'working-mode' = 'memory'," + + " 'jdbc.properties.useSSL' = 'false'" + ")", getUsername(), getPassword(), diff --git a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java index 7ffc3e0867a..931c74f8e74 100644 --- a/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java +++ b/flink-connector-oceanbase-cdc/src/test/java/com/ververica/cdc/connectors/oceanbase/table/OceanBaseTableFactoryTest.java @@ -37,6 +37,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Properties; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -82,6 +83,8 @@ public class OceanBaseTableFactoryTest { private static final String CONNECT_TIMEOUT = "30s"; private static final String HOSTNAME = "127.0.0.1"; private static final Integer PORT = 2881; + private static final String COMPATIBLE_MODE = "mysql"; + private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver"; private static final String LOG_PROXY_HOST = "127.0.0.1"; private static final Integer LOG_PROXY_PORT = 2983; private static final String LOG_PROXY_CLIENT_ID = "clientId"; @@ -111,6 +114,9 @@ public void testCommonProperties() { Duration.parse("PT" + CONNECT_TIMEOUT), null, null, + COMPATIBLE_MODE, + DRIVER_CLASS, + new Properties(), LOG_PROXY_HOST, LOG_PROXY_PORT, null, @@ -130,6 +136,8 @@ public void testOptionalProperties() { options.put("table-list", TABLE_LIST); options.put("hostname", HOSTNAME); options.put("port", String.valueOf(PORT)); + options.put("compatible-mode", COMPATIBLE_MODE); + options.put("jdbc.driver", DRIVER_CLASS); options.put("logproxy.client.id", LOG_PROXY_CLIENT_ID); options.put("rootserver-list", RS_LIST); DynamicTableSource actualSource = createTableSource(SCHEMA, options); @@ -148,6 +156,9 @@ public void testOptionalProperties() { Duration.parse("PT" + CONNECT_TIMEOUT), "127.0.0.1", 2881, + COMPATIBLE_MODE, + DRIVER_CLASS, + new Properties(), LOG_PROXY_HOST, LOG_PROXY_PORT, LOG_PROXY_CLIENT_ID, @@ -187,6 +198,9 @@ public void testMetadataColumns() { Duration.parse("PT" + CONNECT_TIMEOUT), null, null, + COMPATIBLE_MODE, + DRIVER_CLASS, + new Properties(), LOG_PROXY_HOST, LOG_PROXY_PORT, null,