From e17b38722f99889641b06c11cc7e256e7efa56b6 Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Fri, 1 Dec 2023 09:46:48 -0800 Subject: [PATCH] populate latency breakdowns on commit message --- .../worker/StreamingDataflowWorker.java | 3 +- .../worker/streaming/ActiveWorkState.java | 5 +- .../dataflow/worker/streaming/Work.java | 47 ++++++++++++++++--- .../worker/StreamingDataflowWorkerTest.java | 39 ++++++++++++++- 4 files changed, 85 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 2b1b22ee282c..c71bec7fe2ef 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1163,7 +1163,8 @@ public void close() { // Add the output to the commit queue. work.setState(State.COMMIT_QUEUED); - outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions()); + outputBuilder.addAllPerWorkItemLatencyAttributions( + work.getLatencyAttributions(false, constructWorkId(workItem), sampler)); WorkItemCommitRequest commitRequest = outputBuilder.build(); int byteLimit = maxWorkItemCommitBytes; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 9858666c40a2..b4d2786cf519 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -229,7 +230,9 @@ private static Stream toKeyedGetDataRequestStream( .setKey(shardedKey.key()) .setShardingKey(shardedKey.shardingKey()) .setWorkToken(work.getWorkItem().getWorkToken()) - .addAllLatencyAttribution(work.getLatencyAttributions()) + // TODO(clairemccarthy): plumb real values. + .addAllLatencyAttribution(work.getLatencyAttributions(true, "", + DataflowExecutionStateSampler.instance())) .build()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index cc3f6d1b12b2..e723f523652d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -21,12 +21,20 @@ import java.util.ArrayList; import java.util.Collection; import java.util.EnumMap; +import java.util.IntSummaryStatistics; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.function.Consumer; import java.util.function.Supplier; import javax.annotation.concurrent.NotThreadSafe; +import org.apache.beam.runners.dataflow.worker.ActiveMessageMetadata; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown.ActiveElementMetadata; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown.Distribution; import org.joda.time.Duration; import org.joda.time.Instant; @@ -101,7 +109,8 @@ private void recordGetWorkStreamLatencies( } } - public Collection getLatencyAttributions() { + public Collection getLatencyAttributions(Boolean isHeartbeat, + String workId, DataflowExecutionStateSampler sampler) { List list = new ArrayList<>(); for (Windmill.LatencyAttribution.State state : Windmill.LatencyAttribution.State.values()) { Duration duration = totalDurationPerState.getOrDefault(state, Duration.ZERO); @@ -111,15 +120,41 @@ public Collection getLatencyAttributions() { if (duration.equals(Duration.ZERO)) { continue; } - list.add( - Windmill.LatencyAttribution.newBuilder() - .setState(state) - .setTotalDurationMillis(duration.getMillis()) - .build()); + LatencyAttribution.Builder laBuilder = Windmill.LatencyAttribution.newBuilder(); + if (state == LatencyAttribution.State.ACTIVE) { + laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder, + workId, sampler); + } + Windmill.LatencyAttribution la = laBuilder + .setState(state) + .setTotalDurationMillis(duration.getMillis()) + .build(); + list.add(la); } return list; } + private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boolean isHeartbeat, + LatencyAttribution.Builder builder, String workId, DataflowExecutionStateSampler sampler) { + Map processingDistributions = sampler.getProcessingDistributionsForWorkId( + workId); + if (processingDistributions == null) { + return builder; + } + for (Entry entry : processingDistributions.entrySet()) { + ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder(); + stepBuilder.setUserStepName(entry.getKey()); + Distribution.Builder distributionBuilder = Distribution.newBuilder() + .setCount(entry.getValue().getCount()) + .setMin(entry.getValue().getMin()).setMax(entry.getValue() + .getMax()).setMean((long) entry.getValue().getAverage()) + .setSum(entry.getValue().getSum()); + stepBuilder.setProcessingTimesDistribution(distributionBuilder.build()); + builder.addActiveLatencyBreakdown(stepBuilder.build()); + } + return builder; + } + boolean isStuckCommittingAt(Instant stuckCommitDeadline) { return currentState.state() == Work.State.COMMITTING && currentState.startTime().isBefore(stuckCommitDeadline); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 6826607513d9..f8e558bc5f55 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -174,6 +174,7 @@ import org.hamcrest.Matchers; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Assert; import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; @@ -3274,7 +3275,8 @@ public void testLatencyAttributionProtobufsPopulated() { work.setState(Work.State.COMMITTING); clock.sleep(Duration.millis(60)); - Iterator it = work.getLatencyAttributions().iterator(); + Iterator it = work.getLatencyAttributions(false, + "", DataflowExecutionStateSampler.instance()).iterator(); assertTrue(it.hasNext()); LatencyAttribution lat = it.next(); assertSame(State.QUEUED, lat.getState()); @@ -3505,6 +3507,41 @@ public void testLatencyAttributionPopulatedInCommitRequest() throws Exception { } } + @Test + public void testDoFnLatencyBreakdownsReportedOnCommit() throws Exception { + List instructions = + Arrays.asList( + makeSourceInstruction(StringUtf8Coder.of()), + makeDoFnInstruction(new SlowDoFn(), 0, StringUtf8Coder.of()), + makeSinkInstruction(StringUtf8Coder.of(), 0)); + + FakeWindmillServer server = new FakeWindmillServer(errorCollector); + StreamingDataflowWorkerOptions options = createTestingPipelineOptions(server); + options.setActiveWorkRefreshPeriodMillis(100); + StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */); + worker.start(); + + server.whenGetWorkCalled().thenReturn(makeInput(0, TimeUnit.MILLISECONDS.toMicros(0))); + + Map result = server.waitForAndGetCommits(1); + Windmill.WorkItemCommitRequest commit = result.get(0L); + + Windmill.LatencyAttribution.Builder laBuilder = LatencyAttribution.newBuilder() + .setState(State.ACTIVE) + .setTotalDurationMillis(100); + for (LatencyAttribution la : commit.getPerWorkItemLatencyAttributionsList()) { + if (la.getState() == State.ACTIVE) { + assertThat(la.getActiveLatencyBreakdownCount(), equalTo(1)); + assertThat(la.getActiveLatencyBreakdown(0).getUserStepName(), + equalTo(DEFAULT_PARDO_USER_NAME)); + Assert.assertTrue(la.getActiveLatencyBreakdown(0).hasProcessingTimesDistribution()); + Assert.assertFalse(la.getActiveLatencyBreakdown(0).hasActiveMessageMetadata()); + } + } + + worker.stop(); + } + @Test public void testLimitOnOutputBundleSize() throws Exception { // This verifies that ReadOperation, StreamingModeExecutionContext, and windmill sinks