diff --git a/.github/trigger_files/IO_Iceberg_Integration_Tests.json b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
index bbdc3a3910ef..3f63c0c9975f 100644
--- a/.github/trigger_files/IO_Iceberg_Integration_Tests.json
+++ b/.github/trigger_files/IO_Iceberg_Integration_Tests.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
- "modification": 3
+ "modification": 2
}
diff --git a/CHANGES.md b/CHANGES.md
index 364b1a5fbdef..fcb02d1d996a 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -63,6 +63,7 @@
## I/Os
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
+* [Managed Iceberg] Support creating tables if needed ([#32686](https://github.com/apache/beam/pull/32686))
## New Features / Improvements
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
index 5b63803a52d0..d12f8914a338 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergIO.java
@@ -34,6 +34,7 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
@@ -106,6 +107,14 @@
*
Additional configuration options are provided in the `Pre-filtering Options` section below,
* for Iceberg writes.
*
+ *
Creating Tables
+ *
+ * If an Iceberg table does not exist at the time of writing, this connector will automatically
+ * create one with the data's schema.
+ *
+ *
Note that this is a best-effort operation that depends on the {@link Catalog} implementation.
+ * Some implementations may not support creating a table using the Iceberg API.
+ *
*
Beam Rows
*
* Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s.
diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
index 055c8882b72c..396db7c20f36 100644
--- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
+++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/RecordWriterManager.java
@@ -25,7 +25,9 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.Row;
@@ -41,7 +43,13 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A writer that manages multiple {@link RecordWriter}s to write to multiple tables and partitions.
@@ -66,6 +74,7 @@
* #getSerializableDataFiles()}.
*/
class RecordWriterManager implements AutoCloseable {
+ private static final Logger LOG = LoggerFactory.getLogger(RecordWriterManager.class);
/**
* Represents the state of one Iceberg table destination. Creates one {@link RecordWriter} per
* partition and manages them in a {@link Cache}.
@@ -79,8 +88,8 @@ class DestinationState {
private final PartitionKey partitionKey;
private final Table table;
private final String stateToken = UUID.randomUUID().toString();
+ final Cache writers;
private final List dataFiles = Lists.newArrayList();
- @VisibleForTesting final Cache writers;
@VisibleForTesting final Map writerCounts = Maps.newHashMap();
DestinationState(IcebergDestination icebergDestination, Table table) {
@@ -186,6 +195,8 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
private final Map, List>
totalSerializableDataFiles = Maps.newHashMap();
+ private static final Cache TABLE_CACHE =
+ CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
private boolean isClosed = false;
@@ -196,6 +207,40 @@ private RecordWriter createWriter(PartitionKey partitionKey) {
this.maxNumWriters = maxNumWriters;
}
+ /**
+ * Returns an Iceberg {@link Table}.
+ *
+ * First attempts to fetch the table from the {@link #TABLE_CACHE}. If it's not there, we
+ * attempt to load it using the Iceberg API. If the table doesn't exist at all, we attempt to
+ * create it, inferring the table schema from the record schema.
+ *
+ *
Note that this is a best-effort operation that depends on the {@link Catalog}
+ * implementation. Although it is expected, some implementations may not support creating a table
+ * using the Iceberg API.
+ */
+ private Table getOrCreateTable(TableIdentifier identifier, Schema dataSchema) {
+ @Nullable Table table = TABLE_CACHE.getIfPresent(identifier);
+ if (table == null) {
+ try {
+ table = catalog.loadTable(identifier);
+ } catch (NoSuchTableException e) {
+ try {
+ org.apache.iceberg.Schema tableSchema =
+ IcebergUtils.beamSchemaToIcebergSchema(dataSchema);
+ // TODO(ahmedabu98): support creating a table with a specified partition spec
+ table = catalog.createTable(identifier, tableSchema);
+ LOG.info("Created Iceberg table '{}' with schema: {}", identifier, tableSchema);
+ } catch (AlreadyExistsException alreadyExistsException) {
+ // handle race condition where workers are concurrently creating the same table.
+ // if running into already exists exception, we perform one last load
+ table = catalog.loadTable(identifier);
+ }
+ }
+ TABLE_CACHE.put(identifier, table);
+ }
+ return table;
+ }
+
/**
* Fetches the appropriate {@link RecordWriter} for this destination and partition and writes the
* record.
@@ -208,7 +253,16 @@ public boolean write(WindowedValue icebergDestination, Row r
destinations.computeIfAbsent(
icebergDestination,
destination -> {
- Table table = catalog.loadTable(destination.getValue().getTableIdentifier());
+ TableIdentifier identifier = destination.getValue().getTableIdentifier();
+ Table table;
+ try {
+ table =
+ TABLE_CACHE.get(
+ identifier, () -> getOrCreateTable(identifier, row.getSchema()));
+ } catch (ExecutionException e) {
+ throw new RuntimeException(
+ "Error while fetching or creating table: " + identifier, e);
+ }
return new DestinationState(destination.getValue(), table);
});
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
index 84f2146275f0..a5c034ac901a 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java
@@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.Serializable;
@@ -290,14 +291,16 @@ public void testRead() throws Exception {
*/
@Test
public void testWrite() {
- Table table = catalog.createTable(TableIdentifier.parse(tableId), ICEBERG_SCHEMA);
-
// Write with Beam
+ // Expect the sink to create the table
Map config = managedIcebergConfig(tableId);
PCollection input = pipeline.apply(Create.of(INPUT_ROWS)).setRowSchema(BEAM_SCHEMA);
input.apply(Managed.write(Managed.ICEBERG).withConfig(config));
pipeline.run().waitUntilFinish();
+ Table table = catalog.loadTable(TableIdentifier.parse(tableId));
+ assertTrue(table.schema().sameSchema(ICEBERG_SCHEMA));
+
// Read back and check records are correct
List returnedRecords = readRecords(table);
assertThat(
@@ -434,22 +437,23 @@ private void writeToDynamicDestinations(
Schema tableSchema = IcebergUtils.beamSchemaToIcebergSchema(rowFilter.outputSchema());
- PartitionSpec partitionSpec = null;
+ TableIdentifier tableIdentifier0 = TableIdentifier.parse(tableId + "_0_a");
+ TableIdentifier tableIdentifier1 = TableIdentifier.parse(tableId + "_1_b");
+ TableIdentifier tableIdentifier2 = TableIdentifier.parse(tableId + "_2_c");
+ TableIdentifier tableIdentifier3 = TableIdentifier.parse(tableId + "_3_d");
+ TableIdentifier tableIdentifier4 = TableIdentifier.parse(tableId + "_4_e");
+ // the sink doesn't support creating partitioned tables yet,
+ // so we need to create it manually for this test case
if (partitioning) {
Preconditions.checkState(filterOp == null || !filterOp.equals("only"));
- partitionSpec =
+ PartitionSpec partitionSpec =
PartitionSpec.builderFor(tableSchema).identity("bool").identity("modulo_5").build();
+ catalog.createTable(tableIdentifier0, tableSchema, partitionSpec);
+ catalog.createTable(tableIdentifier1, tableSchema, partitionSpec);
+ catalog.createTable(tableIdentifier2, tableSchema, partitionSpec);
+ catalog.createTable(tableIdentifier3, tableSchema, partitionSpec);
+ catalog.createTable(tableIdentifier4, tableSchema, partitionSpec);
}
- Table table0 =
- catalog.createTable(TableIdentifier.parse(tableId + "_0_a"), tableSchema, partitionSpec);
- Table table1 =
- catalog.createTable(TableIdentifier.parse(tableId + "_1_b"), tableSchema, partitionSpec);
- Table table2 =
- catalog.createTable(TableIdentifier.parse(tableId + "_2_c"), tableSchema, partitionSpec);
- Table table3 =
- catalog.createTable(TableIdentifier.parse(tableId + "_3_d"), tableSchema, partitionSpec);
- Table table4 =
- catalog.createTable(TableIdentifier.parse(tableId + "_4_e"), tableSchema, partitionSpec);
// Write with Beam
PCollection input;
@@ -467,6 +471,16 @@ private void writeToDynamicDestinations(
input.setRowSchema(BEAM_SCHEMA).apply(Managed.write(Managed.ICEBERG).withConfig(writeConfig));
pipeline.run().waitUntilFinish();
+ Table table0 = catalog.loadTable(tableIdentifier0);
+ Table table1 = catalog.loadTable(tableIdentifier1);
+ Table table2 = catalog.loadTable(tableIdentifier2);
+ Table table3 = catalog.loadTable(tableIdentifier3);
+ Table table4 = catalog.loadTable(tableIdentifier4);
+
+ for (Table t : Arrays.asList(table0, table1, table2, table3, table4)) {
+ assertTrue(t.schema().sameSchema(tableSchema));
+ }
+
// Read back and check records are correct
List> returnedRecords =
Arrays.asList(
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
index e62c22be7968..87a543a439ec 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOWriteTest.java
@@ -80,9 +80,6 @@ public void testSimpleAppend() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
- // Create a table and add records to it.
- Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
-
Map catalogProps =
ImmutableMap.builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
@@ -104,6 +101,7 @@ public void testSimpleAppend() throws Exception {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");
+ Table table = warehouse.loadTable(tableId);
List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
@@ -117,11 +115,6 @@ public void testDynamicDestinationsWithoutSpillover() throws Exception {
final TableIdentifier table2Id = TableIdentifier.of("default", "table2-" + salt);
final TableIdentifier table3Id = TableIdentifier.of("default", "table3-" + salt);
- // Create a table and add records to it.
- Table table1 = warehouse.createTable(table1Id, TestFixtures.SCHEMA);
- Table table2 = warehouse.createTable(table2Id, TestFixtures.SCHEMA);
- Table table3 = warehouse.createTable(table3Id, TestFixtures.SCHEMA);
-
Map catalogProps =
ImmutableMap.builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
@@ -177,6 +170,10 @@ public IcebergDestination instantiateDestination(String dest) {
testPipeline.run().waitUntilFinish();
LOG.info("Done running pipeline");
+ Table table1 = warehouse.loadTable(table1Id);
+ Table table2 = warehouse.loadTable(table2Id);
+ Table table3 = warehouse.loadTable(table3Id);
+
List writtenRecords1 = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List writtenRecords2 = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
List writtenRecords3 = ImmutableList.copyOf(IcebergGenerics.read(table3).build());
@@ -320,9 +317,6 @@ public void testStreamingWrite() {
TableIdentifier.of(
"default", "streaming_" + Long.toString(UUID.randomUUID().hashCode(), 16));
- // Create a table and add records to it.
- Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
-
Map catalogProps =
ImmutableMap.builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
@@ -365,6 +359,8 @@ public void testStreamingWrite() {
PAssert.that(snapshots).containsInAnyOrder(2L);
testPipeline.run().waitUntilFinish();
+ Table table = warehouse.loadTable(tableId);
+
List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
index 3196d303239f..47dc9aa425dd 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergWriteSchemaTransformProviderTest.java
@@ -95,11 +95,6 @@ public void testBuildTransformWithRow() {
public void testSimpleAppend() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
- TableIdentifier tableId = TableIdentifier.parse(identifier);
-
- // Create a table and add records to it.
- Table table = warehouse.createTable(tableId, TestFixtures.SCHEMA);
-
Map properties = new HashMap<>();
properties.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP);
properties.put("warehouse", warehouse.location);
@@ -129,6 +124,9 @@ public void testSimpleAppend() {
testPipeline.run().waitUntilFinish();
+ TableIdentifier tableId = TableIdentifier.parse(identifier);
+ Table table = warehouse.loadTable(tableId);
+
List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
@@ -137,7 +135,6 @@ public void testSimpleAppend() {
@Test
public void testWriteUsingManagedTransform() {
String identifier = "default.table_" + Long.toString(UUID.randomUUID().hashCode(), 16);
- Table table = warehouse.createTable(TableIdentifier.parse(identifier), TestFixtures.SCHEMA);
String yamlConfig =
String.format(
@@ -161,6 +158,7 @@ public void testWriteUsingManagedTransform() {
testPipeline.run().waitUntilFinish();
+ Table table = warehouse.loadTable(TableIdentifier.parse(identifier));
List writtenRecords = ImmutableList.copyOf(IcebergGenerics.read(table).build());
assertThat(writtenRecords, Matchers.containsInAnyOrder(TestFixtures.FILE1SNAPSHOT1.toArray()));
}
@@ -261,9 +259,6 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo
org.apache.iceberg.Schema icebergSchema =
IcebergUtils.beamSchemaToIcebergSchema(filter.outputSchema());
- Table table0 = warehouse.createTable(TableIdentifier.parse(identifier0), icebergSchema);
- Table table1 = warehouse.createTable(TableIdentifier.parse(identifier1), icebergSchema);
- Table table2 = warehouse.createTable(TableIdentifier.parse(identifier2), icebergSchema);
TestStream stream =
TestStream.create(beamSchema)
@@ -301,6 +296,9 @@ private void writeToDynamicDestinationsAndFilter(@Nullable String operation, boo
testPipeline.run().waitUntilFinish();
+ Table table0 = warehouse.loadTable(TableIdentifier.parse(identifier0));
+ Table table1 = warehouse.loadTable(TableIdentifier.parse(identifier1));
+ Table table2 = warehouse.loadTable(TableIdentifier.parse(identifier2));
List table0Records = ImmutableList.copyOf(IcebergGenerics.read(table0).build());
List table1Records = ImmutableList.copyOf(IcebergGenerics.read(table1).build());
List table2Records = ImmutableList.copyOf(IcebergGenerics.read(table2).build());
diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java
index ad4fc6b382d4..1e1c84d31de9 100644
--- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java
+++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/TestDataWarehouse.java
@@ -149,4 +149,8 @@ public Table createTable(
someTableHasBeenCreated = true;
return catalog.createTable(tableId, schema, partitionSpec);
}
+
+ public Table loadTable(TableIdentifier tableId) {
+ return catalog.loadTable(tableId);
+ }
}