diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java index 6532c5319657..092cf42a29a4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProvider.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.schemas.transforms.SchemaTransform; import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; @@ -87,18 +89,23 @@ public static class BigQueryFileLoadsSchemaTransform extends SchemaTransform { @Override public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollection rowPCollection = input.getSinglePCollection(); - BigQueryIO.Write write = toWrite(); + BigQueryIO.Write write = toWrite(input.getPipeline().getOptions()); rowPCollection.apply(write); return PCollectionRowTuple.empty(input.getPipeline()); } - BigQueryIO.Write toWrite() { + BigQueryIO.Write toWrite(PipelineOptions options) { BigQueryIO.Write write = BigQueryIO.write() .to(configuration.getTable()) .withMethod(BigQueryIO.Write.Method.FILE_LOADS) .withFormatFunction(BigQueryUtils.toTableRow()) + // TODO(https://github.com/apache/beam/issues/33074) BatchLoad's + // createTempFilePrefixView() doesn't pick up the pipeline option + .withCustomGcsTempLocation( + ValueProvider.StaticValueProvider.of(options.getTempLocation())) + .withWriteDisposition(WriteDisposition.WRITE_APPEND) .useBeamSchema(); if (!Strings.isNullOrEmpty(configuration.getCreateDisposition())) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsWriteSchemaTransformProviderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java similarity index 99% rename from sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsWriteSchemaTransformProviderTest.java rename to sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java index 5c2b764ef2ec..897d95da3b13 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsWriteSchemaTransformProviderTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryFileLoadsSchemaTransformProviderTest.java @@ -57,7 +57,7 @@ /** Test for {@link BigQueryFileLoadsSchemaTransformProvider}. */ @RunWith(JUnit4.class) -public class BigQueryFileLoadsWriteSchemaTransformProviderTest { +public class BigQueryFileLoadsSchemaTransformProviderTest { private static final String PROJECT = "fakeproject"; private static final String DATASET = "fakedataset"; diff --git a/sdks/python/apache_beam/transforms/managed.py b/sdks/python/apache_beam/transforms/managed.py index 22ee15b1de1c..cbcb6de56ed7 100644 --- a/sdks/python/apache_beam/transforms/managed.py +++ b/sdks/python/apache_beam/transforms/managed.py @@ -77,12 +77,16 @@ ICEBERG = "iceberg" KAFKA = "kafka" +BIGQUERY = "bigquery" _MANAGED_IDENTIFIER = "beam:transform:managed:v1" _EXPANSION_SERVICE_JAR_TARGETS = { "sdks:java:io:expansion-service:shadowJar": [KAFKA, ICEBERG], + "sdks:java:io:google-cloud-platform:expansion-service:shadowJar": [ + BIGQUERY + ] } -__all__ = ["ICEBERG", "KAFKA", "Read", "Write"] +__all__ = ["ICEBERG", "KAFKA", "BIGQUERY", "Read", "Write"] class Read(PTransform): @@ -90,6 +94,7 @@ class Read(PTransform): _READ_TRANSFORMS = { ICEBERG: ManagedTransforms.Urns.ICEBERG_READ.urn, KAFKA: ManagedTransforms.Urns.KAFKA_READ.urn, + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_READ.urn } def __init__( @@ -130,6 +135,7 @@ class Write(PTransform): _WRITE_TRANSFORMS = { ICEBERG: ManagedTransforms.Urns.ICEBERG_WRITE.urn, KAFKA: ManagedTransforms.Urns.KAFKA_WRITE.urn, + BIGQUERY: ManagedTransforms.Urns.BIGQUERY_WRITE.urn } def __init__(