Skip to content

Commit

Permalink
Merge branch 'iceberg_translation' of https://github.com/ahmedabu98/beam
Browse files Browse the repository at this point in the history
 into iceberg_write_schematransform
  • Loading branch information
ahmedabu98 committed Apr 15, 2024
2 parents 27e5fb0 + 04cc2db commit e2cb93b
Show file tree
Hide file tree
Showing 6 changed files with 491 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class BigQueryIOTranslationTest {

// A mapping from Read transform builder methods to the corresponding schema fields in
// KafkaIOTranslation.
// BigQueryIOTranslation.
static final Map<String, String> READ_TRANSFORM_SCHEMA_MAPPING = new HashMap<>();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,116 @@

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

public class IcebergIO {

public static WriteRows writeToDynamicDestinations(
IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) {
return new WriteRows(catalog, dynamicDestinations);
public static WriteRows writeRows(IcebergCatalogConfig catalog) {
return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build();
}

public static ReadTable readTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
return new ReadTable(catalogConfig, tableId);
}
@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {

abstract IcebergCatalogConfig getCatalogConfig();

abstract @Nullable TableIdentifier getTableIdentifier();

abstract @Nullable DynamicDestinations getDynamicDestinations();

abstract Builder toBuilder();

static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
@AutoValue.Builder
abstract static class Builder {
abstract Builder setCatalogConfig(IcebergCatalogConfig config);

private final IcebergCatalogConfig catalog;
private final DynamicDestinations dynamicDestinations;
abstract Builder setTableIdentifier(TableIdentifier identifier);

private WriteRows(IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) {
this.catalog = catalog;
this.dynamicDestinations = dynamicDestinations;
abstract Builder setDynamicDestinations(DynamicDestinations destinations);

abstract WriteRows build();
}

public WriteRows to(TableIdentifier identifier) {
return toBuilder().setTableIdentifier(identifier).build();
}

public WriteRows to(DynamicDestinations destinations) {
return toBuilder().setDynamicDestinations(destinations).build();
}

@Override
public IcebergWriteResult expand(PCollection<Row> input) {

List<?> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
Preconditions.checkArgument(
1 == allToArgs.stream().filter(Predicates.notNull()).count(),
"Must set exactly one of table identifier or dynamic destinations object.");

DynamicDestinations destinations = getDynamicDestinations();
if (destinations == null) {
destinations =
DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier()));
}
return input
.apply("Set Destination Metadata", new AssignDestinations(dynamicDestinations))
.apply("Set Destination Metadata", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations));
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations));
}
}

public static class ReadTable extends PTransform<PBegin, PCollection<Row>> {
public static ReadRows readRows(IcebergCatalogConfig catalogConfig) {
return new AutoValue_IcebergIO_ReadRows.Builder().setCatalogConfig(catalogConfig).build();
}

@AutoValue
public abstract static class ReadRows extends PTransform<PBegin, PCollection<Row>> {

abstract IcebergCatalogConfig getCatalogConfig();

abstract @Nullable TableIdentifier getTableIdentifier();

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setCatalogConfig(IcebergCatalogConfig config);

private final IcebergCatalogConfig catalogConfig;
private final transient @Nullable TableIdentifier tableId;
abstract Builder setTableIdentifier(TableIdentifier identifier);

private TableIdentifier getTableId() {
return checkStateNotNull(
tableId, "Transient field tableId null; it should not be accessed after serialization");
abstract ReadRows build();
}

private ReadTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
this.catalogConfig = catalogConfig;
this.tableId = tableId;
public ReadRows from(TableIdentifier tableIdentifier) {
return toBuilder().setTableIdentifier(tableIdentifier).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read from.");

Table table = catalogConfig.catalog().loadTable(getTableId());
Table table = getCatalogConfig().catalog().loadTable(tableId);

return input.apply(
Read.from(
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(catalogConfig)
.setCatalogConfig(getCatalogConfig())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(getTableId())
.setTableIdentifier(tableId)
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
.build())));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import static org.apache.beam.io.iceberg.IcebergIO.ReadRows;
import static org.apache.beam.io.iceberg.IcebergIO.WriteRows;
import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray;
import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InvalidClassException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

@SuppressWarnings({"rawtypes", "nullness"})
public class IcebergIOTranslation {
static class IcebergIOReadTranslator implements TransformPayloadTranslator<ReadRows> {

static Schema schema =
Schema.builder()
.addByteArrayField("catalog_config")
.addNullableArrayField("table_identifier", FieldType.STRING)
.build();

public static final String ICEBERG_READ_TRANSFORM_URN =
"beam:transform:org.apache.beam:iceberg_read:v1";

@Override
public String getUrn() {
return ICEBERG_READ_TRANSFORM_URN;
}

@Override
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, ReadRows> application, SdkComponents components)
throws IOException {
// Setting an empty payload since Iceberg transform payload is not actually used by runners
// currently.
return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build();
}

@Override
public Row toConfigRow(ReadRows transform) {

Map<String, Object> fieldValues = new HashMap<>();

if (transform.getCatalogConfig() != null) {
fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig()));
}
if (transform.getTableIdentifier() != null) {
TableIdentifier identifier = transform.getTableIdentifier();
List<String> identifierParts =
Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList());
identifierParts.add(identifier.name());
fieldValues.put("table_identifier", identifierParts);
}

return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

@Override
public ReadRows fromConfigRow(Row configRow, PipelineOptions options) {
try {
ReadRows.Builder builder = new AutoValue_IcebergIO_ReadRows.Builder();

byte[] catalogBytes = configRow.getBytes("catalog_config");
if (catalogBytes != null) {
builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes));
}
Collection<String> tableIdentifierParts = configRow.getArray("table_identifier");
if (tableIdentifierParts != null) {
builder =
builder.setTableIdentifier(
TableIdentifier.parse(String.join(".", tableIdentifierParts)));
}
return builder.build();
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}
}

