Skip to content

Commit

Permalink
Merge pull request #32991 from apache/debugsparkpvr
Browse files Browse the repository at this point in the history
Suppress errors in JvmInitializer#beforeProcessing if successfully initialized before
  • Loading branch information
Abacn authored Nov 4, 2024
2 parents 79c1d29 + 78421b5 commit c4edb7b
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4
"modification": 5
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ public RemoteEnvironment createEnvironment(Environment environment, String worke
try {
fnHarness.get();
} catch (Throwable t) {
// Print stacktrace to stderr. Could be useful if underlying error not surfaced earlier
t.printStackTrace();
executor.shutdownNow();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.sdk.fn;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.harness.JvmInitializer;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
Expand All @@ -25,6 +26,8 @@

/** Helpers for executing {@link JvmInitializer} implementations. */
public class JvmInitializers {
private static final AtomicBoolean initialized = new AtomicBoolean(false);

/**
* Finds all registered implementations of JvmInitializer and executes their {@code onStartup}
* methods. Should be called in worker harness implementations at the very beginning of their main
Expand All @@ -50,10 +53,23 @@ public static void runBeforeProcessing(PipelineOptions options) {
// We load the logger in the method to minimize the amount of class loading that happens
// during class initialization.
Logger logger = LoggerFactory.getLogger(JvmInitializers.class);
for (JvmInitializer initializer : ReflectHelpers.loadServicesOrdered(JvmInitializer.class)) {
logger.info("Running JvmInitializer#beforeProcessing for {}", initializer);
initializer.beforeProcessing(options);
logger.info("Completed JvmInitializer#beforeProcessing for {}", initializer);

try {
for (JvmInitializer initializer : ReflectHelpers.loadServicesOrdered(JvmInitializer.class)) {
logger.info("Running JvmInitializer#beforeProcessing for {}", initializer);
initializer.beforeProcessing(options);
logger.info("Completed JvmInitializer#beforeProcessing for {}", initializer);
}
initialized.compareAndSet(false, true);
} catch (Error e) {
if (initialized.get()) {
logger.warn(
"Error at JvmInitializer#beforeProcessing. This error is suppressed after "
+ "previous success runs. It is expected on Embedded environment",
e);
} else {
throw e;
}
}
}
}

0 comments on commit c4edb7b

Please sign in to comment.