Skip to content

Commit

Permalink
Support Iceberg partition identity transform
Browse files Browse the repository at this point in the history
  • Loading branch information
regadas committed Dec 9, 2024
1 parent 0dae9c9 commit 35e5255
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,22 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.Set;
import java.util.function.BiFunction;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.data.IdentityPartitionConverters;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.avro.DataReader;
import org.apache.iceberg.data.orc.GenericOrcReader;
Expand All @@ -42,6 +49,9 @@
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PartitionUtil;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -112,6 +122,8 @@ public boolean advance() throws IOException {
FileScanTask fileTask = fileScanTasks.remove();
DataFile file = fileTask.file();
InputFile input = decryptor.getInputFile(fileTask);
Map<Integer, ?> idToConstants =
constantsMap(fileTask, IdentityPartitionConverters::convertConstant, project);

CloseableIterable<Record> iterable;
switch (file.format()) {
Expand All @@ -121,7 +133,9 @@ public boolean advance() throws IOException {
ORC.read(input)
.split(fileTask.start(), fileTask.length())
.project(project)
.createReaderFunc(fileSchema -> GenericOrcReader.buildReader(project, fileSchema))
.createReaderFunc(
fileSchema ->
GenericOrcReader.buildReader(project, fileSchema, idToConstants))
.filter(fileTask.residual())
.build();
break;
Expand All @@ -132,7 +146,8 @@ public boolean advance() throws IOException {
.split(fileTask.start(), fileTask.length())
.project(project)
.createReaderFunc(
fileSchema -> GenericParquetReaders.buildReader(project, fileSchema))
fileSchema ->
GenericParquetReaders.buildReader(project, fileSchema, idToConstants))
.filter(fileTask.residual())
.build();
break;
Expand All @@ -142,7 +157,8 @@ public boolean advance() throws IOException {
Avro.read(input)
.split(fileTask.start(), fileTask.length())
.project(project)
.createReaderFunc(DataReader::create)
.createReaderFunc(
fileSchema -> DataReader.create(project, fileSchema, idToConstants))
.build();
break;
default:
Expand All @@ -155,6 +171,20 @@ public boolean advance() throws IOException {
return false;
}

private Map<Integer, ?> constantsMap(
FileScanTask task, BiFunction<Type, Object, Object> converter, Schema schema) {
PartitionSpec spec = task.spec();
Set<Integer> idColumns = spec.identitySourceIds();
Schema partitionSchema = TypeUtil.select(schema, idColumns);
boolean projectsIdentityPartitionColumns = !partitionSchema.columns().isEmpty();

if (projectsIdentityPartitionColumns) {
return PartitionUtil.constantsMap(task, converter);
} else {
return Collections.emptyMap();
}
}

@Override
public Row getCurrent() throws NoSuchElementException {
if (current == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.io.iceberg;

import static org.apache.beam.sdk.io.iceberg.TestFixtures.createRecord;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;

Expand All @@ -35,8 +36,11 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.PartitionKey;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -122,4 +126,72 @@ public void testSimpleScan() throws Exception {

testPipeline.run();
}

@Test
public void testIdentityColumnScan() throws Exception {
TableIdentifier tableId =
TableIdentifier.of("default", "table" + Long.toString(UUID.randomUUID().hashCode(), 16));
Table simpleTable = warehouse.createTable(tableId, TestFixtures.SCHEMA);

String identityColumnName = "identity";
String identityColumnValue = "some-value";
simpleTable.updateSchema().addColumn(identityColumnName, Types.StringType.get()).commit();
simpleTable.updateSpec().addField(identityColumnName).commit();

PartitionSpec spec = simpleTable.spec();
PartitionKey partitionKey = new PartitionKey(simpleTable.spec(), simpleTable.schema());
partitionKey.set(0, identityColumnValue);

simpleTable
.newFastAppend()
.appendFile(
warehouse.writeRecords(
"file1s1.parquet",
TestFixtures.SCHEMA,
spec,
partitionKey,
TestFixtures.FILE1SNAPSHOT1))
.commit();

final Schema schema = IcebergUtils.icebergSchemaToBeamSchema(simpleTable.schema());
final List<Row> expectedRows =
Stream.of(TestFixtures.FILE1SNAPSHOT1_DATA)
.flatMap(List::stream)
.map(
d ->
ImmutableMap.<String, Object>builder()
.putAll(d)
.put(identityColumnName, identityColumnValue)
.build())
.map(r -> createRecord(simpleTable.schema(), r))
.map(record -> IcebergUtils.icebergRecordToBeamRow(schema, record))
.collect(Collectors.toList());

Map<String, String> catalogProps =
ImmutableMap.<String, String>builder()
.put("type", CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP)
.put("warehouse", warehouse.location)
.build();

IcebergCatalogConfig catalogConfig =
IcebergCatalogConfig.builder()
.setCatalogName("name")
.setCatalogProperties(catalogProps)
.build();

PCollection<Row> output =
testPipeline
.apply(IcebergIO.readRows(catalogConfig).from(tableId))
.apply(ParDo.of(new PrintRow()))
.setCoder(RowCoder.of(IcebergUtils.icebergSchemaToBeamSchema(simpleTable.schema())));

PAssert.that(output)
.satisfies(
(Iterable<Row> rows) -> {
assertThat(rows, containsInAnyOrder(expectedRows.toArray()));
return null;
});

testPipeline.run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
Expand Down Expand Up @@ -108,6 +109,16 @@ protected void after() {

public DataFile writeRecords(String filename, Schema schema, List<Record> records)
throws IOException {
return writeRecords(filename, schema, PartitionSpec.unpartitioned(), null, records);
}

public DataFile writeRecords(
String filename,
Schema schema,
PartitionSpec spec,
StructLike partition,
List<Record> records)
throws IOException {
Path path = new Path(location, filename);
FileFormat format = FileFormat.fromFileName(filename);

Expand All @@ -134,9 +145,11 @@ public DataFile writeRecords(String filename, Schema schema, List<Record> record
}
appender.addAll(records);
appender.close();
return DataFiles.builder(PartitionSpec.unpartitioned())

return DataFiles.builder(spec)
.withInputFile(HadoopInputFile.fromPath(path, hadoopConf))
.withMetrics(appender.metrics())
.withPartition(partition)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import static org.apache.iceberg.types.Types.NestedField.required;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
Expand All @@ -34,58 +37,75 @@ public class TestFixtures {
new Schema(
required(1, "id", Types.LongType.get()), optional(2, "data", Types.StringType.get()));

private static final Record genericRecord = GenericRecord.create(SCHEMA);

/* First file in test table */
public static final ImmutableList<Record> FILE1SNAPSHOT1 =
public static final List<Map<String, Object>> FILE1SNAPSHOT1_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 0L, "data", "clarification")),
genericRecord.copy(ImmutableMap.of("id", 1L, "data", "risky")),
genericRecord.copy(ImmutableMap.of("id", 2L, "data", "falafel")));
public static final ImmutableList<Record> FILE1SNAPSHOT2 =
ImmutableMap.of("id", 0L, "data", "clarification"),
ImmutableMap.of("id", 1L, "data", "risky"),
ImmutableMap.of("id", 2L, "data", "falafel"));
public static final List<Map<String, Object>> FILE1SNAPSHOT2_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 3L, "data", "obscure")),
genericRecord.copy(ImmutableMap.of("id", 4L, "data", "secure")),
genericRecord.copy(ImmutableMap.of("id", 5L, "data", "feta")));
public static final ImmutableList<Record> FILE1SNAPSHOT3 =
ImmutableMap.of("id", 3L, "data", "obscure"),
ImmutableMap.of("id", 4L, "data", "secure"),
ImmutableMap.of("id", 5L, "data", "feta"));
public static final List<Map<String, Object>> FILE1SNAPSHOT3_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 6L, "data", "brainy")),
genericRecord.copy(ImmutableMap.of("id", 7L, "data", "film")),
genericRecord.copy(ImmutableMap.of("id", 8L, "data", "feta")));

