Skip to content

Commit

Permalink
pulling in IcebergIO changes; spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Apr 15, 2024
1 parent e2cb93b commit aa8b1ed
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 211 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,98 +19,99 @@

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import org.apache.beam.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.io.iceberg.IcebergReadSchemaTransformProvider.Config;

import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.iceberg.catalog.TableIdentifier;

import java.util.Collections;
import java.util.List;

/**
* SchemaTransform implementation for {@link IcebergIO#readTable}. Reads records from Iceberg and outputs a
* {@link org.apache.beam.sdk.values.PCollection} of Beam {@link org.apache.beam.sdk.values.Row}s.
* SchemaTransform implementation for {@link IcebergIO#readRows}. Reads records from Iceberg and
* outputs a {@link org.apache.beam.sdk.values.PCollection} of Beam {@link
* org.apache.beam.sdk.values.Row}s.
*/
@AutoService(SchemaTransformProvider.class)
public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProvider<Config> {
static final String OUTPUT_TAG = "output";

@Override
protected SchemaTransform from(Config configuration) {
configuration.validate();
return new IcebergReadSchemaTransform(configuration);
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
static final String OUTPUT_TAG = "output";

@Override
protected SchemaTransform from(Config configuration) {
configuration.validate();
return new IcebergReadSchemaTransform(configuration);
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:iceberg_read:v1";
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Config {
public static Builder builder() {
return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:iceberg_read:v1";
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Config {
public static Builder builder() {
return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
}

public abstract String getTable();
public abstract String getTable();

public abstract SchemaTransformCatalogConfig getCatalogConfig();
public abstract SchemaTransformCatalogConfig getCatalogConfig();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String tables);
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String tables);

public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig);
public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig);

public abstract Config build();
}

public void validate() {
getCatalogConfig().validate();
}
public abstract Config build();
}

public void validate() {
getCatalogConfig().validate();
}
}

private static class IcebergReadSchemaTransform extends SchemaTransform {
private final Config configuration;

private static class IcebergReadSchemaTransform extends SchemaTransform {
private final Config configuration;

IcebergReadSchemaTransform(Config configuration) {
this.configuration = configuration;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder()
.setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
}
if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
}

PCollection<Row> output = input.getPipeline().apply(IcebergIO.readTable(catalogBuilder.build(), TableIdentifier.parse(configuration.getTable())));
IcebergReadSchemaTransform(Config configuration) {
this.configuration = configuration;
}

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
}
if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
}

PCollection<Row> output =
input
.getPipeline()
.apply(
IcebergIO.readRows(catalogBuilder.build())
.from(TableIdentifier.parse(configuration.getTable())));

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@
*/
package org.apache.beam.io.iceberg;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.beam.io.iceberg.IcebergWriteSchemaTransformProvider.Config;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
Expand All @@ -33,22 +30,18 @@
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

/**
* SchemaTransform implementation for {@link IcebergIO#writeToDynamicDestinations}. Writes Beam Rows
* to Iceberg and outputs a {@code PCollection<Row>} representing snapshots created in the process.
* SchemaTransform implementation for {@link IcebergIO#writeRows}. Writes Beam Rows to Iceberg and
* outputs a {@code PCollection<Row>} representing snapshots created in the process.
*/
@AutoService(SchemaTransformProvider.class)
public class IcebergWriteSchemaTransformProvider extends TypedSchemaTransformProvider<Config> {
Expand Down Expand Up @@ -117,7 +110,6 @@ public void validate() {
}
}


@VisibleForTesting
private static class IcebergWriteSchemaTransform extends SchemaTransform {
private final Config configuration;
Expand All @@ -134,8 +126,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder()
.setName(catalogConfig.getCatalogName());
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
Expand All @@ -149,8 +140,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
DynamicDestinations.singleTable(TableIdentifier.parse(configuration.getTable()));

IcebergWriteResult result =
rows.apply(
IcebergIO.writeToDynamicDestinations(catalogBuilder.build(), dynamicDestinations));
rows.apply(IcebergIO.writeRows(catalogBuilder.build()).to(dynamicDestinations));

PCollection<Row> snapshots =
result
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import java.util.Set;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.util.Preconditions;
Expand All @@ -9,51 +29,47 @@
import org.apache.iceberg.CatalogUtil;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Set;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class SchemaTransformCatalogConfig {
public static Builder builder() {
return new AutoValue_SchemaTransformCatalogConfig.Builder();
}
public static Builder builder() {
return new AutoValue_SchemaTransformCatalogConfig.Builder();
}

public abstract String getCatalogName();
public abstract String getCatalogName();

public abstract @Nullable String getCatalogType();
public abstract @Nullable String getCatalogType();

public abstract @Nullable String getCatalogImplementation();
public abstract @Nullable String getCatalogImplementation();

public abstract @Nullable String getWarehouseLocation();
public abstract @Nullable String getWarehouseLocation();

@AutoValue.Builder
public abstract static class Builder {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setCatalogName(String catalogName);
public abstract Builder setCatalogName(String catalogName);

public abstract Builder setCatalogType(String catalogType);
public abstract Builder setCatalogType(String catalogType);

public abstract Builder setCatalogImplementation(String catalogImplementation);
public abstract Builder setCatalogImplementation(String catalogImplementation);

public abstract Builder setWarehouseLocation(String warehouseLocation);
public abstract Builder setWarehouseLocation(String warehouseLocation);

public abstract SchemaTransformCatalogConfig build();
}
public abstract SchemaTransformCatalogConfig build();
}

Set<String> validTypes =
Sets.newHashSet(
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);

Set<String> validTypes =
Sets.newHashSet(
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);

public void validate() {
if (Strings.isNullOrEmpty(getCatalogType())) {
checkArgument(
validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())),
"Invalid catalog type. Please pick one of %s",
validTypes);
}
public void validate() {
if (Strings.isNullOrEmpty(getCatalogType())) {
checkArgument(
validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())),
"Invalid catalog type. Please pick one of %s",
validTypes);
}
}
}
Loading

0 comments on commit aa8b1ed

Please sign in to comment.