From 4969c91dc41b16331111a8b9cc49e27298e41295 Mon Sep 17 00:00:00 2001 From: dailai Date: Wed, 22 May 2024 09:40:41 +0800 Subject: [PATCH] [Improve][Connector-V2] Support hive catalog for paimon sink (#6833) --- docs/en/connector-v2/sink/Paimon.md | 105 +++++++++++++++++ docs/zh/connector-v2/sink/Paimon.md | 107 +++++++++++++++++- .../common/exception/CommonError.java | 9 ++ .../common/exception/CommonErrorCode.java | 4 +- .../connector-paimon/pom.xml | 31 +++++ .../paimon/catalog/PaimonCatalog.java | 13 ++- .../paimon/catalog/PaimonCatalogEnum.java | 33 ++++++ .../paimon/catalog/PaimonCatalogFactory.java | 19 ++-- .../paimon/catalog/PaimonCatalogLoader.java | 26 +++-- .../seatunnel/paimon/config/PaimonConfig.java | 45 ++++++-- .../paimon/sink/PaimonSinkFactory.java | 10 +- .../connector/paimon/PaimonSinkHdfsIT.java | 71 ++++++++++++ ...nk_paimon_with_hdfs_with_hive_catalog.conf | 99 ++++++++++++++++ 13 files changed, 536 insertions(+), 36 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogEnum.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf diff --git a/docs/en/connector-v2/sink/Paimon.md b/docs/en/connector-v2/sink/Paimon.md index e0ce1350bb9..d79d7c9b004 100644 --- a/docs/en/connector-v2/sink/Paimon.md +++ b/docs/en/connector-v2/sink/Paimon.md @@ -6,6 +6,24 @@ Sink connector for Apache Paimon. It can support cdc mode 、auto create table. +## Supported DataSource Info + +| Datasource | Dependent | Maven | +|------------|-----------|---------------------------------------------------------------------------| +| Paimon | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) | +| Paimon | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) | + +## Database Dependency + +> In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to /lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages. + +``` +hive-exec-xxx.jar +libfb303-xxx.jar +``` + +> Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package. + ## Key features - [x] [exactly-once](../../concept/connector-v2-features.md) @@ -15,6 +33,8 @@ Sink connector for Apache Paimon. It can support cdc mode 、auto create table. | name | type | required | default value | Description | |-----------------------------|--------|----------|------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------| | warehouse | String | Yes | - | Paimon warehouse path | +| catalog_type | String | No | filesystem | Catalog type of Paimon, support filesystem and hive | +| catalog_uri | String | No | - | Catalog uri of Paimon, only needed when catalog_type is hive | | database | String | Yes | - | The database you want to access | | table | String | Yes | - | The table you want to access | | hdfs_site_path | String | No | - | The path of hdfs-site.xml | @@ -101,6 +121,91 @@ sink { } ``` +### Single table(Hive catalog) + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +sink { + Paimon { + schema_save_mode = "RECREATE_SCHEMA" + catalog_name="seatunnel_test" + catalog_type="hive" + catalog_uri="thrift://hadoop04:9083" + warehouse="hdfs:///tmp/seatunnel" + database="seatunnel_test" + table="st_test3" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +} + +``` + ### Single table with write props of paimon ```hocon diff --git a/docs/zh/connector-v2/sink/Paimon.md b/docs/zh/connector-v2/sink/Paimon.md index ef22add620d..50f88731d3e 100644 --- a/docs/zh/connector-v2/sink/Paimon.md +++ b/docs/zh/connector-v2/sink/Paimon.md @@ -6,6 +6,24 @@ Apache Paimon数据连接器。支持cdc写以及自动建表。 +## 支持的数据源信息 + +| 数据源 | 依赖 | Maven | +|--------|-----------|---------------------------------------------------------------------------| +| Paimon | hive-exec | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-exec) | +| Paimon | libfb303 | [Download](https://mvnrepository.com/artifact/org.apache.thrift/libfb303) | + +## 数据源依赖 + +> 为了兼容不同版本的Hadoop和Hive,在项目pom文件中Hive -exec的作用域为provided,所以如果您使用Flink引擎,首先可能需要将以下Jar包添加到/lib目录下,如果您使用Spark引擎并与Hadoop集成,则不需要添加以下Jar包。 + +``` +hive-exec-xxx.jar +libfb303-xxx.jar +``` + +> 有些版本的hive-exec包没有libfb303-xxx.jar,所以您还需要手动导入Jar包。 + ## 主要特性 - [x] [exactly-once](../../concept/connector-v2-features.md) @@ -13,8 +31,10 @@ Apache Paimon数据连接器。支持cdc写以及自动建表。 ## 连接器选项 | 名称 | 类型 | 是否必须 | 默认值 | 描述 | -|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------| +|-----------------------------|-----|------|------------------------------|---------------------------------------------------------------------------------------------------|---| | warehouse | 字符串 | 是 | - | Paimon warehouse路径 | +| catalog_type | 字符串 | 否 | filesystem | Paimon的catalog类型,目前支持filesystem和hive | +| catalog_uri | 字符串 | 否 | - | Paimon catalog的uri,仅当catalog_type为hive时需要配置 | | | database | 字符串 | 是 | - | 数据库名称 | | table | 字符串 | 是 | - | 表名 | | hdfs_site_path | 字符串 | 否 | - | hdfs-site.xml文件路径 | @@ -101,6 +121,91 @@ sink { } ``` +### 单表(使用Hive catalog) + +```hocon +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +sink { + Paimon { + schema_save_mode = "RECREATE_SCHEMA" + catalog_name="seatunnel_test" + catalog_type="hive" + catalog_uri="thrift://hadoop04:9083" + warehouse="hdfs:///tmp/seatunnel" + database="seatunnel_test" + table="st_test3" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +} + +``` + ### 指定paimon的写属性的单表 ```hocon diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java index 918936b3402..9843774d0c6 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonError.java @@ -29,6 +29,7 @@ import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR; import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_CONNECTOR_TYPE_ERROR_SIMPLE; +import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR; import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR; import static org.apache.seatunnel.common.exception.CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE; import static org.apache.seatunnel.common.exception.CommonErrorCode.FILE_NOT_EXISTED; @@ -139,6 +140,14 @@ public static SeaTunnelRuntimeException convertToConnectorTypeError( return new SeaTunnelRuntimeException(CONVERT_TO_CONNECTOR_TYPE_ERROR, params); } + public static SeaTunnelRuntimeException convertToConnectorPropsBlankError( + String connector, String props) { + Map params = new HashMap<>(); + params.put("connector", connector); + params.put("props", props); + return new SeaTunnelRuntimeException(CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR, params); + } + public static SeaTunnelRuntimeException convertToConnectorTypeError( String identifier, String dataType, String field) { Map params = new HashMap<>(); diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java index feacf4bd7cd..830e651f806 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/exception/CommonErrorCode.java @@ -56,7 +56,9 @@ public enum CommonErrorCode implements SeaTunnelErrorCode { VERSION_NOT_SUPPORTED("COMMON-25", " is unsupported."), - OPERATION_NOT_SUPPORTED("COMMON-26", " is unsupported."); + OPERATION_NOT_SUPPORTED("COMMON-26", " is unsupported."), + CONVERT_TO_SEATUNNEL_PROPS_BLANK_ERROR( + "COMMON-27", "The props named '' of '' is blank."); private final String code; private final String description; diff --git a/seatunnel-connectors-v2/connector-paimon/pom.xml b/seatunnel-connectors-v2/connector-paimon/pom.xml index bc0756ecdc4..267e66dc0ad 100644 --- a/seatunnel-connectors-v2/connector-paimon/pom.xml +++ b/seatunnel-connectors-v2/connector-paimon/pom.xml @@ -31,6 +31,7 @@ 0.7.0-incubating + 2.3.9 @@ -60,6 +61,36 @@ provided + + org.apache.hive + hive-exec + ${hive.version} + core + provided + + + org.apache.logging.log4j + * + + + org.pentaho + * + + + org.apache.parquet + * + + + org.apache.orc + * + + + org.apache.avro + * + + + + diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java index 8d3395af3c4..fab64da52e3 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalog.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.Column; @@ -27,6 +28,7 @@ import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.utils.SchemaUtil; @@ -47,14 +49,14 @@ public class PaimonCatalog implements Catalog, PaimonTable { private static final String DEFAULT_DATABASE = "default"; private String catalogName; - private PaimonSinkConfig paimonSinkConfig; + private ReadonlyConfig readonlyConfig; private PaimonCatalogLoader paimonCatalogLoader; private org.apache.paimon.catalog.Catalog catalog; - public PaimonCatalog(String catalogName, PaimonSinkConfig paimonSinkConfig) { - this.paimonSinkConfig = paimonSinkConfig; + public PaimonCatalog(String catalogName, ReadonlyConfig readonlyConfig) { + this.readonlyConfig = readonlyConfig; this.catalogName = catalogName; - this.paimonCatalogLoader = new PaimonCatalogLoader(paimonSinkConfig); + this.paimonCatalogLoader = new PaimonCatalogLoader(new PaimonConfig(readonlyConfig)); } @Override @@ -135,7 +137,8 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { try { Schema paimonSchema = - SchemaUtil.toPaimonSchema(table.getTableSchema(), this.paimonSinkConfig); + SchemaUtil.toPaimonSchema( + table.getTableSchema(), new PaimonSinkConfig(readonlyConfig)); catalog.createTable(toIdentifier(tablePath), paimonSchema, ignoreIfExists); } catch (org.apache.paimon.catalog.Catalog.TableAlreadyExistException e) { throw new TableAlreadyExistException(this.catalogName, tablePath); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogEnum.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogEnum.java new file mode 100644 index 00000000000..a90bd729f46 --- /dev/null +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogEnum.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; + +public enum PaimonCatalogEnum { + FILESYSTEM("filesystem"), + HIVE("hive"); + + final String type; + + PaimonCatalogEnum(String type) { + this.type = type; + } + + public String getType() { + return type; + } +} diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java index 9994df1d86f..8858c211938 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.api.table.catalog.Catalog; import org.apache.seatunnel.api.table.factory.CatalogFactory; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; import com.google.auto.service.AutoService; @@ -30,7 +31,7 @@ public class PaimonCatalogFactory implements CatalogFactory { @Override public Catalog createCatalog(String catalogName, ReadonlyConfig readonlyConfig) { - return new PaimonCatalog(catalogName, new PaimonSinkConfig(readonlyConfig)); + return new PaimonCatalog(catalogName, readonlyConfig); } @Override @@ -41,19 +42,19 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { return OptionRule.builder() - .required( - PaimonSinkConfig.WAREHOUSE, - PaimonSinkConfig.DATABASE, - PaimonSinkConfig.TABLE) + .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, PaimonConfig.TABLE) .optional( - PaimonSinkConfig.HDFS_SITE_PATH, + PaimonConfig.HDFS_SITE_PATH, + PaimonConfig.HADOOP_CONF, + PaimonConfig.HADOOP_CONF_PATH, + PaimonConfig.CATALOG_TYPE, PaimonSinkConfig.SCHEMA_SAVE_MODE, PaimonSinkConfig.DATA_SAVE_MODE, PaimonSinkConfig.PRIMARY_KEYS, PaimonSinkConfig.PARTITION_KEYS, - PaimonSinkConfig.WRITE_PROPS, - PaimonSinkConfig.HADOOP_CONF, - PaimonSinkConfig.HADOOP_CONF_PATH) + PaimonSinkConfig.WRITE_PROPS) + .conditional( + PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, PaimonConfig.CATALOG_URI) .build(); } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java index 1c00e6dc3b8..774576c408f 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogLoader.java @@ -17,8 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.catalog; +import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonHadoopConfiguration; -import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.paimon.exception.PaimonConnectorException; import org.apache.seatunnel.connectors.seatunnel.paimon.security.PaimonSecurityContext; @@ -28,6 +28,7 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import lombok.extern.slf4j.Slf4j; @@ -37,8 +38,6 @@ import java.util.Iterator; import java.util.Map; -import static org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig.WAREHOUSE; - @Slf4j public class PaimonCatalogLoader implements Serializable { /** hdfs uri is required */ @@ -51,24 +50,35 @@ public class PaimonCatalogLoader implements Serializable { private static final String HDFS_IMPL_KEY = "fs.hdfs.impl"; private String warehouse; + private PaimonCatalogEnum catalogType; + private String catalogUri; private PaimonHadoopConfiguration paimonHadoopConfiguration; - public PaimonCatalogLoader(PaimonSinkConfig paimonSinkConfig) { - this.warehouse = paimonSinkConfig.getWarehouse(); - this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonSinkConfig); + public PaimonCatalogLoader(PaimonConfig paimonConfig) { + this.warehouse = paimonConfig.getWarehouse(); + this.catalogType = paimonConfig.getCatalogType(); + this.catalogUri = paimonConfig.getCatalogUri(); + this.paimonHadoopConfiguration = PaimonSecurityContext.loadHadoopConfig(paimonConfig); } public Catalog loadCatalog() { // When using the seatunel engine, set the current class loader to prevent loading failures Thread.currentThread().setContextClassLoader(PaimonCatalogLoader.class.getClassLoader()); final Map optionsMap = new HashMap<>(1); - optionsMap.put(WAREHOUSE.key(), warehouse); - final Options options = Options.fromMap(optionsMap); + optionsMap.put(CatalogOptions.WAREHOUSE.key(), warehouse); + optionsMap.put(CatalogOptions.METASTORE.key(), catalogType.getType()); if (warehouse.startsWith(HDFS_PREFIX)) { checkConfiguration(paimonHadoopConfiguration, HDFS_DEF_FS_NAME); paimonHadoopConfiguration.set(HDFS_IMPL_KEY, HDFS_IMPL); } + if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) { + optionsMap.put(CatalogOptions.URI.key(), catalogUri); + paimonHadoopConfiguration + .getPropsWithPrefix(StringUtils.EMPTY) + .forEach((k, v) -> optionsMap.put(k, v)); + } + final Options options = Options.fromMap(optionsMap); PaimonSecurityContext.shouldEnableKerberos(paimonHadoopConfiguration); final CatalogContext catalogContext = CatalogContext.create(options, paimonHadoopConfiguration); diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java index 01207570837..0f41402d034 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java @@ -25,6 +25,11 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.sink.SeaTunnelSink; import org.apache.seatunnel.api.source.SeaTunnelSource; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogEnum; + +import org.apache.commons.lang3.StringUtils; import lombok.Getter; @@ -35,7 +40,6 @@ import java.util.Map; import static java.util.stream.Collectors.toList; -import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull; /** * Utility class to store configuration options, used by {@link SeaTunnelSource} and {@link @@ -50,11 +54,23 @@ public class PaimonConfig implements Serializable { .noDefaultValue() .withDescription("The warehouse path of paimon"); + public static final Option CATALOG_TYPE = + Options.key("catalog_type") + .enumType(PaimonCatalogEnum.class) + .defaultValue(PaimonCatalogEnum.FILESYSTEM) + .withDescription("The type of paimon catalog"); + + public static final Option CATALOG_URI = + Options.key("catalog_uri") + .stringType() + .noDefaultValue() + .withDescription("The uri of paimon with hive catalog"); + public static final Option CATALOG_NAME = Options.key("catalog_name") .stringType() .defaultValue("paimon") - .withDescription(" the iceberg catalog name"); + .withDescription(" the paimon catalog name"); public static final Option DATABASE = Options.key("database") @@ -95,6 +111,8 @@ public class PaimonConfig implements Serializable { "The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files"); protected String catalogName; + protected PaimonCatalogEnum catalogType; + protected String catalogUri; protected String warehouse; protected String namespace; protected String table; @@ -103,18 +121,27 @@ public class PaimonConfig implements Serializable { protected String hadoopConfPath; public PaimonConfig(ReadonlyConfig readonlyConfig) { - this.catalogName = checkArgumentNotNull(readonlyConfig.get(CATALOG_NAME)); - this.warehouse = checkArgumentNotNull(readonlyConfig.get(WAREHOUSE)); - this.namespace = checkArgumentNotNull(readonlyConfig.get(DATABASE)); - this.table = checkArgumentNotNull(readonlyConfig.get(TABLE)); + this.catalogName = + checkArgumentNotBlank(readonlyConfig.get(CATALOG_NAME), CATALOG_NAME.key()); + this.warehouse = checkArgumentNotBlank(readonlyConfig.get(WAREHOUSE), WAREHOUSE.key()); + this.namespace = checkArgumentNotBlank(readonlyConfig.get(DATABASE), DATABASE.key()); + this.table = checkArgumentNotBlank(readonlyConfig.get(TABLE), TABLE.key()); this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH); this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF); this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH); + this.catalogType = readonlyConfig.get(CATALOG_TYPE); + if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) { + this.catalogUri = + checkArgumentNotBlank(readonlyConfig.get(CATALOG_URI), CATALOG_URI.key()); + } } - protected T checkArgumentNotNull(T argument) { - checkNotNull(argument); - return argument; + protected String checkArgumentNotBlank(String propValue, String propKey) { + if (StringUtils.isBlank(propValue)) { + throw new SeaTunnelException( + CommonError.convertToConnectorPropsBlankError("Paimon", propKey)); + } + return propValue; } @VisibleForTesting diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java index dd656cd8ceb..46b92afb097 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/sink/PaimonSinkFactory.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.factory.Factory; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext; +import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogEnum; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; @@ -52,13 +53,16 @@ public OptionRule optionRule() { .required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, PaimonConfig.TABLE) .optional( PaimonConfig.HDFS_SITE_PATH, + PaimonConfig.HADOOP_CONF, + PaimonConfig.HADOOP_CONF_PATH, + PaimonConfig.CATALOG_TYPE, PaimonSinkConfig.SCHEMA_SAVE_MODE, PaimonSinkConfig.DATA_SAVE_MODE, PaimonSinkConfig.PRIMARY_KEYS, PaimonSinkConfig.PARTITION_KEYS, - PaimonSinkConfig.WRITE_PROPS, - PaimonConfig.HADOOP_CONF, - PaimonConfig.HADOOP_CONF_PATH) + PaimonSinkConfig.WRITE_PROPS) + .conditional( + PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, PaimonConfig.CATALOG_URI) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java index 72301bcf9d7..5d2ce86c2b5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkHdfsIT.java @@ -24,9 +24,11 @@ import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogLoader; import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig; import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; @@ -57,6 +59,29 @@ @Disabled( "HDFS is not available in CI, if you want to run this test, please set up your own HDFS environment in the test case file and the below setup") public class PaimonSinkHdfsIT extends TestSuiteBase { + + private String hiveExecUrl() { + return "https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.3/hive-exec-3.1.3.jar"; + } + + private String libfb303Url() { + return "https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.0/libfb303-0.9.0.jar"; + } + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Paimon/lib && cd /tmp/seatunnel/plugins/Paimon/lib && wget " + + hiveExecUrl() + + " && wget " + + libfb303Url()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + private Map PAIMON_SINK_PROPERTIES; @BeforeAll @@ -124,4 +149,50 @@ public void testFakeCDCSinkPaimon(TestContainer container) throws Exception { }); }); } + + @TestTemplate + public void testFakeCDCSinkPaimonWithHiveCatalog(TestContainer container) throws Exception { + Container.ExecResult execResult = + container.executeJob("/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + given().ignoreExceptions() + .await() + .atLeast(200L, TimeUnit.MILLISECONDS) + .atMost(40L, TimeUnit.SECONDS) + .untilAsserted( + () -> { + PaimonSinkConfig paimonSinkConfig = + new PaimonSinkConfig( + ReadonlyConfig.fromMap(PAIMON_SINK_PROPERTIES)); + PaimonCatalogLoader paimonCatalogLoader = + new PaimonCatalogLoader(paimonSinkConfig); + Catalog catalog = paimonCatalogLoader.loadCatalog(); + Table table = + catalog.getTable( + Identifier.create("seatunnel_namespace1", "st_test")); + ReadBuilder readBuilder = table.newReadBuilder(); + TableScan.Plan plan = readBuilder.newScan().plan(); + TableRead tableRead = readBuilder.newRead(); + List paimonRecords = new ArrayList<>(); + try (RecordReader reader = tableRead.createReader(plan)) { + reader.forEachRemaining( + row -> + paimonRecords.add( + new PaimonRecord( + row.getLong(0), + row.getString(1).toString()))); + } + Assertions.assertEquals(2, paimonRecords.size()); + paimonRecords.forEach( + paimonRecord -> { + if (paimonRecord.getPkId() == 1) { + Assertions.assertEquals("A_1", paimonRecord.getName()); + } + if (paimonRecord.getPkId() == 3) { + Assertions.assertEquals("C", paimonRecord.getName()); + } + }); + }); + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf new file mode 100644 index 00000000000..3afdc59701b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_cdc_sink_paimon_with_hdfs_with_hive_catalog.conf @@ -0,0 +1,99 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +sink { + Paimon { + schema_save_mode = "RECREATE_SCHEMA" + catalog_name="seatunnel_test" + catalog_type="hive" + catalog_uri="thrift://hadoop04:9083" + warehouse="hdfs:///tmp/seatunnel" + database="seatunnel_test" + table="st_test3" + paimon.hadoop.conf = { + fs.defaultFS = "hdfs://nameservice1" + dfs.nameservices = "nameservice1" + dfs.ha.namenodes.nameservice1 = "nn1,nn2" + dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020" + dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020" + dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider" + dfs.client.use.datanode.hostname = "true" + } + } +}