From e6f9aec4a12ef3fbc9df8c67bbf5b7abb236f9a9 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Sun, 22 Sep 2024 23:20:32 -0400 Subject: [PATCH 1/5] fix(pg-cdc): check ancestors and descendants for pg partitions --- e2e_test/source/cdc/cdc.load.slt | 60 ++++++++++++++++++ e2e_test/source/cdc/cdc.share_stream.slt | 8 +++ e2e_test/source/cdc/postgres_cdc.sql | 8 ++- .../source/common/DbzSourceUtils.java | 4 +- .../source/common/PostgresValidator.java | 62 +++++++++++++++++++ .../main/resources/validate_sql.properties | 3 + 6 files changed, 143 insertions(+), 2 deletions(-) diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index a50a0b7c9458..71b3ff05694e 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -259,6 +259,66 @@ create table upper_orders ( slot.name = 'orders' ); +statement ok +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table_2023', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table' is already covered in the publication 'rw_publication_partition_2023'. +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table_2023_h1' is already covered in the publication 'rw_publication_partition_2023'. +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023_h1( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table_2023_h1', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement ok +DROP TABLE partitioned_timestamp_table_2023; + # for the partitioned table statement ok CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 83840239b623..c8fcfecb5a8b 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -296,6 +296,14 @@ CREATE TABLE partitioned_timestamp_table_shared( PRIMARY KEY (c_int, c_timestamp) ) from pg_source table 'public.partitioned_timestamp_table'; +statement error The ancestor or descendant partition 'partitioned_timestamp_table' of the table partition 'partitioned_timestamp_table_2023' is already covered in the publication 'rw_publication'. +CREATE TABLE partitioned_timestamp_table_2023_shared( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) from pg_source table 'public.partitioned_timestamp_table_2023'; + statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index f80dfa267432..e3e40ab6514c 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -122,7 +122,7 @@ CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( ) PARTITION BY RANGE (c_timestamp); CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table - FOR VALUES FROM ('2023-01-01') TO ('2023-12-31'); + FOR VALUES FROM ('2023-01-01') TO ('2023-12-31') PARTITION BY RANGE (c_timestamp); CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table FOR VALUES FROM ('2024-01-01') TO ('2024-12-31'); @@ -130,6 +130,12 @@ CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table FOR VALUES FROM ('2025-01-01') TO ('2025-12-31'); +CREATE TABLE partitioned_timestamp_table_2023_h1 PARTITION OF partitioned_timestamp_table_2023 + FOR VALUES FROM ('2023-01-01') TO ('2023-06-30'); + +CREATE TABLE partitioned_timestamp_table_2023_h2 PARTITION OF partitioned_timestamp_table_2023 + FOR VALUES FROM ('2023-07-01') TO ('2024-12-31'); + INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES (1, false, '2023-02-01 10:30:00'), (2, false, '2023-05-15 11:45:00'), diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index 6ab10bf18eb0..e0670cb7cb8e 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -62,8 +62,10 @@ public static void createPostgresPublicationIfNeeded( ValidatorUtils.getSql("postgres.publication_exist"))) { stmt.setString(1, pubName); var res = stmt.executeQuery(); + // Note: the value returned here is `pubviaroot`, If there's more than one row, the + // publication exists if (res.next()) { - isPubExist = res.getBoolean(1); + isPubExist = true; } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 9d5b65c1d73a..5404ec6804d6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -397,6 +397,68 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException + "'` in the upstream Postgres to check."); } + if (isPublicationExists) { + List family = new ArrayList<>(); + boolean findRoot = false; + String currentPartition = tableName; + System.out.println("WKXLOG before find root: " + family); + + while (!findRoot) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_parent"))) { + stmt.setString( + 1, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); + stmt.setString( + 2, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); + stmt.setString( + 3, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); + var res = stmt.executeQuery(); + if (res.next()) { + String parent = res.getString(1); + family.add(parent); + currentPartition = parent; + } else { + findRoot = true; + } + } + } + System.out.println("WKXLOG after find root: " + family); + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_descendants"))) { + stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); + stmt.setString(2, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); + var res = stmt.executeQuery(); + while (res.next()) { + String descendant = res.getString(1); + family.add(descendant); + } + } + + System.out.println("WKXLOG after find descendants: " + family); + + for (String relative : family) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_in_publication.check"))) { + stmt.setString(1, schemaName); + stmt.setString(2, relative); + stmt.setString(3, pubName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getBoolean(1)) { + throw ValidatorUtils.invalidArgument( + String.format( + "The ancestor or descendant partition '%s' of the table partition '%s' is already covered in the publication '%s'. Please use a new publication for '%s'", + relative, tableName, pubName, tableName)); + } + } + } + } + System.out.println("WKXLOG after publication.check: " + family); + } + // PG 15 and up supports partial publication of table // check whether publication covers all columns of the table schema if (isPartialPublicationEnabled) { diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 0288b7b37fca..209da1f0ed56 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -20,6 +20,9 @@ postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE s postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ? postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?; +postgres.partition_parent=SELECT parentrelid FROM pg_partition_tree(?) WHERE relid = ?::regclass AND parentrelid <> ?::regclass +postgres.partition_descendants=SELECT relid FROM pg_partition_tree(?) WHERE relid <> ?::regclass +postgres.partition_in_publication.check=SELECT count(*) > 0 FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \ SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \ INNER JOIN pg_roles r1 ON r1.oid = am.roleid \ From cd623701517a8a1a48a31132d9db9182e8bd9fa6 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 23 Sep 2024 20:53:55 -0400 Subject: [PATCH 2/5] fix version issue --- .../source/common/DbzSourceUtils.java | 4 +- .../source/common/PostgresValidator.java | 57 +++++++++++++------ .../main/resources/validate_sql.properties | 3 +- 3 files changed, 43 insertions(+), 21 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index e0670cb7cb8e..6ab10bf18eb0 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -62,10 +62,8 @@ public static void createPostgresPublicationIfNeeded( ValidatorUtils.getSql("postgres.publication_exist"))) { stmt.setString(1, pubName); var res = stmt.executeQuery(); - // Note: the value returned here is `pubviaroot`, If there's more than one row, the - // publication exists if (res.next()) { - isPubExist = true; + isPubExist = res.getBoolean(1); } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 5404ec6804d6..3469dbd6fc1b 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -48,6 +48,7 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl // Whether the properties to validate is shared by multiple tables. // If true, we will skip validation check for table private final boolean isCdcSourceJob; + private final int pgVersion; public PostgresValidator( Map userProps, TableSchema tableSchema, boolean isCdcSourceJob) @@ -75,12 +76,17 @@ public PostgresValidator( this.pubAutoCreate = userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); this.isCdcSourceJob = isCdcSourceJob; + try { + this.pgVersion = jdbcConnection.getMetaData().getDatabaseMajorVersion(); + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } } @Override public void validateDbConfig() { try { - if (jdbcConnection.getMetaData().getDatabaseMajorVersion() > 16) { + if (pgVersion > 16) { throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16."); } @@ -331,7 +337,6 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException { private void validatePublicationConfig(boolean isSuperUser) throws SQLException { boolean isPublicationCoversTable = false; boolean isPublicationExists = false; - boolean isPublicationViaRoot = false; boolean isPartialPublicationEnabled = false; // Check whether publication exists @@ -341,8 +346,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException stmt.setString(1, pubName); var res = stmt.executeQuery(); while (res.next()) { - isPublicationViaRoot = res.getBoolean(1); - isPublicationExists = true; + isPublicationExists = res.getBoolean(1); } } @@ -384,20 +388,39 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException partitions.add(res.getString(1)); } } - if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) { - // make sure the publication are created with `publish_via_partition_root = true`, which - // is required by partitioned tables. - throw ValidatorUtils.invalidArgument( - "Table '" - + tableName - + "' has partitions, which requires publication '" - + pubName - + "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '" - + pubName - + "'` in the upstream Postgres to check."); - } - if (isPublicationExists) { + if (!partitions.isEmpty() && isPublicationExists) { + // `pubviaroot` in `pg_publication` is added after PG v13, before which PG does not + // allow adding partitioned table to a publication. So here, if partitions.isEmpty() is + // false, we can safely check the value of `pubviaroot` of the publication here. + boolean isPublicationViaRoot = false; + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.publication_pubviaroot"))) { + stmt.setString(1, pubName); + var res = stmt.executeQuery(); + if (res.next()) { + isPublicationViaRoot = res.getBoolean(1); + } + } + if (!isPublicationViaRoot) { + // Make sure the publication are created with `publish_via_partition_root = true`, + // which + // is required by partitioned tables. + throw ValidatorUtils.invalidArgument( + "Table '" + + tableName + + "' has partitions, which requires publication '" + + pubName + + "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '" + + pubName + + "'` in the upstream Postgres to check."); + } + } + // Only after v13, PG allows adding a partitioned table to a publication. So, if the + // version is before v13, the tables in a publication are always partition leaves, we don't + // check their ancestors and descendants anymore. + if (isPublicationExists && pgVersion >= 13) { List family = new ArrayList<>(); boolean findRoot = false; String currentPartition = tableName; diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 209da1f0ed56..04eaf227b65d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -17,7 +17,8 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ? postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames' postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? -postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ? +postgres.publication_exist=SELECT count(*) > 0 from pg_publication WHERE pubname = ? +postgres.publication_pubviaroot=SELECT pubviaroot from pg_publication WHERE pubname = ? postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?; postgres.partition_parent=SELECT parentrelid FROM pg_partition_tree(?) WHERE relid = ?::regclass AND parentrelid <> ?::regclass From e1c2fe691cdf5620f73af453703eb8b78c945271 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 23 Sep 2024 21:39:51 -0400 Subject: [PATCH 3/5] remove log --- .../connector/source/common/PostgresValidator.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 3469dbd6fc1b..682ccf3632ce 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -424,8 +424,6 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException List family = new ArrayList<>(); boolean findRoot = false; String currentPartition = tableName; - System.out.println("WKXLOG before find root: " + family); - while (!findRoot) { try (var stmt = jdbcConnection.prepareStatement( @@ -446,7 +444,6 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } } - System.out.println("WKXLOG after find root: " + family); try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.partition_descendants"))) { @@ -459,8 +456,6 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } - System.out.println("WKXLOG after find descendants: " + family); - for (String relative : family) { try (var stmt = jdbcConnection.prepareStatement( @@ -479,7 +474,6 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } } } - System.out.println("WKXLOG after publication.check: " + family); } // PG 15 and up supports partial publication of table From 0d100a900e9fc9d4f595059e6d786ff98fe5b97e Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Mon, 23 Sep 2024 21:48:53 -0400 Subject: [PATCH 4/5] fix --- .../risingwave/connector/source/common/DbzSourceUtils.java | 4 +--- .../risingwave/connector/source/common/PostgresValidator.java | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index e0670cb7cb8e..6ab10bf18eb0 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -62,10 +62,8 @@ public static void createPostgresPublicationIfNeeded( ValidatorUtils.getSql("postgres.publication_exist"))) { stmt.setString(1, pubName); var res = stmt.executeQuery(); - // Note: the value returned here is `pubviaroot`, If there's more than one row, the - // publication exists if (res.next()) { - isPubExist = true; + isPubExist = res.getBoolean(1); } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 682ccf3632ce..a0a1bde9e5a6 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -405,8 +405,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException } if (!isPublicationViaRoot) { // Make sure the publication are created with `publish_via_partition_root = true`, - // which - // is required by partitioned tables. + // which is required by partitioned tables. throw ValidatorUtils.invalidArgument( "Table '" + tableName From 0994102d816498606c31dcdcce2c9365f10278f4 Mon Sep 17 00:00:00 2001 From: Kexiang Wang Date: Tue, 24 Sep 2024 13:58:19 -0400 Subject: [PATCH 5/5] add more comments --- .../source/common/PostgresValidator.java | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index a0a1bde9e5a6..1e3b1a9bb8db 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -392,7 +392,8 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException if (!partitions.isEmpty() && isPublicationExists) { // `pubviaroot` in `pg_publication` is added after PG v13, before which PG does not // allow adding partitioned table to a publication. So here, if partitions.isEmpty() is - // false, we can safely check the value of `pubviaroot` of the publication here. + // false, which means the PG version is >= v13, we can safely check the value of + // `pubviaroot` of the publication here. boolean isPublicationViaRoot = false; try (var stmt = jdbcConnection.prepareStatement( @@ -416,7 +417,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException + "'` in the upstream Postgres to check."); } } - // Only after v13, PG allows adding a partitioned table to a publication. So, if the + // Only after v13, PG allows adding a partitioned table to a publication. So, if the // version is before v13, the tables in a publication are always partition leaves, we don't // check their ancestors and descendants anymore. if (isPublicationExists && pgVersion >= 13) { @@ -427,12 +428,11 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.partition_parent"))) { - stmt.setString( - 1, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); - stmt.setString( - 2, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); - stmt.setString( - 3, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition)); + String schemaPartitionName = + String.format("\"%s\".\"%s\"", this.schemaName, currentPartition); + stmt.setString(1, schemaPartitionName); + stmt.setString(2, schemaPartitionName); + stmt.setString(3, schemaPartitionName); var res = stmt.executeQuery(); if (res.next()) { String parent = res.getString(1); @@ -446,15 +446,21 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException try (var stmt = jdbcConnection.prepareStatement( ValidatorUtils.getSql("postgres.partition_descendants"))) { - stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); - stmt.setString(2, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName)); + String schemaTableName = + String.format("\"%s\".\"%s\"", this.schemaName, this.tableName); + stmt.setString(1, schemaTableName); + stmt.setString(2, schemaTableName); var res = stmt.executeQuery(); while (res.next()) { String descendant = res.getString(1); family.add(descendant); } } - + // The check here was added based on experimental observations. We found that if a table + // is added to a publication where its ancestor or descendant is already included, the + // table cannot be read data from the slot correctly. Therefore, we must verify whether + // its ancestors or descendants are already in the publication. If yes, we deny the + // request. for (String relative : family) { try (var stmt = jdbcConnection.prepareStatement(