Skip to content

Commit

Permalink
Invoke teardown when DoFn throws in portable runners (#32522)
Browse files Browse the repository at this point in the history
* Invoke teardown when DoFn throws in portable runners

* update CHANGES.md

* adjusted comment and logging
  • Loading branch information
Abacn authored Oct 9, 2024
1 parent 6e570d6 commit c31d81c
Show file tree
Hide file tree
Showing 11 changed files with 27 additions and 11 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test"
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_PVR_Samza.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 0
"modification": 1
}
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* (Java) Fixed tearDown not invoked when DoFn throws on Portable Runners ([#18592](https://github.com/apache/beam/issues/18592), [#31381](https://github.com/apache/beam/issues/31381)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
1 change: 0 additions & 1 deletion runners/flink/job-server/flink_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean checkpoi
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
Expand Down
2 changes: 1 addition & 1 deletion runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesMultimapState',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle',
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
]
Expand Down
3 changes: 2 additions & 1 deletion runners/samza/job-server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ def portableValidatesRunnerTask(String name, boolean docker) {
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
Expand Down Expand Up @@ -127,6 +126,8 @@ def portableValidatesRunnerTask(String name, boolean docker) {
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalid'
// TODO(https://github.com/apache/beam/issues/21144)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoTest$TimestampTests.testParDoShiftTimestampInvalidZeroAllowed'
// TODO(https://github.com/apache/beam/issues/32520)
excludeTestsMatching 'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionIn*Stateful'
// TODO(https://github.com/apache/beam/issues/21145)
excludeTestsMatching 'org.apache.beam.sdk.transforms.DeduplicateTest.testEventTime'
// TODO(https://github.com/apache/beam/issues/21146)
Expand Down
2 changes: 0 additions & 2 deletions runners/spark/job-server/spark_job_server.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker,
excludeCategories 'org.apache.beam.sdk.testing.UsesFailureMessage'
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
excludeCategories 'org.apache.beam.sdk.testing.UsesOrderedListState'
Expand Down Expand Up @@ -187,7 +186,6 @@ def portableValidatesRunnerTask(String name, boolean streaming, boolean docker,
excludeCategories 'org.apache.beam.sdk.testing.UsesGaugeMetrics'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderedDelivery'
excludeCategories 'org.apache.beam.sdk.testing.UsesPerKeyOrderInBundle'
excludeCategories 'org.apache.beam.sdk.testing.UsesParDoLifecycle'
excludeCategories 'org.apache.beam.sdk.testing.UsesMapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesMultimapState'
excludeCategories 'org.apache.beam.sdk.testing.UsesSetState'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,11 @@ public BeamFnApi.InstructionResponse.Builder processBundle(BeamFnApi.Instruction
request.getProcessBundle().getProcessBundleDescriptorId(), bundleProcessor);
return BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response);
} catch (Exception e) {
// Make sure we clean-up from the active set of bundle processors.
// Make sure we clean up from the active set of bundle processors.
LOG.debug(
"Discard bundleProcessor for {} after exception: {}",
request.getProcessBundle().getProcessBundleDescriptorId(),
e.getMessage());
bundleProcessorCache.discard(bundleProcessor);
throw e;
}
Expand Down Expand Up @@ -1168,13 +1172,26 @@ void discard() {
if (this.bundleCache != null) {
this.bundleCache.clear();
}
// setupFunctions are invoked in createBundleProcessor. Invoke teardownFunction here as the
// BundleProcessor is already removed from cache and won't be re-used.
for (ThrowingRunnable teardownFunction : Lists.reverse(this.getTearDownFunctions())) {
try {
teardownFunction.run();
} catch (Throwable e) {
LOG.warn(
"Exceptions are thrown from DoFn.teardown method when trying to discard "
+ "ProcessBundleHandler",
e);
}
}
getMetricsEnvironmentStateForBundle().discard();
for (BeamFnDataOutboundAggregator aggregator : getOutboundAggregators().values()) {
aggregator.discard();
}
}
}

// this is called in cachedBundleProcessors removal listener
void shutdown() {
for (ThrowingRunnable tearDownFunction : getTearDownFunctions()) {
LOG.debug("Tearing down function {}", tearDownFunction);
Expand Down

0 comments on commit c31d81c

Please sign in to comment.