Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IcebergIO translation #30933

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
public class BigQueryIOTranslationTest {

// A mapping from Read transform builder methods to the corresponding schema fields in
// KafkaIOTranslation.
// BigQueryIOTranslation.
static final Map<String, String> READ_TRANSFORM_SCHEMA_MAPPING = new HashMap<>();

static {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,116 @@

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

public class IcebergIO {

public static WriteRows writeToDynamicDestinations(
IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) {
return new WriteRows(catalog, dynamicDestinations);
public static WriteRows writeRows(IcebergCatalogConfig catalog) {
return new AutoValue_IcebergIO_WriteRows.Builder().setCatalogConfig(catalog).build();
}

public static ReadTable readTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
return new ReadTable(catalogConfig, tableId);
}
@AutoValue
public abstract static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {

abstract IcebergCatalogConfig getCatalogConfig();

abstract @Nullable TableIdentifier getTableIdentifier();

abstract @Nullable DynamicDestinations getDynamicDestinations();

abstract Builder toBuilder();

static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {
@AutoValue.Builder
abstract static class Builder {
abstract Builder setCatalogConfig(IcebergCatalogConfig config);

private final IcebergCatalogConfig catalog;
private final DynamicDestinations dynamicDestinations;
abstract Builder setTableIdentifier(TableIdentifier identifier);

private WriteRows(IcebergCatalogConfig catalog, DynamicDestinations dynamicDestinations) {
this.catalog = catalog;
this.dynamicDestinations = dynamicDestinations;
abstract Builder setDynamicDestinations(DynamicDestinations destinations);

abstract WriteRows build();
}

public WriteRows to(TableIdentifier identifier) {
return toBuilder().setTableIdentifier(identifier).build();
}

public WriteRows to(DynamicDestinations destinations) {
return toBuilder().setDynamicDestinations(destinations).build();
}

@Override
public IcebergWriteResult expand(PCollection<Row> input) {

List<?> allToArgs = Arrays.asList(getTableIdentifier(), getDynamicDestinations());
Preconditions.checkArgument(
1 == allToArgs.stream().filter(Predicates.notNull()).count(),
"Must set exactly one of table identifier or dynamic destinations object.");

DynamicDestinations destinations = getDynamicDestinations();
if (destinations == null) {
destinations =
DynamicDestinations.singleTable(Preconditions.checkNotNull(getTableIdentifier()));
}
return input
.apply("Set Destination Metadata", new AssignDestinations(dynamicDestinations))
.apply("Set Destination Metadata", new AssignDestinations(destinations))
.apply(
"Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations));
"Write Rows to Destinations",
new WriteToDestinations(getCatalogConfig(), destinations));
}
}

public static class ReadTable extends PTransform<PBegin, PCollection<Row>> {
public static ReadRows readRows(IcebergCatalogConfig catalogConfig) {
return new AutoValue_IcebergIO_ReadRows.Builder().setCatalogConfig(catalogConfig).build();
}

@AutoValue
public abstract static class ReadRows extends PTransform<PBegin, PCollection<Row>> {

abstract IcebergCatalogConfig getCatalogConfig();

abstract @Nullable TableIdentifier getTableIdentifier();

abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
abstract Builder setCatalogConfig(IcebergCatalogConfig config);

private final IcebergCatalogConfig catalogConfig;
private final transient @Nullable TableIdentifier tableId;
abstract Builder setTableIdentifier(TableIdentifier identifier);

private TableIdentifier getTableId() {
return checkStateNotNull(
tableId, "Transient field tableId null; it should not be accessed after serialization");
abstract ReadRows build();
}

private ReadTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
this.catalogConfig = catalogConfig;
this.tableId = tableId;
public ReadRows from(TableIdentifier tableIdentifier) {
return toBuilder().setTableIdentifier(tableIdentifier).build();
}

@Override
public PCollection<Row> expand(PBegin input) {
TableIdentifier tableId =
checkStateNotNull(getTableIdentifier(), "Must set a table to read from.");

Table table = catalogConfig.catalog().loadTable(getTableId());
Table table = getCatalogConfig().catalog().loadTable(tableId);

return input.apply(
Read.from(
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(catalogConfig)
.setCatalogConfig(getCatalogConfig())
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(getTableId())
.setTableIdentifier(tableId)
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
.build())));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import static org.apache.beam.io.iceberg.IcebergIO.ReadRows;
import static org.apache.beam.io.iceberg.IcebergIO.WriteRows;
import static org.apache.beam.sdk.util.construction.TransformUpgrader.fromByteArray;
import static org.apache.beam.sdk.util.construction.TransformUpgrader.toByteArray;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.io.InvalidClassException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.sdk.util.construction.SdkComponents;
import org.apache.beam.sdk.util.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

@SuppressWarnings({"rawtypes", "nullness"})
public class IcebergIOTranslation {
static class IcebergIOReadTranslator implements TransformPayloadTranslator<ReadRows> {

static Schema schema =
Schema.builder()
.addByteArrayField("catalog_config")
.addNullableArrayField("table_identifier", FieldType.STRING)
.build();

public static final String ICEBERG_READ_TRANSFORM_URN =
"beam:transform:org.apache.beam:iceberg_read:v1";

@Override
public String getUrn() {
return ICEBERG_READ_TRANSFORM_URN;
}

@Override
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, ReadRows> application, SdkComponents components)
throws IOException {
// Setting an empty payload since Iceberg transform payload is not actually used by runners
// currently.
return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build();
}

@Override
public Row toConfigRow(ReadRows transform) {

Map<String, Object> fieldValues = new HashMap<>();

if (transform.getCatalogConfig() != null) {
fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig()));
}
if (transform.getTableIdentifier() != null) {
TableIdentifier identifier = transform.getTableIdentifier();
List<String> identifierParts =
Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList());
identifierParts.add(identifier.name());
fieldValues.put("table_identifier", identifierParts);
}

return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

@Override
public ReadRows fromConfigRow(Row configRow, PipelineOptions options) {
try {
ReadRows.Builder builder = new AutoValue_IcebergIO_ReadRows.Builder();

byte[] catalogBytes = configRow.getBytes("catalog_config");
if (catalogBytes != null) {
builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes));
}
Collection<String> tableIdentifierParts = configRow.getArray("table_identifier");
if (tableIdentifierParts != null) {
builder =
builder.setTableIdentifier(
TableIdentifier.parse(String.join(".", tableIdentifierParts)));
}
return builder.build();
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}
}

@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class ReadRegistrar implements TransformPayloadTranslatorRegistrar {
@Override
@SuppressWarnings({
"rawtypes",
})
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
.put(AutoValue_IcebergIO_ReadRows.class, new IcebergIOReadTranslator())
.build();
}
}

static class IcebergIOWriteTranslator implements TransformPayloadTranslator<WriteRows> {

static Schema schema =
Schema.builder()
.addByteArrayField("catalog_config")
.addNullableArrayField("table_identifier", FieldType.STRING)
.addNullableByteArrayField("dynamic_destinations")
.build();

public static final String ICEBERG_WRITE_TRANSFORM_URN =
"beam:transform:org.apache.beam:iceberg_write:v1";

@Override
public String getUrn() {
return ICEBERG_WRITE_TRANSFORM_URN;
}

@Override
public @Nullable FunctionSpec translate(
AppliedPTransform<?, ?, WriteRows> application, SdkComponents components)
throws IOException {
// Setting an empty payload since Iceberg transform payload is not actually used by runners
// currently.
return FunctionSpec.newBuilder().setUrn(getUrn()).setPayload(ByteString.empty()).build();
}

@Override
public Row toConfigRow(WriteRows transform) {

Map<String, Object> fieldValues = new HashMap<>();

if (transform.getCatalogConfig() != null) {
fieldValues.put("catalog_config", toByteArray(transform.getCatalogConfig()));
}
if (transform.getTableIdentifier() != null) {
TableIdentifier identifier = transform.getTableIdentifier();
List<String> identifierParts =
Arrays.stream(identifier.namespace().levels()).collect(Collectors.toList());
identifierParts.add(identifier.name());
fieldValues.put("table_identifier", identifierParts);
}
if (transform.getDynamicDestinations() != null) {
fieldValues.put("dynamic_destinations", toByteArray(transform.getDynamicDestinations()));
}

return Row.withSchema(schema).withFieldValues(fieldValues).build();
}

@Override
public WriteRows fromConfigRow(Row configRow, PipelineOptions options) {
try {
IcebergIO.WriteRows.Builder builder = new AutoValue_IcebergIO_WriteRows.Builder();

byte[] catalogBytes = configRow.getBytes("catalog_config");
if (catalogBytes != null) {
builder = builder.setCatalogConfig((IcebergCatalogConfig) fromByteArray(catalogBytes));
}
Collection<String> tableIdentifierParts = configRow.getArray("table_identifier");
if (tableIdentifierParts != null) {
builder =
builder.setTableIdentifier(
TableIdentifier.parse(String.join(".", tableIdentifierParts)));
}
byte[] dynamicDestinationsBytes = configRow.getBytes("dynamic_destinations");
if (dynamicDestinationsBytes != null) {
builder =
builder.setDynamicDestinations(
(DynamicDestinations) fromByteArray(dynamicDestinationsBytes));
}
return builder.build();
} catch (InvalidClassException e) {
throw new RuntimeException(e);
}
}
}

@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class WriteRegistrar implements TransformPayloadTranslatorRegistrar {
@Override
@SuppressWarnings({
"rawtypes",
})
public Map<? extends Class<? extends PTransform>, ? extends TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap.<Class<? extends PTransform>, TransformPayloadTranslator>builder()
.put(AutoValue_IcebergIO_WriteRows.class, new IcebergIOWriteTranslator())
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public void testSimpleScan() throws Exception {

PCollection<Row> output =
testPipeline
.apply(IcebergIO.readTable(catalogConfig, tableId))
.apply(IcebergIO.readRows(catalogConfig).from(tableId))
.apply(ParDo.of(new PrintRow()))
.setCoder(
RowCoder.of(
Expand Down
Loading
Loading