diff --git a/e2e_test/source/cdc/cdc.load.slt b/e2e_test/source/cdc/cdc.load.slt index a50a0b7c9458..71b3ff05694e 100644 --- a/e2e_test/source/cdc/cdc.load.slt +++ b/e2e_test/source/cdc/cdc.load.slt @@ -259,6 +259,66 @@ create table upper_orders ( slot.name = 'orders' ); +statement ok +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table_2023', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table' is already covered in the publication 'rw_publication_partition_2023'. +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table_2023_h1' is already covered in the publication 'rw_publication_partition_2023'. +CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023_h1( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) WITH ( + connector = 'postgres-cdc', + hostname = '${PGHOST:localhost}', + port = '${PGPORT:5432}', + username = '${PGUSER:$USER}', + password = '${PGPASSWORD:}', + database.name = '${PGDATABASE:postgres}', + schema.name = 'public', + table.name = 'partitioned_timestamp_table_2023_h1', + publication.name = 'rw_publication_partition_2023', + slot.name = 'my_slot_partition' +); + +statement ok +DROP TABLE partitioned_timestamp_table_2023; + # for the partitioned table statement ok CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( diff --git a/e2e_test/source/cdc/cdc.share_stream.slt b/e2e_test/source/cdc/cdc.share_stream.slt index 83840239b623..c8fcfecb5a8b 100644 --- a/e2e_test/source/cdc/cdc.share_stream.slt +++ b/e2e_test/source/cdc/cdc.share_stream.slt @@ -296,6 +296,14 @@ CREATE TABLE partitioned_timestamp_table_shared( PRIMARY KEY (c_int, c_timestamp) ) from pg_source table 'public.partitioned_timestamp_table'; +statement error The ancestor or descendant partition 'partitioned_timestamp_table' of the table partition 'partitioned_timestamp_table_2023' is already covered in the publication 'rw_publication'. +CREATE TABLE partitioned_timestamp_table_2023_shared( + c_int int, + c_boolean boolean, + c_timestamp timestamp, + PRIMARY KEY (c_int, c_timestamp) +) from pg_source table 'public.partitioned_timestamp_table_2023'; + statement ok CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new; diff --git a/e2e_test/source/cdc/postgres_cdc.sql b/e2e_test/source/cdc/postgres_cdc.sql index f80dfa267432..e3e40ab6514c 100644 --- a/e2e_test/source/cdc/postgres_cdc.sql +++ b/e2e_test/source/cdc/postgres_cdc.sql @@ -122,7 +122,7 @@ CREATE TABLE IF NOT EXISTS partitioned_timestamp_table( ) PARTITION BY RANGE (c_timestamp); CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table - FOR VALUES FROM ('2023-01-01') TO ('2023-12-31'); + FOR VALUES FROM ('2023-01-01') TO ('2023-12-31') PARTITION BY RANGE (c_timestamp); CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table FOR VALUES FROM ('2024-01-01') TO ('2024-12-31'); @@ -130,6 +130,12 @@ CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table FOR VALUES FROM ('2025-01-01') TO ('2025-12-31'); +CREATE TABLE partitioned_timestamp_table_2023_h1 PARTITION OF partitioned_timestamp_table_2023 + FOR VALUES FROM ('2023-01-01') TO ('2023-06-30'); + +CREATE TABLE partitioned_timestamp_table_2023_h2 PARTITION OF partitioned_timestamp_table_2023 + FOR VALUES FROM ('2023-07-01') TO ('2024-12-31'); + INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES (1, false, '2023-02-01 10:30:00'), (2, false, '2023-05-15 11:45:00'), diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java index e0670cb7cb8e..6ab10bf18eb0 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/DbzSourceUtils.java @@ -62,10 +62,8 @@ public static void createPostgresPublicationIfNeeded( ValidatorUtils.getSql("postgres.publication_exist"))) { stmt.setString(1, pubName); var res = stmt.executeQuery(); - // Note: the value returned here is `pubviaroot`, If there's more than one row, the - // publication exists if (res.next()) { - isPubExist = true; + isPubExist = res.getBoolean(1); } } diff --git a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java index 9d5b65c1d73a..1e3b1a9bb8db 100644 --- a/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java +++ b/java/connector-node/risingwave-connector-service/src/main/java/com/risingwave/connector/source/common/PostgresValidator.java @@ -48,6 +48,7 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl // Whether the properties to validate is shared by multiple tables. // If true, we will skip validation check for table private final boolean isCdcSourceJob; + private final int pgVersion; public PostgresValidator( Map userProps, TableSchema tableSchema, boolean isCdcSourceJob) @@ -75,12 +76,17 @@ public PostgresValidator( this.pubAutoCreate = userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true"); this.isCdcSourceJob = isCdcSourceJob; + try { + this.pgVersion = jdbcConnection.getMetaData().getDatabaseMajorVersion(); + } catch (SQLException e) { + throw ValidatorUtils.internalError(e.getMessage()); + } } @Override public void validateDbConfig() { try { - if (jdbcConnection.getMetaData().getDatabaseMajorVersion() > 16) { + if (pgVersion > 16) { throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16."); } @@ -331,7 +337,6 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException { private void validatePublicationConfig(boolean isSuperUser) throws SQLException { boolean isPublicationCoversTable = false; boolean isPublicationExists = false; - boolean isPublicationViaRoot = false; boolean isPartialPublicationEnabled = false; // Check whether publication exists @@ -341,8 +346,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException stmt.setString(1, pubName); var res = stmt.executeQuery(); while (res.next()) { - isPublicationViaRoot = res.getBoolean(1); - isPublicationExists = true; + isPublicationExists = res.getBoolean(1); } } @@ -384,17 +388,97 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException partitions.add(res.getString(1)); } } - if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) { - // make sure the publication are created with `publish_via_partition_root = true`, which - // is required by partitioned tables. - throw ValidatorUtils.invalidArgument( - "Table '" - + tableName - + "' has partitions, which requires publication '" - + pubName - + "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '" - + pubName - + "'` in the upstream Postgres to check."); + + if (!partitions.isEmpty() && isPublicationExists) { + // `pubviaroot` in `pg_publication` is added after PG v13, before which PG does not + // allow adding partitioned table to a publication. So here, if partitions.isEmpty() is + // false, which means the PG version is >= v13, we can safely check the value of + // `pubviaroot` of the publication here. + boolean isPublicationViaRoot = false; + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.publication_pubviaroot"))) { + stmt.setString(1, pubName); + var res = stmt.executeQuery(); + if (res.next()) { + isPublicationViaRoot = res.getBoolean(1); + } + } + if (!isPublicationViaRoot) { + // Make sure the publication are created with `publish_via_partition_root = true`, + // which is required by partitioned tables. + throw ValidatorUtils.invalidArgument( + "Table '" + + tableName + + "' has partitions, which requires publication '" + + pubName + + "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '" + + pubName + + "'` in the upstream Postgres to check."); + } + } + // Only after v13, PG allows adding a partitioned table to a publication. So, if the + // version is before v13, the tables in a publication are always partition leaves, we don't + // check their ancestors and descendants anymore. + if (isPublicationExists && pgVersion >= 13) { + List family = new ArrayList<>(); + boolean findRoot = false; + String currentPartition = tableName; + while (!findRoot) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_parent"))) { + String schemaPartitionName = + String.format("\"%s\".\"%s\"", this.schemaName, currentPartition); + stmt.setString(1, schemaPartitionName); + stmt.setString(2, schemaPartitionName); + stmt.setString(3, schemaPartitionName); + var res = stmt.executeQuery(); + if (res.next()) { + String parent = res.getString(1); + family.add(parent); + currentPartition = parent; + } else { + findRoot = true; + } + } + } + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_descendants"))) { + String schemaTableName = + String.format("\"%s\".\"%s\"", this.schemaName, this.tableName); + stmt.setString(1, schemaTableName); + stmt.setString(2, schemaTableName); + var res = stmt.executeQuery(); + while (res.next()) { + String descendant = res.getString(1); + family.add(descendant); + } + } + // The check here was added based on experimental observations. We found that if a table + // is added to a publication where its ancestor or descendant is already included, the + // table cannot be read data from the slot correctly. Therefore, we must verify whether + // its ancestors or descendants are already in the publication. If yes, we deny the + // request. + for (String relative : family) { + try (var stmt = + jdbcConnection.prepareStatement( + ValidatorUtils.getSql("postgres.partition_in_publication.check"))) { + stmt.setString(1, schemaName); + stmt.setString(2, relative); + stmt.setString(3, pubName); + var res = stmt.executeQuery(); + while (res.next()) { + if (res.getBoolean(1)) { + throw ValidatorUtils.invalidArgument( + String.format( + "The ancestor or descendant partition '%s' of the table partition '%s' is already covered in the publication '%s'. Please use a new publication for '%s'", + relative, tableName, pubName, tableName)); + } + } + } + } } // PG 15 and up supports partial publication of table diff --git a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties index 0288b7b37fca..04eaf227b65d 100644 --- a/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties +++ b/java/connector-node/risingwave-connector-service/src/main/resources/validate_sql.properties @@ -17,9 +17,13 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ? postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames' postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? -postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ? +postgres.publication_exist=SELECT count(*) > 0 from pg_publication WHERE pubname = ? +postgres.publication_pubviaroot=SELECT pubviaroot from pg_publication WHERE pubname = ? postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?; +postgres.partition_parent=SELECT parentrelid FROM pg_partition_tree(?) WHERE relid = ?::regclass AND parentrelid <> ?::regclass +postgres.partition_descendants=SELECT relid FROM pg_partition_tree(?) WHERE relid <> ?::regclass +postgres.partition_in_publication.check=SELECT count(*) > 0 FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ? postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \ SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \ INNER JOIN pg_roles r1 ON r1.oid = am.roleid \