From 667269aabe85d67256e07096d6be556c55b85f54 Mon Sep 17 00:00:00 2001 From: claudevdm <33973061+claudevdm@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:02:54 -0500 Subject: [PATCH] Add temp location to batch file loads (#33115) --- .../io/gcp/bigquery/providers/BigQueryManagedIT.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java index 6b685392809f..16ce2f049dcf 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryManagedIT.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.PeriodicImpulse; @@ -98,6 +99,10 @@ public void testBatchFileLoadsWriteRead() { String.format("%s:%s.%s", PROJECT, BIG_QUERY_DATASET_ID, testName.getMethodName()); Map config = ImmutableMap.of("table", table); + // file loads requires a GCS temp location + String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); + writePipeline.getOptions().setTempLocation(tempLocation); + // batch write PCollectionRowTuple.of("input", getInput(writePipeline, false)) .apply(Managed.write(Managed.BIGQUERY).withConfig(config)); @@ -146,6 +151,12 @@ public void testDynamicDestinations(boolean streaming) throws IOException, Inter Map config = ImmutableMap.of("table", destinationTemplate, "drop", Collections.singletonList("dest")); + if (!streaming) { + // file loads requires a GCS temp location + String tempLocation = writePipeline.getOptions().as(TestPipelineOptions.class).getTempRoot(); + writePipeline.getOptions().setTempLocation(tempLocation); + } + // write PCollectionRowTuple.of("input", getInput(writePipeline, streaming)) .apply(Managed.write(Managed.BIGQUERY).withConfig(config));