Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Managed Iceberg] Create tables if needed #32686

Merged
merged 6 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
## I/Os

* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
Expand All @@ -46,9 +48,15 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFile;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions.
Expand All @@ -74,6 +82,7 @@
* #getManifestFiles()}.
*/
class RecordWriterManager implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class);
private final Counter dataFilesWritten =
Metrics.counter(RecordWriterManager.class, "dataFilesWritten");
private final Counter manifestFilesWritten =
Expand All @@ -95,7 +104,7 @@ class DestinationState {
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
private final List<DataFile> dataFiles = Lists.newArrayList();
@VisibleForTesting final Cache<PartitionKey, RecordWriter> writers;
final Cache<PartitionKey, RecordWriter> writers;
@VisibleForTesting final Map<PartitionKey, Integer> writerCounts = Maps.newHashMap();

DestinationState(IcebergDestination icebergDestination, Table table) {
Expand Down Expand Up @@ -211,6 +220,8 @@ private String getManifestFileLocation(PaneInfo paneInfo) {

private final Map<WindowedValue<IcebergDestination>, List<ManifestFile>> totalManifestFiles =
Maps.newHashMap();
private static final Cache<TableIdentifier, Table> TABLE_CACHE =
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();

private boolean isClosed = false;

Expand All @@ -221,6 +232,41 @@ private String getManifestFileLocation(PaneInfo paneInfo) {
this.maxNumWriters = maxNumWriters;
}

/**
* Returns an Iceberg {@link Table}.
*
* <p>First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we
* attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
* create it, inferring the table schema from the record schema.
*
* <p>Note that this is a best-effort operation that depends on the {@link Catalog}
* implementation. Although it is expected, some implementations may not support creating a table
* using the Iceberg API.
*/
private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
@Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
if (table == null) {
try {
table = catalog.loadTable(identifier);
} catch (NoSuchTableException e) {
try {
org.apache.iceberg.Schema tableSchema =
IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
// TODO(ahmedabu98): support creating a table with a specified partition spec
table = catalog.createTable(identifier, tableSchema);
LOG.info(
"Successfully created Iceberg table '{}' with schema: {}", identifier, tableSchema);
} catch (AlreadyExistsException alreadyExistsException) {
// handle race condition where workers are concurrently creating the same table.
// if running into already exists exception, we perform one last load
table = catalog.loadTable(identifier);
}
}
TABLE_CACHE.put(identifier, table);
}
return table;
}

/**
* Fetches the appropriate {@link RecordWriter} for this destination and partition and writes the
* record.
Expand All @@ -233,7 +279,16 @@ public boolean write(WindowedValue<IcebergDestination> icebergDestination, Row r
destinations.computeIfAbsent(
icebergDestination,
destination -> {
Table table = catalog.loadTable(destination.getValue().getTableIdentifier());
TableIdentifier identifier = destination.getValue().getTableIdentifier();
Table table;
try {
table =
TABLE_CACHE.get(
identifier, () -> getOrCreateTable(identifier, row.getSchema()));
} catch (ExecutionException e) {
throw new RuntimeException(
"Error while fetching or creating table: " + identifier, e);
}
return new DestinationState(destination.getValue(), table);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.io.Serializable;
Expand Down Expand Up @@ -290,14 +291,16 @@ public void testRead() throws Exception {
*/
@Test
public void testWrite() {
Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA);

// Write with Beam
// Expect the sink to create the table
Map<String, Object> config = managedIcebergConfig(tableId);
PCollection<Row> input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();

Table table = catalog.loadTable(TableIdentifier.parse(tableId));
assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));

// Read back and check records are correct
List<Record> returnedRecords = readRecords(table);
assertThat(
Expand Down Expand Up @@ -434,22 +437,23 @@ private void writeToDynamicDestinations(

Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());

PartitionSpec partitionSpec = null;
TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId + "_0_a");
TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId + "_1_b");
TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId + "_2_c");
TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId + "_3_d");
TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId + "_4_e");
// the sink doesn't support creating partitioned tables yet,
// so we need to create it manually for this test case
if (partitioning) {
Preconditions.checkState(filterOp == null || !filterOp.equals("only"));
partitionSpec =
PartitionSpec partitionSpec =
PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build();
catalog.createTable(tableIdentifier0, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier1, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier2, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier3, tableSchema, partitionSpec);
catalog.createTable(tableIdentifier4, tableSchema, partitionSpec);
}
Table table0 =
catalog.createTable(TableIdentifier.parse(tableId + "_0_a"), tableSchema, partitionSpec);
Table table1 =
catalog.createTable(TableIdentifier.parse(tableId + "_1_b"), tableSchema, partitionSpec);
Table table2 =
catalog.createTable(TableIdentifier.parse(tableId + "_2_c"), tableSchema, partitionSpec);
Table table3 =
catalog.createTable(TableIdentifier.parse(tableId + "_3_d"), tableSchema, partitionSpec);
Table table4 =
catalog.createTable(TableIdentifier.parse(tableId + "_4_e"), tableSchema, partitionSpec);

