diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java index 1c5f2ef1c7d7..8899ac82eb06 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java @@ -96,7 +96,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { return PCollectionRowTuple.empty(input.getPipeline()); } - BigQueryIO.Write toWrite(Schema schema, PipelineOptions options) { + @VisibleForTesting + public BigQueryIO.Write toWrite(Schema schema, PipelineOptions options) { PortableBigQueryDestinations dynamicDestinations = new PortableBigQueryDestinations(schema, configuration); BigQueryIO.Write write = @@ -108,7 +109,8 @@ BigQueryIO.Write toWrite(Schema schema, PipelineOptions options) { .withCustomGcsTempLocation( ValueProvider.StaticValueProvider.of(options.getTempLocation())) .withWriteDisposition(WriteDisposition.WRITE_APPEND) - // Use Avro format for better performance. Don't change this unless it's for a good reason. + // Use Avro format for better performance. Don't change this unless it's for a good + // reason. .withAvroFormatFunction(dynamicDestinations.getAvroFilterFormatFunction(false)); if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java index d96e22f84907..dd218b2bd44b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE; +import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -32,6 +33,7 @@ import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -117,11 +119,13 @@ import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder; import org.apache.beam.sdk.io.gcp.bigquery.WriteRename.TempTableCleanupFn; import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result; +import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider; import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; import org.apache.beam.sdk.io.gcp.testing.FakeJobService; import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.Schema; @@ -818,6 +822,25 @@ public void testStreamingFileLoadsWithAutoSharding() throws Exception { assertEquals(2 * numTables, fakeDatasetService.getInsertCount()); } + @Test + public void testFileLoadSchemaTransformUsesAvroFormat() { + // ensure we are writing with the more performant avro format + assumeTrue(!useStreaming); + assumeTrue(!useStorageApi); + BigQueryFileLoadsSchemaTransformProvider provider = + new BigQueryFileLoadsSchemaTransformProvider(); + Row configuration = + Row.withSchema(provider.configurationSchema()) + .withFieldValue("table", "some-table") + .build(); + BigQueryFileLoadsSchemaTransform schemaTransform = + (BigQueryFileLoadsSchemaTransform) provider.from(configuration); + BigQueryIO.Write write = + schemaTransform.toWrite(Schema.of(), PipelineOptionsFactory.create()); + assertNull(write.getFormatFunction()); + assertNotNull(write.getAvroRowWriterFactory()); + } + @Test public void testBatchFileLoads() throws Exception { assumeTrue(!useStreaming);