From db338059439990c1ad33fdd9f84fc243ac57deef Mon Sep 17 00:00:00 2001 From: clmccart Date: Mon, 1 Apr 2024 09:58:43 -0700 Subject: [PATCH] only record per state change processing times for streaming pipelines (#30653) --- .../worker/DataflowExecutionContext.java | 27 +++++++--- .../worker/DataflowExecutionContextTest.java | 50 ++++++++++++++++++- 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index 080fa7c9dac4..7c9d2fa18879 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -37,6 +37,8 @@ import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.core.metrics.ExecutionStateTracker.ExecutionState; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext; import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.counters.CounterFactory; @@ -252,6 +254,8 @@ public static class DataflowExecutionStateTracker extends ExecutionStateTracker private final ContextActivationObserverRegistry contextActivationObserverRegistry; private final String workItemId; + private final boolean isStreaming; + /** * Metadata on the message whose processing is currently being managed by this tracker. If no * message is actively being processed, activeMessageMetadata will be null. @@ -277,6 +281,11 @@ public DataflowExecutionStateTracker( this.otherState = otherState; this.workItemId = workItemId; this.contextActivationObserverRegistry = ContextActivationObserverRegistry.createDefault(); + if (options instanceof DataflowPipelineOptions) { + this.isStreaming = ((DataflowPipelineOptions) options).isStreaming(); + } else { + this.isStreaming = false; + } } @Override @@ -318,12 +327,14 @@ public Closeable enterState(ExecutionState newState) { newState.isProcessElementState && newState instanceof DataflowExecutionState; if (isDataflowProcessElementState) { DataflowExecutionState newDFState = (DataflowExecutionState) newState; - if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) { - recordActiveMessageInProcessingTimesMap(); - synchronized (this) { - this.activeMessageMetadata = - ActiveMessageMetadata.create( - newDFState.getStepName().userName(), clock.getMillis()); + if (isStreaming) { + if (newDFState.getStepName() != null && newDFState.getStepName().userName() != null) { + recordActiveMessageInProcessingTimesMap(); + synchronized (this) { + this.activeMessageMetadata = + ActiveMessageMetadata.create( + newDFState.getStepName().userName(), clock.getMillis()); + } } } elementExecutionTracker.enter(newDFState.getStepName()); @@ -331,7 +342,9 @@ public Closeable enterState(ExecutionState newState) { return () -> { if (isDataflowProcessElementState) { - recordActiveMessageInProcessingTimesMap(); + if (isStreaming) { + recordActiveMessageInProcessingTimesMap(); + } elementExecutionTracker.exit(); } baseCloseable.close(); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java index 8990768ed200..f4997cb92b25 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java @@ -30,6 +30,8 @@ import java.util.Map; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; +import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; +import org.apache.beam.runners.dataflow.worker.BatchModeExecutionContext.BatchModeExecutionState; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; import org.apache.beam.sdk.options.PipelineOptionsFactory; @@ -123,9 +125,13 @@ public void testContextActivationObserverActivation() throws Exception { @Test public void testDataflowExecutionStateTrackerRecordsActiveMessageMetadata() throws IOException { + DataflowWorkerHarnessOptions options = + PipelineOptionsFactory.fromArgs("--experiments=enable_streaming_engine") + .as(DataflowWorkerHarnessOptions.class); + options.setStreaming(true); DataflowExecutionContext.DataflowExecutionStateTracker tracker = new DataflowExecutionContext.DataflowExecutionStateTracker( - ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), ""); + ExecutionStateSampler.instance(), null, null, options, ""); StreamingModeExecutionState state = new StreamingModeExecutionState( NameContextsForTests.nameContextForTest(), @@ -151,9 +157,14 @@ public void testDataflowExecutionStateTrackerRecordsActiveMessageMetadata() thro @Test public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes() throws IOException { + DataflowWorkerHarnessOptions options = + PipelineOptionsFactory.fromArgs("--experiments=enable_streaming_engine") + .as(DataflowWorkerHarnessOptions.class); + options.setStreaming(true); + DataflowExecutionContext.DataflowExecutionStateTracker tracker = new DataflowExecutionContext.DataflowExecutionStateTracker( - ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), ""); + ExecutionStateSampler.instance(), null, null, options, ""); // Enter a processing state StreamingModeExecutionState state = @@ -184,4 +195,39 @@ public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes() Assert.assertEquals( expectedMetadata.userStepName(), tracker.getActiveMessageMetadata().get().userStepName()); } + + @Test + public void testDataflowExecutionStateTrackerDoesNotRecordCompletedProcessingTimesForBatch() + throws IOException { + DataflowExecutionContext.DataflowExecutionStateTracker tracker = + new DataflowExecutionContext.DataflowExecutionStateTracker( + ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), ""); + + // Enter a processing state + BatchModeExecutionState state = + new BatchModeExecutionState( + NameContextsForTests.nameContextForTest(), + "testState", + null /* requestingStepName */, + null /* inputIndex */, + null /* metricsContainer */, + NoopProfileScope.NOOP); + tracker.enterState(state); + // Enter a new processing state + BatchModeExecutionState newState = + new BatchModeExecutionState( + NameContextsForTests.nameContextForTest(), + "testState2", + null /* requestingStepName */, + null /* inputIndex */, + null /* metricsContainer */, + NoopProfileScope.NOOP); + tracker.enterState(newState); + + // The first completed state should be recorded and the new state should be active. + Map gotProcessingTimes = tracker.getProcessingTimesByStepCopy(); + Assert.assertEquals(0, gotProcessingTimes.size()); + Assert.assertEquals(0, gotProcessingTimes.keySet().size()); + assertFalse(tracker.getActiveMessageMetadata().isPresent()); + } }