Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pg-cdc): check ancestors and descendants for pg partitions #18648

Merged
merged 6 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,66 @@ create table upper_orders (
slot.name = 'orders'
);

statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table' is already covered in the publication 'rw_publication_partition_2023'.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement error The ancestor or descendant partition 'partitioned_timestamp_table_2023' of the table partition 'partitioned_timestamp_table_2023_h1' is already covered in the publication 'rw_publication_partition_2023'.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023_h1(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023_h1',
publication.name = 'rw_publication_partition_2023',
slot.name = 'my_slot_partition'
);

statement ok
DROP TABLE partitioned_timestamp_table_2023;

# for the partitioned table
statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
Expand Down
8 changes: 8 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,14 @@ CREATE TABLE partitioned_timestamp_table_shared(
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table';

statement error The ancestor or descendant partition 'partitioned_timestamp_table' of the table partition 'partitioned_timestamp_table_2023' is already covered in the publication 'rw_publication'.
CREATE TABLE partitioned_timestamp_table_2023_shared(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table_2023';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;

Expand Down
8 changes: 7 additions & 1 deletion e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,20 @@ CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
) PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31');
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31') PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2024-01-01') TO ('2024-12-31');

CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2025-01-01') TO ('2025-12-31');

CREATE TABLE partitioned_timestamp_table_2023_h1 PARTITION OF partitioned_timestamp_table_2023
FOR VALUES FROM ('2023-01-01') TO ('2023-06-30');

CREATE TABLE partitioned_timestamp_table_2023_h2 PARTITION OF partitioned_timestamp_table_2023
FOR VALUES FROM ('2023-07-01') TO ('2024-12-31');

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(1, false, '2023-02-01 10:30:00'),
(2, false, '2023-05-15 11:45:00'),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ public static void createPostgresPublicationIfNeeded(
ValidatorUtils.getSql("postgres.publication_exist"))) {
stmt.setString(1, pubName);
var res = stmt.executeQuery();
// Note: the value returned here is `pubviaroot`, If there's more than one row, the
// publication exists
if (res.next()) {
isPubExist = true;
isPubExist = res.getBoolean(1);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class PostgresValidator extends DatabaseValidator implements AutoCloseabl
// Whether the properties to validate is shared by multiple tables.
// If true, we will skip validation check for table
private final boolean isCdcSourceJob;
private final int pgVersion;

public PostgresValidator(
Map<String, String> userProps, TableSchema tableSchema, boolean isCdcSourceJob)
Expand Down Expand Up @@ -75,12 +76,17 @@ public PostgresValidator(
this.pubAutoCreate =
userProps.get(DbzConnectorConfig.PG_PUB_CREATE).equalsIgnoreCase("true");
this.isCdcSourceJob = isCdcSourceJob;
try {
this.pgVersion = jdbcConnection.getMetaData().getDatabaseMajorVersion();
} catch (SQLException e) {
throw ValidatorUtils.internalError(e.getMessage());
}
}

@Override
public void validateDbConfig() {
try {
if (jdbcConnection.getMetaData().getDatabaseMajorVersion() > 16) {
if (pgVersion > 16) {
throw ValidatorUtils.failedPrecondition("Postgres version should be less than 16.");
}

Expand Down Expand Up @@ -331,7 +337,6 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
private void validatePublicationConfig(boolean isSuperUser) throws SQLException {
boolean isPublicationCoversTable = false;
boolean isPublicationExists = false;
boolean isPublicationViaRoot = false;
boolean isPartialPublicationEnabled = false;

// Check whether publication exists
Expand All @@ -341,8 +346,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
stmt.setString(1, pubName);
var res = stmt.executeQuery();
while (res.next()) {
isPublicationViaRoot = res.getBoolean(1);
isPublicationExists = true;
isPublicationExists = res.getBoolean(1);
}
}

Expand Down Expand Up @@ -384,17 +388,97 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
partitions.add(res.getString(1));
}
}
if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) {
// make sure the publication are created with `publish_via_partition_root = true`, which
// is required by partitioned tables.
throw ValidatorUtils.invalidArgument(
"Table '"
+ tableName
+ "' has partitions, which requires publication '"
+ pubName
+ "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '"
+ pubName
+ "'` in the upstream Postgres to check.");

if (!partitions.isEmpty() && isPublicationExists) {
// `pubviaroot` in `pg_publication` is added after PG v13, before which PG does not
// allow adding partitioned table to a publication. So here, if partitions.isEmpty() is
// false, which means the PG version is >= v13, we can safely check the value of
// `pubviaroot` of the publication here.
boolean isPublicationViaRoot = false;
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.publication_pubviaroot"))) {
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
stmt.setString(1, pubName);
var res = stmt.executeQuery();
if (res.next()) {
isPublicationViaRoot = res.getBoolean(1);
}
}
if (!isPublicationViaRoot) {
// Make sure the publication are created with `publish_via_partition_root = true`,
// which is required by partitioned tables.
throw ValidatorUtils.invalidArgument(
"Table '"
+ tableName
+ "' has partitions, which requires publication '"
+ pubName
+ "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '"
+ pubName
+ "'` in the upstream Postgres to check.");
}
}
// Only after v13, PG allows adding a partitioned table to a publication. So, if the
// version is before v13, the tables in a publication are always partition leaves, we don't
// check their ancestors and descendants anymore.
if (isPublicationExists && pgVersion >= 13) {
List<String> family = new ArrayList<>();
boolean findRoot = false;
String currentPartition = tableName;
while (!findRoot) {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_parent"))) {
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
String schemaPartitionName =
String.format("\"%s\".\"%s\"", this.schemaName, currentPartition);
stmt.setString(1, schemaPartitionName);
stmt.setString(2, schemaPartitionName);
stmt.setString(3, schemaPartitionName);
var res = stmt.executeQuery();
if (res.next()) {
String parent = res.getString(1);
family.add(parent);
currentPartition = parent;
} else {
findRoot = true;
}
}
}
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_descendants"))) {
String schemaTableName =
String.format("\"%s\".\"%s\"", this.schemaName, this.tableName);
stmt.setString(1, schemaTableName);
stmt.setString(2, schemaTableName);
var res = stmt.executeQuery();
while (res.next()) {
String descendant = res.getString(1);
family.add(descendant);
}
}
// The check here was added based on experimental observations. We found that if a table
// is added to a publication where its ancestor or descendant is already included, the
// table cannot be read data from the slot correctly. Therefore, we must verify whether
// its ancestors or descendants are already in the publication. If yes, we deny the
// request.
for (String relative : family) {
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_in_publication.check"))) {
stmt.setString(1, schemaName);
stmt.setString(2, relative);
stmt.setString(3, pubName);
var res = stmt.executeQuery();
while (res.next()) {
if (res.getBoolean(1)) {
throw ValidatorUtils.invalidArgument(
String.format(
"The ancestor or descendant partition '%s' of the table partition '%s' is already covered in the publication '%s'. Please use a new publication for '%s'",
KeXiangWang marked this conversation as resolved.
Show resolved Hide resolved
relative, tableName, pubName, tableName));
}
}
}
}
}

// PG 15 and up supports partial publication of table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_exist=SELECT count(*) > 0 from pg_publication WHERE pubname = ?
postgres.publication_pubviaroot=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?;
postgres.partition_parent=SELECT parentrelid FROM pg_partition_tree(?) WHERE relid = ?::regclass AND parentrelid <> ?::regclass
postgres.partition_descendants=SELECT relid FROM pg_partition_tree(?) WHERE relid <> ?::regclass
postgres.partition_in_publication.check=SELECT count(*) > 0 FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
Expand Down
Loading