// Write with Beam
PCollection<Row> input;
Expand All @@ -467,6 +471,16 @@ private void writeToDynamicDestinations(
input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig));
pipeline.run().waitUntilFinish();

Table table0 = catalog.loadTable(tableIdentifier0);
Table table1 = catalog.loadTable(tableIdentifier1);
Table table2 = catalog.loadTable(tableIdentifier2);
Table table3 = catalog.loadTable(tableIdentifier3);
Table table4 = catalog.loadTable(tableIdentifier4);

for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) {
assertTrue(t.schema().sameSchema(tableSchema));
}

// Read back and check records are correct
List<List<Record>> returnedRecords =
Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ public void testSimpleAppend() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand All @@ -104,6 +101,7 @@ public void testSimpleAppend() throws Exception {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");

Table table = warehouse.loadTable(tableId);
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());

assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
Expand All @@ -117,11 +115,6 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception {
final TableIdentifier table2Id = TableIdentifier.of("default", "table2-" + salt);
final TableIdentifier table3Id = TableIdentifier.of("default", "table3-" + salt);

// Create a table and add records to it.
Table table1 = warehouse.createTable(table1Id, TestFixtures.SCHEMA);
Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand Down Expand Up @@ -177,6 +170,10 @@ public IcebergDestination instantiateDestination(String dest) {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");

Table table1 = warehouse.loadTable(table1Id);
Table table2 = warehouse.loadTable(table2Id);
Table table3 = warehouse.loadTable(table3Id);

List<Record> writtenRecords1 = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List<Record> writtenRecords2 = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
List<Record> writtenRecords3 = ImmutableList.copyOf(IcebergGenerics.read(table3).build());
Expand Down Expand Up @@ -320,9 +317,6 @@ public void testStreamingWrite() {
TableIdentifier.of(
"default", "streaming_" + Long.toString(UUID.randomUUID().hashCode(), 16));

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
Expand Down Expand Up @@ -365,6 +359,8 @@ public void testStreamingWrite() {
PAssert.that(snapshots).containsInAnyOrder(2L);
testPipeline.run().waitUntilFinish();

Table table = warehouse.loadTable(tableId);

List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public void testBuildTransformWithRow() {
public void testSimpleAppend() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);

TableIdentifier tableId = TableIdentifier.parse(identifier);

// Create a table and add records to it.
Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);

Map<String, String> properties = new HashMap<>();
properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
properties.put("warehouse", warehouse.location);
Expand Down Expand Up @@ -129,6 +124,9 @@ public void testSimpleAppend() {

testPipeline.run().waitUntilFinish();

TableIdentifier tableId = TableIdentifier.parse(identifier);
Table table = warehouse.loadTable(tableId);

List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());

assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
Expand All @@ -137,7 +135,6 @@ public void testSimpleAppend() {
@Test
public void testWriteUsingManagedTransform() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
Table table = warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA);

String yamlConfig =
String.format(
Expand All @@ -161,6 +158,7 @@ public void testWriteUsingManagedTransform() {

testPipeline.run().waitUntilFinish();

Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
List<Record> writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
Expand Down Expand Up @@ -261,9 +259,6 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo

org.apache.iceberg.Schema icebergSchema =
IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema());
Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), icebergSchema);
Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), icebergSchema);
Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), icebergSchema);

TestStream<Row> stream =
TestStream.create(beamSchema)
Expand Down Expand Up @@ -301,6 +296,9 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo

testPipeline.run().waitUntilFinish();

Table table0 = warehouse.loadTable(TableIdentifier.parse(identifier0));
Table table1 = warehouse.loadTable(TableIdentifier.parse(identifier1));
Table table2 = warehouse.loadTable(TableIdentifier.parse(identifier2));
List<Record> table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build());
List<Record> table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List<Record> table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,8 @@ public Table createTable(
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec);
}

public Table loadTable(TableIdentifier tableId) {
return catalog.loadTable(tableId);
}
}
Loading