From 46600969bba88be9eb77a39e3b55e612c2a9a022 Mon Sep 17 00:00:00 2001 From: dailai Date: Wed, 8 May 2024 17:25:02 +0800 Subject: [PATCH] [Feature][Jdbc] Support the jdbc connector for InterSystems IRIS (#6797) --- docs/en/connector-v2/sink/Jdbc.md | 43 +- docs/en/connector-v2/source/Jdbc.md | 48 +- .../connector-jdbc/pom.xml | 12 + .../jdbc/catalog/iris/IrisCatalog.java | 276 ++++++++ .../jdbc/catalog/iris/IrisCatalogFactory.java | 61 ++ .../iris/IrisCreateTableSqlBuilder.java | 166 +++++ .../iris/savemode/IrisSaveModeHandler.java | 61 ++ .../internal/dialect/DatabaseIdentifier.java | 1 + .../internal/dialect/iris/IrisDialect.java | 199 ++++++ .../dialect/iris/IrisDialectFactory.java | 45 ++ .../dialect/iris/IrisJdbcRowConverter.java | 29 + .../dialect/iris/IrisTypeConverter.java | 435 ++++++++++++ .../internal/dialect/iris/IrisTypeMapper.java | 52 ++ .../seatunnel/jdbc/sink/JdbcSink.java | 12 + .../sql/IrisCreateTableSqlBuilderTest.java | 111 +++ .../dialect/iris/IrisTypeConverterTest.java | 645 ++++++++++++++++++ .../seatunnel/jdbc/AbstractJdbcIT.java | 4 + .../connector-jdbc-e2e-part-7/pom.xml | 10 + .../connectors/seatunnel/jdbc/JdbcIrisIT.java | 592 ++++++++++++++++ ...bc_iris_source_to_sink_with_full_type.conf | 50 ++ .../src/test/resources/jdbc_iris_upsert.conf | 91 +++ .../src/test/resources/password/password.txt | 1 + 22 files changed, 2900 insertions(+), 44 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCreateTableSqlBuilder.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/savemode/IrisSaveModeHandler.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisJdbcRowConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeMapper.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/IrisCreateTableSqlBuilderTest.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverterTest.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_source_to_sink_with_full_type.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_upsert.conf create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/password/password.txt diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 0153b9f92ba..18a3d4c66d7 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -215,27 +215,28 @@ In the case of is_exactly_once = "true", Xa transactions are used. This requires there are some reference value for params above. -| datasource | driver | url | xa_data_source_class_name | maven | -|------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------| -| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | -| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | -| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | -| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | -| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | -| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | / | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | -| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | -| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | -| saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | / | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc | -| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | -| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | -| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | -| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | -| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | -| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | -| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | / | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar | +| datasource | driver | url | xa_data_source_class_name | maven | +|-------------------|----------------------------------------------|--------------------------------------------------------------------|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------| +| MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| PostgreSQL | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | +| DM | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| Phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | / | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | +| SQL Server | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | +| Oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | oracle.jdbc.xa.OracleXADataSource | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | / | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | +| GBase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | / | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | +| StarRocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | com.ibm.db2.jcc.DB2XADataSource | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | / | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc | +| Doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | / | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | / | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | +| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | +| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | +| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | +| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | +| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | / | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar | +| InterSystems IRIS | com.intersystems.jdbc.IRISDriver | jdbc:IRIS://localhost:1972/%SYS | / | https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar | ## Example diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 225576001d7..2fcadca17f4 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -110,6 +110,7 @@ examples: - oracle: "test_schema.table1" - sqlserver: "testdb.test_schema.table1" - postgresql: "testdb.test_schema.table1" +- iris: "test_schema.table1" ### table_list @@ -205,29 +206,30 @@ How many splits do we need to split into, only support positive integer. default there are some reference value for params above. -| datasource | driver | url | maven | -|------------|-----------------------------------------------------|------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------| -| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | -| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | -| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | -| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | -| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | -| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | -| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | -| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | -| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | -| saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc | -| doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | -| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | -| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | -| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | -| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | -| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | -| Hive | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000 | https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar | -| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar | +| datasource | driver | url | maven | +|-------------------|-----------------------------------------------------|------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------| +| mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | +| dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| phoenix | org.apache.phoenix.queryserver.client.Driver | jdbc:phoenix:thin:url=http://localhost:8765;serialization=PROTOBUF | https://mvnrepository.com/artifact/com.aliyun.phoenix/ali-phoenix-shaded-thin-client | +| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | +| oracle | oracle.jdbc.OracleDriver | jdbc:oracle:thin:@localhost:1521/xepdb1 | https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8 | +| sqlite | org.sqlite.JDBC | jdbc:sqlite:test.db | https://mvnrepository.com/artifact/org.xerial/sqlite-jdbc | +| gbase8a | com.gbase.jdbc.Driver | jdbc:gbase://e2e_gbase8aDb:5258/test | https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar | +| starrocks | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| db2 | com.ibm.db2.jcc.DB2Driver | jdbc:db2://localhost:50000/testdb | https://mvnrepository.com/artifact/com.ibm.db2.jcc/db2jcc/db2jcc4 | +| tablestore | com.alicloud.openservices.tablestore.jdbc.OTSDriver | "jdbc:ots:http s://myinstance.cn-hangzhou.ots.aliyuncs.com/myinstance" | https://mvnrepository.com/artifact/com.aliyun.openservices/tablestore-jdbc | +| saphana | com.sap.db.jdbc.Driver | jdbc:sap://localhost:39015 | https://mvnrepository.com/artifact/com.sap.cloud.db.jdbc/ngdbc | +| doris | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | +| teradata | com.teradata.jdbc.TeraDriver | jdbc:teradata://localhost/DBS_PORT=1025,DATABASE=test | https://mvnrepository.com/artifact/com.teradata.jdbc/terajdbc | +| Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | +| Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | +| Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | +| OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | +| Hive | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000 | https://repo1.maven.org/maven2/org/apache/hive/hive-jdbc/3.1.3/hive-jdbc-3.1.3-standalone.jar | +| xugu | com.xugu.cloudjdbc.Driver | jdbc:xugu://localhost:5138 | https://repo1.maven.org/maven2/com/xugudb/xugu-jdbc/12.2.0/xugu-jdbc-12.2.0.jar | +| InterSystems IRIS | com.intersystems.jdbc.IRISDriver | jdbc:IRIS://localhost:1972/%SYS | https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar | ## Example diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index db8c95dd0fd..cfe0ed44ecd 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -51,6 +51,7 @@ 3.1.3 2.4.3 12.2.0 + 3.0.0 @@ -195,6 +196,12 @@ ${xugu.jdbc.version} provided + + com.intersystems + intersystems-jdbc + ${iris.jdbc.version} + provided + @@ -296,6 +303,11 @@ com.oceanbase oceanbase-client + + + com.intersystems + intersystems-jdbc + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java new file mode 100644 index 00000000000..40f08dc50b5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalog.java @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.catalog.exception.CatalogException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException; +import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.AbstractJdbcCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris.IrisTypeConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris.IrisTypeMapper; + +import org.apache.commons.lang3.StringUtils; + +import com.google.common.annotations.VisibleForTesting; +import lombok.extern.slf4j.Slf4j; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static com.google.common.base.Preconditions.checkNotNull; + +@Slf4j +public class IrisCatalog extends AbstractJdbcCatalog { + + private static final String LIST_TABLES_SQL_TEMPLATE = + "SELECT TABLE_SCHEMA,TABLE_NAME FROM INFORMATION_SCHEMA.Tables WHERE TABLE_SCHEMA='%s' and TABLE_TYPE != 'SYSTEM TABLE' and TABLE_TYPE != 'SYSTEM VIEW';"; + + public IrisCatalog( + String catalogName, String username, String password, JdbcUrlUtil.UrlInfo urlInfo) { + super(catalogName, username, password, urlInfo, null); + SYS_DATABASES.add("%SYS"); + } + + @Override + protected String getCreateTableSql(TablePath tablePath, CatalogTable table) { + return new IrisCreateTableSqlBuilder(table).build(tablePath); + } + + @Override + public String getDropTableSql(TablePath tablePath) { + return String.format("DROP TABLE %s", tablePath.getSchemaAndTableName("\"")); + } + + @Override + protected String getCreateDatabaseSql(String databaseName) { + return String.format("CREATE DATABASE \"%s\"", databaseName); + } + + @Override + protected String getDropDatabaseSql(String databaseName) { + return String.format("DROP DATABASE \"%s\"", databaseName); + } + + @Override + protected String getListTableSql(String tableSchemaName) { + return String.format(LIST_TABLES_SQL_TEMPLATE, tableSchemaName); + } + + @Override + protected String getTableName(ResultSet rs) throws SQLException { + String schemaName = rs.getString(1); + String tableName = rs.getString(2); + // It's the system schema when schema name start with % + if (schemaName.startsWith("%")) { + return null; + } + return schemaName + "." + tableName; + } + + // @Override + // protected String getSelectColumnsSql(TablePath tablePath) { + // return String.format( + // SELECT_COLUMNS_SQL_TEMPLATE, tablePath.getSchemaName(), + // tablePath.getTableName()); + // } + + @Override + protected Column buildColumn(ResultSet resultSet) throws SQLException { + String columnName = resultSet.getString("COLUMN_NAME"); + String typeName = resultSet.getString("TYPE_NAME"); + Long columnLength = resultSet.getLong("COLUMN_SIZE"); + Long columnPrecision = columnLength; + Integer columnScale = resultSet.getObject("DECIMAL_DIGITS", Integer.class); + String columnComment = resultSet.getString("REMARKS"); + Object defaultValue = resultSet.getObject("COLUMN_DEF"); + boolean isNullable = (resultSet.getInt("NULLABLE") == DatabaseMetaData.columnNullable); + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(columnName) + .dataType(typeName) + .length(columnLength) + .precision(columnPrecision) + .scale(columnScale) + .nullable(isNullable) + .defaultValue(defaultValue) + .comment(columnComment) + .build(); + return IrisTypeConverter.INSTANCE.convert(typeDefine); + } + + @Override + protected String getOptionTableName(TablePath tablePath) { + return tablePath.getSchemaAndTableName(); + } + + @Override + public boolean databaseExists(String databaseName) throws CatalogException { + throw new SeaTunnelException("Not supported for list databases for iris"); + } + + @Override + public boolean tableExists(TablePath tablePath) throws CatalogException { + try { + return listTables(tablePath.getSchemaName()) + .contains(tablePath.getSchemaAndTableName()); + } catch (DatabaseNotExistException e) { + return false; + } + } + + @Override + public List listTables(String schemaName) + throws CatalogException, DatabaseNotExistException { + try { + return queryString(defaultUrl, getListTableSql(schemaName), this::getTableName); + } catch (Exception e) { + throw new CatalogException( + String.format("Failed listing database in catalog %s", catalogName), e); + } + } + + @Override + public CatalogTable getTable(String sqlQuery) throws SQLException { + Connection defaultConnection = getConnection(defaultUrl); + return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new IrisTypeMapper()); + } + + @Override + public CatalogTable getTable(TablePath tablePath) + throws CatalogException, TableNotExistException { + if (!tableExists(tablePath)) { + throw new TableNotExistException(catalogName, tablePath); + } + + String dbUrl; + if (StringUtils.isNotBlank(tablePath.getDatabaseName())) { + dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName()); + } else { + dbUrl = getUrlFromDatabaseName(defaultDatabase); + } + try { + Connection conn = getConnection(dbUrl); + DatabaseMetaData metaData = conn.getMetaData(); + try (ResultSet resultSet = + metaData.getColumns( + null, tablePath.getSchemaName(), tablePath.getTableName(), null)) { + Optional primaryKey = getPrimaryKey(metaData, tablePath); + List constraintKeys = getConstraintKeys(metaData, tablePath); + TableSchema.Builder builder = TableSchema.builder(); + buildColumnsWithErrorCheck(tablePath, resultSet, builder); + // add primary key + primaryKey.ifPresent(builder::primaryKey); + // add constraint key + constraintKeys.forEach(builder::constraintKey); + TableIdentifier tableIdentifier = getTableIdentifier(tablePath); + return CatalogTable.of( + tableIdentifier, + builder.build(), + buildConnectorOptions(tablePath), + Collections.emptyList(), + "", + catalogName); + } + } catch (Exception e) { + throw new CatalogException( + String.format("Failed getting table %s", tablePath.getFullName()), e); + } + } + + @Override + public void createDatabase(TablePath tablePath, boolean ignoreIfExists) + throws DatabaseAlreadyExistException, CatalogException { + checkNotNull(tablePath.getDatabaseName(), "Database name cannot be null"); + createDatabaseInternal(tablePath.getDatabaseName()); + } + + @Override + public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreIfExists) + throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + if (defaultSchema.isPresent()) { + tablePath = + new TablePath( + tablePath.getDatabaseName(), + defaultSchema.get(), + tablePath.getTableName()); + } + + if (tableExists(tablePath)) { + if (ignoreIfExists) { + return; + } + throw new TableAlreadyExistException(catalogName, tablePath); + } + + createTableInternal(tablePath, table); + } + + @Override + public void truncateTable(TablePath tablePath, boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + truncateTableInternal(tablePath); + } + + @Override + public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists) + throws DatabaseNotExistException, CatalogException { + checkNotNull(tablePath, "Table path cannot be null"); + checkNotNull(tablePath.getDatabaseName(), "Database name cannot be null"); + dropDatabaseInternal(tablePath.getDatabaseName()); + } + + @Override + protected String getTruncateTableSql(TablePath tablePath) { + return String.format( + "TRUNCATE TABLE \"%s\".\"%s\"", + tablePath.getSchemaName(), tablePath.getTableName()); + } + + @Override + protected String getExistDataSql(TablePath tablePath) { + return String.format( + "SELECT TOP 1 * FROM \"%s\".\"%s\"", + tablePath.getSchemaName(), tablePath.getTableName()); + } + + @VisibleForTesting + public void setConnection(String url, Connection connection) { + this.connectionMap.put(url, connection); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java new file mode 100644 index 00000000000..3304575d2ac --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCatalogFactory.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris; + +import org.apache.seatunnel.shade.com.google.common.base.Preconditions; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.factory.CatalogFactory; +import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import org.apache.commons.lang3.StringUtils; + +import com.google.auto.service.AutoService; + +@AutoService(Factory.class) +public class IrisCatalogFactory implements CatalogFactory { + + @Override + public String factoryIdentifier() { + return DatabaseIdentifier.IRIS; + } + + @Override + public Catalog createCatalog(String catalogName, ReadonlyConfig options) { + String urlWithDatabase = options.get(JdbcCatalogOptions.BASE_URL); + Preconditions.checkArgument( + StringUtils.isNoneBlank(urlWithDatabase), + "Miss config ! Please check your config."); + JdbcUrlUtil.UrlInfo urlInfo = JdbcUrlUtil.getUrlInfo(urlWithDatabase); + return new IrisCatalog( + catalogName, + options.get(JdbcCatalogOptions.USERNAME), + options.get(JdbcCatalogOptions.PASSWORD), + urlInfo); + } + + @Override + public OptionRule optionRule() { + return JdbcCatalogOptions.BASE_RULE.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCreateTableSqlBuilder.java new file mode 100644 index 00000000000..b4a6b8f08d6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/IrisCreateTableSqlBuilder.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris.IrisTypeConverter; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; + +import java.util.List; +import java.util.stream.Collectors; + +public class IrisCreateTableSqlBuilder { + + private List columns; + private PrimaryKey primaryKey; + List constraintKeys; + private String sourceCatalogName; + private String fieldIde; + + private String comment; + + public IrisCreateTableSqlBuilder(CatalogTable catalogTable) { + this.columns = catalogTable.getTableSchema().getColumns(); + this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); + this.constraintKeys = catalogTable.getTableSchema().getConstraintKeys(); + this.sourceCatalogName = catalogTable.getCatalogName(); + this.fieldIde = catalogTable.getOptions().get("fieldIde"); + this.comment = catalogTable.getComment(); + } + + public String build(TablePath tablePath) { + String indexKeySql = ""; + StringBuilder createTableSql = new StringBuilder(); + createTableSql + .append("CREATE TABLE ") + .append(tablePath.getSchemaAndTableName("\"")) + .append(" (\n"); + + List columnSqls = + columns.stream() + .map(column -> CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde)) + .collect(Collectors.toList()); + + // Add primary key directly in the create table statement + if (primaryKey != null + && primaryKey.getColumnNames() != null + && primaryKey.getColumnNames().size() > 0) { + columnSqls.add(buildPrimaryKeySql(primaryKey)); + } + if (CollectionUtils.isNotEmpty(constraintKeys)) { + for (ConstraintKey constraintKey : constraintKeys) { + if (StringUtils.isBlank(constraintKey.getConstraintName()) + || (primaryKey != null + && StringUtils.equals( + primaryKey.getPrimaryKey(), + constraintKey.getConstraintName()))) { + continue; + } + switch (constraintKey.getConstraintType()) { + case UNIQUE_KEY: + String uniqueKeySql = buildUniqueKeySql(constraintKey); + columnSqls.add(uniqueKeySql); + break; + case INDEX_KEY: + indexKeySql = buildIndexKeySql(tablePath, constraintKey); + break; + case FOREIGN_KEY: + // todo: add foreign key + break; + } + } + } + if (StringUtils.isNotBlank(comment)) { + createTableSql.append(" %Description '" + comment + "',\n"); + } + createTableSql.append(String.join(",\n", columnSqls)); + createTableSql.append("\n);"); + createTableSql.append("\n" + indexKeySql); + return createTableSql.toString(); + } + + private String buildColumnSql(Column column) { + StringBuilder columnSql = new StringBuilder(); + columnSql.append("\"").append(column.getName()).append("\" "); + + String columnType = IrisTypeConverter.INSTANCE.reconvert(column).getColumnType(); + columnSql.append(columnType); + + if (!column.isNullable()) { + columnSql.append(" NOT NULL"); + } + + if (StringUtils.isNotBlank(column.getComment())) { + columnSql.append(" %Description '" + column.getComment() + "'"); + } + + return columnSql.toString(); + } + + private String buildPrimaryKeySql(PrimaryKey primaryKey) { + String columnNamesString = + primaryKey.getColumnNames().stream() + .map(columnName -> "\"" + columnName + "\"") + .collect(Collectors.joining(", ")); + return CatalogUtils.getFieldIde(" PRIMARY KEY (" + columnNamesString + ")", fieldIde); + } + + private String buildUniqueKeySql(ConstraintKey constraintKey) { + String indexColumns = + constraintKey.getColumnNames().stream() + .map( + constraintKeyColumn -> + String.format( + "\"%s\"", + CatalogUtils.getFieldIde( + constraintKeyColumn.getColumnName(), + fieldIde))) + .collect(Collectors.joining(", ")); + return "UNIQUE (" + indexColumns + ")"; + } + + private String buildIndexKeySql(TablePath tablePath, ConstraintKey constraintKey) { + // We add table name to index name to avoid name conflict + String constraintName = tablePath.getTableName() + "_" + constraintKey.getConstraintName(); + String indexColumns = + constraintKey.getColumnNames().stream() + .map( + constraintKeyColumn -> + String.format( + "\"%s\"", + CatalogUtils.getFieldIde( + constraintKeyColumn.getColumnName(), + fieldIde))) + .collect(Collectors.joining(", ")); + + return "CREATE INDEX " + + constraintName + + " ON " + + tablePath.getSchemaAndTableName("\"") + + "(" + + indexColumns + + ");"; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/savemode/IrisSaveModeHandler.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/savemode/IrisSaveModeHandler.java new file mode 100644 index 00000000000..b2a7c9851e6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/iris/savemode/IrisSaveModeHandler.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.savemode; + +import org.apache.seatunnel.api.sink.DataSaveMode; +import org.apache.seatunnel.api.sink.DefaultSaveModeHandler; +import org.apache.seatunnel.api.sink.SchemaSaveMode; +import org.apache.seatunnel.api.table.catalog.Catalog; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import lombok.extern.slf4j.Slf4j; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Optional; + +@Slf4j +public class IrisSaveModeHandler extends DefaultSaveModeHandler { + public IrisSaveModeHandler( + @Nonnull SchemaSaveMode schemaSaveMode, + @Nonnull DataSaveMode dataSaveMode, + @Nonnull Catalog catalog, + @Nonnull TablePath tablePath, + @Nullable CatalogTable catalogTable, + @Nullable String customSql) { + super(schemaSaveMode, dataSaveMode, catalog, tablePath, catalogTable, customSql); + } + + @Override + protected void createTable() { + try { + log.info( + "Creating table {} with action {}", + tablePath, + catalog.previewAction( + Catalog.ActionType.CREATE_TABLE, + tablePath, + Optional.ofNullable(catalogTable))); + catalog.createTable(tablePath, catalogTable, true); + } catch (UnsupportedOperationException ignore) { + log.info("Creating table {}", tablePath); + } + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java index 2f6aabc502c..17608392ff1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/DatabaseIdentifier.java @@ -39,4 +39,5 @@ public class DatabaseIdentifier { public static final String OCENABASE = "OceanBase"; public static final String TIDB = "TiDB"; public static final String XUGU = "XUGU"; + public static final String IRIS = "IRIS"; } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java new file mode 100644 index 00000000000..c89768f430f --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris; + +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.common.utils.SeaTunnelException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; + +import org.apache.commons.lang3.StringUtils; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; + +public class IrisDialect implements JdbcDialect { + private static final Integer DEFAULT_IRIS_FETCH_SIZE = 500; + private String fieldIde = FieldIdeEnum.ORIGINAL.getValue(); + + public IrisDialect() {} + + public IrisDialect(String fieldIde) { + this.fieldIde = fieldIde; + } + + @Override + public String dialectName() { + return DatabaseIdentifier.IRIS; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new IrisJdbcRowConverter(); + } + + @Override + public String hashModForField(String fieldName, int mod) { + throw new SeaTunnelException( + "The iris database is not supported hash or md5 function. Please remove the partition_column property in config."); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new IrisTypeMapper(); + } + + @Override + public String quoteIdentifier(String identifier) { + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append("\"").append(parts[i]).append("\"").append("."); + } + return sb.append("\"") + .append(getFieldIde(parts[parts.length - 1], fieldIde)) + .append("\"") + .toString(); + } + + return "\"" + getFieldIde(identifier, fieldIde) + "\""; + } + + @Override + public String tableIdentifier(String database, String tableName) { + return quoteIdentifier(tableName); + } + + @Override + public String extractTableName(TablePath tablePath) { + return tablePath.getSchemaAndTableName(); + } + + @Override + public TablePath parse(String tablePath) { + return TablePath.of(tablePath, true); + } + + @Override + public String tableIdentifier(TablePath tablePath) { + return tablePath.getSchemaAndTableName(); + } + + @Override + public Optional getUpsertStatement( + String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String insertIntoStatement = getInsertIntoStatement(database, tableName, fieldNames); + return Optional.of(insertIntoStatement); + } + + @Override + public String getInsertIntoStatement(String database, String tableName, String[] fieldNames) { + String columns = + Arrays.stream(fieldNames) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String placeholders = + Arrays.stream(fieldNames) + .map(fieldName -> ":" + fieldName) + .collect(Collectors.joining(", ")); + return String.format( + "INSERT OR UPDATE %s (%s) VALUES (%s)", + tableIdentifier(database, tableName), columns, placeholders); + } + + @Override + public PreparedStatement creatPreparedStatement( + Connection connection, String queryTemplate, int fetchSize) throws SQLException { + PreparedStatement statement = + connection.prepareStatement( + queryTemplate, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY); + if (fetchSize > 0) { + statement.setFetchSize(fetchSize); + } else { + statement.setFetchSize(DEFAULT_IRIS_FETCH_SIZE); + } + return statement; + } + + @Override + public Object queryNextChunkMax( + Connection connection, + JdbcSourceTable table, + String columnName, + int chunkSize, + Object includedLowerBound) + throws SQLException { + String quotedColumn = quoteIdentifier(columnName); + String sqlQuery; + if (StringUtils.isNotBlank(table.getQuery())) { + sqlQuery = + String.format( + "SELECT MAX(%s) FROM (" + + "SELECT TOP %s %s FROM (%s) WHERE %s >= ? ORDER BY %s ASC " + + ")", + quotedColumn, + chunkSize, + quotedColumn, + table.getQuery(), + quotedColumn, + quotedColumn); + } else { + sqlQuery = + String.format( + "SELECT MAX(%s) FROM (" + + "SELECT TOP %s %s FROM (%s) WHERE %s >= ? ORDER BY %s ASC " + + ")", + quotedColumn, + chunkSize, + quotedColumn, + table.getTablePath().getSchemaAndTableName(), + quotedColumn, + quotedColumn); + } + + try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) { + ps.setObject(1, includedLowerBound); + try (ResultSet rs = ps.executeQuery()) { + if (!rs.next()) { + // this should never happen + throw new SQLException( + String.format("No result returned after running query [%s]", sqlQuery)); + } + return rs.getObject(1); + } + } + } + + @Override + public ResultSetMetaData getResultSetMetaData(Connection conn, String query) + throws SQLException { + PreparedStatement ps = conn.prepareStatement(query); + return ps.executeQuery().getMetaData(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java new file mode 100644 index 00000000000..c1e45c96665 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialectFactory.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +import javax.annotation.Nonnull; + +/** Factory for {@link IrisDialect}. */ +@AutoService(JdbcDialectFactory.class) +public class IrisDialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:IRIS:"); + } + + @Override + public JdbcDialect create() { + return new IrisDialect(); + } + + @Override + public JdbcDialect create(@Nonnull String compatibleMode, String fieldIde) { + return new IrisDialect(fieldIde); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisJdbcRowConverter.java new file mode 100644 index 00000000000..d4239049a8d --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisJdbcRowConverter.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +public class IrisJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return DatabaseIdentifier.IRIS; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverter.java new file mode 100644 index 00000000000..1baee2ddad1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverter.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.converter.TypeConverter; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.common.exception.CommonError; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; + +import com.google.auto.service.AutoService; +import lombok.extern.slf4j.Slf4j; + +import java.util.Objects; + +/** + * reference + * https://docs.intersystems.com/iris20241/csp/docbook/DocBook.UI.Page.cls?KEY=RSQL_datatype#RSQL_datatype_view_data_type_mappings_to_intersyst + */ +@Slf4j +@AutoService(TypeConverter.class) +public class IrisTypeConverter implements TypeConverter { + // ============================data types===================== + public static final String IRIS_NULL = "NULL"; + + // -------------------------number---------------------------- + public static final String IRIS_NUMERIC = "NUMERIC"; + public static final String IRIS_MONEY = "MONEY"; + public static final String IRIS_SMALLMONEY = "SMALLMONEY"; + public static final String IRIS_NUMBER = "NUMBER"; + public static final String IRIS_DEC = "DEC"; + public static final String IRIS_DECIMAL = "DECIMAL"; + public static final String IRIS_INTEGER = "INTEGER"; + public static final String IRIS_INT = "INT"; + public static final String IRIS_ROWVERSION = "ROWVERSION"; + public static final String IRIS_BIGINT = "BIGINT"; + public static final String IRIS_SERIAL = "SERIAL"; + + public static final String IRIS_TINYINT = "TINYINT"; + public static final String IRIS_SMALLINT = "SMALLINT"; + public static final String IRIS_MEDIUMINT = "MEDIUMINT"; + public static final String IRIS_FLOAT = "FLOAT"; + public static final String IRIS_DOUBLE = "DOUBLE"; + public static final String IRIS_REAL = "REAL"; + public static final String IRIS_DOUBLE_PRECISION = "DOUBLE PRECISION"; + + // ----------------------------string------------------------- + public static final String IRIS_CHAR = "CHAR"; + public static final String IRIS_CHAR_VARYING = "CHAR VARYING"; + public static final String IRIS_CHARACTER_VARYING = "CHARACTER VARYING"; + public static final String IRIS_NATIONAL_CHAR = "NATIONAL CHAR"; + public static final String IRIS_NATIONAL_CHAR_VARYING = "NATIONAL CHAR VARYING"; + public static final String IRIS_NATIONAL_CHARACTER = "NATIONAL CHARACTER"; + public static final String IRIS_NATIONAL_CHARACTER_VARYING = "NATIONAL CHARACTER VARYING"; + public static final String IRIS_NATIONAL_VARCHAR = "NATIONAL VARCHAR"; + public static final String IRIS_NCHAR = "NCHAR"; + public static final String IRIS_NVARCHAR = "NVARCHAR"; + public static final String IRIS_SYSNAME = "SYSNAME"; + public static final String IRIS_VARCHAR2 = "VARCHAR2"; + public static final String IRIS_VARCHAR = "VARCHAR"; + public static final String IRIS_UNIQUEIDENTIFIER = "UNIQUEIDENTIFIER"; + public static final String IRIS_GUID = "GUID"; + public static final String IRIS_CHARACTER = "CHARACTER"; + public static final String IRIS_NTEXT = "NTEXT"; + public static final String IRIS_CLOB = "CLOB"; + public static final String IRIS_LONG_VARCHAR = "LONG VARCHAR"; + public static final String IRIS_LONG = "LONG"; + public static final String IRIS_LONGTEXT = "LONGTEXT"; + public static final String IRIS_MEDIUMTEXT = "MEDIUMTEXT"; + public static final String IRIS_TEXT = "TEXT"; + public static final String IRIS_LONGVARCHAR = "LONGVARCHAR"; + + // ------------------------------time------------------------- + public static final String IRIS_DATE = "DATE"; + + public static final String IRIS_TIME = "TIME"; + + public static final String IRIS_TIMESTAMP = "TIMESTAMP"; + public static final String IRIS_POSIXTIME = "POSIXTIME"; + public static final String IRIS_TIMESTAMP2 = "TIMESTAMP2"; + + public static final String IRIS_DATETIME = "DATETIME"; + public static final String IRIS_SMALLDATETIME = "SMALLDATETIME"; + public static final String IRIS_DATETIME2 = "DATETIME2"; + + // ---------------------------binary--------------------------- + public static final String IRIS_BINARY = "BINARY"; + public static final String IRIS_VARBINARY = "VARBINARY"; + public static final String IRIS_RAW = "RAW"; + public static final String IRIS_LONGVARBINARY = "LONGVARBINARY"; + public static final String IRIS_BINARY_VARYING = "BINARY VARYING"; + public static final String IRIS_BLOB = "BLOB"; + public static final String IRIS_IMAGE = "IMAGE"; + public static final String IRIS_LONG_BINARY = "LONG BINARY"; + public static final String IRIS_LONG_RAW = "LONG RAW"; + + // ---------------------------other--------------------------- + public static final String IRIS_BIT = "BIT"; + + public static final int MAX_SCALE = 18; + public static final int DEFAULT_SCALE = 0; + public static final int MAX_PRECISION = 19 + MAX_SCALE; + public static final int DEFAULT_PRECISION = 15; + public static final int MAX_TIME_SCALE = 9; + public static final long GUID_LENGTH = 36; + public static final long MAX_VARCHAR_LENGTH = Integer.MAX_VALUE; + public static final long MAX_BINARY_LENGTH = Integer.MAX_VALUE; + public static final IrisTypeConverter INSTANCE = new IrisTypeConverter(); + + @Override + public String identifier() { + return DatabaseIdentifier.IRIS; + } + + @Override + public Column convert(BasicTypeDefine typeDefine) { + Long typeDefineLength = typeDefine.getLength(); + PhysicalColumn.PhysicalColumnBuilder builder = + PhysicalColumn.builder() + .name(typeDefine.getName()) + .sourceType(typeDefine.getColumnType()) + .columnLength(typeDefineLength) + .scale(typeDefine.getScale()) + .nullable(typeDefine.isNullable()) + .defaultValue(typeDefine.getDefaultValue()) + .comment(typeDefine.getComment()); + String irisDataType = typeDefine.getDataType().toUpperCase(); + long charOrBinaryLength = + Objects.nonNull(typeDefineLength) && typeDefineLength > 0 ? typeDefineLength : 1; + switch (irisDataType) { + case IRIS_NULL: + builder.dataType(BasicType.VOID_TYPE); + break; + case IRIS_BIT: + builder.dataType(BasicType.BOOLEAN_TYPE); + break; + case IRIS_NUMERIC: + case IRIS_MONEY: + case IRIS_SMALLMONEY: + case IRIS_NUMBER: + case IRIS_DEC: + case IRIS_DECIMAL: + DecimalType decimalType; + if (typeDefine.getPrecision() != null && typeDefine.getPrecision() > 0) { + decimalType = + new DecimalType( + typeDefine.getPrecision().intValue(), typeDefine.getScale()); + } else { + decimalType = new DecimalType(DEFAULT_PRECISION, DEFAULT_SCALE); + } + builder.dataType(decimalType); + builder.columnLength(Long.valueOf(decimalType.getPrecision())); + builder.scale(decimalType.getScale()); + break; + case IRIS_INT: + case IRIS_INTEGER: + case IRIS_MEDIUMINT: + builder.dataType(BasicType.INT_TYPE); + break; + case IRIS_ROWVERSION: + case IRIS_BIGINT: + case IRIS_SERIAL: + builder.dataType(BasicType.LONG_TYPE); + break; + case IRIS_TINYINT: + builder.dataType(BasicType.BYTE_TYPE); + break; + case IRIS_SMALLINT: + builder.dataType(BasicType.SHORT_TYPE); + break; + case IRIS_FLOAT: + builder.dataType(BasicType.FLOAT_TYPE); + break; + case IRIS_DOUBLE: + case IRIS_REAL: + case IRIS_DOUBLE_PRECISION: + builder.dataType(BasicType.DOUBLE_TYPE); + break; + case IRIS_CHAR: + case IRIS_CHAR_VARYING: + case IRIS_CHARACTER_VARYING: + case IRIS_NATIONAL_CHAR: + case IRIS_NATIONAL_CHAR_VARYING: + case IRIS_NATIONAL_CHARACTER: + case IRIS_NATIONAL_CHARACTER_VARYING: + case IRIS_NATIONAL_VARCHAR: + case IRIS_NCHAR: + case IRIS_SYSNAME: + case IRIS_VARCHAR2: + case IRIS_VARCHAR: + case IRIS_NVARCHAR: + case IRIS_UNIQUEIDENTIFIER: + case IRIS_GUID: + case IRIS_CHARACTER: + builder.dataType(BasicType.STRING_TYPE); + builder.columnLength(charOrBinaryLength); + break; + case IRIS_NTEXT: + case IRIS_CLOB: + case IRIS_LONG_VARCHAR: + case IRIS_LONG: + case IRIS_LONGTEXT: + case IRIS_MEDIUMTEXT: + case IRIS_TEXT: + case IRIS_LONGVARCHAR: + builder.dataType(BasicType.STRING_TYPE); + builder.columnLength(Long.valueOf(Integer.MAX_VALUE)); + break; + case IRIS_DATE: + builder.dataType(LocalTimeType.LOCAL_DATE_TYPE); + break; + case IRIS_TIME: + builder.dataType(LocalTimeType.LOCAL_TIME_TYPE); + break; + case IRIS_DATETIME: + case IRIS_DATETIME2: + case IRIS_SMALLDATETIME: + case IRIS_TIMESTAMP: + case IRIS_TIMESTAMP2: + case IRIS_POSIXTIME: + builder.dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE); + break; + case IRIS_BINARY: + case IRIS_BINARY_VARYING: + case IRIS_RAW: + case IRIS_VARBINARY: + builder.dataType(PrimitiveByteArrayType.INSTANCE); + builder.columnLength(charOrBinaryLength); + break; + case IRIS_LONGVARBINARY: + case IRIS_BLOB: + case IRIS_IMAGE: + case IRIS_LONG_BINARY: + case IRIS_LONG_RAW: + builder.dataType(PrimitiveByteArrayType.INSTANCE); + builder.columnLength(Long.valueOf(Integer.MAX_VALUE)); + break; + default: + throw CommonError.convertToSeaTunnelTypeError( + DatabaseIdentifier.IRIS, irisDataType, typeDefine.getName()); + } + return builder.build(); + } + + @Override + public BasicTypeDefine reconvert(Column column) { + BasicTypeDefine.BasicTypeDefineBuilder builder = + BasicTypeDefine.builder() + .name(column.getName()) + .precision(column.getColumnLength()) + .length(column.getColumnLength()) + .nullable(column.isNullable()) + .comment(column.getComment()) + .scale(column.getScale()) + .defaultValue(column.getDefaultValue()); + switch (column.getDataType().getSqlType()) { + case NULL: + builder.columnType(IRIS_NULL); + builder.dataType(IRIS_NULL); + break; + case STRING: + if (column.getColumnLength() == null || column.getColumnLength() <= 0) { + builder.columnType(String.format("%s(%s)", IRIS_VARCHAR, MAX_VARCHAR_LENGTH)); + builder.dataType(IRIS_VARCHAR); + } else if (column.getColumnLength() < MAX_VARCHAR_LENGTH) { + builder.columnType( + String.format("%s(%s)", IRIS_VARCHAR, column.getColumnLength())); + builder.dataType(IRIS_VARCHAR); + } else { + builder.columnType(IRIS_LONG_VARCHAR); + builder.dataType(IRIS_LONG_VARCHAR); + } + break; + case BOOLEAN: + builder.columnType(IRIS_BIT); + builder.dataType(IRIS_BIT); + break; + case TINYINT: + builder.columnType(IRIS_TINYINT); + builder.dataType(IRIS_TINYINT); + break; + case SMALLINT: + builder.columnType(IRIS_SMALLINT); + builder.dataType(IRIS_SMALLINT); + break; + case INT: + builder.columnType(IRIS_INTEGER); + builder.dataType(IRIS_INTEGER); + break; + case BIGINT: + builder.columnType(IRIS_BIGINT); + builder.dataType(IRIS_BIGINT); + break; + case FLOAT: + builder.columnType(IRIS_FLOAT); + builder.dataType(IRIS_FLOAT); + break; + case DOUBLE: + builder.columnType(IRIS_DOUBLE); + builder.dataType(IRIS_DOUBLE); + break; + case DECIMAL: + DecimalType decimalType = (DecimalType) column.getDataType(); + long precision = decimalType.getPrecision(); + int scale = decimalType.getScale(); + if (scale < 0) { + scale = 0; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is scale less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (scale > MAX_SCALE) { + scale = MAX_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + MAX_SCALE, + precision, + scale); + } + if (precision < scale) { + precision = scale; + } + if (precision <= 0) { + precision = DEFAULT_PRECISION; + scale = DEFAULT_SCALE; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which is precision less than 0, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + precision, + scale); + } else if (precision > MAX_PRECISION) { + scale = MAX_SCALE; + precision = MAX_PRECISION; + log.warn( + "The decimal column {} type decimal({},{}) is out of range, " + + "which exceeds the maximum precision of {}, " + + "it will be converted to decimal({},{})", + column.getName(), + decimalType.getPrecision(), + decimalType.getScale(), + MAX_PRECISION, + precision, + scale); + } + builder.columnType(String.format("%s(%s,%s)", IRIS_DECIMAL, precision, scale)); + builder.dataType(IRIS_DECIMAL); + builder.precision(precision); + builder.scale(scale); + break; + case BYTES: + if (column.getColumnLength() == null || column.getColumnLength() <= 0) { + builder.columnType(IRIS_LONG_BINARY); + builder.dataType(IRIS_LONG_BINARY); + } else if (column.getColumnLength() < MAX_BINARY_LENGTH) { + builder.dataType(IRIS_BINARY); + builder.columnType( + String.format("%s(%s)", IRIS_BINARY, column.getColumnLength())); + } else { + builder.columnType(IRIS_LONG_BINARY); + builder.dataType(IRIS_LONG_BINARY); + } + break; + case DATE: + builder.columnType(IRIS_DATE); + builder.dataType(IRIS_DATE); + break; + case TIME: + builder.dataType(IRIS_TIME); + if (Objects.nonNull(column.getScale()) && column.getScale() > 0) { + Integer timeScale = column.getScale(); + if (timeScale > MAX_TIME_SCALE) { + timeScale = MAX_TIME_SCALE; + log.warn( + "The time column {} type time({}) is out of range, " + + "which exceeds the maximum scale of {}, " + + "it will be converted to time({})", + column.getName(), + column.getScale(), + MAX_TIME_SCALE, + timeScale); + } + builder.columnType(String.format("%s(%s)", IRIS_TIME, timeScale)); + builder.scale(timeScale); + } else { + builder.columnType(IRIS_TIME); + } + break; + case TIMESTAMP: + builder.columnType(IRIS_TIMESTAMP2); + builder.dataType(IRIS_TIMESTAMP2); + break; + + default: + throw CommonError.convertToConnectorTypeError( + DatabaseIdentifier.IRIS, + column.getDataType().getSqlType().name(), + column.getName()); + } + return builder.build(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeMapper.java new file mode 100644 index 00000000000..3f60e47ad67 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeMapper.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class IrisTypeMapper implements JdbcDialectTypeMapper { + @Override + public Column mappingColumn(BasicTypeDefine typeDefine) { + return IrisTypeConverter.INSTANCE.convert(typeDefine); + } + + @Override + public Column mappingColumn(ResultSetMetaData metadata, int colIndex) throws SQLException { + String columnName = metadata.getColumnLabel(colIndex); + String nativeType = metadata.getColumnTypeName(colIndex); + int isNullable = metadata.isNullable(colIndex); + long precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name(columnName) + .columnType(nativeType) + .dataType(nativeType) + .nullable(isNullable == ResultSetMetaData.columnNullable) + .length(precision) + .precision(precision) + .scale(scale) + .build(); + return mappingColumn(typeDefine); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 599c0f58203..384319d0223 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -35,6 +35,8 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.IrisCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.savemode.IrisSaveModeHandler; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; @@ -194,6 +196,16 @@ public Optional getSaveModeHandler() { CatalogUtils.quoteTableIdentifier( catalogTable.getTableId().getTableName(), fieldIde)); catalogTable.getOptions().put("fieldIde", fieldIde); + if (catalog instanceof IrisCatalog) { + return Optional.of( + new IrisSaveModeHandler( + schemaSaveMode, + dataSaveMode, + catalog, + tablePath, + catalogTable, + config.get(JdbcOptions.CUSTOM_SQL))); + } return Optional.of( new DefaultSaveModeHandler( schemaSaveMode, diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/IrisCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/IrisCreateTableSqlBuilderTest.java new file mode 100644 index 00000000000..20c65d06c25 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/IrisCreateTableSqlBuilderTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.sql; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.IrisCreateTableSqlBuilder; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; + +public class IrisCreateTableSqlBuilderTest { + + @Test + public void TestCreateTableSqlBuilder() { + TablePath tablePath = TablePath.of("test_database", "test_schema", "test_table"); + TableSchema tableSchema = + TableSchema.builder() + .column(PhysicalColumn.of("id", BasicType.LONG_TYPE, 22, false, null, "id")) + .column( + PhysicalColumn.of( + "name", BasicType.STRING_TYPE, 128, false, null, "name")) + .column( + PhysicalColumn.of( + "age", BasicType.INT_TYPE, (Long) null, true, null, "age")) + .column( + PhysicalColumn.of( + "createTime", + LocalTimeType.LOCAL_DATE_TIME_TYPE, + 3, + true, + null, + "createTime")) + .column( + PhysicalColumn.of( + "lastUpdateTime", + LocalTimeType.LOCAL_DATE_TIME_TYPE, + 3, + true, + null, + "lastUpdateTime")) + .primaryKey(PrimaryKey.of("id", Lists.newArrayList("id"))) + .constraintKey( + Arrays.asList( + ConstraintKey.of( + ConstraintKey.ConstraintType.UNIQUE_KEY, + "name", + Lists.newArrayList( + ConstraintKey.ConstraintKeyColumn.of( + "name", null))), + ConstraintKey.of( + ConstraintKey.ConstraintType.INDEX_KEY, + "age", + Lists.newArrayList( + ConstraintKey.ConstraintKeyColumn.of( + "age", null))))) + .build(); + CatalogTable catalogTable = + CatalogTable.of( + TableIdentifier.of("test_catalog", tablePath), + tableSchema, + new HashMap<>(), + new ArrayList<>(), + "User table"); + + String createTableSql = new IrisCreateTableSqlBuilder(catalogTable).build(tablePath); + // create table sql is change; The old unit tests are no longer applicable + String expect = + "CREATE TABLE \"test_schema\".\"test_table\" (\n" + + " %Description 'User table',\n" + + "\"id\" BIGINT NOT NULL %Description 'id',\n" + + "\"name\" VARCHAR(128) NOT NULL %Description 'name',\n" + + "\"age\" INTEGER %Description 'age',\n" + + "\"createTime\" TIMESTAMP2 %Description 'createTime',\n" + + "\"lastUpdateTime\" TIMESTAMP2 %Description 'lastUpdateTime',\n" + + " PRIMARY KEY (\"id\"),\n" + + "UNIQUE (\"name\")\n" + + ");\n" + + "CREATE INDEX test_table_age ON \"test_schema\".\"test_table\"(\"age\");"; + System.out.println(createTableSql); + Assertions.assertEquals(expect, createTableSql); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverterTest.java new file mode 100644 index 00000000000..03d2ad1918d --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisTypeConverterTest.java @@ -0,0 +1,645 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris; + +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.converter.BasicTypeDefine; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; + +import static org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris.IrisTypeConverter.MAX_BINARY_LENGTH; + +public class IrisTypeConverterTest { + + private static BasicTypeDefine.BasicTypeDefineBuilder basicTypeDefineBuilder; + + @BeforeAll + public static void setup() { + basicTypeDefineBuilder = + BasicTypeDefine.builder() + .name("test") + .nullable(true) + .defaultValue("1") + .comment("test"); + } + + @Test + public void testConvertUnsupported() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("aaa").dataType("aaa").build(); + try { + IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.fail(); + } catch (SeaTunnelRuntimeException e) { + // ignore + } catch (Throwable e) { + Assertions.fail(); + } + } + + @Test + public void testReconvertUnsupported() { + Column column = + PhysicalColumn.of( + "test", + new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE), + (Long) null, + true, + null, + null); + try { + IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.fail(); + } catch (SeaTunnelRuntimeException e) { + // ignore + } catch (Throwable e) { + Assertions.fail(); + } + } + + @Test + public void testConvertBIT() { + BasicTypeDefine typeDefine = + basicTypeDefineBuilder + .columnType("BIT") + .dataType("BIT") + .nullable(true) + .defaultValue("1") + .comment("test") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(BasicType.BOOLEAN_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + Assertions.assertEquals(typeDefine.isNullable(), column.isNullable()); + Assertions.assertEquals(typeDefine.getDefaultValue(), column.getDefaultValue()); + Assertions.assertEquals(typeDefine.getComment(), column.getComment()); + } + + @Test + public void testConvertDecimal() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("numeric(38,2)") + .dataType("numeric") + .precision(38L) + .scale(2) + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(new DecimalType(38, 2), column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("numeric") + .dataType("numeric") + .build(); + column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(new DecimalType(15, 0), column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertTinyint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("tinyint") + .dataType("tinyint") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.BYTE_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertSmallint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("smallint") + .dataType("smallint") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertInt() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("int").dataType("int").build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.INT_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertBigint() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("bigint") + .dataType("bigint") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertFloat() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("float") + .dataType("float") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.FLOAT_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertDouble() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("double") + .dataType("double") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.DOUBLE_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertChar() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("char").dataType("char").build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(1, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("char(10)") + .dataType("char") + .length(10L) + .build(); + column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(10, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertVarchar() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("varchar") + .dataType("varchar") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(1, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("varchar(10)") + .dataType("varchar") + .length(10L) + .build(); + column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(10, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("varchar2(20)") + .dataType("varchar2") + .length(20L) + .build(); + column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(20, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertOtherString() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("clob").dataType("clob").build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.STRING_TYPE, column.getDataType()); + Assertions.assertEquals(Integer.MAX_VALUE, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertBinary() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("binary") + .dataType("binary") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, column.getDataType()); + Assertions.assertEquals(1, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertOtherBinary() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("blob").dataType("blob").build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(PrimitiveByteArrayType.INSTANCE, column.getDataType()); + Assertions.assertEquals(Integer.MAX_VALUE, column.getColumnLength()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertDate() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("date").dataType("date").build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertTime() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder().name("test").columnType("time").dataType("time").build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_TIME_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testConvertTimestamp() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("datetime") + .dataType("datetime") + .build(); + Column column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("timestamp") + .dataType("timestamp") + .build(); + column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getScale(), column.getScale()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + + typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("timestamp(6)") + .dataType("timestamp") + .scale(6) + .build(); + column = IrisTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(LocalTimeType.LOCAL_DATE_TIME_TYPE, column.getDataType()); + Assertions.assertEquals(typeDefine.getScale(), column.getScale()); + Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType()); + } + + @Test + public void testReconvertBoolean() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.BOOLEAN_TYPE) + .nullable(true) + .defaultValue(true) + .comment("test") + .build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_BIT, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_BIT, typeDefine.getDataType()); + Assertions.assertEquals(column.isNullable(), typeDefine.isNullable()); + Assertions.assertEquals(column.getDefaultValue(), typeDefine.getDefaultValue()); + Assertions.assertEquals(column.getComment(), typeDefine.getComment()); + } + + @Test + public void testReconvertByte() { + Column column = PhysicalColumn.builder().name("test").dataType(BasicType.BYTE_TYPE).build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TINYINT, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TINYINT, typeDefine.getDataType()); + } + + @Test + public void testReconvertShort() { + Column column = + PhysicalColumn.builder().name("test").dataType(BasicType.SHORT_TYPE).build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_SMALLINT, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_SMALLINT, typeDefine.getDataType()); + } + + @Test + public void testReconvertInt() { + Column column = PhysicalColumn.builder().name("test").dataType(BasicType.INT_TYPE).build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_INTEGER, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_INTEGER, typeDefine.getDataType()); + } + + @Test + public void testReconvertLong() { + Column column = PhysicalColumn.builder().name("test").dataType(BasicType.LONG_TYPE).build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_BIGINT, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_BIGINT, typeDefine.getDataType()); + } + + @Test + public void testReconvertFloat() { + Column column = + PhysicalColumn.builder().name("test").dataType(BasicType.FLOAT_TYPE).build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_FLOAT, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_FLOAT, typeDefine.getDataType()); + } + + @Test + public void testReconvertDouble() { + Column column = + PhysicalColumn.builder().name("test").dataType(BasicType.DOUBLE_TYPE).build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_DOUBLE, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_DOUBLE, typeDefine.getDataType()); + } + + @Test + public void testReconvertDecimal() { + Column column = + PhysicalColumn.builder().name("test").dataType(new DecimalType(0, 0)).build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format( + "%s(%s,%s)", + IrisTypeConverter.IRIS_DECIMAL, + IrisTypeConverter.DEFAULT_PRECISION, + IrisTypeConverter.DEFAULT_SCALE), + typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_DECIMAL, typeDefine.getDataType()); + + column = PhysicalColumn.builder().name("test").dataType(new DecimalType(10, 2)).build(); + + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format("%s(%s,%s)", IrisTypeConverter.IRIS_DECIMAL, 10, 2), + typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_DECIMAL, typeDefine.getDataType()); + } + + @Test + public void testReconvertBytes() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(PrimitiveByteArrayType.INSTANCE) + .columnLength(null) + .build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_LONG_BINARY, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_LONG_BINARY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(PrimitiveByteArrayType.INSTANCE) + .columnLength(2L) + .build(); + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(column.getColumnLength(), typeDefine.getLength()); + Assertions.assertEquals( + String.format(IrisTypeConverter.IRIS_BINARY + "(%s)", typeDefine.getLength()), + typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_BINARY, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(PrimitiveByteArrayType.INSTANCE) + .columnLength(MAX_BINARY_LENGTH) + .build(); + + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_LONG_BINARY, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_LONG_BINARY, typeDefine.getDataType()); + } + + @Test + public void testReconvertString() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(null) + .build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals("VARCHAR(" + Integer.MAX_VALUE + ")", typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_VARCHAR, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(1L) + .build(); + + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format("%s(%s)", IrisTypeConverter.IRIS_VARCHAR, column.getColumnLength()), + typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_VARCHAR, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(60000L) + .build(); + + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format("%s(%s)", IrisTypeConverter.IRIS_VARCHAR, column.getColumnLength()), + typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_VARCHAR, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(BasicType.STRING_TYPE) + .columnLength(60001L) + .build(); + + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals( + String.format("%s(%s)", IrisTypeConverter.IRIS_VARCHAR, column.getColumnLength()), + typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_VARCHAR, typeDefine.getDataType()); + } + + @Test + public void testReconvertDate() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TYPE) + .build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_DATE, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_DATE, typeDefine.getDataType()); + } + + @Test + public void testReconvertTime() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_TIME_TYPE) + .build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TIME, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TIME, typeDefine.getDataType()); + } + + @Test + public void testReconvertDatetime() { + Column column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE) + .build(); + + BasicTypeDefine typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TIMESTAMP2, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TIMESTAMP2, typeDefine.getDataType()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE) + .scale(3) + .build(); + + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TIMESTAMP2, typeDefine.getColumnType()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TIMESTAMP2, typeDefine.getDataType()); + Assertions.assertEquals(column.getScale(), typeDefine.getScale()); + + column = + PhysicalColumn.builder() + .name("test") + .dataType(LocalTimeType.LOCAL_DATE_TIME_TYPE) + .scale(6) + .build(); + + typeDefine = IrisTypeConverter.INSTANCE.reconvert(column); + Assertions.assertEquals(column.getName(), typeDefine.getName()); + Assertions.assertEquals(IrisTypeConverter.IRIS_TIMESTAMP2, typeDefine.getColumnType()); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 08a9868ab6f..b66a06dec20 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -407,6 +407,10 @@ private Object checkData(Object data) throws SQLException, IOException { try (InputStream inputStream = ((Blob) data).getBinaryStream()) { return ByteStreams.toByteArray(inputStream); } + } else if (data instanceof InputStream) { + try (InputStream inputStream = (InputStream) data) { + return ByteStreams.toByteArray(inputStream); + } } else if (data instanceof Array) { Object[] jdbcArray = (Object[]) ((Array) data).getArray(); Object[] javaArray = new Object[jdbcArray.length]; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml index af4c61d5b65..54929edfa35 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/pom.xml @@ -95,6 +95,16 @@ com.xugudb xugu-jdbc test + + + ch.qos.logback + logback-classic + + + ch.qos.logback + logback-core + + diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java new file mode 100644 index 00000000000..4efa5bf651a --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java @@ -0,0 +1,592 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.JdbcUrlUtil; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.iris.IrisCatalog; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.iris.IrisDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.source.JdbcSourceTable; +import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; +import org.apache.seatunnel.e2e.common.container.EngineType; +import org.apache.seatunnel.e2e.common.container.TestContainer; +import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; +import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerLoggerFactory; +import org.testcontainers.utility.MountableFile; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.sql.Date; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@DisabledOnContainer( + value = {}, + type = {EngineType.SPARK}, + disabledReason = "Currently SPARK do not support cdc, temporarily disable") +@Slf4j +public class JdbcIrisIT extends AbstractJdbcIT { + private static final String IRIS_IMAGE = "intersystems/iris-community:2023.1"; + private static final String IRIS_NETWORK_ALIASES = "e2e_irisDb"; + private static final String DRIVER_CLASS = "com.intersystems.jdbc.IRISDriver"; + private static final int IRIS_PORT = 1972; + private static final String IRIS_URL = "jdbc:IRIS://" + HOST + ":%s/%s"; + private static final String USERNAME = "_SYSTEM"; + private static final String PASSWORD = "Seatunnel"; + private static final String DATABASE = "%SYS"; + private static final String SCHEMA = "test"; + private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SINK_TABLE = "e2e_table_sink"; + private static final String CATALOG_TABLE = "e2e_table_catalog"; + private static final Integer GEN_ROWS = 100; + private static final List CONFIG_FILE = + Lists.newArrayList("/jdbc_iris_source_to_sink_with_full_type.conf"); + + @TestContainerExtension + protected final ContainerExtendedFactory extendedFactory = + container -> { + Container.ExecResult extraCommands = + container.execInContainer( + "bash", + "-c", + "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && wget " + + driverUrl()); + Assertions.assertEquals(0, extraCommands.getExitCode(), extraCommands.getStderr()); + }; + + private static final String CREATE_SQL = + "create table %s\n" + + "(\n" + + " BIGINT_COL BIGINT primary key,\n" + + " BIGINT_10_COL BIGINT(10),\n" + + " BINARY_COL BINARY,\n" + + " BINARY_10_COL BINARY(10),\n" + + " BINARY_VARYING_COL BINARY VARYING,\n" + + " BINARY_VARYING_10_COL BINARY VARYING(10),\n" + + " BIT_COL BIT,\n" + + " BLOB_COL BLOB,\n" + + " CHAR_COL CHAR,\n" + + " CHAR_255_COL CHAR(255),\n" + + " CHAR_VARYING_COL CHAR VARYING,\n" + + " CHAR_VARYING_255_COL CHAR VARYING(255),\n" + + " CHARACTER_COL CHARACTER,\n" + + " CHARACTER_120_COL CHARACTER(120),\n" + + " CHARACTER_VARYING_COL CHARACTER VARYING,\n" + + " CHARACTER_VARYING_155_COL CHARACTER VARYING(155),\n" + + " CLOB_COL CLOB,\n" + + " DATE_COL DATE,\n" + + " DATETIME_COL DATETIME,\n" + + " DATETIME2_COL DATETIME2,\n" + + " DEC_COL DEC,\n" + + " DEC_3_COL DEC(3),\n" + + " DEC_3_2_COL DEC(3,2),\n" + + " DECIMAL_COL DECIMAL,\n" + + " DECIMAL_6_COL DECIMAL(6),\n" + + " DECIMAL_6_2_COL DECIMAL(6,2),\n" + + " DOUBLE_COL DOUBLE,\n" + + " DOUBLE_PRECISION_COL DOUBLE PRECISION,\n" + + " FLOAT_COL FLOAT,\n" + + " FLOAT_2_COL FLOAT(2),\n" + + " IMAGE_COL IMAGE,\n" + + " INT_COL INT,\n" + + " INT_10_COL INT(10),\n" + + " INTEGER_COL INTEGER,\n" + + " LONG_COL LONG,\n" + + " LONG_BINARY_COL LONG BINARY,\n" + + " LONG_RAW_COL LONG RAW,\n" + + " LONG_VARCHAR_COL LONG VARCHAR,\n" + + " LONG_VARCHAR_10_COL LONG VARCHAR(10),\n" + + " LONGTEXT_COL LONGTEXT,\n" + + " LONGVARBINARY_COL LONGVARBINARY,\n" + + " LONGVARBINARY_10_COL LONGVARBINARY(10),\n" + + " LONGVARCHAR_COL LONGVARCHAR,\n" + + " LONGVARCHAR_20_COL LONGVARCHAR(20),\n" + + " MEDIUMINT_COL MEDIUMINT,\n" + + " MEDIUMINT_10_COL MEDIUMINT(10),\n" + + " MEDIUMTEXT_COL MEDIUMTEXT,\n" + + " MONEY_COL MONEY,\n" + + " NATIONAL_CHAR_COL NATIONAL CHAR,\n" + + " NATIONAL_CHAR_200_COL NATIONAL CHAR(200),\n" + + " NATIONAL_CHAR_VARYING_COL NATIONAL CHAR VARYING,\n" + + " NATIONAL_CHAR_VARYING_100_COL NATIONAL CHAR VARYING(100),\n" + + " NATIONAL_CHARACTER_COL NATIONAL CHARACTER,\n" + + " NATIONAL_CHARACTER_233_COL NATIONAL CHARACTER(233),\n" + + " NCHAR_COL NCHAR,\n" + + " NCHAR_22_COL NCHAR(22),\n" + + " NTEXT_COL NTEXT,\n" + + " NUMBER_COL NUMBER,\n" + + " NUMBER_5_COL NUMBER(5),\n" + + " NUMBER_5_3_COL NUMBER(5,3),\n" + + " NUMERIC_COL NUMERIC,\n" + + " NUMERIC_6_COL NUMERIC(6),\n" + + " NUMERIC_6_3_COL NUMERIC(6,3),\n" + + " NVARCHAR_COL NVARCHAR,\n" + + " NVARCHAR_7_COL NVARCHAR(7),\n" + + " NVARCHAR_7_3_COL NVARCHAR(7,3),\n" + + " POSIXTIME_COL POSIXTIME,\n" + + " RAW_10_COL RAW(10),\n" + + " REAL_COL REAL,\n" + + " SERIAL_COL SERIAL,\n" + + " SMALLDATETIME_COL SMALLDATETIME,\n" + + " SMALLINT_COL SMALLINT,\n" + + " SMALLINT_3_COL SMALLINT(3),\n" + + " SMALLMONEY_COL SMALLMONEY,\n" + + " SYSNAME_COL SYSNAME,\n" + + " TEXT_COL TEXT,\n" + + " TIME_COL TIME,\n" + + " TIME_3_COL TIME(3),\n" + + " TIMESTAMP_COL TIMESTAMP,\n" + + " TIMESTAMP2_COL TIMESTAMP2,\n" + + " TINYINT_COL TINYINT,\n" + + " TINYINT_10_COL TINYINT(10),\n" + + " UNIQUEIDENTIFIER_COL UNIQUEIDENTIFIER,\n" + + " VARBINARY_COL VARBINARY,\n" + + " VARBINARY_10_COL VARBINARY(10),\n" + + " VARCHAR_COL VARCHAR,\n" + + " VARCHAR_254_COL VARCHAR(254),\n" + + " VARCHAR_254_10_COL VARCHAR(254,10),\n" + + " VARCHAR2_10_COL VARCHAR2(10)\n" + + ")"; + + private static final String[] fieldNames = + new String[] { + "BIGINT_COL", + "BIGINT_10_COL", + "BINARY_COL", + "BINARY_10_COL", + "BINARY_VARYING_COL", + "BINARY_VARYING_10_COL", + "BIT_COL", + "BLOB_COL", + "CHAR_COL", + "CHAR_255_COL", + "CHAR_VARYING_COL", + "CHAR_VARYING_255_COL", + "CHARACTER_COL", + "CHARACTER_120_COL", + "CHARACTER_VARYING_COL", + "CHARACTER_VARYING_155_COL", + "CLOB_COL", + "DATE_COL", + "DATETIME_COL", + "DATETIME2_COL", + "DEC_COL", + "DEC_3_COL", + "DEC_3_2_COL", + "DECIMAL_COL", + "DECIMAL_6_COL", + "DECIMAL_6_2_COL", + "DOUBLE_COL", + "DOUBLE_PRECISION_COL", + "FLOAT_COL", + "FLOAT_2_COL", + "IMAGE_COL", + "INT_COL", + "INT_10_COL", + "INTEGER_COL", + "LONG_COL", + "LONG_BINARY_COL", + "LONG_RAW_COL", + "LONG_VARCHAR_COL", + "LONG_VARCHAR_10_COL", + "LONGTEXT_COL", + "LONGVARBINARY_COL", + "LONGVARBINARY_10_COL", + "LONGVARCHAR_COL", + "LONGVARCHAR_20_COL", + "MEDIUMINT_COL", + "MEDIUMINT_10_COL", + "MEDIUMTEXT_COL", + "MONEY_COL", + "NATIONAL_CHAR_COL", + "NATIONAL_CHAR_200_COL", + "NATIONAL_CHAR_VARYING_COL", + "NATIONAL_CHAR_VARYING_100_COL", + "NATIONAL_CHARACTER_COL", + "NATIONAL_CHARACTER_233_COL", + "NCHAR_COL", + "NCHAR_22_COL", + "NTEXT_COL", + "NUMBER_COL", + "NUMBER_5_COL", + "NUMBER_5_3_COL", + "NUMERIC_COL", + "NUMERIC_6_COL", + "NUMERIC_6_3_COL", + "NVARCHAR_COL", + "NVARCHAR_7_COL", + "NVARCHAR_7_3_COL", + "POSIXTIME_COL", + "RAW_10_COL", + "REAL_COL", + "SERIAL_COL", + "SMALLDATETIME_COL", + "SMALLINT_COL", + "SMALLINT_3_COL", + "SMALLMONEY_COL", + "SYSNAME_COL", + "TEXT_COL", + "TIME_COL", + "TIME_3_COL", + "TIMESTAMP_COL", + "TIMESTAMP2_COL", + "TINYINT_COL", + "TINYINT_10_COL", + "UNIQUEIDENTIFIER_COL", + "VARBINARY_COL", + "VARBINARY_10_COL", + "VARCHAR_COL", + "VARCHAR_254_COL", + "VARCHAR_254_10_COL", + "VARCHAR2_10_COL" + }; + + @Test + public void testSampleDataFromColumnSuccess() throws SQLException { + JdbcDialect dialect = new IrisDialect(); + JdbcSourceTable table = + JdbcSourceTable.builder() + .tablePath(TablePath.of(DATABASE, SCHEMA, SOURCE_TABLE)) + .build(); + Object[] bigintCols = + dialect.sampleDataFromColumn(connection, table, "BIGINT_COL", 1, 1024); + Assertions.assertEquals(GEN_ROWS, bigintCols.length); + } + + @Test + @Override + public void testCatalog() { + if (catalog == null) { + return; + } + TablePath sourceTablePath = + new TablePath( + jdbcCase.getDatabase(), jdbcCase.getSchema(), jdbcCase.getSourceTable()); + TablePath targetTablePath = + new TablePath( + jdbcCase.getCatalogDatabase(), + jdbcCase.getCatalogSchema(), + jdbcCase.getCatalogTable()); + + CatalogTable catalogTable = catalog.getTable(sourceTablePath); + catalog.createTable(targetTablePath, catalogTable, false); + Assertions.assertTrue(catalog.tableExists(targetTablePath)); + + catalog.dropTable(targetTablePath, false); + Assertions.assertFalse(catalog.tableExists(targetTablePath)); + } + + @TestTemplate + public void testUpsert(TestContainer container) throws IOException, InterruptedException { + Container.ExecResult execResult = container.executeJob("/jdbc_iris_upsert.conf"); + Assertions.assertEquals(0, execResult.getExitCode(), execResult.getStderr()); + try (Statement statement = connection.createStatement()) { + ResultSet sink = + statement.executeQuery( + "SELECT * FROM test.e2e_upsert_table_sink ORDER BY pk_id"); + String[] fieldNames = new String[] {"pk_id", "name", "score"}; + Object[] sinkResult = toArrayResult(sink, fieldNames); + Assertions.assertEquals(2, sinkResult.length); + Assertions.assertEquals(3, ((Object[]) sinkResult[0]).length); + Assertions.assertEquals("A_1", ((Object[]) sinkResult[0])[1]); + } catch (SQLException | IOException e) { + throw new SeaTunnelRuntimeException(JdbcITErrorCode.DATA_COMPARISON_FAILED, e); + } + } + + @Override + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + containerEnv.put("IRIS_PASSWORD", PASSWORD); + containerEnv.put("APP_USER", USERNAME); + containerEnv.put("APP_USER_PASSWORD", PASSWORD); + String jdbcUrl = String.format(IRIS_URL, IRIS_PORT, DATABASE); + Pair> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable(SCHEMA, SOURCE_TABLE, fieldNames); + + return JdbcCase.builder() + .dockerImage(IRIS_IMAGE) + .networkAliases(IRIS_NETWORK_ALIASES) + .containerEnv(containerEnv) + .driverClass(DRIVER_CLASS) + .host(HOST) + .port(IRIS_PORT) + .localPort(IRIS_PORT) + .jdbcTemplate(IRIS_URL) + .jdbcUrl(jdbcUrl) + .userName(USERNAME) + .password(PASSWORD) + .database(DATABASE) + .schema(SCHEMA) + .sourceTable(SOURCE_TABLE) + .sinkTable(SINK_TABLE) + .catalogDatabase(DATABASE) + .catalogSchema(SCHEMA) + .catalogTable(CATALOG_TABLE) + .createSql(CREATE_SQL) + .configFile(CONFIG_FILE) + .insertSql(insertSql) + .testData(testDataSet) + .build(); + } + + @Override + protected void createNeededTables() { + try (Statement statement = connection.createStatement()) { + String createTemplate = jdbcCase.getCreateSql(); + + String createSource = + String.format( + createTemplate, + buildTableInfoWithSchema( + jdbcCase.getDatabase(), + jdbcCase.getSchema(), + jdbcCase.getSourceTable())); + statement.execute(createSource); + + String upsertSinkSql = + "CREATE TABLE test.e2e_upsert_table_sink (\n" + + "\"pk_id\" INT PRIMARY KEY,\n" + + "\"name\" VARCHAR(50),\n" + + "\"score\" INT\n" + + ");"; + statement.execute(upsertSinkSql); + + connection.commit(); + } catch (Exception exception) { + log.error(ExceptionUtils.getMessage(exception)); + throw new SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception); + } + } + + @Override + public String insertTable(String schema, String table, String... fields) { + String columns = + Arrays.stream(fields).map(this::quoteIdentifier).collect(Collectors.joining(", ")); + String placeholders = Arrays.stream(fields).map(f -> "?").collect(Collectors.joining(", ")); + + return "INSERT OR UPDATE " + + buildTableInfoWithSchema(schema, table) + + " (" + + columns + + " )" + + " VALUES (" + + placeholders + + ")"; + } + + @Override + void compareResult(String executeKey) throws SQLException, IOException { + defaultCompare(executeKey, fieldNames, "BIGINT_COL"); + } + + @Override + String driverUrl() { + // reference: https://intersystems-community.github.io/iris-driver-distribution/ + return "https://raw.githubusercontent.com/intersystems-community/iris-driver-distribution/main/JDBC/JDK18/intersystems-jdbc-3.8.4.jar"; + } + + @Override + protected Class loadDriverClass() { + return super.loadDriverClassFromUrl(); + } + + @Override + Pair> initTestData() { + List rows = new ArrayList<>(); + for (int i = 1; i <= GEN_ROWS; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + Long.valueOf(i), + Long.valueOf(i), + "*".getBytes(StandardCharsets.UTF_8), + "123456".getBytes(StandardCharsets.UTF_8), + "*".getBytes(StandardCharsets.UTF_8), + "123456".getBytes(StandardCharsets.UTF_8), + i % 10 == 0 ? 1 : 0, + String.valueOf(i).getBytes(StandardCharsets.UTF_8), + "*", + String.valueOf(i), + "*", + String.valueOf(i), + "*", + String.valueOf(i), + "*", + String.valueOf(i), + String.valueOf(i), + Date.valueOf(LocalDate.now()), + Timestamp.valueOf(LocalDateTime.now()), + Timestamp.valueOf(LocalDateTime.now()), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 2), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 2), + Double.parseDouble("1.111"), + Double.parseDouble("1.111111"), + Float.parseFloat("1.1"), + Float.parseFloat("1.11"), + String.valueOf(i).getBytes(), + i, + i, + i, + Long.valueOf(i), + String.valueOf(i).getBytes(), + String.valueOf(i).getBytes(), + String.valueOf(i), + String.valueOf(i), + String.valueOf(i), + String.valueOf(i).getBytes(), + String.valueOf(i).getBytes(), + String.valueOf(i), + String.valueOf(i), + i, + i, + String.valueOf(i), + i, + "*", + String.valueOf(i), + "*", + String.valueOf(i), + "*", + String.valueOf(i), + "*", + String.valueOf(i), + String.valueOf(i), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 3), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 0), + BigDecimal.valueOf(i, 3), + "1", + "1", + "1.111", + Time.valueOf(LocalTime.now()), + "10".getBytes(), + Double.parseDouble("1.11"), + Long.valueOf(i), + Timestamp.valueOf(LocalDateTime.now()), + i, + i, + i, + "F4526E29-8B4A-4449-AA90-2A7DF971F221", + String.valueOf(i), + Time.valueOf(LocalTime.now()), + Time.valueOf(LocalTime.now()), + Timestamp.valueOf(LocalDateTime.now()), + Timestamp.valueOf(LocalDateTime.now()), + i, + i, + "3E8B5AC7-D63A-4202-83E1-A576EBE11557", + "*".getBytes(), + String.valueOf(i).getBytes(), + "*", + String.valueOf(i), + "1.11", + String.valueOf(i) + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } + + @Override + GenericContainer initContainer() { + GenericContainer container = + new GenericContainer(IRIS_IMAGE) + .withCopyFileToContainer( + MountableFile.forClasspathResource("password/password.txt"), + "/tmp/password.txt") + .withCommand("--password-file /tmp/password.txt") + .withNetwork(NETWORK) + .withNetworkAliases(IRIS_NETWORK_ALIASES) + .withExposedPorts(IRIS_PORT) + .withLogConsumer( + new Slf4jLogConsumer(DockerLoggerFactory.getLogger(IRIS_IMAGE))); + + container.setPortBindings(Lists.newArrayList(String.format("%s:%s", IRIS_PORT, IRIS_PORT))); + + return container; + } + + @Override + public String quoteIdentifier(String field) { + return "\"" + field + "\""; + } + + @Override + protected void clearTable(String database, String schema, String table) { + clearTable(schema, table); + } + + @Override + protected String buildTableInfoWithSchema(String database, String schema, String table) { + return buildTableInfoWithSchema(schema, table); + } + + @Override + protected void initCatalog() { + String jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()); + catalog = + new IrisCatalog( + "iris", + jdbcCase.getUserName(), + jdbcCase.getPassword(), + JdbcUrlUtil.getUrlInfo(jdbcUrl)); + // set connection + ((IrisCatalog) catalog).setConnection(jdbcUrl, connection); + catalog.open(); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_source_to_sink_with_full_type.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_source_to_sink_with_full_type.conf new file mode 100644 index 00000000000..3e13b2d2020 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_source_to_sink_with_full_type.conf @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + jdbc { + url = "jdbc:IRIS://e2e_irisDb:1972/%SYS" + driver = "com.intersystems.jdbc.IRISDriver" + connection_check_timeout_sec = 100 + user = "_SYSTEM" + password = "Seatunnel" + table_path = "test.e2e_table_source" + query = "select * from test.e2e_table_source" + split.size = 10 + } +} + +transform { +} + +sink { + jdbc { + url = "jdbc:IRIS://e2e_irisDb:1972/%SYS" + driver = "com.intersystems.jdbc.IRISDriver" + user = "_SYSTEM" + password = "Seatunnel" + database = "%SYS" + schema = "${schema_name}" + table = "e2e_table_sink" + generate_sink_sql = true + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_upsert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_upsert.conf new file mode 100644 index 00000000000..5d3e69bcb6b --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/jdbc_iris_upsert.conf @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + primaryKey { + name = "pk_id" + columnNames = [pk_id] + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + } + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +transform { +} + +sink { + jdbc { + url = "jdbc:IRIS://e2e_irisDb:1972/%SYS" + driver = "com.intersystems.jdbc.IRISDriver" + user = "_SYSTEM" + password = "Seatunnel" + database = "%SYS" + schema = "test" + table = "e2e_upsert_table_sink" + generate_sink_sql = true + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/password/password.txt b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/password/password.txt new file mode 100644 index 00000000000..4d4ce72e431 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/resources/password/password.txt @@ -0,0 +1 @@ +Seatunnel