From a34d25916dfcf5bcf4a9a9aadbb1856896ae7c19 Mon Sep 17 00:00:00 2001 From: useheart Date: Thu, 9 May 2024 10:13:43 +0800 Subject: [PATCH 01/10] [HotFix][zeta]: Fix loss job (#6759) (#6807) --- .../engine/server/master/JobHistoryService.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java index f3905a9e92e..0406bf07c65 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java @@ -104,16 +104,17 @@ public JobHistoryService( this.finishedJobExpireTime = finishedJobExpireTime; } - // Gets the status of a running and completed job + // Gets the status of a running and completed job. public String listAllJob() { List status = new ArrayList<>(); - Set runningJonIds = + final List runningJobStateList = runningJobMasterMap.values().stream() - .map(master -> master.getJobImmutableInformation().getJobId()) - .collect(Collectors.toSet()); + .map(master -> toJobStateMapper(master, true)) + .collect(Collectors.toList()); + Set runningJonIds = + runningJobStateList.stream().map(JobState::getJobId).collect(Collectors.toSet()); Stream.concat( - runningJobMasterMap.values().stream() - .map(master -> toJobStateMapper(master, true)), + runningJobStateList.stream(), finishedJobStateImap.values().stream() .filter(jobState -> !runningJonIds.contains(jobState.getJobId()))) .forEach( From 42572f03c91937ea67477230cd9bbc3083b2c75f Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 9 May 2024 16:38:40 +0800 Subject: [PATCH 02/10] [Improve] Update doris sink template default value in doc (#6810) --- docs/en/connector-v2/sink/Doris.md | 1 + docs/zh/connector-v2/sink/Doris.md | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index e3463d99e54..44f65a8fa74 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -77,6 +77,7 @@ Default template: ```sql CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( +${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) diff --git a/docs/zh/connector-v2/sink/Doris.md b/docs/zh/connector-v2/sink/Doris.md index 754455544ff..afc470326f5 100644 --- a/docs/zh/connector-v2/sink/Doris.md +++ b/docs/zh/connector-v2/sink/Doris.md @@ -77,6 +77,7 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的 ```sql CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( +${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) From fe1ec4a499e5d4c7c87157a59c71ec13f0d91f9d Mon Sep 17 00:00:00 2001 From: dailai Date: Thu, 9 May 2024 16:42:27 +0800 Subject: [PATCH 03/10] [Hotfix][Dist] Add oracle cdc (#6818) --- seatunnel-dist/pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml index b27c99e47c7..6447ff43acb 100644 --- a/seatunnel-dist/pom.xml +++ b/seatunnel-dist/pom.xml @@ -479,6 +479,12 @@ ${project.version} provided + + org.apache.seatunnel + connector-cdc-oracle + ${project.version} + provided + org.apache.seatunnel connector-cdc-mongodb From d826cf9ece03380fc7728d0f11f2572ca38da351 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Thu, 9 May 2024 18:16:07 +0800 Subject: [PATCH 04/10] [Fix] Fix MultiTableWriterRunnable can not catch Throwable error (#6734) --- .../MultiTableWriterRunnable.java | 2 +- .../seatunnel/cdc/mysql/MysqlCDCIT.java | 3 +- .../seatunnel/cdc/oracle/OracleCDCIT.java | 3 +- .../seatunnel/cdc/postgres/PostgresCDCIT.java | 4 +- .../container/AbstractTestContainer.java | 2 + .../sink/inmemory/InMemorySinkFactory.java | 7 ++- .../e2e/sink/inmemory/InMemorySinkWriter.java | 6 ++- .../engine/e2e/CheckpointEnableIT.java | 4 +- .../engine/e2e/JobClientJobProxyIT.java | 22 ++++++-- ...fake_to_inmemory_with_throwable_error.conf | 51 +++++++++++++++++++ 10 files changed, 92 insertions(+), 12 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_throwable_error.conf diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java index 1fa681f0fac..ce22e0e2e20 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableWriterRunnable.java @@ -66,7 +66,7 @@ public void run() { // exception. throwable = e; break; - } catch (Exception e) { + } catch (Throwable e) { log.error("MultiTableWriterRunnable error", e); throwable = e; break; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java index f2d5669c37f..62dc3f077d0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java @@ -360,7 +360,8 @@ public void testMultiTableWithRestore(TestContainer container) Pattern jobIdPattern = Pattern.compile( - ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", Pattern.DOTALL); + ".*Init JobMaster for Job mysqlcdc_to_mysql_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*", + Pattern.DOTALL); Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); String jobId; if (matcher.matches()) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java index ece56800cf6..125d57915c2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-oracle-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/OracleCDCIT.java @@ -405,7 +405,8 @@ public void testMultiTableWithRestore(TestContainer container) Pattern jobIdPattern = Pattern.compile( - ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", Pattern.DOTALL); + ".*Init JobMaster for Job oraclecdc_to_oracle_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*", + Pattern.DOTALL); Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); String jobId; if (matcher.matches()) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java index 0a4be385dad..0ea8f593ee0 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java @@ -307,7 +307,7 @@ public void testMultiTableWithRestore(TestContainer container) Pattern jobIdPattern = Pattern.compile( - ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", + ".*Init JobMaster for Job pgcdc_to_pg_with_multi_table_mode_one_table.conf \\(([0-9]*)\\).*", Pattern.DOTALL); Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); String jobId; @@ -412,7 +412,7 @@ public void testAddFiledWithRestore(TestContainer container) Pattern jobIdPattern = Pattern.compile( - ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", + ".*Init JobMaster for Job postgrescdc_to_postgres_test_add_Filed.conf \\(([0-9]*)\\).*", Pattern.DOTALL); Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); String jobId; diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java index b033144d584..d7bd0f4d747 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/AbstractTestContainer.java @@ -116,6 +116,8 @@ protected Container.ExecResult executeJob( command.add(adaptPathForWin(binPath)); command.add("--config"); command.add(adaptPathForWin(confInContainerPath)); + command.add("--name"); + command.add(new File(confInContainerPath).getName()); List extraStartShellCommands = new ArrayList<>(getExtraStartShellCommands()); if (variables != null && !variables.isEmpty()) { variables.forEach( diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java index 95778b0f767..16f1c0dc448 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java @@ -35,6 +35,9 @@ public class InMemorySinkFactory public static final Option THROW_EXCEPTION = Options.key("throw_exception").booleanType().defaultValue(false); + + public static final Option THROW_OUT_OF_MEMORY = + Options.key("throw_out_of_memory").booleanType().defaultValue(false); public static final Option CHECKPOINT_SLEEP = Options.key("checkpoint_sleep").booleanType().defaultValue(false); @@ -45,7 +48,9 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().optional(THROW_EXCEPTION, CHECKPOINT_SLEEP).build(); + return OptionRule.builder() + .optional(THROW_EXCEPTION, THROW_OUT_OF_MEMORY, CHECKPOINT_SLEEP) + .build(); } @Override diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java index b25c2ff095d..a12b2ca5b99 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkWriter.java @@ -76,7 +76,11 @@ public InMemorySinkWriter(ReadonlyConfig config) { private InMemoryMultiTableResourceManager resourceManager; @Override - public void write(SeaTunnelRow element) throws IOException {} + public void write(SeaTunnelRow element) throws IOException { + if (config.get(InMemorySinkFactory.THROW_OUT_OF_MEMORY)) { + throw new OutOfMemoryError(); + } + } @Override public Optional prepareCommit() throws IOException { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java index 013cfd33de4..66785d5b649 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java @@ -114,7 +114,7 @@ public void testZetaStreamingCheckpointInterval(TestContainer container) () -> { Pattern jobIdPattern = Pattern.compile( - ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", + ".*Init JobMaster for Job stream_fakesource_to_localfile_interval.conf \\(([0-9]*)\\).*", Pattern.DOTALL); Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); if (matcher.matches()) { @@ -181,7 +181,7 @@ public void testZetaStreamingCheckpointNoInterval(TestContainer container) () -> { Pattern jobIdPattern = Pattern.compile( - ".*Init JobMaster for Job SeaTunnel_Job \\(([0-9]*)\\).*", + ".*Init JobMaster for Job stream_fakesource_to_localfile.conf \\(([0-9]*)\\).*", Pattern.DOTALL); Matcher matcher = jobIdPattern.matcher(container.getServerLogs()); if (matcher.matches()) { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java index 0bdf18c68e4..bcbf40b9f79 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java @@ -45,13 +45,29 @@ public void testJobRetryTimes() throws IOException, InterruptedException { Container.ExecResult execResult = executeJob(server, "/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf"); Assertions.assertNotEquals(0, execResult.getExitCode()); - Assertions.assertTrue(server.getLogs().contains("Restore time 1, pipeline")); - Assertions.assertFalse(server.getLogs().contains("Restore time 3, pipeline")); + Assertions.assertTrue( + server.getLogs() + .contains( + "Restore time 1, pipeline Job stream_fake_to_inmemory_with_error_retry_1.conf")); + Assertions.assertFalse( + server.getLogs() + .contains( + "Restore time 3, pipeline Job stream_fake_to_inmemory_with_error_retry_1.conf")); Container.ExecResult execResult2 = executeJob(server, "/retry-times/stream_fake_to_inmemory_with_error.conf"); Assertions.assertNotEquals(0, execResult2.getExitCode()); - Assertions.assertTrue(server.getLogs().contains("Restore time 3, pipeline")); + Assertions.assertTrue( + server.getLogs() + .contains( + "Restore time 3, pipeline Job stream_fake_to_inmemory_with_error.conf")); + } + + @Test + public void testMultiTableSinkFailedWithThrowable() throws IOException, InterruptedException { + Container.ExecResult execResult = + executeJob(server, "/stream_fake_to_inmemory_with_throwable_error.conf"); + Assertions.assertNotEquals(0, execResult.getExitCode()); } @Test diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_throwable_error.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_throwable_error.conf new file mode 100644 index 00000000000..fd0605b82c8 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/stream_fake_to_inmemory_with_throwable_error.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "STREAMING" + checkpoint.interval = 5000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + result_table_name = "fake" + row.num = 100 + split.num = 5 + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + InMemory { + source_table_name="fake" + throw_out_of_memory=true + } +} \ No newline at end of file From 27dcd80be5e520bd5674698d7e4294a0d28bc0f6 Mon Sep 17 00:00:00 2001 From: hawk9821 <39961809+hawk9821@users.noreply.github.com> Date: Thu, 9 May 2024 19:21:14 +0800 Subject: [PATCH 05/10] [Feature][Engine] Change the name of the rest-api interface for returning job info (#6813) --- docs/en/seatunnel-engine/rest-api.md | 59 ++++++++++++++++++- docs/zh/seatunnel-engine/rest-api.md | 57 ++++++++++++++++++ .../seatunnel/engine/e2e/RestApiIT.java | 21 +++++++ .../engine/server/rest/RestConstant.java | 4 +- .../rest/RestHttpGetCommandProcessor.java | 35 +++++------ 5 files changed, 154 insertions(+), 22 deletions(-) diff --git a/docs/en/seatunnel-engine/rest-api.md b/docs/en/seatunnel-engine/rest-api.md index 3f1069b566b..0a0c1605876 100644 --- a/docs/en/seatunnel-engine/rest-api.md +++ b/docs/en/seatunnel-engine/rest-api.md @@ -77,7 +77,64 @@ network: ### Return details of a job.
- GET /hazelcast/rest/maps/running-job/:jobId (Return details of a job.) + GET /hazelcast/rest/maps/job-info/:jobId (Return details of a job. ) + +#### Parameters + +> | name | type | data type | description | +> |-------|----------|-----------|-------------| +> | jobId | required | long | job id | + +#### Responses + +```json +{ + "jobId": "", + "jobName": "", + "jobStatus": "", + "createTime": "", + "jobDag": { + "vertices": [ + ], + "edges": [ + ] + }, + "metrics": { + "sourceReceivedCount": "", + "sinkWriteCount": "" + }, + "finishedTime": "", + "errorMsg": null, + "envOptions": { + }, + "pluginJarsUrls": [ + ], + "isStartWithSavePoint": false +} +``` + +`jobId`, `jobName`, `jobStatus`, `createTime`, `jobDag`, `metrics` always be returned. +`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` will return when job is running. +`finishedTime`, `errorMsg` will return when job is finished. + +When we can't get the job info, the response will be: + +```json +{ + "jobId" : "" +} +``` + +
+ +------------------------------------------------------------------------------------------ + +### Return details of a job. + +This API has been deprecated, please use /hazelcast/rest/maps/job-info/:jobId instead + +
+ GET /hazelcast/rest/maps/running-job/:jobId (Return details of a job. ) #### Parameters diff --git a/docs/zh/seatunnel-engine/rest-api.md b/docs/zh/seatunnel-engine/rest-api.md index 28a81c548d2..ee9a1511a95 100644 --- a/docs/zh/seatunnel-engine/rest-api.md +++ b/docs/zh/seatunnel-engine/rest-api.md @@ -75,6 +75,63 @@ network: ### 返回作业的详细信息。 +
+ GET /hazelcast/rest/maps/job-info/:jobId (返回作业的详细信息。) + +#### 参数 + +> | name | type | data type | description | +> |-------|----------|-----------|-------------| +> | jobId | required | long | job id | + +#### 响应 + +```json +{ + "jobId": "", + "jobName": "", + "jobStatus": "", + "createTime": "", + "jobDag": { + "vertices": [ + ], + "edges": [ + ] + }, + "metrics": { + "sourceReceivedCount": "", + "sinkWriteCount": "" + }, + "finishedTime": "", + "errorMsg": null, + "envOptions": { + }, + "pluginJarsUrls": [ + ], + "isStartWithSavePoint": false +} +``` + +`jobId`, `jobName`, `jobStatus`, `createTime`, `jobDag`, `metrics` 字段总会返回. +`envOptions`, `pluginJarsUrls`, `isStartWithSavePoint` 字段在Job在RUNNING状态时会返回 +`finishedTime`, `errorMsg` 字段在Job结束时会返回,结束状态为不为RUNNING,可能为FINISHED,可能为CANCEL + +当我们查询不到这个Job时,返回结果为: + +```json +{ + "jobId" : "" +} +``` + +
+ +------------------------------------------------------------------------------------------ + +### 返回作业的详细信息 + +此API已经弃用,请使用/hazelcast/rest/maps/job-info/:jobId替代。 +
GET /hazelcast/rest/maps/running-job/:jobId (返回作业的详细信息。) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index c7be274ad2a..3569fb4b11f 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -197,6 +197,27 @@ public void testGetRunningJobs() { }); } + @Test + public void testGetJobInfoByJobId() { + Arrays.asList(node2, node1) + .forEach( + instance -> { + given().get( + HOST + + instance.getCluster() + .getLocalMember() + .getAddress() + .getPort() + + RestConstant.JOB_INFO_URL + + "/" + + batchJobProxy.getJobId()) + .then() + .statusCode(200) + .body("jobName", equalTo("fake_to_console")) + .body("jobStatus", equalTo("FINISHED")); + }); + } + @Test public void testGetRunningThreads() { Arrays.asList(node2, node1) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java index a7e93f25519..6daa817a48c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java @@ -45,8 +45,8 @@ public class RestConstant { public static final String METRICS = "metrics"; public static final String RUNNING_JOBS_URL = "/hazelcast/rest/maps/running-jobs"; - public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; - + @Deprecated public static final String RUNNING_JOB_URL = "/hazelcast/rest/maps/running-job"; + public static final String JOB_INFO_URL = "/hazelcast/rest/maps/job-info"; public static final String FINISHED_JOBS_INFO = "/hazelcast/rest/maps/finished-jobs"; public static final String SUBMIT_JOB_URL = "/hazelcast/rest/maps/submit-job"; public static final String ENCRYPT_CONFIG = "/hazelcast/rest/maps/encrypt-config"; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java index 25cd51474f1..b4110f46fff 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java @@ -63,6 +63,7 @@ import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500; import static org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO; +import static org.apache.seatunnel.engine.server.rest.RestConstant.JOB_INFO_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOBS_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_JOB_URL; import static org.apache.seatunnel.engine.server.rest.RestConstant.RUNNING_THREADS; @@ -99,7 +100,7 @@ public void handle(HttpGetCommand httpGetCommand) { handleRunningJobsInfo(httpGetCommand); } else if (uri.startsWith(FINISHED_JOBS_INFO)) { handleFinishedJobsInfo(httpGetCommand, uri); - } else if (uri.startsWith(RUNNING_JOB_URL)) { + } else if (uri.startsWith(RUNNING_JOB_URL) || uri.startsWith(JOB_INFO_URL)) { handleJobInfoById(httpGetCommand, uri); } else if (uri.startsWith(SYSTEM_MONITORING_INFORMATION)) { getSystemMonitoringInformation(httpGetCommand); @@ -252,24 +253,20 @@ private void handleJobInfoById(HttpGetCommand command, String uri) { uri = StringUtil.stripTrailingSlash(uri); int indexEnd = uri.indexOf('/', URI_MAPS.length()); String jobId = uri.substring(indexEnd + 1); - - JobInfo jobInfo = - (JobInfo) - this.textCommandService - .getNode() - .getNodeEngine() - .getHazelcastInstance() - .getMap(Constant.IMAP_RUNNING_JOB_INFO) - .get(Long.valueOf(jobId)); - - JobState finishedJobState = - (JobState) - this.textCommandService - .getNode() - .getNodeEngine() - .getHazelcastInstance() - .getMap(Constant.IMAP_FINISHED_JOB_STATE) - .get(Long.valueOf(jobId)); + IMap jobInfoMap = + this.textCommandService + .getNode() + .getNodeEngine() + .getHazelcastInstance() + .getMap(Constant.IMAP_RUNNING_JOB_INFO); + JobInfo jobInfo = (JobInfo) jobInfoMap.get(Long.valueOf(jobId)); + IMap finishedJobStateMap = + this.textCommandService + .getNode() + .getNodeEngine() + .getHazelcastInstance() + .getMap(Constant.IMAP_FINISHED_JOB_STATE); + JobState finishedJobState = (JobState) finishedJobStateMap.get(Long.valueOf(jobId)); if (!jobId.isEmpty() && jobInfo != null) { this.prepareResponse(command, convertToJson(jobInfo, Long.parseLong(jobId))); } else if (!jobId.isEmpty() && finishedJobState != null) { From 9a49c881d69394ad5ed8a306d671cd754a5f2b31 Mon Sep 17 00:00:00 2001 From: Eric Date: Thu, 9 May 2024 21:01:47 +0800 Subject: [PATCH 06/10] [CI] Fix FixSlotResourceTest testNotEnoughResource test error (#6820) --- .../resourcemanager/AbstractResourceManager.java | 7 +++++++ .../server/resourcemanager/ResourceManager.java | 2 ++ .../resourcemanager/FixSlotResourceTest.java | 14 ++++++++++++++ 3 files changed, 23 insertions(+) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java index 8b7e0b18643..2caa6e68166 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/AbstractResourceManager.java @@ -216,4 +216,11 @@ public void heartbeat(WorkerProfile workerProfile) { } registerWorker.put(workerProfile.getAddress(), workerProfile); } + + @Override + public List getUnassignedSlots() { + return registerWorker.values().stream() + .flatMap(workerProfile -> Arrays.stream(workerProfile.getUnassignedSlots())) + .collect(Collectors.toList()); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java index 4a47517bfda..ca668482aac 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/resourcemanager/ResourceManager.java @@ -58,4 +58,6 @@ CompletableFuture> applyResources( void memberRemoved(MembershipServiceEvent event); void close(); + + List getUnassignedSlots(); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java index 6c26f2398c1..cf67ec8de0e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/resourcemanager/FixSlotResourceTest.java @@ -29,6 +29,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; public class FixSlotResourceTest extends AbstractSeaTunnelServerTest { @@ -75,6 +78,17 @@ public void testNotEnoughResource() throws ExecutionException, InterruptedExcept } catch (ExecutionException e) { Assertions.assertTrue(e.getMessage().contains("NoEnoughResourceException")); } + // wait for release resource complete + await().atMost(20000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Assertions.assertEquals( + 3, + server.getCoordinatorService() + .getResourceManager() + .getUnassignedSlots() + .size()); + }); resourceProfiles.remove(0); List slotProfiles = server.getCoordinatorService() From f698396555382eaf79e699cd07662de80f820ac0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=80=81=E7=8E=8B?= <58297137+chl-wxp@users.noreply.github.com> Date: Thu, 9 May 2024 21:05:30 +0800 Subject: [PATCH 07/10] [Improve][Connector] Add some sqlserver IDENTITY type for catalog (#6822) --- .../sqlserver/SqlServerTypeConverter.java | 8 ++++ .../sqlserver/SqlServerCatalogTest.java | 2 +- .../sqlserver/SqlServerTypeConverterTest.java | 42 +++++++++++++++++++ 3 files changed, 51 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverter.java index c1bd3afcbd0..1ed6a2da084 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverter.java @@ -39,11 +39,15 @@ public class SqlServerTypeConverter implements TypeConverter { // -------------------------number---------------------------- public static final String SQLSERVER_BIT = "BIT"; public static final String SQLSERVER_TINYINT = "TINYINT"; + public static final String SQLSERVER_TINYINT_IDENTITY = "TINYINT IDENTITY"; public static final String SQLSERVER_SMALLINT = "SMALLINT"; + public static final String SQLSERVER_SMALLINT_IDENTITY = "SMALLINT IDENTITY"; public static final String SQLSERVER_INTEGER = "INTEGER"; + public static final String SQLSERVER_INTEGER_IDENTITY = "INTEGER IDENTITY"; public static final String SQLSERVER_INT = "INT"; private static final String SQLSERVER_INT_IDENTITY = "INT IDENTITY"; public static final String SQLSERVER_BIGINT = "BIGINT"; + public static final String SQLSERVER_BIGINT_IDENTITY = "BIGINT IDENTITY"; public static final String SQLSERVER_DECIMAL = "DECIMAL"; public static final String SQLSERVER_FLOAT = "FLOAT"; public static final String SQLSERVER_REAL = "REAL"; @@ -111,20 +115,24 @@ public Column convert(BasicTypeDefine typeDefine) { builder.dataType(BasicType.BOOLEAN_TYPE); break; case SQLSERVER_TINYINT: + case SQLSERVER_TINYINT_IDENTITY: builder.sourceType(SQLSERVER_TINYINT); builder.dataType(BasicType.SHORT_TYPE); break; case SQLSERVER_SMALLINT: + case SQLSERVER_SMALLINT_IDENTITY: builder.sourceType(SQLSERVER_SMALLINT); builder.dataType(BasicType.SHORT_TYPE); break; case SQLSERVER_INTEGER: + case SQLSERVER_INTEGER_IDENTITY: case SQLSERVER_INT: case SQLSERVER_INT_IDENTITY: builder.sourceType(SQLSERVER_INT); builder.dataType(BasicType.INT_TYPE); break; case SQLSERVER_BIGINT: + case SQLSERVER_BIGINT_IDENTITY: builder.sourceType(SQLSERVER_BIGINT); builder.dataType(BasicType.LONG_TYPE); break; diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java index 5e457910f03..ea305ca0c1f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCatalogTest.java @@ -75,7 +75,7 @@ static void before() { @Test void listDatabases() { - List list = sqlServerCatalog.listDatabases(); + sqlServerCatalog.listDatabases(); } @Test diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverterTest.java index 0756d7c3b32..ac75bda4054 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverterTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerTypeConverterTest.java @@ -81,6 +81,20 @@ public void testConvertTinyint() { Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType().toLowerCase()); } + @Test + public void testConvertTinyintIdentity() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("tinyint identity") + .dataType("tinyint") + .build(); + Column column = SqlServerTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType()); + Assertions.assertEquals(SqlServerTypeConverter.SQLSERVER_TINYINT, column.getSourceType()); + } + @Test public void testConvertSmallint() { BasicTypeDefine typeDefine = @@ -95,6 +109,20 @@ public void testConvertSmallint() { Assertions.assertEquals(typeDefine.getColumnType(), column.getSourceType().toLowerCase()); } + @Test + public void testConvertSmallintIdentity() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("smallint identity") + .dataType("smallint") + .build(); + Column column = SqlServerTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.SHORT_TYPE, column.getDataType()); + Assertions.assertEquals(SqlServerTypeConverter.SQLSERVER_SMALLINT, column.getSourceType()); + } + @Test public void testConvertInt() { BasicTypeDefine typeDefine = @@ -116,6 +144,20 @@ public void testConvertInt() { Assertions.assertEquals("int", column.getSourceType().toLowerCase()); } + @Test + public void testConvertBigintIdentity() { + BasicTypeDefine typeDefine = + BasicTypeDefine.builder() + .name("test") + .columnType("bigint identity") + .dataType("bigint") + .build(); + Column column = SqlServerTypeConverter.INSTANCE.convert(typeDefine); + Assertions.assertEquals(typeDefine.getName(), column.getName()); + Assertions.assertEquals(BasicType.LONG_TYPE, column.getDataType()); + Assertions.assertEquals(SqlServerTypeConverter.SQLSERVER_BIGINT, column.getSourceType()); + } + @Test public void testConvertBigint() { BasicTypeDefine typeDefine = From 868ba4d7c76eab8619798b4f90afc4c188c3997d Mon Sep 17 00:00:00 2001 From: hailin0 Date: Thu, 9 May 2024 22:16:36 +0800 Subject: [PATCH 08/10] [Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read (#6684) --- .../AbstractJdbcSourceChunkSplitter.java | 22 +++--- .../splitter/JdbcSourceChunkSplitter.java | 69 +++++++++++++---- .../source/JdbcSourceChunkSplitterTest.java | 2 +- .../AbstractJdbcSourceChunkSplitterTest.java | 3 +- .../source/eumerator/MySqlChunkSplitter.java | 8 +- .../source/eumerator/OracleChunkSplitter.java | 8 +- .../enumerator/PostgresChunkSplitter.java | 49 +++++++++--- .../PostgresSnapshotSplitReadTask.java | 2 +- .../cdc/postgres/utils/PostgresUtils.java | 77 +++++++++++++------ .../cdc/postgres/utils/PostgresUtilsTest.java | 25 ++++-- .../eumerator/SqlServerChunkSplitter.java | 9 +-- .../jdbc/internal/JdbcInputFormat.java | 2 +- .../jdbc/internal/dialect/JdbcDialect.java | 15 ++++ .../dialect/psql/PostgresDialect.java | 75 +++++++++++++++++- .../seatunnel/jdbc/source/ChunkSplitter.java | 21 ++++- .../jdbc/source/DynamicChunkSplitter.java | 44 ++++++----- .../jdbc/source/FixedChunkSplitter.java | 18 ++++- .../jdbc/source/DynamicChunkSplitterTest.java | 43 ++++++++++- 18 files changed, 376 insertions(+), 116 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java index f124e5fc715..e10c70795b1 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitter.java @@ -116,7 +116,7 @@ public Collection generateSplits(TableId tableId) { private List splitTableIntoChunks( JdbcConnection jdbc, TableId tableId, Column splitColumn) throws SQLException { final String splitColumnName = splitColumn.name(); - final Object[] minMax = queryMinMax(jdbc, tableId, splitColumnName); + final Object[] minMax = queryMinMax(jdbc, tableId, splitColumn); final Object min = minMax[0]; final Object max = minMax[1]; if (min == null || max == null || min.equals(max)) { @@ -177,8 +177,7 @@ private List splitTableIntoChunks( tableId, inverseSamplingRate); Object[] sample = - sampleDataFromColumn( - jdbc, tableId, splitColumnName, inverseSamplingRate); + sampleDataFromColumn(jdbc, tableId, splitColumn, inverseSamplingRate); log.info( "Sample data from table {} end, the sample size is {}", tableId, @@ -186,11 +185,10 @@ private List splitTableIntoChunks( return efficientShardingThroughSampling( tableId, sample, approximateRowCnt, shardCount); } - return splitUnevenlySizedChunks( - jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } else { - return splitUnevenlySizedChunks(jdbc, tableId, splitColumnName, min, max, chunkSize); + return splitUnevenlySizedChunks(jdbc, tableId, splitColumn, min, max, chunkSize); } } @@ -198,7 +196,7 @@ private List splitTableIntoChunks( protected List splitUnevenlySizedChunks( JdbcConnection jdbc, TableId tableId, - String splitColumnName, + Column splitColumn, Object min, Object max, int chunkSize) @@ -207,7 +205,7 @@ protected List splitUnevenlySizedChunks( "Use unevenly-sized chunks for table {}, the chunk size is {}", tableId, chunkSize); final List splits = new ArrayList<>(); Object chunkStart = null; - Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumnName, max, chunkSize); + Object chunkEnd = nextChunkEnd(jdbc, min, tableId, splitColumn, max, chunkSize); int count = 0; while (chunkEnd != null && ObjectCompare(chunkEnd, max) <= 0) { // we start from [null, min + chunk_size) and avoid [null, min) @@ -215,7 +213,7 @@ protected List splitUnevenlySizedChunks( // may sleep a while to avoid DDOS on MySQL server maySleep(count++, tableId); chunkStart = chunkEnd; - chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumnName, max, chunkSize); + chunkEnd = nextChunkEnd(jdbc, chunkEnd, tableId, splitColumn, max, chunkSize); } // add the ending split splits.add(ChunkRange.of(chunkStart, null)); @@ -226,17 +224,17 @@ protected Object nextChunkEnd( JdbcConnection jdbc, Object previousChunkEnd, TableId tableId, - String splitColumnName, + Column splitColumn, Object max, int chunkSize) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = - queryNextChunkMax(jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + queryNextChunkMax(jdbc, tableId, splitColumn, chunkSize, previousChunkEnd); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = queryMin(jdbc, tableId, splitColumn, chunkEnd); } if (ObjectCompare(chunkEnd, max) >= 0) { return null; diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java index b271be0d765..3981ddfa7c5 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/JdbcSourceChunkSplitter.java @@ -23,6 +23,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import java.sql.SQLException; @@ -35,16 +36,29 @@ public interface JdbcSourceChunkSplitter extends ChunkSplitter { @Override Collection generateSplits(TableId tableId); + /** @deprecated instead by {@link this#queryMinMax(JdbcConnection, TableId, Column)} */ + @Deprecated + Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + throws SQLException; + /** * Query the maximum and minimum value of the column in the table. e.g. query string * SELECT MIN(%s) FROM %s WHERE %s > ? * * @param jdbc JDBC connection. * @param tableId table identity. - * @param columnName column name. + * @param column column. * @return maximum and minimum value. */ - Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + default Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) + throws SQLException { + return queryMinMax(jdbc, tableId, column.name()); + } + + /** @deprecated instead by {@link this#queryMin(JdbcConnection, TableId, Column, Object)} */ + @Deprecated + Object queryMin( + JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) throws SQLException; /** @@ -54,12 +68,19 @@ Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) * * @param jdbc JDBC connection. * @param tableId table identity. - * @param columnName column name. + * @param column column. * @param excludedLowerBound the minimum value should be greater than this value. * @return minimum value. */ - Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + default Object queryMin( + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) + throws SQLException { + return queryMin(jdbc, tableId, column.name(), excludedLowerBound); + } + + @Deprecated + Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate) throws SQLException; /** @@ -68,14 +89,29 @@ Object queryMin( * * @param jdbc The JDBC connection object used to connect to the database. * @param tableId The ID of the table in which the column resides. - * @param columnName The name of the column to be sampled. + * @param column The column to be sampled. * @param samplingRate samplingRate The inverse of the fraction of the data to be sampled from * the column. For example, a value of 1000 would mean 1/1000 of the data will be sampled. * @return Returns a List of sampled data from the specified column. * @throws SQLException If an SQL error occurs during the sampling operation. */ - Object[] sampleDataFromColumn( - JdbcConnection jdbc, TableId tableId, String columnName, int samplingRate) + default Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, Column column, int samplingRate) + throws SQLException { + return sampleDataFromColumn(jdbc, tableId, column.name(), samplingRate); + } + + /** + * @deprecated instead by {@link this#queryNextChunkMax(JdbcConnection, TableId, Column, int, + * Object)} + */ + @Deprecated + Object queryNextChunkMax( + JdbcConnection jdbc, + TableId tableId, + String columnName, + int chunkSize, + Object includedLowerBound) throws SQLException; /** @@ -85,18 +121,20 @@ Object[] sampleDataFromColumn( * * @param jdbc JDBC connection. * @param tableId table identity. - * @param columnName column name. + * @param column column. * @param chunkSize chunk size. * @param includedLowerBound the previous chunk end value. * @return next chunk end value. */ - Object queryNextChunkMax( + default Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, - String columnName, + Column column, int chunkSize, Object includedLowerBound) - throws SQLException; + throws SQLException { + return queryNextChunkMax(jdbc, tableId, column.name(), chunkSize, includedLowerBound); + } /** * Approximate total number of entries in the lookup table. @@ -110,17 +148,14 @@ Object queryNextChunkMax( /** * Build the scan query sql of the {@link SnapshotSplit}. * - * @param tableId table identity. + * @param table table. * @param splitKeyType primary key type. * @param isFirstSplit whether the first split. * @param isLastSplit whether the last split. * @return query sql. */ String buildSplitScanQuery( - TableId tableId, - SeaTunnelRowType splitKeyType, - boolean isFirstSplit, - boolean isLastSplit); + Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit); /** * Checks whether split column is evenly distributed across its range. diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java index 86500f248f3..32617fe18c0 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/jdbc/source/JdbcSourceChunkSplitterTest.java @@ -106,7 +106,7 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) @Override public String buildSplitScanQuery( - TableId tableId, + Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java index 076bafae159..6f646eb6bed 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/test/java/org/apache/seatunnel/connectors/cdc/base/source/enumerator/splitter/AbstractJdbcSourceChunkSplitterTest.java @@ -24,6 +24,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import java.sql.SQLException; @@ -217,7 +218,7 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) @Override public String buildSplitScanQuery( - TableId tableId, + Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java index c078f7cf28c..b4982f2cbeb 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/eumerator/MySqlChunkSplitter.java @@ -27,6 +27,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; @@ -79,11 +80,8 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws @Override public String buildSplitScanQuery( - TableId tableId, - SeaTunnelRowType splitKeyType, - boolean isFirstSplit, - boolean isLastSplit) { - return MySqlUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); + Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { + return MySqlUtils.buildSplitScanQuery(table.id(), splitKeyType, isFirstSplit, isLastSplit); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java index 8500c0c055b..52df70cbc89 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/eumerator/OracleChunkSplitter.java @@ -28,6 +28,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; import oracle.sql.ROWID; @@ -84,11 +85,8 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws @Override public String buildSplitScanQuery( - TableId tableId, - SeaTunnelRowType splitKeyType, - boolean isFirstSplit, - boolean isLastSplit) { - return OracleUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); + Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { + return OracleUtils.buildSplitScanQuery(table.id(), splitKeyType, isFirstSplit, isLastSplit); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java index db1109a453c..2aab573d2e8 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/enumerator/PostgresChunkSplitter.java @@ -27,6 +27,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; @@ -43,14 +44,27 @@ public PostgresChunkSplitter(JdbcSourceConfig sourceConfig, JdbcDataSourceDialec @Override public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) throws SQLException { - return PostgresUtils.queryMinMax(jdbc, tableId, columnName); + return PostgresUtils.queryMinMax(jdbc, tableId, columnName, null); + } + + @Override + public Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, Column column) + throws SQLException { + return PostgresUtils.queryMinMax(jdbc, tableId, column.name(), column); } @Override public Object queryMin( JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) throws SQLException { - return PostgresUtils.queryMin(jdbc, tableId, columnName, excludedLowerBound); + return PostgresUtils.queryMin(jdbc, tableId, columnName, null, excludedLowerBound); + } + + @Override + public Object queryMin( + JdbcConnection jdbc, TableId tableId, Column column, Object excludedLowerBound) + throws SQLException { + return PostgresUtils.queryMin(jdbc, tableId, column.name(), column, excludedLowerBound); } @Override @@ -58,7 +72,15 @@ public Object[] sampleDataFromColumn( JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) throws SQLException { return PostgresUtils.skipReadAndSortSampleData( - jdbc, tableId, columnName, inverseSamplingRate); + jdbc, tableId, columnName, null, inverseSamplingRate); + } + + @Override + public Object[] sampleDataFromColumn( + JdbcConnection jdbc, TableId tableId, Column column, int inverseSamplingRate) + throws SQLException { + return PostgresUtils.skipReadAndSortSampleData( + jdbc, tableId, column.name(), column, inverseSamplingRate); } @Override @@ -70,7 +92,19 @@ public Object queryNextChunkMax( Object includedLowerBound) throws SQLException { return PostgresUtils.queryNextChunkMax( - jdbc, tableId, columnName, chunkSize, includedLowerBound); + jdbc, tableId, columnName, null, chunkSize, includedLowerBound); + } + + @Override + public Object queryNextChunkMax( + JdbcConnection jdbc, + TableId tableId, + Column column, + int chunkSize, + Object includedLowerBound) + throws SQLException { + return PostgresUtils.queryNextChunkMax( + jdbc, tableId, column.name(), column, chunkSize, includedLowerBound); } @Override @@ -80,11 +114,8 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws @Override public String buildSplitScanQuery( - TableId tableId, - SeaTunnelRowType splitKeyType, - boolean isFirstSplit, - boolean isLastSplit) { - return PostgresUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); + Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { + return PostgresUtils.buildSplitScanQuery(table, splitKeyType, isFirstSplit, isLastSplit); } @Override diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java index abf9c6fafec..dc2c52ccca6 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/reader/snapshot/PostgresSnapshotSplitReadTask.java @@ -179,7 +179,7 @@ private void createDataEventsForTable( final String selectSql = PostgresUtils.buildSplitScanQuery( - snapshotSplit.getTableId(), + table, snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, snapshotSplit.getSplitEnd() == null); diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java index 576c7fb5363..b5cd0904532 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtils.java @@ -23,6 +23,8 @@ import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset; import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils; import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.offset.LsnOffset; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialect; import org.apache.kafka.connect.source.SourceRecord; @@ -51,15 +53,20 @@ @Slf4j public class PostgresUtils { private static final int DEFAULT_FETCH_SIZE = 1024; + private static final JdbcDialect JDBC_DIALECT = new PostgresDialect(); private PostgresUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public static Object[] queryMinMax( + JdbcConnection jdbc, TableId tableId, String columnName, Column column) throws SQLException { + columnName = quote(columnName); + if (column != null) { + columnName = JDBC_DIALECT.convertType(columnName, column.typeName()); + } final String minMaxQuery = String.format( - "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quote(tableId)); + "SELECT MIN(%s), MAX(%s) FROM %s", columnName, columnName, quote(tableId)); return jdbc.queryAndMap( minMaxQuery, rs -> { @@ -96,12 +103,20 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) } public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, + TableId tableId, + String columnName, + Column column, + Object excludedLowerBound) throws SQLException { + columnName = quote(columnName); + if (column != null) { + columnName = JDBC_DIALECT.convertType(columnName, column.typeName()); + } final String minQuery = String.format( "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quote(tableId), quote(columnName)); + columnName, quote(tableId), columnName); return jdbc.prepareQueryAndMap( minQuery, ps -> ps.setObject(1, excludedLowerBound), @@ -141,10 +156,17 @@ public static Object[] sampleDataFromColumn( } public static Object[] skipReadAndSortSampleData( - JdbcConnection jdbc, TableId tableId, String columnName, int inverseSamplingRate) + JdbcConnection jdbc, + TableId tableId, + String columnName, + Column column, + int inverseSamplingRate) throws SQLException { - final String sampleQuery = - String.format("SELECT %s FROM %s", quote(columnName), quote(tableId)); + columnName = quote(columnName); + if (column != null) { + columnName = JDBC_DIALECT.convertType(columnName, column.typeName()); + } + final String sampleQuery = String.format("SELECT %s FROM %s", columnName, quote(tableId)); Statement stmt = null; ResultSet rs = null; @@ -198,10 +220,14 @@ public static Object queryNextChunkMax( JdbcConnection jdbc, TableId tableId, String splitColumnName, + Column splitColumn, int chunkSize, Object includedLowerBound) throws SQLException { String quotedColumn = quote(splitColumnName); + if (splitColumn != null) { + quotedColumn = JDBC_DIALECT.convertType(quotedColumn, splitColumn.typeName()); + } String query = String.format( "SELECT MAX(%s) FROM (" @@ -273,8 +299,8 @@ public static LsnOffset currentLsn(PostgresConnection connection) { /** Get split scan query for the given table. */ public static String buildSplitScanQuery( - TableId tableId, SeaTunnelRowType rowType, boolean isFirstSplit, boolean isLastSplit) { - return buildSplitQuery(tableId, rowType, isFirstSplit, isLastSplit, -1, true); + Table table, SeaTunnelRowType rowType, boolean isFirstSplit, boolean isLastSplit) { + return buildSplitQuery(table, rowType, isFirstSplit, isLastSplit, -1, true); } /** Get table split data PreparedStatement. */ @@ -328,7 +354,7 @@ private static String getPrimaryKeyColumnsProjection(SeaTunnelRowType rowType) { } private static String buildSplitQuery( - TableId tableId, + Table table, SeaTunnelRowType rowType, boolean isFirstSplit, boolean isLastSplit, @@ -340,37 +366,37 @@ private static String buildSplitQuery( condition = null; } else if (isFirstSplit) { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?"); + addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?"); if (isScanningData) { sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(rowType, sql, " = ?"); + addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?"); sql.append(")"); } condition = sql.toString(); } else if (isLastSplit) { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?"); + addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?"); condition = sql.toString(); } else { final StringBuilder sql = new StringBuilder(); - addPrimaryKeyColumnsToCondition(rowType, sql, " >= ?"); + addPrimaryKeyColumnsToCondition(table, rowType, sql, " >= ?"); if (isScanningData) { sql.append(" AND NOT ("); - addPrimaryKeyColumnsToCondition(rowType, sql, " = ?"); + addPrimaryKeyColumnsToCondition(table, rowType, sql, " = ?"); sql.append(")"); } sql.append(" AND "); - addPrimaryKeyColumnsToCondition(rowType, sql, " <= ?"); + addPrimaryKeyColumnsToCondition(table, rowType, sql, " <= ?"); condition = sql.toString(); } if (isScanningData) { return buildSelectWithRowLimits( - tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); + table.id(), limitSize, "*", Optional.ofNullable(condition), Optional.empty()); } else { final String orderBy = String.join(", ", rowType.getFieldNames()); return buildSelectWithBoundaryRowLimits( - tableId, + table.id(), limitSize, getPrimaryKeyColumnsProjection(rowType), getMaxPrimaryKeyColumnsProjection(rowType), @@ -441,11 +467,14 @@ public static String quote(TableId tableId) { } private static void addPrimaryKeyColumnsToCondition( - SeaTunnelRowType rowType, StringBuilder sql, String predicate) { - for (Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator(); - fieldNamesIt.hasNext(); ) { - sql.append(quote(fieldNamesIt.next())).append(predicate); - if (fieldNamesIt.hasNext()) { + Table table, SeaTunnelRowType rowType, StringBuilder sql, String predicate) { + for (int i = 0; i < rowType.getTotalFields(); i++) { + String fieldName = quote(rowType.getFieldName(i)); + fieldName = + JDBC_DIALECT.convertType( + fieldName, table.columnWithName(rowType.getFieldName(i)).typeName()); + sql.append(fieldName).append(predicate); + if (i < rowType.getTotalFields() - 1) { sql.append(" AND "); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java index e8e5bb22d2c..6ce08f953c6 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresUtilsTest.java @@ -24,14 +24,21 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; public class PostgresUtilsTest { @Test public void testSplitScanQuery() { + Table table = + Table.editor() + .tableId(TableId.parse("db1.schema1.table1")) + .addColumn(Column.editor().name("id").type("int8").create()) + .create(); String splitScanSQL = PostgresUtils.buildSplitScanQuery( - TableId.parse("db1.schema1.table1"), + table, new SeaTunnelRowType( new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), false, @@ -42,7 +49,7 @@ public void testSplitScanQuery() { splitScanSQL = PostgresUtils.buildSplitScanQuery( - TableId.parse("db1.schema1.table1"), + table, new SeaTunnelRowType( new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), true, @@ -51,7 +58,7 @@ public void testSplitScanQuery() { splitScanSQL = PostgresUtils.buildSplitScanQuery( - TableId.parse("db1.schema1.table1"), + table, new SeaTunnelRowType( new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), true, @@ -60,14 +67,20 @@ public void testSplitScanQuery() { "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" <= ? AND NOT (\"id\" = ?)", splitScanSQL); + table = + Table.editor() + .tableId(TableId.parse("db1.schema1.table1")) + .addColumn(Column.editor().name("id").type("uuid").create()) + .create(); splitScanSQL = PostgresUtils.buildSplitScanQuery( - TableId.parse("db1.schema1.table1"), + table, new SeaTunnelRowType( - new String[] {"id"}, new SeaTunnelDataType[] {BasicType.LONG_TYPE}), + new String[] {"id"}, + new SeaTunnelDataType[] {BasicType.STRING_TYPE}), false, true); Assertions.assertEquals( - "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\" >= ?", splitScanSQL); + "SELECT * FROM \"schema1\".\"table1\" WHERE \"id\"::text >= ?", splitScanSQL); } } diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java index 1dc97020be5..b6698f53190 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/eumerator/SqlServerChunkSplitter.java @@ -27,6 +27,7 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.Column; +import io.debezium.relational.Table; import io.debezium.relational.TableId; import lombok.extern.slf4j.Slf4j; @@ -80,11 +81,9 @@ public Long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws @Override public String buildSplitScanQuery( - TableId tableId, - SeaTunnelRowType splitKeyType, - boolean isFirstSplit, - boolean isLastSplit) { - return SqlServerUtils.buildSplitScanQuery(tableId, splitKeyType, isFirstSplit, isLastSplit); + Table table, SeaTunnelRowType splitKeyType, boolean isFirstSplit, boolean isLastSplit) { + return SqlServerUtils.buildSplitScanQuery( + table.id(), splitKeyType, isFirstSplit, isLastSplit); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java index c2fec613414..8588ef16df1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/JdbcInputFormat.java @@ -95,7 +95,7 @@ public void open(JdbcSourceSplit inputSplit) throws IOException { splitTableSchema = tables.get(inputSplit.getTablePath()).getTableSchema(); splitTableId = inputSplit.getTablePath().toString(); - statement = chunkSplitter.generateSplitStatement(inputSplit); + statement = chunkSplitter.generateSplitStatement(inputSplit, splitTableSchema); resultSet = statement.executeQuery(); hasNext = resultSet.next(); } catch (SQLException se) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java index ea284e3fd9b..db9b90daded 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/JdbcDialect.java @@ -75,6 +75,10 @@ public interface JdbcDialect extends Serializable { */ JdbcDialectTypeMapper getJdbcDialectTypeMapper(); + default String hashModForField(String nativeType, String fieldName, int mod) { + return hashModForField(fieldName, mod); + } + default String hashModForField(String fieldName, int mod) { return "ABS(MD5(" + quoteIdentifier(fieldName) + ") % " + mod + ")"; } @@ -405,4 +409,15 @@ default JdbcConnectionProvider getJdbcConnectionProvider( JdbcConnectionConfig jdbcConnectionConfig) { return new SimpleJdbcConnectionProvider(jdbcConnectionConfig); } + + /** + * Cast column type e.g. CAST(column AS type) + * + * @param columnName + * @param columnType + * @return the text of converted column type. + */ + default String convertType(String columnName, String columnType) { + return columnName; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java index 82d02119d1a..a2cd997e006 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/psql/PostgresDialect.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; @@ -36,6 +37,7 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.Arrays; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; @@ -68,9 +70,73 @@ public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { return new PostgresTypeMapper(); } + @Override + public String hashModForField(String nativeType, String fieldName, int mod) { + String quoteFieldName = quoteIdentifier(fieldName); + if (StringUtils.isNotBlank(nativeType)) { + quoteFieldName = convertType(quoteFieldName, nativeType); + } + return "(ABS(HASHTEXT(" + quoteFieldName + ")) % " + mod + ")"; + } + @Override public String hashModForField(String fieldName, int mod) { - return "(ABS(HASHTEXT(" + quoteIdentifier(fieldName) + ")) % " + mod + ")"; + return hashModForField(null, fieldName, mod); + } + + @Override + public Object queryNextChunkMax( + Connection connection, + JdbcSourceTable table, + String columnName, + int chunkSize, + Object includedLowerBound) + throws SQLException { + Map columns = + table.getCatalogTable().getTableSchema().getColumns().stream() + .collect(Collectors.toMap(c -> c.getName(), c -> c)); + Column column = columns.get(columnName); + + String quotedColumn = quoteIdentifier(columnName); + quotedColumn = convertType(quotedColumn, column.getSourceType()); + String sqlQuery; + if (StringUtils.isNotBlank(table.getQuery())) { + sqlQuery = + String.format( + "SELECT MAX(%s) FROM (" + + "SELECT %s FROM (%s) AS T1 WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + ") AS T2", + quotedColumn, + quotedColumn, + table.getQuery(), + quotedColumn, + quotedColumn, + chunkSize); + } else { + sqlQuery = + String.format( + "SELECT MAX(%s) FROM (" + + "SELECT %s FROM %s WHERE %s >= ? ORDER BY %s ASC LIMIT %s" + + ") AS T", + quotedColumn, + quotedColumn, + tableIdentifier(table.getTablePath()), + quotedColumn, + quotedColumn, + chunkSize); + } + try (PreparedStatement ps = connection.prepareStatement(sqlQuery)) { + ps.setObject(1, includedLowerBound); + try (ResultSet rs = ps.executeQuery()) { + if (rs.next()) { + return rs.getObject(1); + } else { + // this should never happen + throw new SQLException( + String.format("No result returned after running query [%s]", sqlQuery)); + } + } + } } @Override @@ -189,4 +255,11 @@ public Long approximateRowCntStatement(Connection connection, JdbcSourceTable ta } return SQLUtils.countForSubquery(connection, table.getQuery()); } + + public String convertType(String columnName, String columnType) { + if (PostgresTypeConverter.PG_UUID.equals(columnType)) { + return columnName + "::text"; + } + return columnName; + } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java index 89ec64b817f..198dfe47cbc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/ChunkSplitter.java @@ -113,15 +113,16 @@ public Collection generateSplits(JdbcSourceTable table) throws protected abstract Collection createSplits( JdbcSourceTable table, SeaTunnelRowType splitKeyType) throws SQLException; - public PreparedStatement generateSplitStatement(JdbcSourceSplit split) throws SQLException { + public PreparedStatement generateSplitStatement(JdbcSourceSplit split, TableSchema schema) + throws SQLException { if (split.getSplitKeyName() == null) { return createSingleSplitStatement(split); } - return createSplitStatement(split); + return createSplitStatement(split, schema); } - protected abstract PreparedStatement createSplitStatement(JdbcSourceSplit split) - throws SQLException; + protected abstract PreparedStatement createSplitStatement( + JdbcSourceSplit split, TableSchema schema) throws SQLException; protected PreparedStatement createPreparedStatement(String sql) throws SQLException { Connection connection = getOrEstablishConnection(); @@ -174,7 +175,13 @@ protected PreparedStatement createSingleSplitStatement(JdbcSourceSplit split) protected Object queryMin(JdbcSourceTable table, String columnName, Object excludedLowerBound) throws SQLException { String minQuery; + Map columns = + table.getCatalogTable().getTableSchema().getColumns().stream() + .collect(Collectors.toMap(c -> c.getName(), c -> c)); + Column column = columns.get(columnName); + columnName = jdbcDialect.quoteIdentifier(columnName); + columnName = jdbcDialect.convertType(columnName, column.getSourceType()); if (StringUtils.isNotBlank(table.getQuery())) { minQuery = String.format( @@ -206,7 +213,13 @@ protected Object queryMin(JdbcSourceTable table, String columnName, Object exclu protected Pair queryMinMax(JdbcSourceTable table, String columnName) throws SQLException { String sqlQuery; + Map columns = + table.getCatalogTable().getTableSchema().getColumns().stream() + .collect(Collectors.toMap(c -> c.getName(), c -> c)); + Column column = columns.get(columnName); + columnName = jdbcDialect.quoteIdentifier(columnName); + columnName = jdbcDialect.convertType(columnName, column.getSourceType()); if (StringUtils.isNotBlank(table.getQuery())) { sqlQuery = String.format( diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java index 2993f749c66..9dc26d1ef22 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitter.java @@ -19,7 +19,9 @@ import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting; +import org.apache.seatunnel.api.table.catalog.Column; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonError; @@ -41,12 +43,12 @@ import java.sql.Timestamp; import java.time.LocalDate; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import static java.math.BigDecimal.ROUND_CEILING; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; @@ -65,8 +67,9 @@ protected Collection createSplits( } @Override - protected PreparedStatement createSplitStatement(JdbcSourceSplit split) throws SQLException { - return createDynamicSplitStatement(split); + protected PreparedStatement createSplitStatement(JdbcSourceSplit split, TableSchema schema) + throws SQLException { + return createDynamicSplitStatement(split, schema); } private Collection createDynamicSplits( @@ -92,9 +95,9 @@ private Collection createDynamicSplits( return splits; } - private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit split) + private PreparedStatement createDynamicSplitStatement(JdbcSourceSplit split, TableSchema schema) throws SQLException { - String splitQuery = createDynamicSplitQuerySQL(split); + String splitQuery = createDynamicSplitQuerySQL(split, schema); PreparedStatement statement = createPreparedStatement(splitQuery); prepareDynamicSplitStatement(statement, split); return statement; @@ -452,7 +455,7 @@ private int objectCompare(Object obj1, Object obj2) { } @VisibleForTesting - String createDynamicSplitQuerySQL(JdbcSourceSplit split) { + String createDynamicSplitQuerySQL(JdbcSourceSplit split, TableSchema schema) { SeaTunnelRowType rowType = new SeaTunnelRowType( new String[] {split.getSplitKeyName()}, @@ -465,23 +468,23 @@ String createDynamicSplitQuerySQL(JdbcSourceSplit split) { condition = null; } else if (isFirstSplit) { StringBuilder sql = new StringBuilder(); - addKeyColumnsToCondition(rowType, sql, " <= ?"); + addKeyColumnsToCondition(schema, rowType, sql, " <= ?"); sql.append(" AND NOT ("); - addKeyColumnsToCondition(rowType, sql, " = ?"); + addKeyColumnsToCondition(schema, rowType, sql, " = ?"); sql.append(")"); condition = sql.toString(); } else if (isLastSplit) { StringBuilder sql = new StringBuilder(); - addKeyColumnsToCondition(rowType, sql, " >= ?"); + addKeyColumnsToCondition(schema, rowType, sql, " >= ?"); condition = sql.toString(); } else { StringBuilder sql = new StringBuilder(); - addKeyColumnsToCondition(rowType, sql, " >= ?"); + addKeyColumnsToCondition(schema, rowType, sql, " >= ?"); sql.append(" AND NOT ("); - addKeyColumnsToCondition(rowType, sql, " = ?"); + addKeyColumnsToCondition(schema, rowType, sql, " = ?"); sql.append(")"); sql.append(" AND "); - addKeyColumnsToCondition(rowType, sql, " <= ?"); + addKeyColumnsToCondition(schema, rowType, sql, " <= ?"); condition = sql.toString(); } @@ -503,11 +506,16 @@ String createDynamicSplitQuerySQL(JdbcSourceSplit split) { } private void addKeyColumnsToCondition( - SeaTunnelRowType rowType, StringBuilder sql, String predicate) { - for (Iterator fieldNamesIt = Arrays.stream(rowType.getFieldNames()).iterator(); - fieldNamesIt.hasNext(); ) { - sql.append(jdbcDialect.quoteIdentifier(fieldNamesIt.next())).append(predicate); - if (fieldNamesIt.hasNext()) { + TableSchema schema, SeaTunnelRowType rowType, StringBuilder sql, String predicate) { + Map columns = + schema.getColumns().stream().collect(Collectors.toMap(c -> c.getName(), c -> c)); + for (int i = 0; i < rowType.getTotalFields(); i++) { + String fieldName = jdbcDialect.quoteIdentifier(rowType.getFieldName(i)); + fieldName = + jdbcDialect.convertType( + fieldName, columns.get(rowType.getFieldName(i)).getSourceType()); + sql.append(fieldName).append(predicate); + if (i < rowType.getTotalFields() - 1) { sql.append(" AND "); } } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java index aa93d2d8a5c..edeef96f0a2 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/FixedChunkSplitter.java @@ -17,6 +17,8 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -89,7 +91,8 @@ protected Collection createSplits( } @Override - protected PreparedStatement createSplitStatement(JdbcSourceSplit split) throws SQLException { + protected PreparedStatement createSplitStatement(JdbcSourceSplit split, TableSchema schema) + throws SQLException { if (SqlType.STRING.equals(split.getSplitKeyType().getSqlType())) { return createStringColumnSplitStatement(split); } @@ -103,6 +106,11 @@ protected PreparedStatement createSplitStatement(JdbcSourceSplit split) throws S private Collection createStringColumnSplits( JdbcSourceTable table, String splitKeyName, SeaTunnelDataType splitKeyType) { List splits = new ArrayList<>(table.getPartitionNumber()); + Column column = + table.getCatalogTable().getTableSchema().getColumns().stream() + .filter(c -> c.getName().equals(splitKeyName)) + .findAny() + .get(); for (int i = 0; i < table.getPartitionNumber(); i++) { String splitQuery; if (StringUtils.isNotBlank(table.getQuery())) { @@ -111,14 +119,18 @@ private Collection createStringColumnSplits( "SELECT * FROM (%s) st_jdbc_splitter WHERE %s = ?", table.getQuery(), jdbcDialect.hashModForField( - splitKeyName, table.getPartitionNumber())); + column.getSourceType(), + splitKeyName, + table.getPartitionNumber())); } else { splitQuery = String.format( "SELECT * FROM %s WHERE %s = ?", jdbcDialect.tableIdentifier(table.getTablePath()), jdbcDialect.hashModForField( - splitKeyName, table.getPartitionNumber())); + column.getSourceType(), + splitKeyName, + table.getPartitionNumber())); } JdbcSourceSplit split = diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java index c71ae7b43db..70963a9f725 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/DynamicChunkSplitterTest.java @@ -17,7 +17,9 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.source; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; import org.apache.seatunnel.api.table.catalog.TablePath; +import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcConnectionConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig; @@ -35,7 +37,7 @@ public class DynamicChunkSplitterTest { @Test - public void testGenerateSplitQuerySQL() { + public void testPostgresGenerateSplitQuerySQL() { JdbcSourceConfig config = JdbcSourceConfig.builder() .jdbcConnectionConfig( @@ -44,6 +46,17 @@ public void testGenerateSplitQuerySQL() { .driverName("org.postgresql.Driver") .build()) .build(); + TableSchema tableSchema = + TableSchema.builder() + .columns( + Arrays.asList( + PhysicalColumn.builder() + .name("id") + .sourceType("int4") + .dataType(BasicType.INT_TYPE) + .build())) + .build(); + DynamicChunkSplitter splitter = new DynamicChunkSplitter(config); JdbcSourceSplit split = @@ -55,7 +68,7 @@ public void testGenerateSplitQuerySQL() { BasicType.INT_TYPE, 1, 10); - String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split); + String splitQuerySQL = splitter.createDynamicSplitQuerySQL(split, tableSchema); Assertions.assertEquals( "SELECT * FROM \"db1\".\"schema1\".\"table1\" WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?", splitQuerySQL); @@ -69,10 +82,34 @@ public void testGenerateSplitQuerySQL() { BasicType.INT_TYPE, 1, 10); - splitQuerySQL = splitter.createDynamicSplitQuerySQL(split); + splitQuerySQL = splitter.createDynamicSplitQuerySQL(split, tableSchema); Assertions.assertEquals( "SELECT * FROM (select * from table1) tmp WHERE \"id\" >= ? AND NOT (\"id\" = ?) AND \"id\" <= ?", splitQuerySQL); + + tableSchema = + TableSchema.builder() + .columns( + Arrays.asList( + PhysicalColumn.builder() + .name("id") + .sourceType("uuid") + .dataType(BasicType.INT_TYPE) + .build())) + .build(); + split = + new JdbcSourceSplit( + TablePath.of("db1", "schema1", "table1"), + "split1", + "select * from table1", + "id", + BasicType.INT_TYPE, + 1, + 10); + splitQuerySQL = splitter.createDynamicSplitQuerySQL(split, tableSchema); + Assertions.assertEquals( + "SELECT * FROM (select * from table1) tmp WHERE \"id\"::text >= ? AND NOT (\"id\"::text = ?) AND \"id\"::text <= ?", + splitQuerySQL); } @Test From c3ab84caa4be6fb6dd62c147761df39bb3ca68c3 Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 10 May 2024 12:51:16 +0800 Subject: [PATCH 09/10] Fix HttpSource bug (#6824) --- .../connectors/seatunnel/http/source/HttpSourceReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java index db0d36d9b63..c2c3da69681 100644 --- a/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java +++ b/seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java @@ -116,7 +116,7 @@ public void pollAndCollectData(Collector output) throws Exception this.httpParameter.getHeaders(), this.httpParameter.getParams(), this.httpParameter.getBody()); - if (HttpResponse.STATUS_OK == response.getCode()) { + if (response.getCode() >= 200 && response.getCode() <= 207) { String content = response.getContent(); if (!Strings.isNullOrEmpty(content)) { if (this.httpParameter.isEnableMultilines()) { From 52d1020eb7dcde618430a20c5922bf4ee93eac97 Mon Sep 17 00:00:00 2001 From: Jia Fan Date: Fri, 10 May 2024 15:24:57 +0800 Subject: [PATCH 10/10] [Fix] Fix ConnectorSpecificationCheckTest failed (#6828) --- .../seatunnel/cdc/oracle/source/OracleIncrementalSource.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java index 9bcd8f88459..53a82b2e2f9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/source/OracleIncrementalSource.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.ReadonlyConfig; -import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.source.SupportParallelism; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -43,12 +42,10 @@ import org.apache.kafka.connect.data.Struct; -import com.google.auto.service.AutoService; import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; import io.debezium.relational.history.ConnectTableChangeSerializer; import io.debezium.relational.history.TableChanges; -import lombok.NoArgsConstructor; import java.time.ZoneId; import java.util.List; @@ -56,8 +53,6 @@ import java.util.function.Function; import java.util.stream.Collectors; -@NoArgsConstructor -@AutoService(SeaTunnelSource.class) public class OracleIncrementalSource extends IncrementalSource implements SupportParallelism {