Skip to content

Commit

Permalink
populate active message metadata on heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
clmccart committed Dec 1, 2023
1 parent e17b387 commit 8455f6c
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1877,7 +1877,7 @@ private void refreshActiveWork() {
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));

for (Map.Entry<String, ComputationState> entry : computationMap.entrySet()) {
active.put(entry.getKey(), entry.getValue().getKeysToRefresh(refreshDeadline));
active.put(entry.getKey(), entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
}

metricTrackingWindmillServer.refreshActiveWork(active);
Expand All @@ -1891,7 +1891,7 @@ private void invalidateStuckCommits() {
}
}

private static String constructWorkId(Windmill.WorkItem workItem) {
public static String constructWorkId(Windmill.WorkItem workItem) {
StringBuilder workIdBuilder = new StringBuilder(33);
workIdBuilder.append(Long.toHexString(workItem.getShardingKey()));
workIdBuilder.append('-');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.beam.runners.dataflow.worker.streaming;

import static org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.constructWorkId;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList;

import java.io.PrintWriter;
Expand Down Expand Up @@ -211,14 +212,16 @@ private synchronized ImmutableMap<ShardedKey, Long> getStuckCommitsAt(
return stuckCommits.build();
}

synchronized ImmutableList<KeyedGetDataRequest> getKeysToRefresh(Instant refreshDeadline) {
synchronized ImmutableList<KeyedGetDataRequest> getKeysToRefresh(Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
return activeWork.entrySet().stream()
.flatMap(entry -> toKeyedGetDataRequestStream(entry, refreshDeadline))
.flatMap(entry -> toKeyedGetDataRequestStream(entry, refreshDeadline, sampler))
.collect(toImmutableList());
}

private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant refreshDeadline) {
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();

Expand All @@ -230,9 +233,9 @@ private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
.setKey(shardedKey.key())
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
// TODO(clairemccarthy): plumb real values.
.addAllLatencyAttribution(work.getLatencyAttributions(true, "",
DataflowExecutionStateSampler.instance()))
.addAllLatencyAttribution(
work.getLatencyAttributions(true, constructWorkId(work.getWorkItem()),
sampler))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
Expand Down Expand Up @@ -119,9 +120,12 @@ private void forceExecute(Work work) {
executor.forceExecute(work, work.getWorkItem().getSerializedSize());
}

/** Adds any work started before the refreshDeadline to the GetDataRequest builder. */
public List<Windmill.KeyedGetDataRequest> getKeysToRefresh(Instant refreshDeadline) {
return activeWorkState.getKeysToRefresh(refreshDeadline);
/**
* Adds any work started before the refreshDeadline to the GetDataRequest builder.
*/
public List<Windmill.KeyedGetDataRequest> getKeysToRefresh(Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
return activeWorkState.getKeysToRefresh(refreshDeadline, sampler);
}

public void printActiveWork(PrintWriter writer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,22 @@ public Collection<Windmill.LatencyAttribution> getLatencyAttributions(Boolean is

private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boolean isHeartbeat,
LatencyAttribution.Builder builder, String workId, DataflowExecutionStateSampler sampler) {
if (isHeartbeat) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
ActiveMessageMetadata activeMessage = sampler.getActiveMessageMetadataForWorkId(
workId);
if (activeMessage == null) {
return builder;
}
stepBuilder.setUserStepName(activeMessage.userStepName);
ActiveElementMetadata.Builder activeElementBuilder = ActiveElementMetadata.newBuilder();
activeElementBuilder.setProcessingTimeMillis(
System.currentTimeMillis() - activeMessage.startTime);
stepBuilder.setActiveMessageMetadata(activeElementBuilder);
builder.addActiveLatencyBreakdown(stepBuilder.build());
return builder;
}

Map<String, IntSummaryStatistics> processingDistributions = sampler.getProcessingDistributionsForWorkId(
workId);
if (processingDistributions == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class FakeWindmillServer extends WindmillServerStub {
private final ErrorCollector errorCollector;
private final ConcurrentHashMap<Long, Consumer<Windmill.CommitStatus>> droppedStreamingCommits;
private int commitsRequested = 0;
private int numGetDataRequests = 0;
private List<Windmill.GetDataRequest> getDataRequests = new ArrayList<>();
private boolean isReady = true;
private boolean dropStreamingCommits = false;

Expand Down Expand Up @@ -144,7 +144,7 @@ private void validateGetDataRequest(Windmill.GetDataRequest request) {
public Windmill.GetDataResponse getData(Windmill.GetDataRequest request) {
LOG.info("getDataRequest: {}", request.toString());
validateGetDataRequest(request);
++numGetDataRequests;
getDataRequests.add(request);
GetDataResponse response = dataToOffer.getOrDefault(request);
LOG.debug("getDataResponse: {}", response.toString());
return response;
Expand Down Expand Up @@ -431,7 +431,11 @@ public Windmill.Exception getException() throws InterruptedException {
}

public int numGetDataRequests() {
return numGetDataRequests;
return getDataRequests.size();
}

public List<Windmill.GetDataRequest> getGetDataRequests() {
return getDataRequests;
}

public ArrayList<Windmill.ReportStatsRequest> getStatsReceived() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3542,6 +3542,38 @@ public void testDoFnLatencyBreakdownsReportedOnCommit() throws Exception {
worker.stop();
}

@Test
public void testDoFnActiveMessageMetadataReportedOnHeartbeat() throws Exception {
List<ParallelInstruction> instructions =
Arrays.asList(
makeSourceInstruction(StringUtf8Coder.of()),
makeDoFnInstruction(new SlowDoFn(), 0, StringUtf8Coder.of()),
makeSinkInstruction(StringUtf8Coder.of(), 0));

FakeWindmillServer server = new FakeWindmillServer(errorCollector);
StreamingDataflowWorkerOptions options = createTestingPipelineOptions(server);
options.setActiveWorkRefreshPeriodMillis(10);
StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */);
worker.start();

server.whenGetWorkCalled().thenReturn(makeInput(0, TimeUnit.MILLISECONDS.toMicros(0)));

Map<Long, Windmill.WorkItemCommitRequest> result = server.waitForAndGetCommits(1);

assertThat(server.numGetDataRequests(), greaterThan(0));
Windmill.GetDataRequest heartbeat = server.getGetDataRequests().get(2);

for (LatencyAttribution la : heartbeat.getRequests(0).getRequests(0)
.getLatencyAttributionList()) {
if (la.getState() == State.ACTIVE) {
assertTrue(la.getActiveLatencyBreakdownCount() > 0);
assertTrue(la.getActiveLatencyBreakdown(0).hasActiveMessageMetadata());
}
}

worker.stop();
}

@Test
public void testLimitOnOutputBundleSize() throws Exception {
// This verifies that ReadOperation, StreamingModeExecutionContext, and windmill sinks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.ActivateWorkResult;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
Expand Down Expand Up @@ -251,7 +252,8 @@ public void testGetKeysToRefresh() {
activeWorkState.activateWorkForKey(shardedKey1, freshWork);
activeWorkState.activateWorkForKey(shardedKey2, refreshableWork2);

ImmutableList<KeyedGetDataRequest> requests = activeWorkState.getKeysToRefresh(refreshDeadline);
ImmutableList<KeyedGetDataRequest> requests = activeWorkState.getKeysToRefresh(refreshDeadline,
DataflowExecutionStateSampler.instance());

ImmutableList<GetDataRequestKeyShardingKeyAndWorkToken> expected =
ImmutableList.of(
Expand Down

0 comments on commit 8455f6c

Please sign in to comment.