Skip to content

Commit

Permalink
Enable BigQuery CDC configuration for Python BigQuery sink (#32529)
Browse files Browse the repository at this point in the history
* include CDC configuration on the storage write transform provider

* adding the primary key configuration for CDC and tests

* fixing List.of references to use ImmutableList

* fixing test, missing calling the cdc info row builder() method

* fix test, add config validations

* added the xlang params to storage write python wrapper

* adding missing comma

* shortening property name

* changing xlang config property

* set use cdc schema property as nullable, added safe retrieval method

* fixes property name reference and argument type definition

* python format fix

* adding xlang IT with BQ

* adding missing primary key column to test

* python format fix

* format xlang test

* more format xlang test fixes

* and more format xlang test fixes

* adding missing import

* missing self reference

* enabled create if needed functionality for CDC python integration, implemented table constraint support on the bigquery fake dataset services

* Update bigquery.py

* triggering the xlang tests

* fixing lint

* addressing few comments

* cdc info is added after row transformation now

* remove not used param

* removed typing information for callable

* adding test for cdc using dicts as input and cdc write callable

* simplifying the xlang configuration from python perspective, will add callable on a future PR

* spotless apply

* wrong property passed to xlang builder

* missing self

* fixing xlang it

* fixes wrong property reference

* change cdc xlang test to use beam.io.WriteToBigQuery

* force another build

* modifying comment to trigger build.

* addressing PR comments, included new dicts based test for xlang python tests, included the CDC configurations into the existing RowDynamicDestinations object, improved error message for mutation information schema checks.
  • Loading branch information
prodriguezdefino authored Oct 15, 2024
1 parent 06692ca commit e52868c
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@

{
"comment": "Modify this file in a trivial way to cause this test suite to run"
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -2259,6 +2259,7 @@ public static Write<RowMutation> applyRowMutations() {
.withFormatFunction(RowMutation::getTableRow)
.withRowMutationInformationFn(RowMutation::getMutationInformation);
}

/**
* A {@link PTransform} that writes a {@link PCollection} containing {@link GenericRecord
* GenericRecords} to a BigQuery table.
Expand Down Expand Up @@ -2367,8 +2368,10 @@ public enum Method {
abstract WriteDisposition getWriteDisposition();

abstract Set<SchemaUpdateOption> getSchemaUpdateOptions();

/** Table description. Default is empty. */
abstract @Nullable String getTableDescription();

/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();

Expand Down Expand Up @@ -3455,7 +3458,10 @@ && getStorageApiTriggeringFrequency(bqOptions) != null) {
LOG.error("The Storage API sink does not support the WRITE_TRUNCATE write disposition.");
}
if (getRowMutationInformationFn() != null) {
checkArgument(getMethod() == Method.STORAGE_API_AT_LEAST_ONCE);
checkArgument(
getMethod() == Method.STORAGE_API_AT_LEAST_ONCE,
"When using row updates on BigQuery, StorageWrite API should execute using"
+ " \"at least once\" mode.");
checkArgument(
getCreateDisposition() == CreateDisposition.CREATE_NEVER || getPrimaryKey() != null,
"If specifying CREATE_IF_NEEDED along with row updates, a primary key needs to be specified");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
Expand All @@ -37,6 +39,7 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryStorageApiInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.RowMutationInformation;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
Expand Down Expand Up @@ -87,6 +90,14 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
private static final String FAILED_ROWS_WITH_ERRORS_TAG = "FailedRowsWithErrors";
// magic string that tells us to write to dynamic destinations
protected static final String DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS";
protected static final String ROW_PROPERTY_MUTATION_INFO = "row_mutation_info";
protected static final String ROW_PROPERTY_MUTATION_TYPE = "mutation_type";
protected static final String ROW_PROPERTY_MUTATION_SQN = "change_sequence_number";
protected static final Schema ROW_SCHEMA_MUTATION_INFO =
Schema.builder()
.addStringField("mutation_type")
.addStringField("change_sequence_number")
.build();

@Override
protected SchemaTransform from(
Expand Down Expand Up @@ -257,6 +268,20 @@ public static Builder builder() {
@Nullable
public abstract ErrorHandling getErrorHandling();

@SchemaFieldDescription(
"This option enables the use of BigQuery CDC functionality. The expected PCollection"
+ " should contain Beam Rows with a schema wrapping the record to be inserted and"
+ " adding the CDC info similar to: {row_mutation_info: {mutation_type:\"...\", "
+ "change_sequence_number:\"...\"}, record: {...}}")
@Nullable
public abstract Boolean getUseCdcWrites();

@SchemaFieldDescription(
"If CREATE_IF_NEEDED disposition is set, BigQuery table(s) will be created with this"
+ " columns as primary key. Required when CDC writes are enabled with CREATE_IF_NEEDED.")
@Nullable
public abstract List<String> getPrimaryKey();

/** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {
Expand All @@ -277,6 +302,10 @@ public abstract static class Builder {

public abstract Builder setErrorHandling(ErrorHandling errorHandling);

public abstract Builder setUseCdcWrites(Boolean cdcWrites);

public abstract Builder setPrimaryKey(List<String> pkColumns);

/** Builds a {@link BigQueryStorageWriteApiSchemaTransformConfiguration} instance. */
public abstract BigQueryStorageWriteApiSchemaTransformProvider
.BigQueryStorageWriteApiSchemaTransformConfiguration
Expand Down Expand Up @@ -343,15 +372,27 @@ public void process(ProcessContext c) {}
}

private static class RowDynamicDestinations extends DynamicDestinations<Row, String> {
Schema schema;
final Schema schema;
final String fixedDestination;
final List<String> primaryKey;

RowDynamicDestinations(Schema schema) {
this.schema = schema;
this.fixedDestination = null;
this.primaryKey = null;
}

public RowDynamicDestinations(
Schema schema, String fixedDestination, List<String> primaryKey) {
this.schema = schema;
this.fixedDestination = fixedDestination;
this.primaryKey = primaryKey;
}

@Override
public String getDestination(ValueInSingleWindow<Row> element) {
return element.getValue().getString("destination");
return Optional.ofNullable(fixedDestination)
.orElseGet(() -> element.getValue().getString("destination"));
}

@Override
Expand All @@ -363,6 +404,17 @@ public TableDestination getTable(String destination) {
public TableSchema getSchema(String destination) {
return BigQueryUtils.toTableSchema(schema);
}

@Override
public TableConstraints getTableConstraints(String destination) {
return Optional.ofNullable(this.primaryKey)
.filter(pk -> !pk.isEmpty())
.map(
pk ->
new TableConstraints()
.setPrimaryKey(new TableConstraints.PrimaryKey().setColumns(pk)))
.orElse(null);
}
}

@Override
Expand Down Expand Up @@ -453,6 +505,13 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
}
}

void validateDynamicDestinationsExpectedSchema(Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
}

BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
Method writeMethod =
configuration.getUseAtLeastOnceSemantics() != null
Expand All @@ -466,11 +525,11 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
.withFormatFunction(BigQueryUtils.toTableRow())
.withWriteDisposition(WriteDisposition.WRITE_APPEND);

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
checkArgument(
schema.getFieldNames().equals(Arrays.asList("destination", "record")),
"When writing to dynamic destinations, we expect Row Schema with a "
+ "\"destination\" string field and a \"record\" Row field.");
// in case CDC writes are configured we validate and include them in the configuration
if (Optional.ofNullable(configuration.getUseCdcWrites()).orElse(false)) {
write = validateAndIncludeCDCInformation(write, schema);
} else if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
write =
write
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
Expand All @@ -485,6 +544,7 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
configuration.getCreateDisposition().toUpperCase());
write = write.withCreateDisposition(createDisposition);
}

