Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Oct 21, 2024
1 parent baba789 commit 40cdde1
Showing 1 changed file with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public void setUp() {
windowedDestination =
getWindowedDestination("table_" + testName.getMethodName(), PARTITION_SPEC);
catalog = new HadoopCatalog(new Configuration(), warehouse.location);
RecordWriterManager.TABLE_CACHE.invalidateAll();
}

private WindowedValue<IcebergDestination> getWindowedDestination(
Expand Down Expand Up @@ -329,6 +330,39 @@ public void testSerializableDataFileRoundTripEquality() throws IOException {
*/
@Test
public void testRecreateSerializableDataAfterUpdatingPartitionSpec() throws IOException {
PartitionKey partitionKey = new PartitionKey(PARTITION_SPEC, ICEBERG_SCHEMA);

Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build();
Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcxyz", true).build();
// same partition for both records (name_trunc=abc, bool=true)
partitionKey.partition(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));

// write some rows
RecordWriter writer =
new RecordWriter(catalog, windowedDestination.getValue(), "test_file_name", partitionKey);
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row));
writer.write(IcebergUtils.beamRowToIcebergRecord(ICEBERG_SCHEMA, row2));
writer.close();

// fetch data file and its serializable version
DataFile datafile = writer.getDataFile();
SerializableDataFile serializableDataFile = SerializableDataFile.from(datafile, partitionKey);

assertEquals(2L, datafile.recordCount());
assertEquals(serializableDataFile.getPartitionSpecId(), datafile.specId());

// update spec
Table table = catalog.loadTable(windowedDestination.getValue().getTableIdentifier());
table.updateSpec().addField("id").removeField("bool").commit();

Map<Integer, PartitionSpec> updatedSpecs = table.specs();
DataFile roundTripDataFile = serializableDataFile.createDataFile(updatedSpecs);

checkDataFileEquality(datafile, roundTripDataFile);
}

@Test
public void testWriterKeepsUpWithUpdatingPartitionSpec() throws IOException {
Table table = catalog.loadTable(windowedDestination.getValue().getTableIdentifier());
Row row = Row.withSchema(BEAM_SCHEMA).addValues(1, "abcdef", true).build();
Row row2 = Row.withSchema(BEAM_SCHEMA).addValues(2, "abcxyz", true).build();
Expand Down

0 comments on commit 40cdde1

Please sign in to comment.