Skip to content

Commit

Permalink
[Managed BigQuery] use file loads with Avro format for better perform…
Browse files Browse the repository at this point in the history
…ance (#33392)

* use avro file format

* add comment

* add unit test
  • Loading branch information
ahmedabu98 authored Dec 27, 2024
1 parent ab73855 commit bf7e317
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -97,20 +96,22 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
@VisibleForTesting
public BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(schema, configuration);
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.to(dynamicDestinations)
.withMethod(BigQueryIO.Write.Method.FILE_LOADS)
.withFormatFunction(BigQueryUtils.toTableRow())
// TODO(https://github.com/apache/beam/issues/33074) BatchLoad's
// createTempFilePrefixView() doesn't pick up the pipeline option
.withCustomGcsTempLocation(
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFormatFunction(dynamicDestinations.getFilterFormatFunction(false));
// Use Avro format for better performance. Don't change this unless it's for a good
// reason.
.withAvroFormatFunction(dynamicDestinations.getAvroFilterFormatFunction(false));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
CreateDisposition createDisposition =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
Expand Down Expand Up @@ -102,4 +105,16 @@ public SerializableFunction<Row, TableRow> getFilterFormatFunction(boolean fetch
return BigQueryUtils.toTableRow(filtered);
};
}

public SerializableFunction<AvroWriteRequest<Row>, GenericRecord> getAvroFilterFormatFunction(
boolean fetchNestedRecord) {
return request -> {
Row row = request.getElement();
if (fetchNestedRecord) {
row = checkStateNotNull(row.getRow(RECORD));
}
Row filtered = rowFilter.filter(row);
return AvroUtils.toGenericRecord(filtered);
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -32,6 +33,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -117,11 +119,13 @@
import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteRename.TempTableCleanupFn;
import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -818,6 +822,25 @@ public void testStreamingFileLoadsWithAutoSharding() throws Exception {
assertEquals(2 * numTables, fakeDatasetService.getInsertCount());
}

@Test
public void testFileLoadSchemaTransformUsesAvroFormat() {
// ensure we are writing with the more performant avro format
assumeTrue(!useStreaming);
assumeTrue(!useStorageApi);
BigQueryFileLoadsSchemaTransformProvider provider =
new BigQueryFileLoadsSchemaTransformProvider();
Row configuration =
Row.withSchema(provider.configurationSchema())
.withFieldValue("table", "some-table")
.build();
BigQueryFileLoadsSchemaTransform schemaTransform =
(BigQueryFileLoadsSchemaTransform) provider.from(configuration);
BigQueryIO.Write<Row> write =
schemaTransform.toWrite(Schema.of(), PipelineOptionsFactory.create());
assertNull(write.getFormatFunction());
assertNotNull(write.getAvroRowWriterFactory());
}

@Test
public void testBatchFileLoads() throws Exception {
assumeTrue(!useStreaming);
Expand Down

0 comments on commit bf7e317

Please sign in to comment.