if (!Strings.isNullOrEmpty(configuration.getWriteDisposition())) {
WriteDisposition writeDisposition =
BigQueryStorageWriteApiSchemaTransformConfiguration.WRITE_DISPOSITIONS.get(
Expand All @@ -498,5 +558,53 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {

return write;
}

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
checkArgument(
schema.getFieldNames().containsAll(Arrays.asList(ROW_PROPERTY_MUTATION_INFO, "record")),
"When writing using CDC functionality, we expect Row Schema with a "
+ "\""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" Row field and a \"record\" Row field.");

Schema rowSchema = schema.getField(ROW_PROPERTY_MUTATION_INFO).getType().getRowSchema();

checkArgument(
rowSchema.equals(ROW_SCHEMA_MUTATION_INFO),
"When writing using CDC functionality, we expect a \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field of Row type with schema:\n"
+ ROW_SCHEMA_MUTATION_INFO.toString()
+ "\n"
+ "Received \""
+ ROW_PROPERTY_MUTATION_INFO
+ "\" field with schema:\n"
+ rowSchema.toString());

String tableDestination = null;

if (configuration.getTable().equals(DYNAMIC_DESTINATIONS)) {
validateDynamicDestinationsExpectedSchema(schema);
} else {
tableDestination = configuration.getTable();
}

return write
.to(
new RowDynamicDestinations(
schema.getField("record").getType().getRowSchema(),
tableDestination,
configuration.getPrimaryKey()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")))
.withPrimaryKey(configuration.getPrimaryKey())
.withRowMutationInformationFn(
row ->
RowMutationInformation.of(
RowMutationInformation.MutationType.valueOf(
row.getRow(ROW_PROPERTY_MUTATION_INFO)
.getString(ROW_PROPERTY_MUTATION_TYPE)),
row.getRow(ROW_PROPERTY_MUTATION_INFO).getString(ROW_PROPERTY_MUTATION_SQN)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.beam.sdk.io.gcp.testing;

import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableConstraints;
import com.google.api.services.bigquery.model.TableRow;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
Expand Down Expand Up @@ -51,12 +53,24 @@ class TableContainer {
this.keyedRows = Maps.newHashMap();
this.ids = new ArrayList<>();
this.sizeBytes = 0L;
// extract primary key information from Table if present
List<String> pkColumns = primaryKeyColumns(table);
this.primaryKeyColumns = pkColumns;
this.primaryKeyColumnIndices = primaryColumnFieldIndices(pkColumns, table);
}

// Only top-level columns supported.
void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
this.primaryKeyColumns = primaryKeyColumns;
static @Nullable List<String> primaryKeyColumns(Table table) {
return Optional.ofNullable(table.getTableConstraints())
.flatMap(constraints -> Optional.ofNullable(constraints.getPrimaryKey()))
.map(TableConstraints.PrimaryKey::getColumns)
.orElse(null);
}

static @Nullable List<Integer> primaryColumnFieldIndices(
@Nullable List<String> primaryKeyColumns, Table table) {
if (primaryKeyColumns == null) {
return null;
}
Map<String, Integer> indices =
IntStream.range(0, table.getSchema().getFields().size())
.boxed()
Expand All @@ -65,7 +79,13 @@ void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
for (String columnName : primaryKeyColumns) {
primaryKeyColumnIndices.add(Preconditions.checkStateNotNull(indices.get(columnName)));
}
this.primaryKeyColumnIndices = primaryKeyColumnIndices;
return primaryKeyColumnIndices;
}

// Only top-level columns supported.
void setPrimaryKeyColumns(List<String> primaryKeyColumns) {
this.primaryKeyColumns = primaryKeyColumns;
this.primaryKeyColumnIndices = primaryColumnFieldIndices(primaryKeyColumns, table);
}

@Nullable
Expand All @@ -80,7 +100,7 @@ List<Object> getPrimaryKey(TableRow tableRow) {
.stream()
.map(cell -> Preconditions.checkStateNotNull(cell.get("v")))
.collect(Collectors.toList());
;

return Preconditions.checkStateNotNull(primaryKeyColumnIndices).stream()
.map(cellValues::get)
.collect(Collectors.toList());
Expand All @@ -91,7 +111,7 @@ List<Object> getPrimaryKey(TableRow tableRow) {

long addRow(TableRow row, String id) {
List<Object> primaryKey = getPrimaryKey(row);
if (primaryKey != null) {
if (primaryKey != null && !primaryKey.isEmpty()) {
if (keyedRows.putIfAbsent(primaryKey, row) != null) {
throw new RuntimeException(
"Primary key validation error! Multiple inserts with the same primary key.");
Expand Down
Loading

0 comments on commit e52868c

Please sign in to comment.