Skip to content

Commit

Permalink
fix(pg-cdc): fix support for partitioned table
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Sep 8, 2024
1 parent becb896 commit 52ce057
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 4 deletions.
44 changes: 44 additions & 0 deletions e2e_test/source/cdc/cdc.check_new_rows.slt
Original file line number Diff line number Diff line change
Expand Up @@ -321,3 +321,47 @@ NULL NULL NULL
{} {} {}
NULL NULL NULL
{NULL,"{\"key\": \"value\"}"} {NULL,123e4567-e89b-12d3-a456-426614174000} NULL

query TTT
SELECT * FROM postgres_timestamptz_types ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
4 f 2024-01-04 13:00:00
5 f 2024-03-05 09:30:00
6 f 2024-06-06 14:20:00
7 f 2024-09-07 16:45:00
8 f 2025-01-08 18:30:00
9 f 2025-07-09 07:10:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00
14 t 2024-01-04 13:00:00
15 t 2024-03-05 09:30:00
16 t 2024-06-06 14:20:00
17 t 2024-09-07 16:45:00
18 t 2025-01-08 18:30:00
19 t 2025-07-09 07:10:00

query TTT
SELECT * FROM partitioned_timestamp_table_shared ORDER BY c_boolean, c_int;
----
1 f 2023-02-01 10:30:00
2 f 2023-05-15 11:45:00
3 f 2023-11-03 12:15:00
4 f 2024-01-04 13:00:00
5 f 2024-03-05 09:30:00
6 f 2024-06-06 14:20:00
7 f 2024-09-07 16:45:00
8 f 2025-01-08 18:30:00
9 f 2025-07-09 07:10:00
11 t 2023-02-01 10:30:00
12 t 2023-05-15 11:45:00
13 t 2023-11-03 12:15:00
14 t 2024-01-04 13:00:00
15 t 2024-03-05 09:30:00
16 t 2024-06-06 14:20:00
17 t 2024-09-07 16:45:00
18 t 2025-01-08 18:30:00
19 t 2025-07-09 07:10:00
18 changes: 18 additions & 0 deletions e2e_test/source/cdc/cdc.load.slt
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,21 @@ create table upper_orders (
table.name = 'Orders',
slot.name = 'orders'
);

statement ok
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
WITH (
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication'
slot.name = 'my_slot_partition'
);
8 changes: 8 additions & 0 deletions e2e_test/source/cdc/cdc.share_stream.slt
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ create table orders_test (
PRIMARY KEY (order_id)
) from mysql_mytest table 'mytest.orders';

statement ok
CREATE TABLE partitioned_timestamp_table_shared(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) from pg_source table 'public.partitioned_timestamp_table';

statement ok
create materialized view products_test_cnt as select count(*) as cnt from rw.products_test;