@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar {
@Override
@SuppressWarnings({
"rawtypes",
})
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
.put(AutoValue_IcebergIO_ReadRows.class, new IcebergIOReadTranslator())
.build();
}
}

static class IcebergIOWriteTranslator implements TransformPayloadTranslator<WriteRows> {

static Schema schema =
Schema.builder()
.addByteArrayField("catalog_config")
.addNullableArrayField("table_identifier", FieldType.STRING)
.addNullableByteArrayField("dynamic_destinations")
.build();

public static final String ICEBERG_WRITE_TRANSFORM_URN =
"beam:transform:org.apache.beam:iceberg_write:v1";

@Override
public String getUrn() {
return ICEBERG_WRITE_TRANSFORM_URN;
}

@Override
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, WriteRows> application, SdkComponents components)
throws IOException {
// Setting an empty payload since Iceberg transform payload is not actually used by runners
// currently.
return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build();
}

@Override
public Row toConfigRow(WriteRows transform) {

Map<String, Object> fieldValues = new HashMap<>();

if (transform.getCatalogConfig() != null) {
fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig()));
}
if (transform.getTableIdentifier() != null) {
TableIdentifier identifier = transform.getTableIdentifier();
List<String> identifierParts =
Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList());
identifierParts.add(identifier.name());
fieldValues.put("table_identifier", identifierParts);
}
if (transform.getDynamicDestinations() != null) {
fieldValues.put("dynamic_destinations", toByteArray(transform.getDynamicDestinations()));
}

return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

@Override
public WriteRows fromConfigRow(Row configRow, PipelineOptions options) {
try {
IcebergIO.WriteRows.Builder builder = new AutoValue_IcebergIO_WriteRows.Builder();

byte[] catalogBytes = configRow.getBytes("catalog_config");
if (catalogBytes != null) {
builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes));
}
Collection<String> tableIdentifierParts = configRow.getArray("table_identifier");
if (tableIdentifierParts != null) {
builder =
builder.setTableIdentifier(
TableIdentifier.parse(String.join(".", tableIdentifierParts)));
}
byte[] dynamicDestinationsBytes = configRow.getBytes("dynamic_destinations");
if (dynamicDestinationsBytes != null) {
builder =
builder.setDynamicDestinations(
(DynamicDestinations) fromByteArray(dynamicDestinationsBytes));
}
return builder.build();
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}
}

@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar {
@Override
@SuppressWarnings({
"rawtypes",
})
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
.put(AutoValue_IcebergIO_WriteRows.class, new IcebergIOWriteTranslator())
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testSimpleScan() throws Exception {

PCollection<Row> output =
testPipeline
.apply(IcebergIO.readTable(catalogConfig, tableId))
.apply(IcebergIO.readRows(catalogConfig).from(tableId))
.apply(ParDo.of(new PrintRow()))
.setCoder(
RowCoder.of(
Expand Down
Loading

0 comments on commit e2cb93b

Please sign in to comment.