Skip to content

Commit

Permalink
Support creating BigLake managed tables (#33125)
Browse files Browse the repository at this point in the history
* create managed biglake tables

* add to translation

* add to changes.md

* adjust changes description
  • Loading branch information
ahmedabu98 authored Dec 17, 2024
1 parent f7a7bdd commit e50a136
Show file tree
Hide file tree
Showing 12 changed files with 226 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
* gcs-connector config options can be set via GcsOptions (Java) ([#32769](https://github.com/apache/beam/pull/32769)).
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Upgraded the default version of Hadoop dependencies to 3.4.1. Hadoop 2.10.2 is still supported (Java) ([#33011](https://github.com/apache/beam/issues/33011)).
* [BigQueryIO] Create managed BigLake tables dynamically ([#33125](https://github.com/apache/beam/pull/33125))

## New Features / Improvements

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -629,6 +630,9 @@ public class BigQueryIO {
private static final SerializableFunction<TableSchema, org.apache.avro.Schema>
DEFAULT_AVRO_SCHEMA_FACTORY = BigQueryAvroUtils::toGenericAvroSchema;

static final String CONNECTION_ID = "connectionId";
static final String STORAGE_URI = "storageUri";

/**
* @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link
* #readTableRows()} does exactly the same as {@link #read}, however {@link
Expand Down Expand Up @@ -2372,6 +2376,8 @@ public enum Method {
/** Table description. Default is empty. */
abstract @Nullable String getTableDescription();

abstract @Nullable Map<String, String> getBigLakeConfiguration();

/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();

Expand Down Expand Up @@ -2484,6 +2490,8 @@ abstract Builder<T> setAvroSchemaFactory(

abstract Builder<T> setTableDescription(String tableDescription);

abstract Builder<T> setBigLakeConfiguration(Map<String, String> bigLakeConfiguration);

abstract Builder<T> setValidate(boolean validate);

abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
Expand Down Expand Up @@ -2909,6 +2917,30 @@ public Write<T> withTableDescription(String tableDescription) {
return toBuilder().setTableDescription(tableDescription).build();
}

/**
* Specifies a configuration to create BigLake tables. The following options are available:
*
* <ul>
* <li>connectionId (REQUIRED): the name of your cloud resource connection.
* <li>storageUri (REQUIRED): the path to your GCS folder where data will be written to. This
* sink will create sub-folders for each project, dataset, and table destination. Example:
* if you specify a storageUri of {@code "gs://foo/bar"} and writing to table {@code
* "my_project.my_dataset.my_table"}, your data will be written under {@code
* "gs://foo/bar/my_project/my_dataset/my_table/"}
* <li>fileFormat (OPTIONAL): defaults to {@code "parquet"}
* <li>tableFormat (OPTIONAL): defaults to {@code "iceberg"}
* </ul>
*
* <p><b>NOTE:</b> This is only supported with the Storage Write API methods.
*
* @see <a href="https://cloud.google.com/bigquery/docs/iceberg-tables#api">BigQuery Tables for
* Apache Iceberg documentation</a>
*/
public Write<T> withBigLakeConfiguration(Map<String, String> bigLakeConfiguration) {
checkArgument(bigLakeConfiguration != null, "bigLakeConfiguration can not be null");
return toBuilder().setBigLakeConfiguration(bigLakeConfiguration).build();
}

/**
* Specifies a policy for handling failed inserts.
*
Expand Down Expand Up @@ -3454,8 +3486,21 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
checkArgument(
!getAutoSchemaUpdate(),
"withAutoSchemaUpdate only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
} else if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
checkArgument(
getBigLakeConfiguration() == null,
"bigLakeConfiguration is only supported when using STORAGE_WRITE_API or STORAGE_API_AT_LEAST_ONCE.");
} else {
if (getWriteDisposition() == WriteDisposition.WRITE_TRUNCATE) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
}
if (getBigLakeConfiguration() != null) {
checkArgument(
Arrays.stream(new String[] {CONNECTION_ID, STORAGE_URI})
.allMatch(getBigLakeConfiguration()::containsKey),
String.format(
"bigLakeConfiguration must contain keys '%s' and '%s'",
CONNECTION_ID, STORAGE_URI));
}
}
if (getRowMutationInformationFn() != null) {
checkArgument(
Expand Down Expand Up @@ -3905,6 +3950,7 @@ private <DestinationT> WriteResult continueExpandTyped(
getPropagateSuccessfulStorageApiWritesPredicate(),
getRowMutationInformationFn() != null,
getDefaultMissingValueInterpretation(),
getBigLakeConfiguration(),
getBadRecordRouter(),
getBadRecordErrorHandler());
return input.apply("StorageApiLoads", storageApiLoads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ static class BigQueryIOWriteTranslator implements TransformPayloadTranslator<Wri
.addNullableByteArrayField("write_disposition")
.addNullableArrayField("schema_update_options", FieldType.BYTES)
.addNullableStringField("table_description")
.addNullableMapField("biglake_configuration", FieldType.STRING, FieldType.STRING)
.addNullableBooleanField("validate")
.addNullableByteArrayField("bigquery_services")
.addNullableInt32Field("max_files_per_bundle")
Expand Down Expand Up @@ -510,6 +511,9 @@ public Row toConfigRow(Write<?> transform) {
if (transform.getTableDescription() != null) {
fieldValues.put("table_description", transform.getTableDescription());
}
if (transform.getBigLakeConfiguration() != null) {
fieldValues.put("biglake_configuration", transform.getBigLakeConfiguration());
}
fieldValues.put("validate", transform.getValidate());
if (transform.getBigQueryServices() != null) {
fieldValues.put("bigquery_services", toByteArray(transform.getBigQueryServices()));
Expand Down Expand Up @@ -719,6 +723,10 @@ public Write<?> fromConfigRow(Row configRow, PipelineOptions options) {
if (tableDescription != null) {
builder = builder.setTableDescription(tableDescription);
}
Map<String, String> biglakeConfiguration = configRow.getMap("biglake_configuration");
if (biglakeConfiguration != null) {
builder = builder.setBigLakeConfiguration(biglakeConfiguration);
}
Boolean validate = configRow.getBoolean("validate");
if (validate != null) {
builder = builder.setValidate(validate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.CONNECTION_ID;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.STORAGE_URI;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.gax.rpc.ApiException;
import com.google.api.services.bigquery.model.BigLakeConfiguration;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.EncryptionConfiguration;
import com.google.api.services.bigquery.model.Table;
Expand All @@ -31,6 +34,7 @@
import com.google.api.services.bigquery.model.TimePartitioning;
import io.grpc.StatusRuntimeException;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -41,6 +45,7 @@
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -91,7 +96,8 @@ static TableDestination possiblyCreateTable(
CreateDisposition createDisposition,
@Nullable Coder<?> tableDestinationCoder,
@Nullable String kmsKey,
BigQueryServices bqServices) {
BigQueryServices bqServices,
@Nullable Map<String, String> bigLakeConfiguration) {
checkArgument(
tableDestination.getTableSpec() != null,
"DynamicDestinations.getTable() must return a TableDestination "
Expand Down Expand Up @@ -132,7 +138,8 @@ static TableDestination possiblyCreateTable(
createDisposition,
tableSpec,
kmsKey,
bqServices);
bqServices,
bigLakeConfiguration);
}
}
}
Expand All @@ -147,7 +154,8 @@ private static void tryCreateTable(
CreateDisposition createDisposition,
String tableSpec,
@Nullable String kmsKey,
BigQueryServices bqServices) {
BigQueryServices bqServices,
@Nullable Map<String, String> bigLakeConfiguration) {
TableReference tableReference = tableDestination.getTableReference().clone();
tableReference.setTableId(BigQueryHelpers.stripPartitionDecorator(tableReference.getTableId()));
try (DatasetService datasetService = bqServices.getDatasetService(options)) {
Expand Down Expand Up @@ -189,6 +197,24 @@ private static void tryCreateTable(
if (kmsKey != null) {
table.setEncryptionConfiguration(new EncryptionConfiguration().setKmsKeyName(kmsKey));
}
if (bigLakeConfiguration != null) {
TableReference ref = table.getTableReference();
table.setBiglakeConfiguration(
new BigLakeConfiguration()
.setTableFormat(
MoreObjects.firstNonNull(bigLakeConfiguration.get("tableFormat"), "iceberg"))
.setFileFormat(
MoreObjects.firstNonNull(bigLakeConfiguration.get("fileFormat"), "parquet"))
.setConnectionId(
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(CONNECTION_ID)))
.setStorageUri(
String.format(
"%s/%s/%s/%s",
Preconditions.checkArgumentNotNull(bigLakeConfiguration.get(STORAGE_URI)),
ref.getProjectId(),
ref.getDatasetId(),
ref.getTableId())));
}
datasetService.createTable(table);
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ public void processElement(ProcessContext context) {
createDisposition,
dynamicDestinations.getDestinationCoder(),
kmsKey,
bqServices);
bqServices,
null);
});

context.output(KV.of(tableDestination, context.element().getValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Predicate;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class StorageApiLoads<DestinationT, ElementT>
private final boolean usesCdc;

private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation;
private final Map<String, String> bigLakeConfiguration;

private final BadRecordRouter badRecordRouter;

Expand All @@ -98,6 +100,7 @@ public StorageApiLoads(
Predicate<String> propagateSuccessfulStorageApiWritesPredicate,
boolean usesCdc,
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation,
Map<String, String> bigLakeConfiguration,
BadRecordRouter badRecordRouter,
ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
this.destinationCoder = destinationCoder;
Expand All @@ -118,6 +121,7 @@ public StorageApiLoads(
this.successfulRowsPredicate = propagateSuccessfulStorageApiWritesPredicate;
this.usesCdc = usesCdc;
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this.bigLakeConfiguration = bigLakeConfiguration;
this.badRecordRouter = badRecordRouter;
this.badRecordErrorHandler = badRecordErrorHandler;
}
Expand Down Expand Up @@ -186,7 +190,8 @@ public WriteResult expandInconsistent(
createDisposition,
kmsKey,
usesCdc,
defaultMissingValueInterpretation));
defaultMissingValueInterpretation,
bigLakeConfiguration));

PCollection<BigQueryStorageApiInsertError> insertErrors =
PCollectionList.of(convertMessagesResult.get(failedRowsTag))
Expand Down Expand Up @@ -279,7 +284,8 @@ public WriteResult expandTriggered(
successfulRowsPredicate,
autoUpdateSchema,
ignoreUnknownValues,
defaultMissingValueInterpretation));
defaultMissingValueInterpretation,
bigLakeConfiguration));

PCollection<BigQueryStorageApiInsertError> insertErrors =
PCollectionList.of(convertMessagesResult.get(failedRowsTag))
Expand Down Expand Up @@ -372,7 +378,8 @@ public WriteResult expandUntriggered(
createDisposition,
kmsKey,
usesCdc,
defaultMissingValueInterpretation));
defaultMissingValueInterpretation,
bigLakeConfiguration));

PCollection<BigQueryStorageApiInsertError> insertErrors =
PCollectionList.of(convertMessagesResult.get(failedRowsTag))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import java.util.Map;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -55,6 +56,7 @@ public class StorageApiWriteRecordsInconsistent<DestinationT, ElementT>
private final @Nullable String kmsKey;
private final boolean usesCdc;
private final AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation;
private final @Nullable Map<String, String> bigLakeConfiguration;

public StorageApiWriteRecordsInconsistent(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
Expand All @@ -69,7 +71,8 @@ public StorageApiWriteRecordsInconsistent(
BigQueryIO.Write.CreateDisposition createDisposition,
@Nullable String kmsKey,
boolean usesCdc,
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation) {
AppendRowsRequest.MissingValueInterpretation defaultMissingValueInterpretation,
@Nullable Map<String, String> bigLakeConfiguration) {
this.dynamicDestinations = dynamicDestinations;
this.bqServices = bqServices;
this.failedRowsTag = failedRowsTag;
Expand All @@ -83,6 +86,7 @@ public StorageApiWriteRecordsInconsistent(
this.kmsKey = kmsKey;
this.usesCdc = usesCdc;
this.defaultMissingValueInterpretation = defaultMissingValueInterpretation;
this.bigLakeConfiguration = bigLakeConfiguration;
}

@Override
Expand Down Expand Up @@ -116,7 +120,8 @@ public PCollectionTuple expand(PCollection<KV<DestinationT, StorageApiWritePaylo
kmsKey,
usesCdc,
defaultMissingValueInterpretation,
bigQueryOptions.getStorageWriteApiMaxRetries()))
bigQueryOptions.getStorageWriteApiMaxRetries(),
bigLakeConfiguration))
.withOutputTags(finalizeTag, tupleTagList)
.withSideInputs(dynamicDestinations.getSideInputs()));
result.get(failedRowsTag).setCoder(failedRowsCoder);
Expand Down
Loading

0 comments on commit e50a136

Please sign in to comment.