diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java index e9ee8f39cba4..47e36e498507 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -27,18 +27,11 @@ public class OperationalLimits { public final long maxOutputKeyBytes; // Maximum size of a single output element's serialized value. public final long maxOutputValueBytes; - // Whether to throw an exception when processing output that violates any of the given limits. - public final boolean throwExceptionOnLargeOutput; - OperationalLimits( - long maxWorkItemCommitBytes, - long maxOutputKeyBytes, - long maxOutputValueBytes, - boolean throwExceptionOnLargeOutput) { + OperationalLimits(long maxWorkItemCommitBytes, long maxOutputKeyBytes, long maxOutputValueBytes) { this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; this.maxOutputKeyBytes = maxOutputKeyBytes; this.maxOutputValueBytes = maxOutputValueBytes; - this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput; } @AutoBuilder(ofClass = OperationalLimits.class) @@ -49,8 +42,6 @@ public interface Builder { Builder setMaxOutputValueBytes(long bytes); - Builder setThrowExceptionOnLargeOutput(boolean shouldThrow); - OperationalLimits build(); } @@ -58,7 +49,6 @@ public static Builder builder() { return new AutoBuilder_OperationalLimits_Builder() .setMaxWorkItemCommitBytes(Long.MAX_VALUE) .setMaxOutputKeyBytes(Long.MAX_VALUE) - .setMaxOutputValueBytes(Long.MAX_VALUE) - .setThrowExceptionOnLargeOutput(false); + .setMaxOutputValueBytes(Long.MAX_VALUE); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 90f072be997e..1af677382092 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -445,7 +445,6 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o config -> onPipelineConfig( config, - options, dispatcherClient::consumeWindmillDispatcherEndpoints, operationalLimits::set)); computationStateCache = computationStateCacheFactory.apply(configFetcher); @@ -515,7 +514,6 @@ static StreamingDataflowWorker forTesting( config -> onPipelineConfig( config, - options, windmillServer::setWindmillServiceEndpoints, operationalLimits::set)) : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); @@ -598,7 +596,6 @@ static StreamingDataflowWorker forTesting( private static void onPipelineConfig( StreamingEnginePipelineConfig config, - DataflowWorkerHarnessOptions options, Consumer> consumeWindmillServiceEndpoints, Consumer operationalLimits) { @@ -607,8 +604,6 @@ private static void onPipelineConfig( .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes()) .setMaxOutputKeyBytes(config.maxOutputKeyBytes()) .setMaxOutputValueBytes(config.maxOutputValueBytes()) - .setThrowExceptionOnLargeOutput( - DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output")) .build()); if (!config.windmillServiceEndpoints().isEmpty()) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index a594dbb1e0f7..f25f6294da86 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -107,6 +107,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext stateNameMap; private final WindmillStateCache.ForComputation stateCache; private final ReaderCache readerCache; + private final boolean throwExceptionOnLargeOutput; private volatile long backlogBytes; /** @@ -152,7 +153,8 @@ public StreamingModeExecutionContext( MetricsContainerRegistry metricsContainerRegistry, DataflowExecutionStateTracker executionStateTracker, StreamingModeExecutionStateRegistry executionStateRegistry, - long sinkByteLimit) { + long sinkByteLimit, + boolean throwExceptionOnLargeOutput) { super( counterFactory, metricsContainerRegistry, @@ -165,6 +167,7 @@ public StreamingModeExecutionContext( this.stateNameMap = ImmutableMap.copyOf(stateNameMap); this.stateCache = stateCache; this.backlogBytes = UnboundedReader.BACKLOG_UNKNOWN; + this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput; } @VisibleForTesting @@ -181,7 +184,7 @@ public long getMaxOutputValueBytes() { } public boolean throwExceptionsForLargeOutput() { - return operationalLimits.throwExceptionOnLargeOutput; + return throwExceptionOnLargeOutput; } public boolean workIsFailed() { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java index f9b1b45be6c1..20c1247b2168 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/ComputationWorkExecutorFactory.java @@ -68,6 +68,10 @@ final class ComputationWorkExecutorFactory { private static final Logger LOG = LoggerFactory.getLogger(ComputationWorkExecutorFactory.class); private static final String DISABLE_SINK_BYTE_LIMIT_EXPERIMENT = "disable_limiting_bundle_sink_bytes"; + // Whether to throw an exception when processing output that violates any of the operational + // limits. + private static final String THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT = + "throw_exceptions_on_large_output"; private final DataflowWorkerHarnessOptions options; private final DataflowMapTaskExecutorFactory mapTaskExecutorFactory; @@ -90,6 +94,7 @@ final class ComputationWorkExecutorFactory { private final long maxSinkBytes; private final IdGenerator idGenerator; + private final boolean throwExceptionOnLargeOutput; ComputationWorkExecutorFactory( DataflowWorkerHarnessOptions options, @@ -113,6 +118,8 @@ final class ComputationWorkExecutorFactory { hasExperiment(options, DISABLE_SINK_BYTE_LIMIT_EXPERIMENT) ? Long.MAX_VALUE : StreamingDataflowWorker.MAX_SINK_BYTES; + this.throwExceptionOnLargeOutput = + hasExperiment(options, THROW_EXCEPTIONS_ON_LARGE_OUTPUT_EXPERIMENT); } private static Nodes.ParallelInstructionNode extractReadNode( @@ -255,7 +262,8 @@ private StreamingModeExecutionContext createExecutionContext( stageInfo.metricsContainerRegistry(), executionStateTracker, stageInfo.executionStateRegistry(), - maxSinkBytes); + maxSinkBytes, + throwExceptionOnLargeOutput); } private DataflowMapTaskExecutor createMapTaskExecutor( diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d16ed2942fd9..b41ad391d878 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -1280,13 +1280,9 @@ public void testOutputKeyTooLargeException() throws Exception { StreamingDataflowWorker worker = makeWorker( - defaultWorkerParams() + defaultWorkerParams("--experiments=throw_exceptions_on_large_output") .setInstructions(instructions) - .setOperationalLimits( - OperationalLimits.builder() - .setMaxOutputKeyBytes(15) - .setThrowExceptionOnLargeOutput(true) - .build()) + .setOperationalLimits(OperationalLimits.builder().setMaxOutputKeyBytes(15).build()) .build()); worker.start(); @@ -1317,13 +1313,10 @@ public void testOutputValueTooLargeException() throws Exception { StreamingDataflowWorker worker = makeWorker( - defaultWorkerParams() + defaultWorkerParams("--experiments=throw_exceptions_on_large_output") .setInstructions(instructions) .setOperationalLimits( - OperationalLimits.builder() - .setMaxOutputValueBytes(15) - .setThrowExceptionOnLargeOutput(true) - .build()) + OperationalLimits.builder().setMaxOutputValueBytes(15).build()) .build()); worker.start(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 8445e8ede852..86ed8f552d16 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -127,7 +127,8 @@ public void setUp() { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, - Long.MAX_VALUE); + Long.MAX_VALUE, + /*throwExceptionOnLargeOutput=*/ false); } private static Work createMockWork(Windmill.WorkItem workItem, Watermarks watermarks) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 5c149a65f4ce..f2e03b453fd8 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -610,7 +610,8 @@ public void testReadUnboundedReader() throws Exception { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, - Long.MAX_VALUE); + Long.MAX_VALUE, + /*throwExceptionOnLargeOutput=*/ false); options.setNumWorkers(5); int maxElements = 10; @@ -978,7 +979,8 @@ public void testFailedWorkItemsAbort() throws Exception { PipelineOptionsFactory.create(), "test-work-item-id"), executionStateRegistry, - Long.MAX_VALUE); + Long.MAX_VALUE, + /*throwExceptionOnLargeOutput=*/ false); options.setNumWorkers(5); int maxElements = 100;