Skip to content

Commit

Permalink
Pass-through IcebergIO catalog properties (#31726)
Browse files Browse the repository at this point in the history
* Pass-through iceberg catalog properties

* add to CHANGES.md

* trigger integration test

* remove custom configuration; pass catalog name
  • Loading branch information
ahmedabu98 authored Jul 8, 2024
1 parent 02600f5 commit ac423af
Show file tree
Hide file tree
Showing 13 changed files with 152 additions and 438 deletions.
3 changes: 2 additions & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@
## New Features / Improvements

* Multiple RunInference instances can now share the same model instance by setting the model_identifier parameter (Python) ([#31665](https://github.com/apache/beam/issues/31665)).
* [IcebergIO] All specified catalog properties are passed through to the connector ([#31726](https://github.com/apache/beam/pull/31726))
* Removed a 3rd party LGPL dependency from the Go SDK ([#31765](https://github.com/apache/beam/issues/31765)).
* Support for MapState and SetState when using Dataflow Runner v1 with Streaming Engine (Java) ([[#18200](https://github.com/apache/beam/issues/18200)])

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* [IcebergIO] IcebergCatalogConfig was changed to support specifying catalog properties in a key-store fashion ([#31726](https://github.com/apache/beam/pull/31726))

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,214 +19,35 @@

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import java.util.Properties;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergCatalogConfig implements Serializable {

@Pure
public abstract String getName();

/* Core Properties */
@Pure
public abstract @Nullable String getIcebergCatalogType();

@Pure
public abstract @Nullable String getCatalogImplementation();

@Pure
public abstract @Nullable String getFileIOImplementation();

@Pure
public abstract @Nullable String getWarehouseLocation();

@Pure
public abstract @Nullable String getMetricsReporterImplementation();

/* Caching */
@Pure
public abstract boolean getCacheEnabled();

@Pure
public abstract boolean getCacheCaseSensitive();

@Pure
public abstract long getCacheExpirationIntervalMillis();

@Pure
public abstract boolean getIOManifestCacheEnabled();

@Pure
public abstract long getIOManifestCacheExpirationIntervalMillis();

@Pure
public abstract long getIOManifestCacheMaxTotalBytes();

@Pure
public abstract long getIOManifestCacheMaxContentLength();

@Pure
public abstract @Nullable String getUri();

@Pure
public abstract int getClientPoolSize();

@Pure
public abstract long getClientPoolEvictionIntervalMs();

@Pure
public abstract @Nullable String getClientPoolCacheKeys();

@Pure
public abstract @Nullable String getLockImplementation();

@Pure
public abstract long getLockHeartbeatIntervalMillis();

@Pure
public abstract long getLockHeartbeatTimeoutMillis();

@Pure
public abstract int getLockHeartbeatThreads();

@Pure
public abstract long getLockAcquireIntervalMillis();

@Pure
public abstract long getLockAcquireTimeoutMillis();

@Pure
public abstract @Nullable String getAppIdentifier();

@Pure
public abstract @Nullable String getUser();

@Pure
public abstract long getAuthSessionTimeoutMillis();
public abstract String getCatalogName();

@Pure
public abstract @Nullable Configuration getConfiguration();
public abstract Properties getProperties();

@Pure
public static Builder builder() {
return new AutoValue_IcebergCatalogConfig.Builder()
.setIcebergCatalogType(null)
.setCatalogImplementation(null)
.setFileIOImplementation(null)
.setWarehouseLocation(null)
.setMetricsReporterImplementation(null) // TODO: Set this to our implementation
.setCacheEnabled(CatalogProperties.CACHE_ENABLED_DEFAULT)
.setCacheCaseSensitive(CatalogProperties.CACHE_CASE_SENSITIVE_DEFAULT)
.setCacheExpirationIntervalMillis(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setIOManifestCacheEnabled(CatalogProperties.IO_MANIFEST_CACHE_ENABLED_DEFAULT)
.setIOManifestCacheExpirationIntervalMillis(
CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS_DEFAULT)
.setIOManifestCacheMaxTotalBytes(
CatalogProperties.IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT)
.setIOManifestCacheMaxContentLength(
CatalogProperties.IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT)
.setUri(null)
.setClientPoolSize(CatalogProperties.CLIENT_POOL_SIZE_DEFAULT)
.setClientPoolEvictionIntervalMs(
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT)
.setClientPoolCacheKeys(null)
.setLockImplementation(null)
.setLockHeartbeatIntervalMillis(CatalogProperties.LOCK_HEARTBEAT_INTERVAL_MS_DEFAULT)
.setLockHeartbeatTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
.setLockHeartbeatThreads(CatalogProperties.LOCK_HEARTBEAT_THREADS_DEFAULT)
.setLockAcquireIntervalMillis(CatalogProperties.LOCK_ACQUIRE_INTERVAL_MS_DEFAULT)
.setLockAcquireTimeoutMillis(CatalogProperties.LOCK_HEARTBEAT_TIMEOUT_MS_DEFAULT)
.setAppIdentifier(null)
.setUser(null)
.setAuthSessionTimeoutMillis(CatalogProperties.AUTH_SESSION_TIMEOUT_MS_DEFAULT)
.setConfiguration(null);
}

@Pure
public ImmutableMap<String, String> properties() {
return new PropertyBuilder()
.put(CatalogUtil.ICEBERG_CATALOG_TYPE, getIcebergCatalogType())
.put(CatalogProperties.CATALOG_IMPL, getCatalogImplementation())
.put(CatalogProperties.FILE_IO_IMPL, getFileIOImplementation())
.put(CatalogProperties.WAREHOUSE_LOCATION, getWarehouseLocation())
.put(CatalogProperties.METRICS_REPORTER_IMPL, getMetricsReporterImplementation())
.put(CatalogProperties.CACHE_ENABLED, getCacheEnabled())
.put(CatalogProperties.CACHE_CASE_SENSITIVE, getCacheCaseSensitive())
.put(CatalogProperties.CACHE_EXPIRATION_INTERVAL_MS, getCacheExpirationIntervalMillis())
.build();
return new AutoValue_IcebergCatalogConfig.Builder();
}

public org.apache.iceberg.catalog.Catalog catalog() {
Configuration conf = getConfiguration();
if (conf == null) {
conf = new Configuration();
}
return CatalogUtil.buildIcebergCatalog(getName(), properties(), conf);
return CatalogUtil.buildIcebergCatalog(
getCatalogName(), Maps.fromProperties(getProperties()), new Configuration());
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setCatalogName(String catalogName);

/* Core Properties */
public abstract Builder setName(String name);

public abstract Builder setIcebergCatalogType(@Nullable String icebergType);

public abstract Builder setCatalogImplementation(@Nullable String catalogImpl);

public abstract Builder setFileIOImplementation(@Nullable String fileIOImpl);

public abstract Builder setWarehouseLocation(@Nullable String warehouse);

public abstract Builder setMetricsReporterImplementation(@Nullable String metricsImpl);

/* Caching */
public abstract Builder setCacheEnabled(boolean cacheEnabled);

public abstract Builder setCacheCaseSensitive(boolean cacheCaseSensitive);

public abstract Builder setCacheExpirationIntervalMillis(long expiration);

public abstract Builder setIOManifestCacheEnabled(boolean enabled);

public abstract Builder setIOManifestCacheExpirationIntervalMillis(long expiration);

public abstract Builder setIOManifestCacheMaxTotalBytes(long bytes);

public abstract Builder setIOManifestCacheMaxContentLength(long length);

public abstract Builder setUri(@Nullable String uri);

public abstract Builder setClientPoolSize(int size);

public abstract Builder setClientPoolEvictionIntervalMs(long interval);

public abstract Builder setClientPoolCacheKeys(@Nullable String keys);

public abstract Builder setLockImplementation(@Nullable String lockImplementation);

public abstract Builder setLockHeartbeatIntervalMillis(long interval);

public abstract Builder setLockHeartbeatTimeoutMillis(long timeout);

public abstract Builder setLockHeartbeatThreads(int threads);

public abstract Builder setLockAcquireIntervalMillis(long interval);

public abstract Builder setLockAcquireTimeoutMillis(long timeout);

public abstract Builder setAppIdentifier(@Nullable String id);

public abstract Builder setUser(@Nullable String user);

public abstract Builder setAuthSessionTimeoutMillis(long timeout);

public abstract Builder setConfiguration(@Nullable Configuration conf);
public abstract Builder setProperties(Properties props);

public abstract IcebergCatalogConfig build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.beam.sdk.io.iceberg.IcebergReadSchemaTransformProvider.Config;
import org.apache.beam.sdk.managed.ManagedTransformConstants;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.iceberg.catalog.TableIdentifier;

/**
Expand All @@ -47,7 +49,6 @@ public class IcebergReadSchemaTransformProvider extends TypedSchemaTransformProv

@Override
protected SchemaTransform from(Config configuration) {
configuration.validate();
return new IcebergReadSchemaTransform(configuration);
}

Expand All @@ -68,21 +69,24 @@ public static Builder builder() {
return new AutoValue_IcebergReadSchemaTransformProvider_Config.Builder();
}

@SchemaFieldDescription("Identifier of the Iceberg table to write to.")
public abstract String getTable();

public abstract IcebergSchemaTransformCatalogConfig getCatalogConfig();
@SchemaFieldDescription("Name of the catalog containing the table.")
public abstract String getCatalogName();

@SchemaFieldDescription("Configuration properties used to set up the Iceberg catalog.")
public abstract Map<String, String> getCatalogProperties();

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setTable(String tables);
public abstract Builder setTable(String table);

public abstract Builder setCatalogConfig(IcebergSchemaTransformCatalogConfig catalogConfig);
public abstract Builder setCatalogName(String catalogName);

public abstract Config build();
}
public abstract Builder setCatalogProperties(Map<String, String> catalogProperties);

public void validate() {
getCatalogConfig().validate();
public abstract Config build();
}
}

Expand All @@ -109,17 +113,13 @@ Row getConfigurationRow() {

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
IcebergSchemaTransformCatalogConfig catalogConfig = configuration.getCatalogConfig();
Properties properties = new Properties();
properties.putAll(configuration.getCatalogProperties());

IcebergCatalogConfig.Builder catalogBuilder =
IcebergCatalogConfig.builder().setName(catalogConfig.getCatalogName());

if (!Strings.isNullOrEmpty(catalogConfig.getCatalogType())) {
catalogBuilder = catalogBuilder.setIcebergCatalogType(catalogConfig.getCatalogType());
}
if (!Strings.isNullOrEmpty(catalogConfig.getWarehouseLocation())) {
catalogBuilder = catalogBuilder.setWarehouseLocation(catalogConfig.getWarehouseLocation());
}
IcebergCatalogConfig.builder()
.setCatalogName(configuration.getCatalogName())
.setProperties(properties);

PCollection<Row> output =
input
Expand Down
Loading

0 comments on commit ac423af

Please sign in to comment.