Skip to content

Commit

Permalink
[oceanbase] add jdbc options and support oracle mode (apache#1854)
Browse files Browse the repository at this point in the history
  • Loading branch information
whhe authored Jun 8, 2023
1 parent 7eceafa commit 888f92d
Show file tree
Hide file tree
Showing 13 changed files with 588 additions and 42 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ This README is meant as a brief walkthrough on the core features of CDC Connecto
|------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
| [mongodb-cdc](docs/content/connectors/mongodb-cdc.md) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 |
| [mysql-cdc](docs/content/connectors/mysql-cdc.md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |
| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x |
| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
| [oracle-cdc](docs/content/connectors/oracle-cdc.md) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 |
| [postgres-cdc](docs/content/connectors/postgres-cdc.md) | <li> [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.27 |
| [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) | <li> [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
Expand Down
2 changes: 1 addition & 1 deletion docs/content/about.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The CDC Connectors for Apache Flink<sup>®</sup> integrate Debezium as the engin
|----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
| [mongodb-cdc](connectors/mongodb-cdc.md) | <li> [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 |
| [mysql-cdc](connectors/mysql-cdc.md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |
| [oceanbase-cdc](connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x |
| [oceanbase-cdc](connectors/oceanbase-cdc.md) | <li> [OceanBase CE](https://open.oceanbase.com): 3.1.x, 4.x <li> [OceanBase EE](https://www.oceanbase.com/product/oceanbase): 2.x, 3.x, 4.x | OceanBase Driver: 2.4.x |
| [oracle-cdc](connectors/oracle-cdc.md) | <li> [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 |
| [postgres-cdc](connectors/postgres-cdc.md) | <li> [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 |
| [sqlserver-cdc](connectors/sqlserver-cdc.md) | <li> [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 |
Expand Down
164 changes: 163 additions & 1 deletion docs/content/connectors/oceanbase-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,16 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。
</dependency>
```

如果您是要连接企业版的 OceanBase,您可能需要使用 OceanBase 官方的 JDBC 驱动,这时需要引入如下依赖。

```xml
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>oceanbase-client</artifactId>
<version>2.4.2</version>
</dependency>
```

## 下载 SQL 客户端 JAR 包

```下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。```
Expand All @@ -23,6 +33,8 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。

**注意:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取。

对于 JDBC 驱动,上述的 cdc jar 文件中已经包含了我们推荐的 MySQL 驱动版本 5.1.47。由于开源许可证的原因,我们不能在上述 cdc jar 文件中包含 OceanBase 的官方 JDBC 驱动,如果您需要使用它,可以从[这里](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.2/oceanbase-client-2.4.2.jar)下载,然后放到 `<FLINK_HOME>/lib/` 目录下,同时需要将配置项 `jdbc.driver` 设为 `com.oceanbase.jdbc.Driver`

### 配置 OceanBase 数据库和 oblogproxy 服务

1. 按照 [文档](https://github.com/oceanbase/oceanbase#quick-start) 配置 OceanBase 集群。
Expand Down Expand Up @@ -69,7 +81,7 @@ Flink SQL> CREATE TABLE orders (
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'user@test_tenant',
'username' = 'user@test_tenant#cluster_name',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = '^test_db$',
Expand All @@ -86,6 +98,36 @@ Flink SQL> CREATE TABLE orders (
Flink SQL> SELECT * FROM orders;
```

如果您使用的是企业版的 OceanBase Oracle 模式,您需要先添加 OceanBase 的官方 JDBC 驱动 jar 包到 Flink 环境,并且部署企业版的 oblogproxy 服务,然后通过以下命令创建 OceanBase CDC 表:

```sql
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'username' = 'user@test_tenant#cluster_name',
'password' = 'pswd',
'tenant-name' = 'test_tenant',
'database-name' = '^test_db$',
'table-name' = '^orders$',
'hostname' = '127.0.0.1',
'port' = '2881',
'compatible-mode' = 'oracle',
'jdbc.driver' = 'com.oceanbase.jdbc.Driver',
'config-url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&User_ID=xxx&UID=xxx&ObRegion=xxx',
'logproxy.host' = '127.0.0.1',
'logproxy.port' = '2983',
'working-mode' = 'memory'
);
```

您也可以访问 Flink CDC 官网文档,快速体验将数据从 OceanBase 导入到 Elasticsearch。更多信息,参考 [Flink CDC 官网文档](https://ververica.github.io/flink-cdc-connectors/release-2.2/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/oceanbase-tutorial-zh.html)。

## OceanBase CDC 连接器选项
Expand Down Expand Up @@ -248,6 +290,27 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表
<td>String</td>
<td>日志代理中 `libobcdc` 的工作模式 , 可以是 `storage``memory`</td>
</tr>
<tr>
<td>compatible-mode</td>
<td></td>
<td style="word-wrap: break-word;">mysql</td>
<td>String</td>
<td>OceanBase 的兼容模式,可以是 `mysql``oracle`</td>
</tr>
<tr>
<td>jdbc.driver</td>
<td></td>
<td style="word-wrap: break-word;">com.mysql.jdbc.Driver</td>
<td>String</td>
<td>全量读取时使用的 jdbc 驱动类名。</td>
</tr>
<tr>
<td>jdbc.properties.*</td>
<td></td>
<td style="word-wrap: break-word;"></td>
<td>String</td>
<td>传递自定义 JDBC URL 属性的选项。用户可以传递自定义属性,如 'jdbc.properties.useSSL' = 'false'</td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -396,6 +459,8 @@ public class OceanBaseSourceExample {
.tableName("^test_table$")
.hostname("127.0.0.1")
.port(2881)
.compatibleMode("mysql")
.jdbcDriver("com.mysql.jdbc.Driver")
.logProxyHost("127.0.0.1")
.logProxyPort(2983)
.serverTimeZone(serverTimeZone)
Expand Down Expand Up @@ -597,3 +662,100 @@ public class OceanBaseSourceExample {
</tbody>
</table>
</div>
### Oracle 模式
<div class="wy-table-responsive">
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left">OceanBase type</th>
<th class="text-left">Flink SQL type</th>
<th class="text-left">NOTE</th>
</tr>
</thead>
<tbody>
<tr>
<td>NUMBER(1)</td>
<td>BOOLEAN</td>
<td></td>
</tr>
<tr>
<td>NUMBER(p, s <= 0), p - s < 3 </td>
<td>TINYINT</td>
<td></td>
</tr>
<tr>
<td>NUMBER(p, s <= 0), p - s < 5 </td>
<td>SMALLINT</td>
<td></td>
</tr>
<tr>
<td>NUMBER(p, s <= 0), p - s < 10 </td>
<td>INT</td>
<td></td>
</tr>
<tr>
<td>NUMBER(p, s <= 0), p - s < 19 </td>
<td>BIGINT</td>
<td></td>
</tr>
<tr>
<td>NUMBER(p, s <= 0), 19 <=p - s <=38</td>
<td>DECIMAL(p - s, 0)</td>
<td></td>
</tr>
<tr>
<td>NUMBER(p, s > 0)</td>
<td>DECIMAL(p, s)</td>
</tr>
<tr>
<td>NUMBER(p, s <= 0), p - s> 38 </td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
FLOAT<br>
BINARY_FLOAT
</td>
<td>FLOAT</td>
<td></td>
</tr>
<tr>
<td>BINARY_DOUBLE</td>
<td>DOUBLE</td>
<td></td>
</tr>
<tr>
<td>
DATE<br>
TIMESTAMP [(p)]
</td>
<td>TIMESTAMP [(p)]</td>
<td></td>
</tr>
<tr>
<td>
CHAR(n)<br>
NCHAR(n)<br>
VARCHAR(n)<br>
VARCHAR2(n)<br>
NVARCHAR2(n)<br>
CLOB<br>
</td>
<td>STRING</td>
<td></td>
</tr>
<tr>
<td>
RAW<br>
BLOB<br>
ROWID
</td>
<td>BYTES</td>
<td></td>
</tr>
</tbody>
</table>
</div>
Loading

0 comments on commit 888f92d

Please sign in to comment.