Expand Down
39 changes: 39 additions & 0 deletions e2e_test/source/cdc/cdc.validate.postgres.slt
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,42 @@ create table shipments (

statement ok
drop table shipments;

# format & encode provided but mismatch with debezium json, this is not allowed
statement error Row format for CDC connectors should be either omitted or set to `FORMAT DEBEZIUM ENCODE JSON`
create table shipments (
shipment_id INTEGER,
order_id real,
origin STRING,
destination STRING,
is_arrived boolean,
PRIMARY KEY (shipment_id)
) with (
connector = 'postgres-cdc',
hostname = 'db',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'cdc_test',
schema.name = 'public',
table.name = 'shipments',
slot.name = 'shipments'
) format canal encode csv;

statement error Table 'partitioned_timestamp_table' has partitions, which requires publication 'rw_publication_pubviaroot_false' to be created with `publish_via_partition_root = true`.
CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
WITH (
hostname = '${PGHOST:localhost}',
port = '${PGPORT:5432}',
username = '${PGUSER:$USER}',
password = '${PGPASSWORD:}',
database.name = '${PGDATABASE:postgres}',
schema.name = 'public',
table.name = 'partitioned_timestamp_table',
publication.name = 'rw_publication_pubviaroot_false'
slot.name = 'my_slot_partition'
);
30 changes: 30 additions & 0 deletions e2e_test/source/cdc/postgres_cdc.sql
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,33 @@ CREATE TABLE "Orders" (
name varchar
);
INSERT INTO "Orders" VALUES (1, 'happy');


CREATE TABLE IF NOT EXISTS partitioned_timestamp_table(
c_int int,
c_boolean boolean,
c_timestamp timestamp,
PRIMARY KEY (c_int, c_timestamp)
) PARTITION BY RANGE (c_timestamp);

CREATE TABLE partitioned_timestamp_table_2023 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2023-01-01') TO ('2023-12-31');

CREATE TABLE partitioned_timestamp_table_2024 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2024-01-01') TO ('2024-12-31');

CREATE TABLE partitioned_timestamp_table_2025 PARTITION OF partitioned_timestamp_table
FOR VALUES FROM ('2025-01-01') TO ('2025-12-31');

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(1, false, '2023-02-01 10:30:00'),
(2, false, '2023-05-15 11:45:00'),
(3, false, '2023-11-03 12:15:00'),
(4, false, '2024-01-04 13:00:00'),
(5, false, '2024-03-05 09:30:00'),
(6, false, '2024-06-06 14:20:00'),
(7, false, '2024-09-07 16:45:00'),
(8, false, '2025-01-08 18:30:00'),
(9, false, '2025-07-09 07:10:00');

create publication rw_publication_pubviaroot_false for TABLE partitioned_timestamp_table;
10 changes: 10 additions & 0 deletions e2e_test/source/cdc/postgres_cdc_insert.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,13 @@ INSERT INTO list_with_null VALUES (3, '{NULL,-3,-4}', '{NULL,nan,-inf}', '{NULL,
INSERT INTO list_with_null VALUES (4, '{-4,-5,-6}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,-99999999999999999.9999}', '{NULL,sad,ok}', '{b2e4636d-fa03-4ad4-bf16-029a79dca3e2}', '{\\x88,\\x99,\\xAA}');
INSERT INTO list_with_null VALUES (6, NULL, NULL, NULL, NULL, NULL, NULL, NULL);

INSERT INTO partitioned_timestamp_table (c_int, c_boolean, c_timestamp) VALUES
(11, true, '2023-02-01 10:30:00'),
(12, true, '2023-05-15 11:45:00'),
(13, true, '2023-11-03 12:15:00'),
(14, true, '2024-01-04 13:00:00'),
(15, true, '2024-03-05 09:30:00'),
(16, true, '2024-06-06 14:20:00'),
(17, true, '2024-09-07 16:45:00'),
(18, true, '2025-01-08 18:30:00'),
(19, true, '2025-07-09 07:10:00');
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,13 @@ public static void createPostgresPublicationIfNeeded(
if (schemaTableName.isPresent()) {
createPublicationSql =
String.format(
"CREATE PUBLICATION %s FOR TABLE %s;",
"CREATE PUBLICATION %s FOR TABLE %s WITH ( publish_via_partition_root = true );",
quotePostgres(pubName), schemaTableName.get());
} else {
createPublicationSql =
String.format("CREATE PUBLICATION %s", quotePostgres(pubName));
String.format(
"CREATE PUBLICATION %s ( publish_via_partition_root = true );",
quotePostgres(pubName));
}
try (var stmt = jdbcConnection.createStatement()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ private void validateTablePrivileges(boolean isSuperUser) throws SQLException {
private void validatePublicationConfig(boolean isSuperUser) throws SQLException {
boolean isPublicationCoversTable = false;
boolean isPublicationExists = false;
boolean isPublicationViaRoot = false;
boolean isPartialPublicationEnabled = false;

// Check whether publication exists
Expand All @@ -340,7 +341,8 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
stmt.setString(1, pubName);
var res = stmt.executeQuery();
while (res.next()) {
isPublicationExists = res.getBoolean(1);
isPublicationViaRoot = res.getBoolean(1);
isPublicationExists = true;
}
}

Expand Down Expand Up @@ -370,6 +372,31 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
isPartialPublicationEnabled = res.getBoolean(1);
}
}

List<String> partitions = new ArrayList<>();
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_names"))) {
stmt.setString(1, schemaName);
stmt.setString(2, tableName);
var res = stmt.executeQuery();
while (res.next()) {
partitions.add(res.getString(1));
}
}
if (!partitions.isEmpty() && isPublicationExists && !isPublicationViaRoot) {
// make sure the publication are created with `publish_via_partition_root = true`, which
// is required by partitioned tables.
throw ValidatorUtils.invalidArgument(
"Table '"
+ tableName
+ "' has partitions, which requires publication '"
+ pubName
+ "' to be created with `publish_via_partition_root = true`. \nHint: you can run `SELECT pubviaroot from pg_publication WHERE pubname = '"
+ pubName
+ "'` to check.");
}

// PG 15 and up supports partial publication of table
// check whether publication covers all columns of the table schema
if (isPartialPublicationEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ postgres.table_read_privilege.check=SELECT (COUNT(*) = 1) FROM information_schem
postgres.table_owner=SELECT tableowner FROM pg_tables WHERE schemaname = ? and tablename = ?
postgres.publication_att_exists=SELECT count(*) > 0 FROM information_schema.columns WHERE table_name = 'pg_publication_tables' AND column_name = 'attnames'
postgres.publication_attnames=SELECT attnames FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.publication_exist=SELECT COUNT(*) > 0 from pg_publication WHERE pubname = ?
postgres.publication_exist=SELECT pubviaroot from pg_publication WHERE pubname = ?
postgres.publication_has_table=SELECT COUNT(*) > 0 AS count FROM pg_publication_tables WHERE schemaname = ? AND tablename = ? AND pubname = ?
postgres.partition_names=SELECT c.relname AS partition_name FROM pg_inherits AS i JOIN pg_class AS c ON i.inhrelid = c.oid JOIN pg_class AS p ON i.inhparent = p.oid JOIN pg_namespace AS n ON p.relnamespace = n.oid WHERE n.nspname = ? AND p.relname = ?;
postgres.users_of_group=WITH RECURSIVE base (g, m) AS (( \
SELECT r1.rolname as group, ARRAY_AGG(DISTINCT(r2.rolname)) as members FROM pg_auth_members am \
INNER JOIN pg_roles r1 ON r1.oid = am.roleid \
Expand Down

0 comments on commit 52ce057

Please sign in to comment.