Skip to content

Commit

Permalink
[FLINK-36790][cdc-connector][paimon] set waitCompaction with true dur…
Browse files Browse the repository at this point in the history
…ing prepare common of PaimonWriter to avoid CME problem
  • Loading branch information
wuzhiping committed Nov 26, 2024
1 parent c969957 commit ac65f4d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public Collection<MultiTableCommittable> prepareCommit() throws IOException {
for (Map.Entry<Identifier, StoreSinkWrite> entry : writes.entrySet()) {
Identifier key = entry.getKey();
StoreSinkWrite write = entry.getValue();
boolean waitCompaction = false;
boolean waitCompaction = true;
committables.addAll(
// here we set it to lastCheckpointId+1 to
// avoid prepareCommit the same checkpointId with the first round.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.params.provider.CsvSource;

import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -140,7 +140,7 @@ private void initialize(String metastore)
.dropDatabase(TEST_DATABASE, true, true);
}

private List<Event> createTestEvents() throws SchemaEvolveException {
private List<Event> createTestEvents(boolean enableDeleteVectors) throws SchemaEvolveException {
List<Event> testEvents = new ArrayList<>();
// create table
Schema schema =
Expand All @@ -149,6 +149,7 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
.physicalColumn("col2", DataTypes.STRING())
.primaryKey("col1")
.option("bucket", "1")
.option("deletion-vectors.enabled", String.valueOf(enableDeleteVectors))
.build();
CreateTableEvent createTableEvent = new CreateTableEvent(table1, schema);
testEvents.add(createTableEvent);
Expand Down Expand Up @@ -180,8 +181,8 @@ private List<Event> createTestEvents() throws SchemaEvolveException {
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testSinkWithDataChange(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithDataChange(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -192,7 +193,7 @@ public void testSinkWithDataChange(String metastore)
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// insert
for (Event event : createTestEvents()) {
for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null);
}
writer.flush(false);
Expand All @@ -215,7 +216,7 @@ public void testSinkWithDataChange(String metastore)
// delete
Event event =
DataChangeEvent.deleteEvent(
TableId.tableId("test", "table1"),
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("1"),
Expand All @@ -240,7 +241,7 @@ public void testSinkWithDataChange(String metastore)
// update
event =
DataChangeEvent.updateEvent(
TableId.tableId("test", "table1"),
table1,
generator.generate(
new Object[] {
BinaryStringData.fromString("2"),
Expand Down Expand Up @@ -273,17 +274,19 @@ public void testSinkWithDataChange(String metastore)
.collect()
.forEachRemaining(result::add);
// Each commit will generate one sequence number(equal to checkpointId).
Assertions.assertEquals(
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 2L),
Row.ofKind(RowKind.INSERT, 3L)),
result);
List<Row> expected =
enableDeleteVector
? Collections.singletonList(Row.ofKind(RowKind.INSERT, 3L))
: Arrays.asList(
Row.ofKind(RowKind.INSERT, 1L),
Row.ofKind(RowKind.INSERT, 2L),
Row.ofKind(RowKind.INSERT, 3L));
Assertions.assertEquals(expected, result);
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testSinkWithSchemaChange(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithSchemaChange(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -294,7 +297,7 @@ public void testSinkWithSchemaChange(String metastore)
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// 1. receive only DataChangeEvents during one checkpoint
for (Event event : createTestEvents()) {
for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null);
}
writer.flush(false);
Expand Down Expand Up @@ -427,8 +430,8 @@ public void testSinkWithSchemaChange(String metastore)
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testSinkWithMultiTables(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testSinkWithMultiTables(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -437,7 +440,7 @@ public void testSinkWithMultiTables(String metastore)
catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault()));
PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext());
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();
List<Event> testEvents = createTestEvents();
List<Event> testEvents = createTestEvents(enableDeleteVector);
// create table
TableId table2 = TableId.tableId("test", "table2");
Schema schema =
Expand Down Expand Up @@ -492,8 +495,8 @@ public void testSinkWithMultiTables(String metastore)
}

@ParameterizedTest
@ValueSource(strings = {"filesystem", "hive"})
public void testDuplicateCommitAfterRestore(String metastore)
@CsvSource({"filesystem, true", "filesystem, false", "hive, true", "hive, false"})
public void testDuplicateCommitAfterRestore(String metastore, boolean enableDeleteVector)
throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException,
Catalog.DatabaseNotExistException, SchemaEvolveException {
initialize(metastore);
Expand All @@ -504,7 +507,7 @@ public void testDuplicateCommitAfterRestore(String metastore)
Committer<MultiTableCommittable> committer = paimonSink.createCommitter();

// insert
for (Event event : createTestEvents()) {
for (Event event : createTestEvents(enableDeleteVector)) {
writer.write(event, null);
}
writer.flush(false);
Expand Down Expand Up @@ -553,8 +556,13 @@ public void testDuplicateCommitAfterRestore(String metastore)
.execute()
.collect()
.forEachRemaining(result::add);
// 8 APPEND and 1 COMPACT
Assertions.assertEquals(result.size(), 9);
if (enableDeleteVector) {
// Each APPEND will trigger COMPACT once enable deletion-vectors.
Assertions.assertEquals(16, result.size());
} else {
// 8 APPEND and 1 COMPACT
Assertions.assertEquals(9, result.size());
}
result.clear();

tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")
Expand Down

0 comments on commit ac65f4d

Please sign in to comment.