Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Dec 19, 2024
1 parent 984673d commit e04a3e5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
return PCollectionRowTuple.empty(input.getPipeline());
}

BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
@VisibleForTesting
public BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
PortableBigQueryDestinations dynamicDestinations =
new PortableBigQueryDestinations(schema, configuration);
BigQueryIO.Write<Row> write =
Expand All @@ -108,7 +109,8 @@ BigQueryIO.Write<Row> toWrite(Schema schema, PipelineOptions options) {
.withCustomGcsTempLocation(
ValueProvider.StaticValueProvider.of(options.getTempLocation()))
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
// Use Avro format for better performance. Don't change this unless it's for a good reason.
// Use Avro format for better performance. Don't change this unless it's for a good
// reason.
.withAvroFormatFunction(dynamicDestinations.getAvroFilterFormatFunction(false));

if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.sdk.io.gcp.bigquery.WriteTables.ResultCoder.INSTANCE;
import static org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider.BigQueryFileLoadsSchemaTransform;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -32,6 +33,7 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand Down Expand Up @@ -117,11 +119,13 @@
import org.apache.beam.sdk.io.gcp.bigquery.WritePartition.ResultCoder;
import org.apache.beam.sdk.io.gcp.bigquery.WriteRename.TempTableCleanupFn;
import org.apache.beam.sdk.io.gcp.bigquery.WriteTables.Result;
import org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryFileLoadsSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.Schema;
Expand Down Expand Up @@ -818,6 +822,25 @@ public void testStreamingFileLoadsWithAutoSharding() throws Exception {
assertEquals(2 * numTables, fakeDatasetService.getInsertCount());
}

@Test
public void testFileLoadSchemaTransformUsesAvroFormat() {
// ensure we are writing with the more performant avro format
assumeTrue(!useStreaming);
assumeTrue(!useStorageApi);
BigQueryFileLoadsSchemaTransformProvider provider =
new BigQueryFileLoadsSchemaTransformProvider();
Row configuration =
Row.withSchema(provider.configurationSchema())
.withFieldValue("table", "some-table")
.build();
BigQueryFileLoadsSchemaTransform schemaTransform =
(BigQueryFileLoadsSchemaTransform) provider.from(configuration);
BigQueryIO.Write<Row> write =
schemaTransform.toWrite(Schema.of(), PipelineOptionsFactory.create());
assertNull(write.getFormatFunction());
assertNotNull(write.getAvroRowWriterFactory());
}

@Test
public void testBatchFileLoads() throws Exception {
assumeTrue(!useStreaming);
Expand Down

0 comments on commit e04a3e5

Please sign in to comment.