From ac65f4dca1c638d23c64816811cd846bd572b87b Mon Sep 17 00:00:00 2001 From: wuzhiping Date: Tue, 26 Nov 2024 09:35:05 +0800 Subject: [PATCH] [FLINK-36790][cdc-connector][paimon] set waitCompaction with true during prepare common of PaimonWriter to avoid CME problem --- .../paimon/sink/v2/PaimonWriter.java | 2 +- .../paimon/sink/v2/PaimonSinkITCase.java | 56 +++++++++++-------- 2 files changed, 33 insertions(+), 25 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index aa4dd2cb28..fcf5224683 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -96,7 +96,7 @@ public Collection prepareCommit() throws IOException { for (Map.Entry entry : writes.entrySet()) { Identifier key = entry.getKey(); StoreSinkWrite write = entry.getValue(); - boolean waitCompaction = false; + boolean waitCompaction = true; committables.addAll( // here we set it to lastCheckpointId+1 to // avoid prepareCommit the same checkpointId with the first round. diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index 5635dcfd88..3a554ef2f8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -58,7 +58,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import java.io.File; import java.io.IOException; @@ -140,7 +140,7 @@ private void initialize(String metastore) .dropDatabase(TEST_DATABASE, true, true); } - private List createTestEvents() throws SchemaEvolveException { + private List createTestEvents(boolean enableDeleteVectors) throws SchemaEvolveException { List testEvents = new ArrayList<>(); // create table Schema schema = @@ -149,6 +149,7 @@ private List createTestEvents() throws SchemaEvolveException { .physicalColumn("col2", DataTypes.STRING()) .primaryKey("col1") .option("bucket", "1") + .option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors)) .build(); CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema); testEvents.add(createTableEvent); @@ -180,8 +181,8 @@ private List createTestEvents() throws SchemaEvolveException { } @ParameterizedTest - @ValueSource(strings = {"filesystem", "hive"}) - public void testSinkWithDataChange(String metastore) + @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) + public void testSinkWithDataChange(String metastore, boolean enableDeleteVector) throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException, SchemaEvolveException { initialize(metastore); @@ -192,7 +193,7 @@ public void testSinkWithDataChange(String metastore) Committer committer = paimonSink.createCommitter(); // insert - for (Event event : createTestEvents()) { + for (Event event : createTestEvents(enableDeleteVector)) { writer.write(event, null); } writer.flush(false); @@ -215,7 +216,7 @@ public void testSinkWithDataChange(String metastore) // delete Event event = DataChangeEvent.deleteEvent( - TableId.tableId("test", "table1"), + table1, generator.generate( new Object[] { BinaryStringData.fromString("1"), @@ -240,7 +241,7 @@ public void testSinkWithDataChange(String metastore) // update event = DataChangeEvent.updateEvent( - TableId.tableId("test", "table1"), + table1, generator.generate( new Object[] { BinaryStringData.fromString("2"), @@ -273,17 +274,19 @@ public void testSinkWithDataChange(String metastore) .collect() .forEachRemaining(result::add); // Each commit will generate one sequence number(equal to checkpointId). - Assertions.assertEquals( - Arrays.asList( - Row.ofKind(RowKind.INSERT, 1L), - Row.ofKind(RowKind.INSERT, 2L), - Row.ofKind(RowKind.INSERT, 3L)), - result); + List expected = + enableDeleteVector + ? Collections.singletonList(Row.ofKind(RowKind.INSERT, 3L)) + : Arrays.asList( + Row.ofKind(RowKind.INSERT, 1L), + Row.ofKind(RowKind.INSERT, 2L), + Row.ofKind(RowKind.INSERT, 3L)); + Assertions.assertEquals(expected, result); } @ParameterizedTest - @ValueSource(strings = {"filesystem", "hive"}) - public void testSinkWithSchemaChange(String metastore) + @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) + public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVector) throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException, SchemaEvolveException { initialize(metastore); @@ -294,7 +297,7 @@ public void testSinkWithSchemaChange(String metastore) Committer committer = paimonSink.createCommitter(); // 1. receive only DataChangeEvents during one checkpoint - for (Event event : createTestEvents()) { + for (Event event : createTestEvents(enableDeleteVector)) { writer.write(event, null); } writer.flush(false); @@ -427,8 +430,8 @@ public void testSinkWithSchemaChange(String metastore) } @ParameterizedTest - @ValueSource(strings = {"filesystem", "hive"}) - public void testSinkWithMultiTables(String metastore) + @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) + public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector) throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException, SchemaEvolveException { initialize(metastore); @@ -437,7 +440,7 @@ public void testSinkWithMultiTables(String metastore) catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); PaimonWriter writer = paimonSink.createWriter(new MockInitContext()); Committer committer = paimonSink.createCommitter(); - List testEvents = createTestEvents(); + List testEvents = createTestEvents(enableDeleteVector); // create table TableId table2 = TableId.tableId("test", "table2"); Schema schema = @@ -492,8 +495,8 @@ public void testSinkWithMultiTables(String metastore) } @ParameterizedTest - @ValueSource(strings = {"filesystem", "hive"}) - public void testDuplicateCommitAfterRestore(String metastore) + @CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"}) + public void testDuplicateCommitAfterRestore(String metastore, boolean enableDeleteVector) throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, Catalog.DatabaseNotExistException, SchemaEvolveException { initialize(metastore); @@ -504,7 +507,7 @@ public void testDuplicateCommitAfterRestore(String metastore) Committer committer = paimonSink.createCommitter(); // insert - for (Event event : createTestEvents()) { + for (Event event : createTestEvents(enableDeleteVector)) { writer.write(event, null); } writer.flush(false); @@ -553,8 +556,13 @@ public void testDuplicateCommitAfterRestore(String metastore) .execute() .collect() .forEachRemaining(result::add); - // 8 APPEND and 1 COMPACT - Assertions.assertEquals(result.size(), 9); + if (enableDeleteVector) { + // Each APPEND will trigger COMPACT once enable deletion-vectors. + Assertions.assertEquals(16, result.size()); + } else { + // 8 APPEND and 1 COMPACT + Assertions.assertEquals(9, result.size()); + } result.clear(); tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")