Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(pg-cdc): fix support for partitioned table #18456

Merged
merged 13 commits into from
Sep 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,57 @@ NULL NULL NULL
{} {} {}
NULL NULL NULL
{NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL

query TTT
SELECT * FROM partitioned_timestamp_table ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
4 f 2024-01-04 13:00:00
5 f 2024-03-05 09:30:00
6 f 2024-06-06 14:20:00
7 f 2024-09-07 16:45:00
8 f 2025-01-08 18:30:00
9 f 2025-07-09 07:10:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00
14 t 2024-01-04 13:00:00
15 t 2024-03-05 09:30:00
16 t 2024-06-06 14:20:00
17 t 2024-09-07 16:45:00
18 t 2025-01-08 18:30:00
19 t 2025-07-09 07:10:00

query TTT
SELECT * FROM partitioned_timestamp_table_2023 ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00

query TTT
SELECT * FROM partitioned_timestamp_table_shared ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
4 f 2024-01-04 13:00:00
5 f 2024-03-05 09:30:00
6 f 2024-06-06 14:20:00
7 f 2024-09-07 16:45:00
8 f 2025-01-08 18:30:00
9 f 2025-07-09 07:10:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00
14 t 2024-01-04 13:00:00
15 t 2024-03-05 09:30:00
16 t 2024-06-06 14:20:00
17 t 2024-09-07 16:45:00
18 t 2025-01-08 18:30:00
19 t 2025-07-09 07:10:00
40 changes: 40 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,43 @@ create table upper_orders (
table.name = 'Orders',
slot.name = 'orders'
);

# for the partitioned table
statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_partition_root',
slot.name = 'my_slot_partition'
);

# for only one partition, as Postgres does not support adding both a partitioned tableand its individual partitions to the same publication, we use different publication for the partition
statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table_2023(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table_2023',
publication.name = 'rw_publication_partition',
slot.name = 'my_slot_partition_2'
);
8 changes: 8 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,14 @@ INCLUDE SCHEMA_NAME as schema_name
INCLUDE TABLE_NAME as table_name
FROM pg_source TABLE 'public.person';

statement ok
CREATE TABLE partitioned_timestamp_table_shared(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table';

statement ok
CREATE MATERIALIZED VIEW person_new_cnt AS SELECT COUNT(*) AS cnt FROM person_new;

Expand Down
19 changes: 19 additions & 0 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,22 @@ create table shipments (

statement ok
drop table shipments;

statement error Table 'partitioned_timestamp_table' has partitions, which requires publication 'rw_publication_pubviaroot_false' to be created with `publish_via_partition_root = true`.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) WITH (
connector = 'postgres-cdc',
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_pubviaroot_false',
slot.name = 'my_slot_partition'
);
31 changes: 31 additions & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,34 @@ CREATE TABLE "Orders" (
name varchar
);
INSERT INTO "Orders" VALUES (1, 'happy');


CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31');

CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2024-01-01') TO ('2024-12-31');

CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2025-01-01') TO ('2025-12-31');

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(1, false, '2023-02-01 10:30:00'),
(2, false, '2023-05-15 11:45:00'),
(3, false, '2023-11-03 12:15:00'),
(4, false, '2024-01-04 13:00:00'),
(5, false, '2024-03-05 09:30:00'),
(6, false, '2024-06-06 14:20:00'),
(7, false, '2024-09-07 16:45:00'),
(8, false, '2025-01-08 18:30:00'),
(9, false, '2025-07-09 07:10:00');

-- Here we create this publication without `WITH ( publish_via_partition_root = true )` only for tests. Normally, it should be added.
create publication rw_publication_pubviaroot_false for TABLE partitioned_timestamp_table;
10 changes: 10 additions & 0 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ INSERT INTO list_with_null VALUES (3, '{NULL,-3,-4}', '{NULL,nan,-inf}', '{NULL,
INSERT INTO list_with_null VALUES (4, '{-4,-5,-6}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,sad,ok}', '{b2e4636d-fa03-4ad4-bf16-029a79dca3e2}', '{\\x88,\\x99,\\xAA}');
INSERT INTO list_with_null VALUES (6, NULL, NULL, NULL, NULL, NULL, NULL, NULL);

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(11, true, '2023-02-01 10:30:00'),
(12, true, '2023-05-15 11:45:00'),
(13, true, '2023-11-03 12:15:00'),
(14, true, '2024-01-04 13:00:00'),
(15, true, '2024-03-05 09:30:00'),
(16, true, '2024-06-06 14:20:00'),
(17, true, '2024-09-07 16:45:00'),
(18, true, '2025-01-08 18:30:00'),
(19, true, '2025-07-09 07:10:00');
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public static void createPostgresPublicationIfNeeded(
if (schemaTableName.isPresent()) {
createPublicationSql =
String.format(
"CREATE PUBLICATION %s FOR TABLE %s;",
"CREATE PUBLICATION %s FOR TABLE %s WITH ( publish_via_partition_root = true );",
quotePostgres(pubName), schemaTableName.get());
} else {
createPublicationSql =
String.format("CREATE PUBLICATION %s", quotePostgres(pubName));
String.format(
"CREATE PUBLICATION %s WITH ( publish_via_partition_root = true );",
quotePostgres(pubName));
}
try (var stmt = jdbcConnection.createStatement()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
private void validatePublicationConfig(boolean isSuperUser) throws SQLException {
boolean isPublicationCoversTable = false;
boolean isPublicationExists = false;
boolean isPublicationViaRoot = false;
boolean isPartialPublicationEnabled = false;

// Check whether publication exists
Expand All @@ -340,7 +341,8 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
stmt.setString(1, pubName);
var res = stmt.executeQuery();
while (res.next()) {
isPublicationExists = res.getBoolean(1);
isPublicationViaRoot = res.getBoolean(1);
isPublicationExists = true;
}
}

Expand Down Expand Up @@ -370,6 +372,31 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
isPartialPublicationEnabled = res.getBoolean(1);
}
}

List<String> partitions = new ArrayList<>();
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_names"))) {
stmt.setString(1, schemaName);
stmt.setString(2, tableName);
var res = stmt.executeQuery();
while (res.next()) {
partitions.add(res.getString(1));
}
}
if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) {
// make sure the publication are created with `publish_via_partition_root = true`, which
// is required by partitioned tables.
throw ValidatorUtils.invalidArgument(
"Table '"
+ tableName
+ "' has partitions, which requires publication '"
+ pubName
+ "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '"
+ pubName
+ "'` in the upstream Postgres to check.");
}

// PG 15 and up supports partial publication of table
// check whether publication covers all columns of the table schema
if (isPartialPublicationEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.publication_exist=SELECT COUNT(*) > 0 from pg_publication WHERE pubname = ?
postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?;
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public void testPermissionCheck() throws SQLException {
"CREATE TABLE IF NOT EXISTS orders (o_key BIGINT NOT NULL, o_val INT, PRIMARY KEY (o_key))";
SourceTestClient.performQuery(connDbz, query);
// create a partial publication, check whether error is reported
query = "CREATE PUBLICATION rw_publication FOR TABLE orders (o_key)";
query =
"CREATE PUBLICATION rw_publication FOR TABLE orders (o_key) WITH ( publish_via_partition_root = true );";
SourceTestClient.performQuery(connDbz, query);
ConnectorServiceProto.TableSchema tableSchema =
ConnectorServiceProto.TableSchema.newBuilder()
Expand Down
Loading