From 89d5e2f29615c1d4dba41c42a2161d5c5d5f39a8 Mon Sep 17 00:00:00 2001 From: Andrew Crites Date: Tue, 30 Jul 2024 09:17:21 -0700 Subject: [PATCH] Validate commits in StreamingDataflowWorker (#31822) * Grabs operational limits from streaming config and plumbs them to WindmillSink, which can then throw an exception or log a warning if outputs are too large. --- .../dataflow/worker/OperationalLimits.java | 64 ++++++++++++ .../worker/OutputTooLargeException.java | 38 +++++++ .../worker/StreamingDataflowWorker.java | 44 +++++---- .../worker/StreamingModeExecutionContext.java | 18 ++++ .../runners/dataflow/worker/WindmillSink.java | 25 +++++ .../streaming/ComputationWorkExecutor.java | 7 +- ...reamingEngineComputationConfigFetcher.java | 16 ++- .../config/StreamingEnginePipelineConfig.java | 10 ++ .../processing/StreamingWorkScheduler.java | 22 +++-- .../worker/StreamingDataflowWorkerTest.java | 98 +++++++++++++++++-- .../StreamingModeExecutionContextTest.java | 2 + .../worker/WorkerCustomSourcesTest.java | 4 +- 12 files changed, 311 insertions(+), 37 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java new file mode 100644 index 000000000000..e9ee8f39cba4 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OperationalLimits.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import com.google.auto.value.AutoBuilder; + +/** Keep track of any operational limits required by the backend. */ +public class OperationalLimits { + // Maximum size of a commit from a single work item. + public final long maxWorkItemCommitBytes; + // Maximum size of a single output element's serialized key. + public final long maxOutputKeyBytes; + // Maximum size of a single output element's serialized value. + public final long maxOutputValueBytes; + // Whether to throw an exception when processing output that violates any of the given limits. + public final boolean throwExceptionOnLargeOutput; + + OperationalLimits( + long maxWorkItemCommitBytes, + long maxOutputKeyBytes, + long maxOutputValueBytes, + boolean throwExceptionOnLargeOutput) { + this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; + this.maxOutputKeyBytes = maxOutputKeyBytes; + this.maxOutputValueBytes = maxOutputValueBytes; + this.throwExceptionOnLargeOutput = throwExceptionOnLargeOutput; + } + + @AutoBuilder(ofClass = OperationalLimits.class) + public interface Builder { + Builder setMaxWorkItemCommitBytes(long bytes); + + Builder setMaxOutputKeyBytes(long bytes); + + Builder setMaxOutputValueBytes(long bytes); + + Builder setThrowExceptionOnLargeOutput(boolean shouldThrow); + + OperationalLimits build(); + } + + public static Builder builder() { + return new AutoBuilder_OperationalLimits_Builder() + .setMaxWorkItemCommitBytes(Long.MAX_VALUE) + .setMaxOutputKeyBytes(Long.MAX_VALUE) + .setMaxOutputValueBytes(Long.MAX_VALUE) + .setThrowExceptionOnLargeOutput(false); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java new file mode 100644 index 000000000000..9f4b413841c5 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/OutputTooLargeException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker; + +import org.checkerframework.checker.nullness.qual.Nullable; + +/** Indicates that an output element was too large. */ +public class OutputTooLargeException extends RuntimeException { + public OutputTooLargeException(String reason) { + super(reason); + } + + /** Returns whether an exception was caused by a {@link OutputTooLargeException}. */ + public static boolean isCausedByOutputTooLargeException(@Nullable Throwable t) { + while (t != null) { + if (t instanceof OutputTooLargeException) { + return true; + } + t = t.getCause(); + } + return false; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 718d93830c41..a07bbfa7f5f3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -35,7 +35,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -177,7 +177,7 @@ private StreamingDataflowWorker( WorkFailureProcessor workFailureProcessor, StreamingCounters streamingCounters, MemoryMonitor memoryMonitor, - AtomicInteger maxWorkItemCommitBytes, + AtomicReference operationalLimits, GrpcWindmillStreamFactory windmillStreamFactory, Function executorSupplier, ConcurrentMap stageInfoMap) { @@ -296,7 +296,7 @@ private StreamingDataflowWorker( streamingCounters, hotKeyLogger, sampler, - maxWorkItemCommitBytes, + operationalLimits, ID_GENERATOR, stageInfoMap); @@ -304,7 +304,6 @@ private StreamingDataflowWorker( LOG.debug("WindmillServiceEndpoint: {}", options.getWindmillServiceEndpoint()); LOG.debug("WindmillServicePort: {}", options.getWindmillServicePort()); LOG.debug("LocalWindmillHostport: {}", options.getLocalWindmillHostport()); - LOG.debug("maxWorkItemCommitBytes: {}", maxWorkItemCommitBytes.get()); } public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions options) { @@ -314,7 +313,8 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o StreamingCounters streamingCounters = StreamingCounters.create(); WorkUnitClient dataflowServiceClient = new DataflowWorkUnitClient(options, LOG); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); - AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(Integer.MAX_VALUE); + AtomicReference operationalLimits = + new AtomicReference<>(OperationalLimits.builder().build()); WindmillStateCache windmillStateCache = WindmillStateCache.builder() .setSizeMb(options.getWorkerCacheMb()) @@ -332,7 +332,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o createConfigFetcherComputationStateCacheAndWindmillClient( options, dataflowServiceClient, - maxWorkItemCommitBytes, + operationalLimits, windmillStreamFactoryBuilder, configFetcher -> ComputationStateCache.create( @@ -390,7 +390,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o workFailureProcessor, streamingCounters, memoryMonitor, - maxWorkItemCommitBytes, + operationalLimits, configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), executorSupplier, stageInfo); @@ -406,7 +406,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o createConfigFetcherComputationStateCacheAndWindmillClient( DataflowWorkerHarnessOptions options, WorkUnitClient dataflowServiceClient, - AtomicInteger maxWorkItemCommitBytes, + AtomicReference operationalLimits, GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, Function computationStateCacheFactory) { ComputationConfig.Fetcher configFetcher; @@ -422,8 +422,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o config -> onPipelineConfig( config, + options, dispatcherClient::consumeWindmillDispatcherEndpoints, - maxWorkItemCommitBytes)); + operationalLimits::set)); computationStateCache = computationStateCacheFactory.apply(configFetcher); windmillStreamFactory = windmillStreamFactoryBuilder @@ -469,9 +470,9 @@ static StreamingDataflowWorker forTesting( Supplier clock, Function executorSupplier, int localRetryTimeoutMs, - int maxWorkItemCommitBytesOverrides) { + OperationalLimits limits) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); - AtomicInteger maxWorkItemCommitBytes = new AtomicInteger(maxWorkItemCommitBytesOverrides); + AtomicReference operationalLimits = new AtomicReference<>(limits); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = WindmillStateCache.builder() @@ -488,8 +489,9 @@ static StreamingDataflowWorker forTesting( config -> onPipelineConfig( config, + options, windmillServer::setWindmillServiceEndpoints, - maxWorkItemCommitBytes)) + operationalLimits::set)) : new StreamingApplianceComputationConfigFetcher(windmillServer::getConfig); ConcurrentMap stateNameMap = new ConcurrentHashMap<>(prePopulatedStateNameMappings); @@ -557,7 +559,7 @@ static StreamingDataflowWorker forTesting( workFailureProcessor, streamingCounters, memoryMonitor, - maxWorkItemCommitBytes, + operationalLimits, options.isEnableStreamingEngine() ? windmillStreamFactory .setHealthCheckIntervalMillis( @@ -570,12 +572,18 @@ static StreamingDataflowWorker forTesting( private static void onPipelineConfig( StreamingEnginePipelineConfig config, + DataflowWorkerHarnessOptions options, Consumer> consumeWindmillServiceEndpoints, - AtomicInteger maxWorkItemCommitBytes) { - if (config.maxWorkItemCommitBytes() != maxWorkItemCommitBytes.get()) { - LOG.info("Setting maxWorkItemCommitBytes to {}", maxWorkItemCommitBytes); - maxWorkItemCommitBytes.set((int) config.maxWorkItemCommitBytes()); - } + Consumer operationalLimits) { + + operationalLimits.accept( + OperationalLimits.builder() + .setMaxWorkItemCommitBytes(config.maxWorkItemCommitBytes()) + .setMaxOutputKeyBytes(config.maxOutputKeyBytes()) + .setMaxOutputValueBytes(config.maxOutputValueBytes()) + .setThrowExceptionOnLargeOutput( + DataflowRunner.hasExperiment(options, "throw_exceptions_on_large_output")) + .build()); if (!config.windmillServiceEndpoints().isEmpty()) { consumeWindmillServiceEndpoints.accept(config.windmillServiceEndpoints()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java index dd6353060abc..a594dbb1e0f7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java @@ -129,6 +129,10 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext extends Sink> { private final Coder valueCoder; private final Coder> windowsCoder; private StreamingModeExecutionContext context; + private static final Logger LOG = LoggerFactory.getLogger(WindmillSink.class); WindmillSink( String destinationName, @@ -172,6 +175,28 @@ public long add(WindowedValue data) throws IOException { key = context.getSerializedKey(); value = encode(valueCoder, data.getValue()); } + if (key.size() > context.getMaxOutputKeyBytes()) { + if (context.throwExceptionsForLargeOutput()) { + throw new OutputTooLargeException("Key too large: " + key.size()); + } else { + LOG.error( + "Trying to output too large key with size " + + key.size() + + ". Limit is " + + context.getMaxOutputKeyBytes()); + } + } + if (value.size() > context.getMaxOutputValueBytes()) { + if (context.throwExceptionsForLargeOutput()) { + throw new OutputTooLargeException("Value too large: " + value.size()); + } else { + LOG.error( + "Trying to output too large value with size " + + value.size() + + ". Limit is " + + context.getMaxOutputValueBytes()); + } + } Windmill.KeyedMessageBundle.Builder keyedOutput = productionMap.get(key); if (keyedOutput == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java index dd34e85bc93c..8a00194887da 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationWorkExecutor.java @@ -24,6 +24,7 @@ import org.apache.beam.runners.core.metrics.ExecutionStateTracker; import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutor; import org.apache.beam.runners.dataflow.worker.DataflowWorkExecutor; +import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; @@ -45,7 +46,7 @@ * @implNote Once closed, it cannot be reused. */ // TODO(m-trieu): See if this can be combined/cleaned up with StreamingModeExecutionContext as the -// seperation of responsibilities are unclear. +// separation of responsibilities are unclear. @AutoValue @Internal @NotThreadSafe @@ -72,9 +73,11 @@ public final void executeWork( Work work, WindmillStateReader stateReader, SideInputStateFetcher sideInputStateFetcher, + OperationalLimits operationalLimits, Windmill.WorkItemCommitRequest.Builder outputBuilder) throws Exception { - context().start(key, work, stateReader, sideInputStateFetcher, outputBuilder); + context() + .start(key, work, stateReader, sideInputStateFetcher, operationalLimits, outputBuilder); workExecutor().execute(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java index 51d1507af5fe..850e8c3f24bd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEngineComputationConfigFetcher.java @@ -157,7 +157,7 @@ private static Optional fetchConfigWithRetry( } } - private static StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) { + private StreamingEnginePipelineConfig createPipelineConfig(StreamingConfigTask config) { StreamingEnginePipelineConfig.Builder pipelineConfig = StreamingEnginePipelineConfig.builder(); if (config.getUserStepToStateFamilyNameMap() != null) { pipelineConfig.setUserStepToStateFamilyNameMap(config.getUserStepToStateFamilyNameMap()); @@ -187,6 +187,18 @@ private static StreamingEnginePipelineConfig createPipelineConfig(StreamingConfi pipelineConfig.setMaxWorkItemCommitBytes(config.getMaxWorkItemCommitBytes().intValue()); } + if (config.getOperationalLimits() != null) { + if (config.getOperationalLimits().getMaxKeyBytes() > 0 + && config.getOperationalLimits().getMaxKeyBytes() <= Integer.MAX_VALUE) { + pipelineConfig.setMaxOutputKeyBytes(config.getOperationalLimits().getMaxKeyBytes()); + } + if (config.getOperationalLimits().getMaxProductionOutputBytes() > 0 + && config.getOperationalLimits().getMaxProductionOutputBytes() <= Integer.MAX_VALUE) { + pipelineConfig.setMaxOutputValueBytes( + config.getOperationalLimits().getMaxProductionOutputBytes()); + } + } + return pipelineConfig.build(); } @@ -273,7 +285,7 @@ private synchronized void fetchInitialPipelineGlobalConfig() { private Optional fetchGlobalConfig() { return fetchConfigWithRetry(dataflowServiceClient::getGlobalStreamingConfigWorkItem) - .map(StreamingEngineComputationConfigFetcher::createPipelineConfig); + .map(config -> createPipelineConfig(config)); } @FunctionalInterface diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java index b5b761ada703..8f1ff93f6a49 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/config/StreamingEnginePipelineConfig.java @@ -34,12 +34,18 @@ public abstract class StreamingEnginePipelineConfig { public static StreamingEnginePipelineConfig.Builder builder() { return new AutoValue_StreamingEnginePipelineConfig.Builder() .setMaxWorkItemCommitBytes(DEFAULT_MAX_WORK_ITEM_COMMIT_BYTES) + .setMaxOutputKeyBytes(Long.MAX_VALUE) + .setMaxOutputValueBytes(Long.MAX_VALUE) .setUserStepToStateFamilyNameMap(new HashMap<>()) .setWindmillServiceEndpoints(ImmutableSet.of()); } public abstract long maxWorkItemCommitBytes(); + public abstract long maxOutputKeyBytes(); + + public abstract long maxOutputValueBytes(); + public abstract Map userStepToStateFamilyNameMap(); public abstract ImmutableSet windmillServiceEndpoints(); @@ -48,6 +54,10 @@ public static StreamingEnginePipelineConfig.Builder builder() { public abstract static class Builder { public abstract Builder setMaxWorkItemCommitBytes(long value); + public abstract Builder setMaxOutputKeyBytes(long value); + + public abstract Builder setMaxOutputValueBytes(long value); + public abstract Builder setUserStepToStateFamilyNameMap(Map value); public abstract Builder setWindmillServiceEndpoints(ImmutableSet value); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java index 334ab8efeae2..e9ffa982925b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java @@ -23,7 +23,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; @@ -31,6 +31,7 @@ import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.DataflowMapTaskExecutorFactory; import org.apache.beam.runners.dataflow.worker.HotKeyLogger; +import org.apache.beam.runners.dataflow.worker.OperationalLimits; import org.apache.beam.runners.dataflow.worker.ReaderCache; import org.apache.beam.runners.dataflow.worker.WorkItemCancelledException; import org.apache.beam.runners.dataflow.worker.logging.DataflowWorkerLoggingMDC; @@ -81,7 +82,7 @@ public final class StreamingWorkScheduler { private final HotKeyLogger hotKeyLogger; private final ConcurrentMap stageInfoMap; private final DataflowExecutionStateSampler sampler; - private final AtomicInteger maxWorkItemCommitBytes; + private final AtomicReference operationalLimits; public StreamingWorkScheduler( DataflowWorkerHarnessOptions options, @@ -95,7 +96,7 @@ public StreamingWorkScheduler( HotKeyLogger hotKeyLogger, ConcurrentMap stageInfoMap, DataflowExecutionStateSampler sampler, - AtomicInteger maxWorkItemCommitBytes) { + AtomicReference operationalLimits) { this.options = options; this.clock = clock; this.computationWorkExecutorFactory = computationWorkExecutorFactory; @@ -107,7 +108,7 @@ public StreamingWorkScheduler( this.hotKeyLogger = hotKeyLogger; this.stageInfoMap = stageInfoMap; this.sampler = sampler; - this.maxWorkItemCommitBytes = maxWorkItemCommitBytes; + this.operationalLimits = operationalLimits; } public static StreamingWorkScheduler create( @@ -123,7 +124,7 @@ public static StreamingWorkScheduler create( StreamingCounters streamingCounters, HotKeyLogger hotKeyLogger, DataflowExecutionStateSampler sampler, - AtomicInteger maxWorkItemCommitBytes, + AtomicReference operationalLimits, IdGenerator idGenerator, ConcurrentMap stageInfoMap) { ComputationWorkExecutorFactory computationWorkExecutorFactory = @@ -148,7 +149,7 @@ public static StreamingWorkScheduler create( hotKeyLogger, stageInfoMap, sampler, - maxWorkItemCommitBytes); + operationalLimits); } private static long computeShuffleBytesRead(Windmill.WorkItem workItem) { @@ -292,7 +293,7 @@ private Windmill.WorkItemCommitRequest validateCommitRequestSize( Windmill.WorkItemCommitRequest commitRequest, String computationId, Windmill.WorkItem workItem) { - int byteLimit = maxWorkItemCommitBytes.get(); + long byteLimit = operationalLimits.get().maxWorkItemCommitBytes; int commitSize = commitRequest.getSerializedSize(); int estimatedCommitSize = commitSize < 0 ? Integer.MAX_VALUE : commitSize; @@ -375,7 +376,12 @@ private ExecuteWorkResult executeWork( // Blocks while executing work. computationWorkExecutor.executeWork( - executionKey, work, stateReader, localSideInputStateFetcher, outputBuilder); + executionKey, + work, + stateReader, + localSideInputStateFetcher, + operationalLimits.get(), + outputBuilder); if (work.isFailed()) { throw new WorkItemCancelledException(workItem.getShardingKey()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 52bc61e59919..8a4369fdbd8d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -549,7 +549,6 @@ private Windmill.GetWorkResponse buildSessionInput( List inputs, List timers) throws Exception { - // Windmill.GetWorkResponse.Builder builder = Windmill.GetWorkResponse.newBuilder(); Windmill.WorkItem.Builder builder = Windmill.WorkItem.newBuilder(); builder.setKey(DEFAULT_KEY_BYTES); builder.setShardingKey(DEFAULT_SHARDING_KEY); @@ -849,7 +848,7 @@ private StreamingDataflowWorker makeWorker( streamingDataflowWorkerTestParams.clock(), streamingDataflowWorkerTestParams.executorSupplier(), streamingDataflowWorkerTestParams.localRetryTimeoutMs(), - streamingDataflowWorkerTestParams.maxWorkItemCommitBytes()); + streamingDataflowWorkerTestParams.operationalLimits()); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -1216,7 +1215,8 @@ public void testKeyCommitTooLargeException() throws Exception { makeWorker( defaultWorkerParams() .setInstructions(instructions) - .setMaxWorkItemCommitBytes(1000) + .setOperationalLimits( + OperationalLimits.builder().setMaxWorkItemCommitBytes(1000).build()) .publishCounters() .build()); worker.start(); @@ -1271,6 +1271,80 @@ public void testKeyCommitTooLargeException() throws Exception { assertTrue(foundErrors); } + @Test + public void testOutputKeyTooLargeException() throws Exception { + KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + List instructions = + Arrays.asList( + makeSourceInstruction(kvCoder), + makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder), + makeSinkInstruction(kvCoder, 1)); + + server.setExpectedExceptionCount(1); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setInstructions(instructions) + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputKeyBytes(15) + .setThrowExceptionOnLargeOutput(true) + .build()) + .build()); + worker.start(); + + // This large key will cause the ExceptionCatchingFn to throw an exception, which will then + // cause it to output a smaller key. + String bigKey = "some_much_too_large_output_key"; + server.whenGetWorkCalled().thenReturn(makeInput(1, 0, bigKey, DEFAULT_SHARDING_KEY)); + server.waitForEmptyWorkQueue(); + + Map result = server.waitForAndGetCommits(1); + assertEquals(1, result.size()); + assertEquals( + makeExpectedOutput(1, 0, bigKey, DEFAULT_SHARDING_KEY, "smaller_key").build(), + removeDynamicFields(result.get(1L))); + } + + @Test + public void testOutputValueTooLargeException() throws Exception { + KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); + + List instructions = + Arrays.asList( + makeSourceInstruction(kvCoder), + makeDoFnInstruction(new ExceptionCatchingFn(), 0, kvCoder), + makeSinkInstruction(kvCoder, 1)); + + server.setExpectedExceptionCount(1); + + StreamingDataflowWorker worker = + makeWorker( + defaultWorkerParams() + .setInstructions(instructions) + .setOperationalLimits( + OperationalLimits.builder() + .setMaxOutputValueBytes(15) + .setThrowExceptionOnLargeOutput(true) + .build()) + .build()); + worker.start(); + + // The first time processing will have value "data1_a_bunch_more_data_output", which is above + // the limit. After throwing the exception, the output should be just "data1", which is small + // enough. + server.whenGetWorkCalled().thenReturn(makeInput(1, 0, "key", DEFAULT_SHARDING_KEY)); + server.waitForEmptyWorkQueue(); + + Map result = server.waitForAndGetCommits(1); + assertEquals(1, result.size()); + assertEquals( + makeExpectedOutput(1, 0, "key", DEFAULT_SHARDING_KEY, "smaller_key").build(), + removeDynamicFields(result.get(1L))); + } + @Test public void testKeyChange() throws Exception { KvCoder kvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()); @@ -4021,6 +4095,18 @@ public void processElement(ProcessContext c) { } } + static class ExceptionCatchingFn extends DoFn, KV> { + + @ProcessElement + public void processElement(ProcessContext c) { + try { + c.output(KV.of(c.element().getKey(), c.element().getValue() + "_a_bunch_more_data_output")); + } catch (Exception e) { + c.output(KV.of("smaller_key", c.element().getValue())); + } + } + } + static class ChangeKeysFn extends DoFn, KV> { @ProcessElement @@ -4433,7 +4519,7 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { .setLocalRetryTimeoutMs(-1) .setPublishCounters(false) .setClock(Instant::now) - .setMaxWorkItemCommitBytes(Integer.MAX_VALUE); + .setOperationalLimits(OperationalLimits.builder().build()); } abstract ImmutableMap stateNameMappings(); @@ -4450,7 +4536,7 @@ private static StreamingDataflowWorkerTestParams.Builder builder() { abstract int localRetryTimeoutMs(); - abstract int maxWorkItemCommitBytes(); + abstract OperationalLimits operationalLimits(); @AutoValue.Builder abstract static class Builder { @@ -4484,7 +4570,7 @@ final Builder publishCounters() { abstract Builder setLocalRetryTimeoutMs(int value); - abstract Builder setMaxWorkItemCommitBytes(int maxWorkItemCommitBytes); + abstract Builder setOperationalLimits(OperationalLimits operationalLimits); abstract StreamingDataflowWorkerTestParams build(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 6c46bda5acfe..7988212efde0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -157,6 +157,7 @@ public void testTimerInternalsSetTimer() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, + OperationalLimits.builder().build(), outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); @@ -206,6 +207,7 @@ public void testTimerInternalsProcessingTimeSkew() { Watermarks.builder().setInputDataWatermark(new Instant(1000)).build()), stateReader, sideInputStateFetcher, + OperationalLimits.builder().build(), outputBuilder); TimerInternals timerInternals = stepContext.timerInternals(); assertTrue(timerTimestamp.isBefore(timerInternals.currentProcessingTime())); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 5d8ebd53400c..c79d947ca227 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -634,6 +634,7 @@ public void testReadUnboundedReader() throws Exception { Watermarks.builder().setInputDataWatermark(new Instant(0)).build()), mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), + OperationalLimits.builder().build(), Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"}) @@ -1000,7 +1001,7 @@ public void testFailedWorkItemsAbort() throws Exception { Work.createProcessingContext( COMPUTATION_ID, (a, b) -> Windmill.KeyedGetDataResponse.getDefaultInstance(), - gnored -> {}), + ignored -> {}), Instant::now, Collections.emptyList()); context.start( @@ -1008,6 +1009,7 @@ public void testFailedWorkItemsAbort() throws Exception { dummyWork, mock(WindmillStateReader.class), mock(SideInputStateFetcher.class), + OperationalLimits.builder().build(), Windmill.WorkItemCommitRequest.newBuilder()); @SuppressWarnings({"unchecked", "rawtypes"})