From 0bdb00a15364821744e64b40c3f8684b4640f6e4 Mon Sep 17 00:00:00 2001 From: Yi Hu Date: Wed, 9 Oct 2024 15:04:01 -0400 Subject: [PATCH] Avoid repeated run of setDefaultPipelineOptionsOnce in TestPipelineOptions.create --- .../beam_PostCommit_Java_PVR_Spark_Batch.json | 2 +- .../org/apache/beam/sdk/io/FileSystems.java | 23 +++++++++++++++++-- .../apache/beam/sdk/testing/TestPipeline.java | 4 ++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json index b60f5c4cc3c8..e3d6056a5de9 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Spark_Batch.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "modification": 0 + "modification": 1 } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java index fb25cac6262f..5ca22749b163 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileSystems.java @@ -565,11 +565,13 @@ static FileSystem getFileSystemInternal(String scheme) { * *

It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes. * - *

This is expected only to be used by runners after {@code Pipeline.run}, or in tests. + *

Outside of workers where Beam FileSystem API is used (e.g. test methods, user code executed + * during pipeline submission), consider use {@link #registerFileSystemsOnce} if initialize + * FIleSystem of supported schema is the main goal. */ @Internal public static void setDefaultPipelineOptions(PipelineOptions options) { - checkNotNull(options, "options"); + checkNotNull(options, "options cannot be null"); long id = options.getOptionsId(); int nextRevision = options.revision(); @@ -593,6 +595,23 @@ public static void setDefaultPipelineOptions(PipelineOptions options) { } } + /** + * Register file systems once if never done before. + * + *

This method executes {@link #setDefaultPipelineOptions} only if it has never been run, + * otherwise it returns immediately. + * + *

It is internally used by test setup to avoid repeated filesystem registrations (involves + * expensive ServiceLoader calls) when there are multiple pipeline and PipelineOptions object + * initialized, which is commonly seen in test execution. + */ + @Internal + public static synchronized void registerFileSystemsOnce(PipelineOptions options) { + if (FILESYSTEM_REVISION.get() == null) { + setDefaultPipelineOptions(options); + } + } + @VisibleForTesting static Map verifySchemesAreUnique( PipelineOptions options, Set registrars) { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index ed61f7f3d6f2..328bf19c466c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -373,7 +373,7 @@ public PipelineResult runWithAdditionalOptionArgs(List additionalArgs) { } newOptions.setStableUniqueNames(CheckEnabled.ERROR); - FileSystems.setDefaultPipelineOptions(options); + FileSystems.registerFileSystemsOnce(options); return run(newOptions); } catch (IOException e) { throw new RuntimeException( @@ -515,7 +515,7 @@ public static PipelineOptions testingPipelineOptions() { } options.setStableUniqueNames(CheckEnabled.ERROR); - FileSystems.setDefaultPipelineOptions(options); + FileSystems.registerFileSystemsOnce(options); return options; } catch (IOException e) { throw new RuntimeException(