/* Second file in test table */
public static final ImmutableList<Record> FILE2SNAPSHOT1 =
ImmutableMap.of("id", 6L, "data", "brainy"),
ImmutableMap.of("id", 7L, "data", "film"),
ImmutableMap.of("id", 8L, "data", "feta"));
public static final List<Map<String, Object>> FILE2SNAPSHOT1_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 10L, "data", "clammy")),
genericRecord.copy(ImmutableMap.of("id", 11L, "data", "evacuate")),
genericRecord.copy(ImmutableMap.of("id", 12L, "data", "tissue")));
public static final ImmutableList<Record> FILE2SNAPSHOT2 =
ImmutableMap.of("id", 10L, "data", "clammy"),
ImmutableMap.of("id", 11L, "data", "evacuate"),
ImmutableMap.of("id", 12L, "data", "tissue"));
public static final List<Map<String, Object>> FILE2SNAPSHOT2_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 14L, "data", "radical")),
genericRecord.copy(ImmutableMap.of("id", 15L, "data", "collocation")),
genericRecord.copy(ImmutableMap.of("id", 16L, "data", "book")));
public static final ImmutableList<Record> FILE2SNAPSHOT3 =
ImmutableMap.of("id", 14L, "data", "radical"),
ImmutableMap.of("id", 15L, "data", "collocation"),
ImmutableMap.of("id", 16L, "data", "book"));
public static final List<Map<String, Object>> FILE2SNAPSHOT3_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 16L, "data", "cake")),
genericRecord.copy(ImmutableMap.of("id", 17L, "data", "intrinsic")),
genericRecord.copy(ImmutableMap.of("id", 18L, "data", "paper")));

