Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BigQueryIO] fetch updated schema for newly created Storage API stream writers #33231

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed EventTimeTimer ordering in Prism. ([#32222](https://github.com/apache/beam/issues/32222)).
* [BigQueryIO] Fixed an issue where Storage Write API sometimes doesn't pick up auto-schema updates ([#33231](https://github.com/apache/beam/pull/33231))

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;
import com.google.cloud.bigquery.storage.v1.FlushRowsRequest;
import com.google.cloud.bigquery.storage.v1.FlushRowsResponse;
import com.google.cloud.bigquery.storage.v1.GetWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
Expand All @@ -86,6 +87,7 @@
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStreamView;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -1419,7 +1421,11 @@ public WriteStream createWriteStream(String tableUrn, WriteStream.Type type)

@Override
public @Nullable WriteStream getWriteStream(String writeStream) {
return newWriteClient.getWriteStream(writeStream);
return newWriteClient.getWriteStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a boolean parameter to the method, so we only return schema if requested

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI in our code base we always perform this call for the sole purpose of fetching the schema

GetWriteStreamRequest.newBuilder()
.setView(WriteStreamView.FULL)
.setName(writeStream)
.build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,15 @@ SchemaAndDescriptor getCurrentTableSchema(String stream, @Nullable TableSchema u
Preconditions.checkStateNotNull(maybeWriteStreamService)
.getWriteStream(streamName);
if (writeStream != null && writeStream.hasTableSchema()) {
TableSchema updatedFromStream = writeStream.getTableSchema();
currentSchema.set(updatedFromStream);
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, updatedFromStream);
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(
initialTableSchema, writeStream.getTableSchema());
if (newSchema.isPresent()) {
currentSchema.set(newSchema.get());
updated.set(true);
LOG.debug(
"Fetched updated schema for table {}:\n\t{}", tableUrn, newSchema.get());
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.bigquery.storage.v1.Exceptions.StreamFinalizedException;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.storage.v1.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.DescriptorProtos;
Expand Down Expand Up @@ -531,6 +532,30 @@ public void process(
element.getKey().getKey(), dynamicDestinations, datasetService);
tableSchema = converter.getTableSchema();
descriptor = converter.getDescriptor(false);

if (autoUpdateSchema) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is the ideal place to put this. getAppendClientInfo is called whenever the static cache is populated, meaning that on any worker restart, range move, etc. we'll be forced to call this API again. However we have persistent state in this DoFn, so we know if it's a "new" key or not. Can we use that to gate calling this method instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should always perform this check before creating a new StreamWriter , regardless of the reason for its creation. The only exception is if we already have an updated schema stored in state (see first if block above). If I'm following correctly, this method (getAppendClientInfo) will always create a new stream writer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also note that the updated schema is ignored when the StreamWriter object's creation time is later than the updated schema's.
i.e. it doesn't matter when the WriteStream itself was created

// A StreamWriter ignores table schema updates that happen prior to its creation.
// So before creating a StreamWriter below, we fetch the table schema to check if we
// missed an update.
// If so, use the new schema instead of the base schema
@Nullable
WriteStream writeStream =
writeStreamService.getWriteStream(getOrCreateStream.get());
TableSchema streamSchema =
writeStream == null
? TableSchema.getDefaultInstance()
: writeStream.getTableSchema();
Optional<TableSchema> newSchema =
TableSchemaUpdateUtils.getUpdatedSchema(tableSchema, streamSchema);

if (newSchema.isPresent()) {
tableSchema = newSchema.get();
descriptor =
TableRowToStorageApiProto.descriptorSchemaFromTableSchema(
tableSchema, true, false);
updatedSchema.write(tableSchema);
}
}
}
AppendClientInfo info =
AppendClientInfo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;

Expand All @@ -33,7 +35,9 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.function.Function;
Expand All @@ -60,6 +64,7 @@
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
Expand Down Expand Up @@ -124,6 +129,9 @@ public static Iterable<Object[]> data() {
private static final int TOTAL_N = 70;
// Number of rows with the original schema
private static final int ORIGINAL_N = 60;
// for dynamic destination test
private static final int NUM_DESTINATIONS = 3;
private static final int TOTAL_NUM_STREAMS = 9;

private final Random randomGenerator = new Random();

Expand All @@ -145,13 +153,20 @@ public static void cleanUp() {
}

private String createTable(TableSchema tableSchema) throws IOException, InterruptedException {
return createTable(tableSchema, "");
}

private String createTable(TableSchema tableSchema, String suffix)
throws IOException, InterruptedException {
String tableId = Iterables.get(Splitter.on('[').split(testName.getMethodName()), 0);
if (useInputSchema) {
tableId += "WithInputSchema";
}
if (changeTableSchema) {
tableId += "OnSchemaChange";
}
tableId += suffix;

BQ_CLIENT.deleteTable(PROJECT, BIG_QUERY_DATASET_ID, tableId);
BQ_CLIENT.createNewTable(
PROJECT,
Expand All @@ -170,9 +185,8 @@ static class UpdateSchemaDoFn extends DoFn<KV<Integer, TableRow>, TableRow> {

private final String projectId;
private final String datasetId;
private final String tableId;
// represent as String because TableSchema is not serializable
private final String newSchema;
private final Map<String, String> newSchemas;

private transient BigqueryClient bqClient;

Expand All @@ -183,11 +197,14 @@ static class UpdateSchemaDoFn extends DoFn<KV<Integer, TableRow>, TableRow> {
private final StateSpec<ValueState<Integer>> counter;

public UpdateSchemaDoFn(
String projectId, String datasetId, String tableId, TableSchema newSchema) {
String projectId, String datasetId, Map<String, TableSchema> newSchemas) {
this.projectId = projectId;
this.datasetId = datasetId;
this.tableId = tableId;
this.newSchema = BigQueryHelpers.toJsonString(newSchema);
Map<String, String> serializableSchemas = new HashMap<>();
for (Map.Entry<String, TableSchema> entry : newSchemas.entrySet()) {
serializableSchemas.put(entry.getKey(), BigQueryHelpers.toJsonString(entry.getValue()));
}
this.newSchemas = serializableSchemas;
this.bqClient = null;
this.counter = StateSpecs.value();
}
Expand All @@ -201,14 +218,17 @@ public void setup() {
public void processElement(ProcessContext c, @StateId(ROW_COUNTER) ValueState<Integer> counter)
throws Exception {
int current = firstNonNull(counter.read(), 0);
// We update schema early on to leave a healthy amount of time for
// StreamWriter to recognize it.
if (current == 10) {
bqClient.updateTableSchema(
projectId,
datasetId,
tableId,
BigQueryHelpers.fromJsonString(newSchema, TableSchema.class));
// We update schema early on to leave a healthy amount of time for StreamWriter to recognize
// it.
// We also update halfway through so that some writers are created *after* the schema update
if (current == TOTAL_NUM_STREAMS / 2) {
for (Map.Entry<String, String> entry : newSchemas.entrySet()) {
bqClient.updateTableSchema(
projectId,
datasetId,
entry.getKey(),
BigQueryHelpers.fromJsonString(entry.getValue(), TableSchema.class));
}
}

counter.write(++current);
Expand Down Expand Up @@ -304,7 +324,7 @@ private void runStreamingPipelineWithSchemaChange(
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
// Limit parallelism so that all streams recognize the new schema in an expected short amount
// of time (before we start writing rows with updated schema)
p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(3);
p.getOptions().as(BigQueryOptions.class).setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS);
// Need to manually enable streaming engine for legacy dataflow runner
ExperimentalOptions.addExperiment(
p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT);
Expand Down Expand Up @@ -394,7 +414,8 @@ private void runStreamingPipelineWithSchemaChange(
.apply(
"Update Schema",
ParDo.of(
new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, tableId, updatedSchema)));
new UpdateSchemaDoFn(
PROJECT, BIG_QUERY_DATASET_ID, ImmutableMap.of(tableId, updatedSchema))));
}
WriteResult result = rows.apply("Stream to BigQuery", write);
if (useIgnoreUnknownValues) {
Expand Down Expand Up @@ -494,13 +515,13 @@ public void checkRowsWithUpdatedSchema(
if (Integer.parseInt((String) row.get("id")) < ORIGINAL_N
|| !useAutoSchemaUpdate
|| !changeTableSchema) {
assertTrue(
assertNull(
String.format("Expected row to NOT have field %s:\n%s", extraField, row),
row.get(extraField) == null);
row.get(extraField));
} else {
assertTrue(
assertNotNull(
String.format("Expected row to have field %s:\n%s", extraField, row),
row.get(extraField) != null);
row.get(extraField));
}
}
}
Expand Down Expand Up @@ -539,4 +560,151 @@ public void testAtLeastOnceWithIgnoreUnknownValues() throws Exception {
public void testAtLeastOnceWithAutoSchemaUpdate() throws Exception {
runStreamingPipelineWithSchemaChange(Write.Method.STORAGE_API_AT_LEAST_ONCE, true, true);
}

public void runDynamicDestinationsWithAutoSchemaUpdate(boolean useAtLeastOnce) throws Exception {
Pipeline p = Pipeline.create(TestPipeline.testingPipelineOptions());
// 0 threshold so that the stream tries fetching an updated schema after each append
p.getOptions().as(BigQueryOptions.class).setStorageApiAppendThresholdBytes(0);
// Total streams per destination
p.getOptions()
.as(BigQueryOptions.class)
.setNumStorageWriteApiStreams(TOTAL_NUM_STREAMS / NUM_DESTINATIONS);
// Need to manually enable streaming engine for legacy dataflow runner
ExperimentalOptions.addExperiment(
p.getOptions().as(ExperimentalOptions.class), GcpOptions.STREAMING_ENGINE_EXPERIMENT);
// Only run the most relevant test case on Dataflow
if (p.getOptions().getRunner().getName().contains("DataflowRunner")) {
assumeTrue(
"Skipping in favor of more relevant test case", changeTableSchema && useInputSchema);
}

List<String> fieldNamesOrigin = new ArrayList<String>(Arrays.asList(FIELDS));

// Shuffle the fields in the write schema to do fuzz testing on field order
List<String> fieldNamesShuffled = new ArrayList<String>(fieldNamesOrigin);
Collections.shuffle(fieldNamesShuffled, randomGenerator);
TableSchema bqTableSchema = makeTableSchemaFromTypes(fieldNamesOrigin, null);
TableSchema inputSchema = makeTableSchemaFromTypes(fieldNamesShuffled, null);

Map<Long, String> destinations = new HashMap<>(NUM_DESTINATIONS);
Map<String, TableSchema> updatedSchemas = new HashMap<>(NUM_DESTINATIONS);
Map<String, String> extraFields = new HashMap<>(NUM_DESTINATIONS);
Map<Long, GenerateRowFunc> rowFuncs = new HashMap<>(NUM_DESTINATIONS);
for (int i = 0; i < NUM_DESTINATIONS; i++) {
// The updated schema includes all fields in the original schema plus a random new field
List<String> fieldNamesWithExtra = new ArrayList<String>(fieldNamesOrigin);
String extraField =
fieldNamesOrigin.get(randomGenerator.nextInt(fieldNamesOrigin.size())) + "_EXTRA";
fieldNamesWithExtra.add(extraField);
TableSchema updatedSchema =
makeTableSchemaFromTypes(fieldNamesWithExtra, ImmutableSet.of(extraField));
GenerateRowFunc generateRowFunc = new GenerateRowFunc(fieldNamesOrigin, fieldNamesWithExtra);

String tableId = createTable(bqTableSchema, "_dynamic_" + i);
String tableSpec = PROJECT + ":" + BIG_QUERY_DATASET_ID + "." + tableId;

rowFuncs.put((long) i, generateRowFunc);
destinations.put((long) i, tableSpec);
updatedSchemas.put(tableId, updatedSchema);
extraFields.put(tableSpec, extraField);
}

// build write transform
Write<TableRow> write =
BigQueryIO.writeTableRows()
.to(
row -> {
long l = (int) row.getValue().get("id") % NUM_DESTINATIONS;
String destination = destinations.get(l);
return new TableDestination(destination, null);
})
.withAutoSchemaUpdate(true)
.ignoreUnknownValues()
.withMethod(Write.Method.STORAGE_API_AT_LEAST_ONCE)
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND);
if (useInputSchema) {
write = write.withSchema(inputSchema);
}
if (!useAtLeastOnce) {
write =
write
.withMethod(Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(1));
}

int numRows = TOTAL_N;
// set up and build pipeline
Instant start = new Instant(0);
// We give a healthy waiting period between each element to give Storage API streams a chance to
// recognize the new schema. Apply on relevant tests.
Duration interval = changeTableSchema ? Duration.standardSeconds(1) : Duration.millis(1);
Duration stop =
changeTableSchema ? Duration.standardSeconds(numRows - 1) : Duration.millis(numRows - 1);
Function<Instant, Long> getIdFromInstant =
changeTableSchema
? (Function<Instant, Long> & Serializable)
(Instant instant) -> instant.getMillis() / 1000
: (Function<Instant, Long> & Serializable) Instant::getMillis;

// Generates rows with original schema up for row IDs under ORIGINAL_N
// Then generates rows with updated schema for the rest
// Rows with updated schema should only reach the table if ignoreUnknownValues is set,
// and the extra field should be present only when autoSchemaUpdate is set
PCollection<Instant> instants =
p.apply(
"Generate Instants",
PeriodicImpulse.create()
.startAt(start)
.stopAt(start.plus(stop))
.withInterval(interval)
.catchUpToNow(false));
PCollection<TableRow> rows =
instants.apply(
"Create TableRows",
MapElements.into(TypeDescriptor.of(TableRow.class))
.via(
instant -> {
long rowId = getIdFromInstant.apply(instant);
long dest = rowId % NUM_DESTINATIONS;
return rowFuncs.get(dest).apply(rowId);
}));
if (changeTableSchema) {
rows =
rows
// UpdateSchemaDoFn uses state, so need to have a KV input
.apply("Add a dummy key", WithKeys.of(1))
.apply(
"Update Schema",
ParDo.of(new UpdateSchemaDoFn(PROJECT, BIG_QUERY_DATASET_ID, updatedSchemas)));
}

WriteResult result = rows.apply("Stream to BigQuery", write);
// We ignore the extra fields, so no rows should have been sent to DLQ
PAssert.that("Check DLQ is empty", result.getFailedStorageApiInserts()).empty();
p.run().waitUntilFinish();

Map<String, Integer> expectedCounts = new HashMap<>(NUM_DESTINATIONS);
for (int i = 0; i < numRows; i++) {
long mod = i % NUM_DESTINATIONS;
String destination = destinations.get(mod);
expectedCounts.merge(destination, 1, Integer::sum);
}

for (Map.Entry<String, Integer> expectedCount : expectedCounts.entrySet()) {
String dest = expectedCount.getKey();
checkRowCompleteness(dest, expectedCount.getValue(), true);
checkRowsWithUpdatedSchema(dest, extraFields.get(dest), true);
}
}

@Test
public void testExactlyOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception {
runDynamicDestinationsWithAutoSchemaUpdate(false);
}

@Test
public void testAtLeastOnceDynamicDestinationsWithAutoSchemaUpdate() throws Exception {
runDynamicDestinationsWithAutoSchemaUpdate(true);
}
}
Loading