Skip to content

Commit

Permalink
add more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang committed Sep 24, 2024
1 parent 0d100a9 commit 0994102
Showing 1 changed file with 17 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,8 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
if (!partitions.isEmpty() && isPublicationExists) {
// `pubviaroot` in `pg_publication` is added after PG v13, before which PG does not
// allow adding partitioned table to a publication. So here, if partitions.isEmpty() is
// false, we can safely check the value of `pubviaroot` of the publication here.
// false, which means the PG version is >= v13, we can safely check the value of
// `pubviaroot` of the publication here.
boolean isPublicationViaRoot = false;
try (var stmt =
jdbcConnection.prepareStatement(
Expand All @@ -416,7 +417,7 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
+ "'` in the upstream Postgres to check.");
}
}
// Only after v13, PG allows adding a partitioned table to a publication. So, if the
// Only after v13, PG allows adding a partitioned table to a publication. So, if the
// version is before v13, the tables in a publication are always partition leaves, we don't
// check their ancestors and descendants anymore.
if (isPublicationExists && pgVersion >= 13) {
Expand All @@ -427,12 +428,11 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_parent"))) {
stmt.setString(
1, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition));
stmt.setString(
2, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition));
stmt.setString(
3, String.format("\"%s\".\"%s\"", this.schemaName, currentPartition));
String schemaPartitionName =
String.format("\"%s\".\"%s\"", this.schemaName, currentPartition);
stmt.setString(1, schemaPartitionName);
stmt.setString(2, schemaPartitionName);
stmt.setString(3, schemaPartitionName);
var res = stmt.executeQuery();
if (res.next()) {
String parent = res.getString(1);
Expand All @@ -446,15 +446,21 @@ private void validatePublicationConfig(boolean isSuperUser) throws SQLException
try (var stmt =
jdbcConnection.prepareStatement(
ValidatorUtils.getSql("postgres.partition_descendants"))) {
stmt.setString(1, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName));
stmt.setString(2, String.format("\"%s\".\"%s\"", this.schemaName, this.tableName));
String schemaTableName =
String.format("\"%s\".\"%s\"", this.schemaName, this.tableName);
stmt.setString(1, schemaTableName);
stmt.setString(2, schemaTableName);
var res = stmt.executeQuery();
while (res.next()) {
String descendant = res.getString(1);
family.add(descendant);
}
}

// The check here was added based on experimental observations. We found that if a table
// is added to a publication where its ancestor or descendant is already included, the
// table cannot be read data from the slot correctly. Therefore, we must verify whether
// its ancestors or descendants are already in the publication. If yes, we deny the
// request.
for (String relative : family) {
try (var stmt =
jdbcConnection.prepareStatement(
Expand Down

0 comments on commit 0994102

Please sign in to comment.