/* Third file in test table */
public static final ImmutableList<Record> FILE3SNAPSHOT1 =
ImmutableMap.of("id", 16L, "data", "cake"),
ImmutableMap.of("id", 17L, "data", "intrinsic"),
ImmutableMap.of("id", 18L, "data", "paper"));
public static final List<Map<String, Object>> FILE3SNAPSHOT1_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 20L, "data", "ocean")),
genericRecord.copy(ImmutableMap.of("id", 21L, "data", "holistic")),
genericRecord.copy(ImmutableMap.of("id", 22L, "data", "preventative")));
public static final ImmutableList<Record> FILE3SNAPSHOT2 =
ImmutableMap.of("id", 20L, "data", "ocean"),
ImmutableMap.of("id", 21L, "data", "holistic"),
ImmutableMap.of("id", 22L, "data", "preventative"));
public static final List<Map<String, Object>> FILE3SNAPSHOT2_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 24L, "data", "cloud")),
genericRecord.copy(ImmutableMap.of("id", 25L, "data", "zen")),
genericRecord.copy(ImmutableMap.of("id", 26L, "data", "sky")));
public static final ImmutableList<Record> FILE3SNAPSHOT3 =
ImmutableMap.of("id", 24L, "data", "cloud"),
ImmutableMap.of("id", 25L, "data", "zen"),
ImmutableMap.of("id", 26L, "data", "sky"));
public static final List<Map<String, Object>> FILE3SNAPSHOT3_DATA =
ImmutableList.of(
genericRecord.copy(ImmutableMap.of("id", 26L, "data", "belleview")),
genericRecord.copy(ImmutableMap.of("id", 27L, "data", "overview")),
genericRecord.copy(ImmutableMap.of("id", 28L, "data", "tender")));
ImmutableMap.of("id", 26L, "data", "belleview"),
ImmutableMap.of("id", 27L, "data", "overview"),
ImmutableMap.of("id", 28L, "data", "tender"));

/* First file in test table */
public static final List<Record> FILE1SNAPSHOT1 =
Lists.transform(FILE1SNAPSHOT1_DATA, d -> createRecord(SCHEMA, d));
public static final List<Record> FILE1SNAPSHOT2 =
Lists.transform(FILE1SNAPSHOT2_DATA, d -> createRecord(SCHEMA, d));
public static final List<Record> FILE1SNAPSHOT3 =
Lists.transform(FILE1SNAPSHOT3_DATA, d -> createRecord(SCHEMA, d));

/* Second file in test table */
public static final List<Record> FILE2SNAPSHOT1 =
Lists.transform(FILE2SNAPSHOT1_DATA, d -> createRecord(SCHEMA, d));
public static final List<Record> FILE2SNAPSHOT2 =
Lists.transform(FILE2SNAPSHOT2_DATA, d -> createRecord(SCHEMA, d));
public static final List<Record> FILE2SNAPSHOT3 =
Lists.transform(FILE2SNAPSHOT3_DATA, d -> createRecord(SCHEMA, d));

/* Third file in test table */
public static final List<Record> FILE3SNAPSHOT1 =
Lists.transform(FILE3SNAPSHOT1_DATA, d -> createRecord(SCHEMA, d));
public static final List<Record> FILE3SNAPSHOT2 =
Lists.transform(FILE3SNAPSHOT2_DATA, d -> createRecord(SCHEMA, d));
public static final List<Record> FILE3SNAPSHOT3 =
Lists.transform(FILE3SNAPSHOT3_DATA, d -> createRecord(SCHEMA, d));

public static final ImmutableList<Row> asRows(Iterable<Record> records) {
ArrayList<Row> rows = new ArrayList<>();
Expand All @@ -98,4 +118,8 @@ public static final ImmutableList<Row> asRows(Iterable<Record> records) {
}
return ImmutableList.copyOf(rows);
}

public static Record createRecord(org.apache.iceberg.Schema schema, Map<String, Object> values) {
return org.apache.iceberg.data.GenericRecord.create(schema).copy(values);
}
}

0 comments on commit 35e5255

Please sign in to comment.