Skip to content

Commit

Permalink
Read schematransform and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Apr 15, 2024
1 parent f1576e3 commit 27e5fb0
Show file tree
Hide file tree
Showing 5 changed files with 303 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.io.iceberg.IcebergReadSchemaTransformProvider.Config;

import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.iceberg.catalog.TableIdentifier;

import java.util.Collections;
import java.util.List;

/**
* SchemaTransform implementation for {@link IcebergIO#readTable}. Reads records from Iceberg and outputs a
* {@link org.apache.beam.sdk.values.PCollection} of Beam {@link org.apache.beam.sdk.values.Row}s.
*/
@AutoService(SchemaTransformProvider.class)
public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProvider<Config> {
static final String OUTPUT_TAG = "output";

@Override
protected SchemaTransform from(Config configuration) {
configuration.validate();
return new IcebergReadSchemaTransform(configuration);
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
}

@Override
public String identifier() {
return "beam:schematransform:org.apache.beam:iceberg_read:v1";
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class Config {
public static Builder builder() {
return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
}

public abstract String getTable();

public abstract SchemaTransformCatalogConfig getCatalogConfig();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String tables);

public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig);

public abstract Config build();
}

public void validate() {
getCatalogConfig().validate();
}
}



private static class IcebergReadSchemaTransform extends SchemaTransform {
private final Config configuration;

IcebergReadSchemaTransform(Config configuration) {
this.configuration = configuration;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder()
.setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
}
if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
}

PCollection<Row> output = input.getPipeline().apply(IcebergIO.readTable(catalogBuilder.build(), TableIdentifier.parse(configuration.getTable())));

return PCollectionRowTuple.of(OUTPUT_TAG, output);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ public static Builder builder() {

public abstract String getTable();

public abstract CatalogConfig getCatalogConfig();
public abstract SchemaTransformCatalogConfig getCatalogConfig();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String tables);

public abstract Builder setCatalogConfig(CatalogConfig catalogConfig);
public abstract Builder setCatalogConfig(SchemaTransformCatalogConfig catalogConfig);

public abstract Config build();
}
Expand All @@ -117,53 +117,9 @@ public void validate() {
}
}

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class CatalogConfig {
public static Builder builder() {
return new AutoValue_IcebergWriteSchemaTransformProvider_CatalogConfig.Builder();
}

public abstract String getCatalogName();

public abstract @Nullable String getCatalogType();

public abstract @Nullable String getCatalogImplementation();

public abstract @Nullable String getWarehouseLocation();

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setCatalogName(String catalogName);

public abstract Builder setCatalogType(String catalogType);

public abstract Builder setCatalogImplementation(String catalogImplementation);

public abstract Builder setWarehouseLocation(String warehouseLocation);

public abstract CatalogConfig build();
}

Set<String> validTypes =
Sets.newHashSet(
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);

public void validate() {
if (Strings.isNullOrEmpty(getCatalogType())) {
checkArgument(
validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())),
"Invalid catalog type. Please pick one of %s",
validTypes);
}
}
}

@VisibleForTesting
static class IcebergWriteSchemaTransform extends SchemaTransform {
private static class IcebergWriteSchemaTransform extends SchemaTransform {
private final Config configuration;

IcebergWriteSchemaTransform(Config configuration) {
Expand All @@ -175,13 +131,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {

PCollection<Row> rows = input.get(INPUT_TAG);

CatalogConfig catalogConfig = configuration.getCatalogConfig();
SchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder()
.setName(catalogConfig.getCatalogName())
.setIcebergCatalogType(catalogConfig.getCatalogType())
.setWarehouseLocation(catalogConfig.getWarehouseLocation());
.setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
Expand Down Expand Up @@ -212,17 +166,13 @@ static class SnapshotToRow extends SimpleFunction<KV<String, Snapshot>, Row> {
@Override
public Row apply(KV<String, Snapshot> input) {
Snapshot snapshot = input.getValue();
Row row =
Row.withSchema(OUTPUT_SCHEMA)
.addValues(
input.getKey(),
snapshot.operation(),
snapshot.summary(),
snapshot.manifestListLocation())
.build();
System.out.println("SNAPSHOT: " + snapshot);
System.out.println("ROW: " + row);
return row;
return Row.withSchema(OUTPUT_SCHEMA)
.addValues(
input.getKey(),
snapshot.operation(),
snapshot.summary(),
snapshot.manifestListLocation())
.build();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package org.apache.beam.io.iceberg;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.apache.iceberg.CatalogUtil;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Set;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class SchemaTransformCatalogConfig {
public static Builder builder() {
return new AutoValue_SchemaTransformCatalogConfig.Builder();
}

public abstract String getCatalogName();

public abstract @Nullable String getCatalogType();

public abstract @Nullable String getCatalogImplementation();

public abstract @Nullable String getWarehouseLocation();

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setCatalogName(String catalogName);

public abstract Builder setCatalogType(String catalogType);

public abstract Builder setCatalogImplementation(String catalogImplementation);

public abstract Builder setWarehouseLocation(String warehouseLocation);

public abstract SchemaTransformCatalogConfig build();
}

Set<String> validTypes =
Sets.newHashSet(
CatalogUtil.ICEBERG_CATALOG_TYPE_HADOOP,
CatalogUtil.ICEBERG_CATALOG_TYPE_HIVE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST);

public void validate() {
if (Strings.isNullOrEmpty(getCatalogType())) {
checkArgument(
validTypes.contains(Preconditions.checkArgumentNotNull(getCatalogType())),
"Invalid catalog type. Please pick one of %s",
validTypes);
}
}
}
Loading

0 comments on commit 27e5fb0

Please sign in to comment.