Skip to content

Commit

Permalink
[Improve][Connector-V2] Support hive catalog for paimon sink (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dailai authored May 22, 2024
1 parent afdcb32 commit 4969c91
Show file tree
Hide file tree
Showing 13 changed files with 536 additions and 36 deletions.
105 changes: 105 additions & 0 deletions docs/en/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@

Sink connector for Apache Paimon. It can support cdc mode 、auto create table.

## Supported DataSource Info

| Datasource | Dependent | Maven |
|------------|-----------|---------------------------------------------------------------------------|
| Paimon | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) |
| Paimon | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) |

## Database Dependency

> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to <FLINK_HOME>/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
```
hive-exec-xxx.jar
libfb303-xxx.jar
```

> Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.
## Key features

- [x] [exactly-once](../../concept/connector-v2-features.md)
Expand All @@ -15,6 +33,8 @@ Sink connector for Apache Paimon. It can support cdc mode 、auto create table.
| name | type | required | default value | Description |
|-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| warehouse | String | Yes | - | Paimon warehouse path |
| catalog_type | String | No | filesystem | Catalog type of Paimon, support filesystem and hive |
| catalog_uri | String | No | - | Catalog uri of Paimon, only needed when catalog_type is hive |
| database | String | Yes | - | The database you want to access |
| table | String | Yes | - | The table you want to access |
| hdfs_site_path | String | No | - | The path of hdfs-site.xml |
Expand Down Expand Up @@ -101,6 +121,91 @@ sink {
}
```

### Single table(Hive catalog)

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100]
},
{
kind = DELETE
fields = [2, "B", 100]
}
]
}
}
sink {
Paimon {
schema_save_mode = "RECREATE_SCHEMA"
catalog_name="seatunnel_test"
catalog_type="hive"
catalog_uri="thrift://hadoop04:9083"
warehouse="hdfs:///tmp/seatunnel"
database="seatunnel_test"
table="st_test3"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
dfs.client.use.datanode.hostname = "true"
}
}
}
```

### Single table with write props of paimon

```hocon
Expand Down
107 changes: 106 additions & 1 deletion docs/zh/connector-v2/sink/Paimon.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,35 @@

Apache Paimon数据连接器。支持cdc写以及自动建表。

## 支持的数据源信息

| 数据源 | 依赖 | Maven |
|--------|-----------|---------------------------------------------------------------------------|
| Paimon | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) |
| Paimon | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) |

## 数据源依赖

> 为了兼容不同版本的Hadoop和Hive,在项目pom文件中Hive -exec的作用域为provided,所以如果您使用Flink引擎,首先可能需要将以下Jar包添加到<FLINK_HOME>/lib目录下,如果您使用Spark引擎并与Hadoop集成,则不需要添加以下Jar包。
```
hive-exec-xxx.jar
libfb303-xxx.jar
```

> 有些版本的hive-exec包没有libfb303-xxx.jar,所以您还需要手动导入Jar包。
## 主要特性

- [x] [exactly-once](../../concept/connector-v2-features.md)

## 连接器选项

| 名称 | 类型 | 是否必须 | 默认值 | 描述 |
|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|
|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|---|
| warehouse | 字符串 || - | Paimon warehouse路径 |
| catalog_type | 字符串 || filesystem | Paimon的catalog类型,目前支持filesystem和hive |
| catalog_uri | 字符串 || - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | |
| database | 字符串 || - | 数据库名称 |
| table | 字符串 || - | 表名 |
| hdfs_site_path | 字符串 || - | hdfs-site.xml文件路径 |
Expand Down Expand Up @@ -101,6 +121,91 @@ sink {
}
```

### 单表(使用Hive catalog)

```hocon
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
primaryKey {
name = "pk_id"
columnNames = [pk_id]
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
}
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100]
},
{
kind = DELETE
fields = [2, "B", 100]
}
]
}
}
sink {
Paimon {
schema_save_mode = "RECREATE_SCHEMA"
catalog_name="seatunnel_test"
catalog_type="hive"
catalog_uri="thrift://hadoop04:9083"
warehouse="hdfs:///tmp/seatunnel"
database="seatunnel_test"
table="st_test3"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
dfs.client.use.datanode.hostname = "true"
}
}
}
```

### 指定paimon的写属性的单表

```hocon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR;
import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE;
import static org.apache.seatunnel.common.exception.CommonErrorCode.FILE_NOT_EXISTED;
Expand Down Expand Up @@ -139,6 +140,14 @@ public static SeaTunnelRuntimeException convertToConnectorTypeError(
return new SeaTunnelRuntimeException(CONVERT_TO_CONNECTOR_TYPE_ERROR, params);
}

public static SeaTunnelRuntimeException convertToConnectorPropsBlankError(
String connector, String props) {
Map<String, String> params = new HashMap<>();
params.put("connector", connector);
params.put("props", props);
return new SeaTunnelRuntimeException(CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR, params);
}

public static SeaTunnelRuntimeException convertToConnectorTypeError(
String identifier, String dataType, String field) {
Map<String, String> params = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode {

VERSION_NOT_SUPPORTED("COMMON-25", "<identifier> <version> is unsupported."),

OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported.");
OPERATION_NOT_SUPPORTED("COMMON-26", "<identifier> <operation> is unsupported."),
CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR(
"COMMON-27", "The props named '<props>' of '<connector>' is blank.");

private final String code;
private final String description;
Expand Down
31 changes: 31 additions & 0 deletions seatunnel-connectors-v2/connector-paimon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

<properties>
<paimon.version>0.7.0-incubating</paimon.version>
<hive.version>2.3.9</hive.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -60,6 +61,36 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>core</classifier>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.pentaho</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.parquet</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.orc</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.connectors.seatunnel.paimon.catalog;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.Column;
Expand All @@ -27,6 +28,7 @@
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil;

Expand All @@ -47,14 +49,14 @@ public class PaimonCatalog implements Catalog, PaimonTable {
private static final String DEFAULT_DATABASE = "default";

private String catalogName;
private PaimonSinkConfig paimonSinkConfig;
private ReadonlyConfig readonlyConfig;
private PaimonCatalogLoader paimonCatalogLoader;
private org.apache.paimon.catalog.Catalog catalog;

public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) {
this.paimonSinkConfig = paimonSinkConfig;
public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) {
this.readonlyConfig = readonlyConfig;
this.catalogName = catalogName;
this.paimonCatalogLoader = new PaimonCatalogLoader(paimonSinkConfig);
this.paimonCatalogLoader = new PaimonCatalogLoader(new PaimonConfig(readonlyConfig));
}

@Override
Expand Down Expand Up @@ -135,7 +137,8 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
try {
Schema paimonSchema =
SchemaUtil.toPaimonSchema(table.getTableSchema(), this.paimonSinkConfig);
SchemaUtil.toPaimonSchema(
table.getTableSchema(), new PaimonSinkConfig(readonlyConfig));
catalog.createTable(toIdentifier(tablePath), paimonSchema, ignoreIfExists);
} catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) {
throw new TableAlreadyExistException(this.catalogName, tablePath);
Expand Down
Loading

0 comments on commit 4969c91

Please sign in to comment.