Skip to content

Commit

Permalink
set use cdc schema property as nullable, added safe retrieval method
Browse files Browse the repository at this point in the history
  • Loading branch information
prodriguezdefino committed Sep 21, 2024
1 parent de9e948 commit ab40dd9
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
Expand Down Expand Up @@ -264,6 +265,7 @@ public static Builder builder() {
"This option enables the use of BigQuery CDC functionality. It expects a Row schema"
+ " wrapping the record to be inserted and adding the CDC info similar to:"
+ " {cdc_info: {mutation_type:\"...\", change_sequence_number:\"...\"}, record: {...}}")
@Nullable
public abstract List<String> getUseCdcWritesWithPrimaryKey();

/** Builder for {@link BigQueryStorageWriteApiSchemaTransformConfiguration}. */
Expand Down Expand Up @@ -487,11 +489,11 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
.to(new RowDynamicDestinations(schema.getField("record").getType().getRowSchema()))
.withFormatFunction(row -> BigQueryUtils.toTableRow(row.getRow("record")));

if (!configuration.getUseCdcWritesWithPrimaryKey().isEmpty()) {
if (!safeUseCdcWritesWithPrimaryKeys().isEmpty()) {
write = validateAndIncludeCDCInformation(write, schema);
}
} else {
if (!configuration.getUseCdcWritesWithPrimaryKey().isEmpty()) {
if (!safeUseCdcWritesWithPrimaryKeys().isEmpty()) {
write =
validateAndIncludeCDCInformation(write, schema)
.to(configuration.getTable())
Expand Down Expand Up @@ -521,6 +523,11 @@ BigQueryIO.Write<Row> createStorageWriteApiTransform(Schema schema) {
return write;
}

List<String> safeUseCdcWritesWithPrimaryKeys() {
return Optional.ofNullable(configuration.getUseCdcWritesWithPrimaryKey())
.orElse(ImmutableList.of());
}

BigQueryIO.Write<Row> validateAndIncludeCDCInformation(
BigQueryIO.Write<Row> write, Schema schema) {
checkArgument(
Expand Down

0 comments on commit ab40dd9

Please sign in to comment.