From 20c6ad5ba1bac68634d2347197f5e2e8c4fa064a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Fri, 27 Dec 2024 14:09:15 +0000 Subject: [PATCH] [Managed BigQuery] use file loads with Avro format for better performance (#33392) * use avro file format * add comment * add unit test --- ...QueryFileLoadsSchemaTransformProvider.java | 9 ++++---- .../PortableBigQueryDestinations.java | 15 ++++++++++++ .../io/gcp/bigquery/BigQueryIOWriteTest.java | 23 +++++++++++++++++++ 3 files changed, 43 insertions(+), 4 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java index 7872c91d1f72..8899ac82eb06 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; -import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.Schema; @@ -97,20 +96,22 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return PCollectionRowTuple.empty(input.getPipeline()); } - BigQueryIO.Write toWrite(Schema schema, PipelineOptions options) { + @VisibleForTesting + public BigQueryIO.Write toWrite(Schema schema, PipelineOptions options) { PortableBigQueryDestinations dynamicDestinations = new PortableBigQueryDestinations(schema, configuration); BigQueryIO.Write write = BigQueryIO.write() .to(dynamicDestinations) .withMethod(BigQueryIO.Write.Method.FILE_LOADS) - .withFormatFunction(BigQueryUtils.toTableRow()) // TODO(https://github.com/apache/beam/issues/33074) BatchLoad's // createTempFilePrefixView() doesn't pick up the pipeline option .withCustomGcsTempLocation( ValueProvider.StaticValueProvider.of(options.getTempLocation())) .withWriteDisposition(WriteDisposition.WRITE_APPEND) - .withFormatFunction(dynamicDestinations.getFilterFormatFunction(false)); + // Use Avro format for better performance. Don't change this unless it's for a good + // reason. + .withAvroFormatFunction(dynamicDestinations.getAvroFilterFormatFunction(false)); if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { CreateDisposition createDisposition = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java index 54d125012eac..0cd2b65b0858 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/PortableBigQueryDestinations.java @@ -25,7 +25,10 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import java.util.List; +import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils; +import org.apache.beam.sdk.io.gcp.bigquery.AvroWriteRequest; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations; import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; @@ -102,4 +105,16 @@ public SerializableFunction getFilterFormatFunction(boolean fetch return BigQueryUtils.toTableRow(filtered); }; } + + public SerializableFunction, GenericRecord> getAvroFilterFormatFunction( + boolean fetchNestedRecord) { + return request -> { + Row row = request.getElement(); + if (fetchNestedRecord) { + row = checkStateNotNull(row.getRow(RECORD)); + } + Row filtered = rowFilter.filter(row); + return AvroUtils.toGenericRecord(filtered); + }; + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index 69994c019509..57c71c023fcb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -32,6 +33,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -117,11 +119,13 @@ import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder; import org.apache.beam.sdk.io.gcp.bigquery.WriteRename.TempTableCleanupFn; import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.Schema; @@ -818,6 +822,25 @@ public void testStreamingFileLoadsWithAutoSharding() throws Exception { assertEquals(2 * numTables, fakeDatasetService.getInsertCount()); } + @Test + public void testFileLoadSchemaTransformUsesAvroFormat() { + // ensure we are writing with the more performant avro format + assumeTrue(!useStreaming); + assumeTrue(!useStorageApi); + BigQueryFileLoadsSchemaTransformProvider provider = + new BigQueryFileLoadsSchemaTransformProvider(); + Row configuration = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("table", "some-table") + .build(); + BigQueryFileLoadsSchemaTransform schemaTransform = + (BigQueryFileLoadsSchemaTransform) provider.from(configuration); + BigQueryIO.Write write = + schemaTransform.toWrite(Schema.of(), PipelineOptionsFactory.create()); + assertNull(write.getFormatFunction()); + assertNotNull(write.getAvroRowWriterFactory()); + } + @Test public void testBatchFileLoads() throws Exception { assumeTrue(!useStreaming);