From 8455f6c20fdcb33b75441c4dba7311bd3c7795ca Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Fri, 1 Dec 2023 10:49:39 -0800 Subject: [PATCH] populate active message metadata on heartbeats --- .../worker/StreamingDataflowWorker.java | 4 +-- .../worker/streaming/ActiveWorkState.java | 15 +++++---- .../worker/streaming/ComputationState.java | 10 ++++-- .../dataflow/worker/streaming/Work.java | 16 ++++++++++ .../dataflow/worker/FakeWindmillServer.java | 10 ++++-- .../worker/StreamingDataflowWorkerTest.java | 32 +++++++++++++++++++ .../worker/streaming/ActiveWorkStateTest.java | 4 ++- 7 files changed, 76 insertions(+), 15 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c71bec7fe2ef..eb9298f2c490 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1877,7 +1877,7 @@ private void refreshActiveWork() { clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis())); for (Map.Entry entry : computationMap.entrySet()) { - active.put(entry.getKey(), entry.getValue().getKeysToRefresh(refreshDeadline)); + active.put(entry.getKey(), entry.getValue().getKeysToRefresh(refreshDeadline, sampler)); } metricTrackingWindmillServer.refreshActiveWork(active); @@ -1891,7 +1891,7 @@ private void invalidateStuckCommits() { } } - private static String constructWorkId(Windmill.WorkItem workItem) { + public static String constructWorkId(Windmill.WorkItem workItem) { StringBuilder workIdBuilder = new StringBuilder(33); workIdBuilder.append(Long.toHexString(workItem.getShardingKey())); workIdBuilder.append('-'); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index b4d2786cf519..c5b77a5de55c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.dataflow.worker.streaming; +import static org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.constructWorkId; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList; import java.io.PrintWriter; @@ -211,14 +212,16 @@ private synchronized ImmutableMap getStuckCommitsAt( return stuckCommits.build(); } - synchronized ImmutableList getKeysToRefresh(Instant refreshDeadline) { + synchronized ImmutableList getKeysToRefresh(Instant refreshDeadline, + DataflowExecutionStateSampler sampler) { return activeWork.entrySet().stream() - .flatMap(entry -> toKeyedGetDataRequestStream(entry, refreshDeadline)) + .flatMap(entry -> toKeyedGetDataRequestStream(entry, refreshDeadline, sampler)) .collect(toImmutableList()); } private static Stream toKeyedGetDataRequestStream( - Entry> shardedKeyAndWorkQueue, Instant refreshDeadline) { + Entry> shardedKeyAndWorkQueue, Instant refreshDeadline, + DataflowExecutionStateSampler sampler) { ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); Deque workQueue = shardedKeyAndWorkQueue.getValue(); @@ -230,9 +233,9 @@ private static Stream toKeyedGetDataRequestStream( .setKey(shardedKey.key()) .setShardingKey(shardedKey.shardingKey()) .setWorkToken(work.getWorkItem().getWorkToken()) - // TODO(clairemccarthy): plumb real values. - .addAllLatencyAttribution(work.getLatencyAttributions(true, "", - DataflowExecutionStateSampler.instance())) + .addAllLatencyAttribution( + work.getLatencyAttributions(true, constructWorkId(work.getWorkItem()), + sampler)) .build()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java index 9d7a9131f584..be0f77843239 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java @@ -23,6 +23,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; @@ -119,9 +120,12 @@ private void forceExecute(Work work) { executor.forceExecute(work, work.getWorkItem().getSerializedSize()); } - /** Adds any work started before the refreshDeadline to the GetDataRequest builder. */ - public List getKeysToRefresh(Instant refreshDeadline) { - return activeWorkState.getKeysToRefresh(refreshDeadline); + /** + * Adds any work started before the refreshDeadline to the GetDataRequest builder. + */ + public List getKeysToRefresh(Instant refreshDeadline, + DataflowExecutionStateSampler sampler) { + return activeWorkState.getKeysToRefresh(refreshDeadline, sampler); } public void printActiveWork(PrintWriter writer) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index e723f523652d..b6924592d323 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -136,6 +136,22 @@ public Collection getLatencyAttributions(Boolean is private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boolean isHeartbeat, LatencyAttribution.Builder builder, String workId, DataflowExecutionStateSampler sampler) { + if (isHeartbeat) { + ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder(); + ActiveMessageMetadata activeMessage = sampler.getActiveMessageMetadataForWorkId( + workId); + if (activeMessage == null) { + return builder; + } + stepBuilder.setUserStepName(activeMessage.userStepName); + ActiveElementMetadata.Builder activeElementBuilder = ActiveElementMetadata.newBuilder(); + activeElementBuilder.setProcessingTimeMillis( + System.currentTimeMillis() - activeMessage.startTime); + stepBuilder.setActiveMessageMetadata(activeElementBuilder); + builder.addActiveLatencyBreakdown(stepBuilder.build()); + return builder; + } + Map processingDistributions = sampler.getProcessingDistributionsForWorkId( workId); if (processingDistributions == null) { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java index 092f5e59a13c..a434b2001207 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java @@ -80,7 +80,7 @@ class FakeWindmillServer extends WindmillServerStub { private final ErrorCollector errorCollector; private final ConcurrentHashMap> droppedStreamingCommits; private int commitsRequested = 0; - private int numGetDataRequests = 0; + private List getDataRequests = new ArrayList<>(); private boolean isReady = true; private boolean dropStreamingCommits = false; @@ -144,7 +144,7 @@ private void validateGetDataRequest(Windmill.GetDataRequest request) { public Windmill.GetDataResponse getData(Windmill.GetDataRequest request) { LOG.info("getDataRequest: {}", request.toString()); validateGetDataRequest(request); - ++numGetDataRequests; + getDataRequests.add(request); GetDataResponse response = dataToOffer.getOrDefault(request); LOG.debug("getDataResponse: {}", response.toString()); return response; @@ -431,7 +431,11 @@ public Windmill.Exception getException() throws InterruptedException { } public int numGetDataRequests() { - return numGetDataRequests; + return getDataRequests.size(); + } + + public List getGetDataRequests() { + return getDataRequests; } public ArrayList getStatsReceived() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index f8e558bc5f55..d064accbba34 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -3542,6 +3542,38 @@ public void testDoFnLatencyBreakdownsReportedOnCommit() throws Exception { worker.stop(); } + @Test + public void testDoFnActiveMessageMetadataReportedOnHeartbeat() throws Exception { + List instructions = + Arrays.asList( + makeSourceInstruction(StringUtf8Coder.of()), + makeDoFnInstruction(new SlowDoFn(), 0, StringUtf8Coder.of()), + makeSinkInstruction(StringUtf8Coder.of(), 0)); + + FakeWindmillServer server = new FakeWindmillServer(errorCollector); + StreamingDataflowWorkerOptions options = createTestingPipelineOptions(server); + options.setActiveWorkRefreshPeriodMillis(10); + StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */); + worker.start(); + + server.whenGetWorkCalled().thenReturn(makeInput(0, TimeUnit.MILLISECONDS.toMicros(0))); + + Map result = server.waitForAndGetCommits(1); + + assertThat(server.numGetDataRequests(), greaterThan(0)); + Windmill.GetDataRequest heartbeat = server.getGetDataRequests().get(2); + + for (LatencyAttribution la : heartbeat.getRequests(0).getRequests(0) + .getLatencyAttributionList()) { + if (la.getState() == State.ACTIVE) { + assertTrue(la.getActiveLatencyBreakdownCount() > 0); + assertTrue(la.getActiveLatencyBreakdown(0).hasActiveMessageMetadata()); + } + } + + worker.stop(); + } + @Test public void testLimitOnOutputBundleSize() throws Exception { // This verifies that ReadOperation, StreamingModeExecutionContext, and windmill sinks diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 12ae816de829..97d02968c14f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -33,6 +33,7 @@ import java.util.Map; import java.util.Optional; import javax.annotation.Nullable; +import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.ActivateWorkResult; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; @@ -251,7 +252,8 @@ public void testGetKeysToRefresh() { activeWorkState.activateWorkForKey(shardedKey1, freshWork); activeWorkState.activateWorkForKey(shardedKey2, refreshableWork2); - ImmutableList requests = activeWorkState.getKeysToRefresh(refreshDeadline); + ImmutableList requests = activeWorkState.getKeysToRefresh(refreshDeadline, + DataflowExecutionStateSampler.instance()); ImmutableList expected = ImmutableList.of(