Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce setDefaultPipelineOptions call on creating TestPipelineOptions #32723

Merged
merged 1 commit into from
Oct 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,13 @@ static FileSystem getFileSystemInternal(String scheme) {
*
* <p>It will be used in {@link FileSystemRegistrar FileSystemRegistrars} for all schemes.
*
* <p>This is expected only to be used by runners after {@code Pipeline.run}, or in tests.
* <p>Outside of workers where Beam FileSystem API is used (e.g. test methods, user code executed
* during pipeline submission), consider use {@link #registerFileSystemsOnce} if initialize
* FIleSystem of supported schema is the main goal.
*/
@Internal
public static void setDefaultPipelineOptions(PipelineOptions options) {
checkNotNull(options, "options");
checkNotNull(options, "options cannot be null");
long id = options.getOptionsId();
int nextRevision = options.revision();

Expand All @@ -593,6 +595,23 @@ public static void setDefaultPipelineOptions(PipelineOptions options) {
}
}

/**
* Register file systems once if never done before.
*
* <p>This method executes {@link #setDefaultPipelineOptions} only if it has never been run,
* otherwise it returns immediately.
*
* <p>It is internally used by test setup to avoid repeated filesystem registrations (involves
* expensive ServiceLoader calls) when there are multiple pipeline and PipelineOptions object
* initialized, which is commonly seen in test execution.
*/
@Internal
public static synchronized void registerFileSystemsOnce(PipelineOptions options) {
if (FILESYSTEM_REVISION.get() == null) {
setDefaultPipelineOptions(options);
}
}

@VisibleForTesting
static Map<String, FileSystem> verifySchemesAreUnique(
PipelineOptions options, Set<FileSystemRegistrar> registrars) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ public PipelineResult runWithAdditionalOptionArgs(List<String> additionalArgs) {
}
newOptions.setStableUniqueNames(CheckEnabled.ERROR);

FileSystems.setDefaultPipelineOptions(options);
FileSystems.registerFileSystemsOnce(options);
return run(newOptions);
} catch (IOException e) {
throw new RuntimeException(
Expand Down Expand Up @@ -515,7 +515,7 @@ public static PipelineOptions testingPipelineOptions() {
}
options.setStableUniqueNames(CheckEnabled.ERROR);

FileSystems.setDefaultPipelineOptions(options);
FileSystems.registerFileSystemsOnce(options);
return options;
} catch (IOException e) {
throw new RuntimeException(
Expand Down
Loading