From c57553cb9e420107c5c9334162f8ab115b2eafd3 Mon Sep 17 00:00:00 2001 From: liferoad Date: Mon, 18 Nov 2024 19:11:49 -0500 Subject: [PATCH] Added replaceGcsFilesWithLocalFiles (#33006) * Added replaceGcsFilesWithLocalFiles * use static * added unit tests * make it public * Added logs * spotless * Added a simple IT to cover downloading the GCS file and staging it later. --- .../org/apache/beam/it/gcp/WordCountIT.java | 27 ++++++ .../beam/runners/dataflow/DataflowRunner.java | 92 +++++++++++++++++++ .../runners/dataflow/DataflowRunnerTest.java | 14 +++ 3 files changed, 133 insertions(+) diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/WordCountIT.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/WordCountIT.java index 511c322b8c4f..b561c0c71fb6 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/WordCountIT.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/WordCountIT.java @@ -19,15 +19,19 @@ import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; +import static org.apache.beam.sdk.util.construction.resources.PipelineResources.detectClassPathResourcesToStage; import java.io.IOException; import java.time.Duration; import java.util.Arrays; +import java.util.List; import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; import org.apache.beam.it.common.PipelineLauncher.Sdk; import org.apache.beam.it.common.PipelineOperator.Result; +import org.apache.beam.runners.dataflow.DataflowRunner; import org.apache.beam.sdk.io.TextIO; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Filter; @@ -67,6 +71,29 @@ public void testWordCountDataflow() throws IOException { assertThatResult(result).isLaunchFinished(); } + @Test + public void testWordCountDataflowWithGCSFilesToStage() throws IOException { + + PipelineOptions pipelineOptions = wcPipeline.getOptions(); + List filesToStage = + detectClassPathResourcesToStage(DataflowRunner.class.getClassLoader(), pipelineOptions); + filesToStage.add("gs://apache-beam-samples/shakespeare/kinglear.txt"); + + LaunchConfig options = + LaunchConfig.builder("test-wordcount") + .setSdk(Sdk.JAVA) + .setPipeline(wcPipeline) + .addParameter("runner", "DataflowRunner") + .addParameter("filesToStage", String.join(",", filesToStage)) + .build(); + + LaunchInfo launchInfo = pipelineLauncher.launch(project, region, options); + assertThatPipeline(launchInfo).isRunning(); + Result result = + pipelineOperator.waitUntilDone(createConfig(launchInfo, Duration.ofMinutes(20))); + assertThatResult(result).isLaunchFinished(); + } + /** Build WordCount pipeline. */ private void buildPipeline() { wcPipeline diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index abe7d0d364d3..ce99958c57fd 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -41,10 +41,12 @@ import com.google.auto.value.AutoValue; import java.io.BufferedWriter; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.io.PrintWriter; import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -179,6 +181,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.HashCode; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Files; import org.joda.time.DateTimeUtils; import org.joda.time.DateTimeZone; @@ -257,6 +260,92 @@ public class DataflowRunner extends PipelineRunner { /** Dataflow service endpoints are expected to match this pattern. */ static final String ENDPOINT_REGEXP = "https://[\\S]*googleapis\\.com[/]?"; + /** + * Replaces GCS file paths with local file paths by downloading the GCS files locally. This is + * useful when files need to be accessed locally before being staged to Dataflow. + * + * @param filesToStage List of file paths that may contain GCS paths (gs://) and local paths + * @return List of local file paths where any GCS paths have been downloaded locally + * @throws RuntimeException if there are errors copying GCS files locally + */ + public static List replaceGcsFilesWithLocalFiles(List filesToStage) { + List processedFiles = new ArrayList<>(); + + for (String fileToStage : filesToStage) { + String localPath; + if (fileToStage.contains("=")) { + // Handle files with staging name specified + String[] components = fileToStage.split("=", 2); + String stagingName = components[0]; + String filePath = components[1]; + + if (filePath.startsWith("gs://")) { + try { + // Create temp file with exact same name as GCS file + String gcsFileName = filePath.substring(filePath.lastIndexOf('/') + 1); + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + File tempFile = new File(tempDir, gcsFileName); + tempFile.deleteOnExit(); + + LOG.info( + "Downloading GCS file {} to local temp file {}", + filePath, + tempFile.getAbsolutePath()); + + // Copy GCS file to local temp file + ResourceId source = FileSystems.matchNewResource(filePath, false); + try (ReadableByteChannel reader = FileSystems.open(source); + FileOutputStream writer = new FileOutputStream(tempFile)) { + ByteStreams.copy(Channels.newInputStream(reader), writer); + } + + localPath = stagingName + "=" + tempFile.getAbsolutePath(); + LOG.info("Replaced GCS path {} with local path {}", fileToStage, localPath); + } catch (IOException e) { + throw new RuntimeException("Failed to copy GCS file locally: " + filePath, e); + } + } else { + localPath = fileToStage; + } + } else { + // Handle files without staging name + if (fileToStage.startsWith("gs://")) { + try { + // Create temp file with exact same name as GCS file + String gcsFileName = fileToStage.substring(fileToStage.lastIndexOf('/') + 1); + File tempDir = Files.createTempDir(); + tempDir.deleteOnExit(); + File tempFile = new File(tempDir, gcsFileName); + tempFile.deleteOnExit(); + + LOG.info( + "Downloading GCS file {} to local temp file {}", + fileToStage, + tempFile.getAbsolutePath()); + + // Copy GCS file to local temp file + ResourceId source = FileSystems.matchNewResource(fileToStage, false); + try (ReadableByteChannel reader = FileSystems.open(source); + FileOutputStream writer = new FileOutputStream(tempFile)) { + ByteStreams.copy(Channels.newInputStream(reader), writer); + } + + localPath = tempFile.getAbsolutePath(); + LOG.info("Replaced GCS path {} with local path {}", fileToStage, localPath); + } catch (IOException e) { + throw new RuntimeException("Failed to copy GCS file locally: " + fileToStage, e); + } + } else { + localPath = fileToStage; + } + } + processedFiles.add(localPath); + } + + return processedFiles; + } + /** * Construct a runner from the provided options. * @@ -312,6 +401,9 @@ && isServiceEndpoint(dataflowOptions.getDataflowEndpoint())) { } if (dataflowOptions.getFilesToStage() != null) { + // Replace GCS file paths with local file paths + dataflowOptions.setFilesToStage( + replaceGcsFilesWithLocalFiles(dataflowOptions.getFilesToStage())); // The user specifically requested these files, so fail now if they do not exist. // (automatically detected classpath elements are permitted to not exist, so later // staging will not fail on nonexistent files) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index 01ceac9da585..106b15de6e4d 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -77,6 +77,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.Serializable; +import java.io.Writer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -1155,6 +1156,19 @@ public void testNonExistentStagingLocation() throws IOException { assertValidJob(jobCaptor.getValue()); } + @Test + public void testReplaceGcsFilesWithLocalFilesEmptyList() { + List filesToStage = Collections.emptyList(); + List processedFiles = DataflowRunner.replaceGcsFilesWithLocalFiles(filesToStage); + assertTrue(processedFiles.isEmpty()); + } + + @Test(expected = RuntimeException.class) + public void testReplaceGcsFilesWithLocalFilesIOError() { + List filesToStage = Collections.singletonList("gs://non-existent-bucket/file.jar"); + DataflowRunner.replaceGcsFilesWithLocalFiles(filesToStage); + } + @Test public void testNonExistentProfileLocation() throws IOException { DataflowPipelineOptions options = buildPipelineOptions();