diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java index 6e5843f533db..78ea34503e54 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.fasterxml.jackson.annotation.JsonCreator; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -386,4 +387,21 @@ static List getConfiguredLoggerFromOptions(SdkHarnessOptions loggingOpti } return configuredLoggers; } + + @Hidden + @Description( + "Timeout used for cache of bundle processors. Defaults to a minute for batch and an hour for streaming.") + @Default.InstanceFactory(BundleProcessorCacheTimeoutFactory.class) + Duration getBundleProcessorCacheTimeout(); + + void setBundleProcessorCacheTimeout(Duration duration); + + class BundleProcessorCacheTimeoutFactory implements DefaultValueFactory { + @Override + public Duration create(PipelineOptions options) { + return options.as(StreamingOptions.class).isStreaming() + ? Duration.ofHours(1) + : Duration.ofMinutes(1); + } + } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 0d517503b12d..300796ac6f12 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -83,6 +83,7 @@ import org.apache.beam.sdk.metrics.MetricsEnvironment; import org.apache.beam.sdk.metrics.MetricsEnvironment.MetricsEnvironmentState; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.SdkHarnessOptions; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.ReflectHelpers; @@ -188,7 +189,8 @@ public ProcessBundleHandler( executionStateSampler, REGISTERED_RUNNER_FACTORIES, processWideCache, - new BundleProcessorCache(), + new BundleProcessorCache( + options.as(SdkHarnessOptions.class).getBundleProcessorCacheTimeout()), dataSampler); } @@ -927,25 +929,25 @@ public int hashCode() { return super.hashCode(); } - BundleProcessorCache() { - this.cachedBundleProcessors = + BundleProcessorCache(Duration timeout) { + CacheBuilder> builder = CacheBuilder.newBuilder() - .expireAfterAccess(Duration.ofMinutes(1L)) .removalListener( - removalNotification -> { - ((ConcurrentLinkedQueue) removalNotification.getValue()) - .forEach( - bundleProcessor -> { - bundleProcessor.shutdown(); - }); - }) - .build( - new CacheLoader>() { - @Override - public ConcurrentLinkedQueue load(String s) throws Exception { - return new ConcurrentLinkedQueue<>(); - } - }); + removalNotification -> + removalNotification + .getValue() + .forEach(bundleProcessor -> bundleProcessor.shutdown())); + if (timeout.compareTo(Duration.ZERO) > 0) { + builder = builder.expireAfterAccess(timeout); + } + this.cachedBundleProcessors = + builder.build( + new CacheLoader>() { + @Override + public ConcurrentLinkedQueue load(String s) throws Exception { + return new ConcurrentLinkedQueue<>(); + } + }); // We specifically use a weak hash map so that references will automatically go out of scope // and not need to be freed explicitly from the cache. this.activeBundleProcessors = Collections.synchronizedMap(new WeakHashMap<>()); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 95b404aa6203..a69ea5338dc3 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -48,6 +48,7 @@ import static org.mockito.Mockito.when; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -354,6 +355,10 @@ void reset() throws Exception { private static class TestBundleProcessorCache extends BundleProcessorCache { + TestBundleProcessorCache() { + super(Duration.ZERO); + } + @Override BundleProcessor get( InstructionRequest processBundleRequest, @@ -376,7 +381,7 @@ public void testTrySplitBeforeBundleDoesNotFail() { executionStateSampler, ImmutableMap.of(), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); BeamFnApi.InstructionResponse response = @@ -407,7 +412,7 @@ public void testProgressBeforeBundleDoesNotFail() throws Exception { executionStateSampler, ImmutableMap.of(), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); handler.progress( @@ -487,7 +492,7 @@ public void testOrderOfStartAndFinishCalls() throws Exception { DATA_INPUT_URN, startFinishRecorder, DATA_OUTPUT_URN, startFinishRecorder), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); handler.processBundle( @@ -592,7 +597,7 @@ public void testOrderOfSetupTeardownCalls() throws Exception { executionStateSampler, urnToPTransformRunnerFactoryMap, Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); handler.processBundle( @@ -699,7 +704,7 @@ private static InstructionRequest processBundleRequestFor( public void testBundleProcessorIsFoundWhenActive() { BundleProcessor bundleProcessor = mock(BundleProcessor.class); when(bundleProcessor.getInstructionId()).thenReturn("known"); - BundleProcessorCache cache = new BundleProcessorCache(); + BundleProcessorCache cache = new BundleProcessorCache(Duration.ZERO); // Check that an unknown bundle processor is not found assertNull(cache.find("unknown")); @@ -811,7 +816,7 @@ public void testCreatingPTransformExceptionsArePropagated() throws Exception { throw new IllegalStateException("TestException"); }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); assertThrows( "TestException", @@ -862,7 +867,7 @@ public void testBundleFinalizationIsPropagated() throws Exception { return null; }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); BeamFnApi.InstructionResponse.Builder response = handler.processBundle( @@ -916,7 +921,7 @@ public void testPTransformStartExceptionsArePropagated() { return null; }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); assertThrows( "TestException", @@ -1094,7 +1099,7 @@ public void onCompleted() {} executionStateSampler, urnToPTransformRunnerFactoryMap, Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); } @@ -1427,7 +1432,7 @@ public void testInstructionIsUnregisteredFromBeamFnDataClientOnSuccess() throws return null; }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); handler.processBundle( BeamFnApi.InstructionRequest.newBuilder() @@ -1500,7 +1505,7 @@ public void testDataProcessingExceptionsArePropagated() throws Exception { return null; }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); assertThrows( "TestException", @@ -1551,7 +1556,7 @@ public void testPTransformFinishExceptionsArePropagated() throws Exception { return null; }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); assertThrows( "TestException", @@ -1647,7 +1652,7 @@ private void doStateCalls(BeamFnStateClient beamFnStateClient) { } }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); handler.processBundle( BeamFnApi.InstructionRequest.newBuilder() @@ -1698,7 +1703,7 @@ private void doStateCalls(BeamFnStateClient beamFnStateClient) { } }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); assertThrows( "State API calls are unsupported", @@ -1787,7 +1792,7 @@ public void reset() { return null; }; - BundleProcessorCache bundleProcessorCache = new BundleProcessorCache(); + BundleProcessorCache bundleProcessorCache = new BundleProcessorCache(Duration.ZERO); ProcessBundleHandler handler = new ProcessBundleHandler( PipelineOptionsFactory.create(), @@ -1930,7 +1935,7 @@ public Object createRunnerForPTransform(Context context) throws IOException { } }), Caches.noop(), - new BundleProcessorCache(), + new BundleProcessorCache(Duration.ZERO), null /* dataSampler */); assertThrows( "Timers are unsupported",