Skip to content

Commit

Permalink
populate latency breakdowns on commit message
Browse files Browse the repository at this point in the history
  • Loading branch information
clmccart committed Dec 1, 2023
1 parent 8690565 commit e17b387
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1163,7 +1163,8 @@ public void close() {

// Add the output to the commit queue.
work.setState(State.COMMIT_QUEUED);
outputBuilder.addAllPerWorkItemLatencyAttributions(work.getLatencyAttributions());
outputBuilder.addAllPerWorkItemLatencyAttributions(
work.getLatencyAttributions(false, constructWorkId(workItem), sampler));

WorkItemCommitRequest commitRequest = outputBuilder.build();
int byteLimit = maxWorkItemCommitBytes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
Expand Down Expand Up @@ -229,7 +230,9 @@ private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
.setKey(shardedKey.key())
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
.addAllLatencyAttribution(work.getLatencyAttributions())
// TODO(clairemccarthy): plumb real values.
.addAllLatencyAttribution(work.getLatencyAttributions(true, "",
DataflowExecutionStateSampler.instance()))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,20 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumMap;
import java.util.IntSummaryStatistics;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.beam.runners.dataflow.worker.ActiveMessageMetadata;
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown.ActiveElementMetadata;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution.ActiveLatencyBreakdown.Distribution;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand Down Expand Up @@ -101,7 +109,8 @@ private void recordGetWorkStreamLatencies(
}
}

public Collection<Windmill.LatencyAttribution> getLatencyAttributions() {
public Collection<Windmill.LatencyAttribution> getLatencyAttributions(Boolean isHeartbeat,
String workId, DataflowExecutionStateSampler sampler) {
List<Windmill.LatencyAttribution> list = new ArrayList<>();
for (Windmill.LatencyAttribution.State state : Windmill.LatencyAttribution.State.values()) {
Duration duration = totalDurationPerState.getOrDefault(state, Duration.ZERO);
Expand All @@ -111,15 +120,41 @@ public Collection<Windmill.LatencyAttribution> getLatencyAttributions() {
if (duration.equals(Duration.ZERO)) {
continue;
}
list.add(
Windmill.LatencyAttribution.newBuilder()
.setState(state)
.setTotalDurationMillis(duration.getMillis())
.build());
LatencyAttribution.Builder laBuilder = Windmill.LatencyAttribution.newBuilder();
if (state == LatencyAttribution.State.ACTIVE) {
laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder,
workId, sampler);
}
Windmill.LatencyAttribution la = laBuilder
.setState(state)
.setTotalDurationMillis(duration.getMillis())
.build();
list.add(la);
}
return list;
}

private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boolean isHeartbeat,
LatencyAttribution.Builder builder, String workId, DataflowExecutionStateSampler sampler) {
Map<String, IntSummaryStatistics> processingDistributions = sampler.getProcessingDistributionsForWorkId(
workId);
if (processingDistributions == null) {
return builder;
}
for (Entry<String, IntSummaryStatistics> entry : processingDistributions.entrySet()) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
stepBuilder.setUserStepName(entry.getKey());
Distribution.Builder distributionBuilder = Distribution.newBuilder()
.setCount(entry.getValue().getCount())
.setMin(entry.getValue().getMin()).setMax(entry.getValue()
.getMax()).setMean((long) entry.getValue().getAverage())
.setSum(entry.getValue().getSum());
stepBuilder.setProcessingTimesDistribution(distributionBuilder.build());
builder.addActiveLatencyBreakdown(stepBuilder.build());
}
return builder;
}

boolean isStuckCommittingAt(Instant stuckCommitDeadline) {
return currentState.state() == Work.State.COMMITTING
&& currentState.startTime().isBefore(stuckCommitDeadline);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -3274,7 +3275,8 @@ public void testLatencyAttributionProtobufsPopulated() {
work.setState(Work.State.COMMITTING);
clock.sleep(Duration.millis(60));

Iterator<LatencyAttribution> it = work.getLatencyAttributions().iterator();
Iterator<LatencyAttribution> it = work.getLatencyAttributions(false,
"", DataflowExecutionStateSampler.instance()).iterator();
assertTrue(it.hasNext());
LatencyAttribution lat = it.next();
assertSame(State.QUEUED, lat.getState());
Expand Down Expand Up @@ -3505,6 +3507,41 @@ public void testLatencyAttributionPopulatedInCommitRequest() throws Exception {
}
}

@Test
public void testDoFnLatencyBreakdownsReportedOnCommit() throws Exception {
List<ParallelInstruction> instructions =
Arrays.asList(
makeSourceInstruction(StringUtf8Coder.of()),
makeDoFnInstruction(new SlowDoFn(), 0, StringUtf8Coder.of()),
makeSinkInstruction(StringUtf8Coder.of(), 0));

FakeWindmillServer server = new FakeWindmillServer(errorCollector);
StreamingDataflowWorkerOptions options = createTestingPipelineOptions(server);
options.setActiveWorkRefreshPeriodMillis(100);
StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */);
worker.start();

server.whenGetWorkCalled().thenReturn(makeInput(0, TimeUnit.MILLISECONDS.toMicros(0)));

Map<Long, Windmill.WorkItemCommitRequest> result = server.waitForAndGetCommits(1);
Windmill.WorkItemCommitRequest commit = result.get(0L);

Windmill.LatencyAttribution.Builder laBuilder = LatencyAttribution.newBuilder()
.setState(State.ACTIVE)
.setTotalDurationMillis(100);
for (LatencyAttribution la : commit.getPerWorkItemLatencyAttributionsList()) {
if (la.getState() == State.ACTIVE) {
assertThat(la.getActiveLatencyBreakdownCount(), equalTo(1));
assertThat(la.getActiveLatencyBreakdown(0).getUserStepName(),
equalTo(DEFAULT_PARDO_USER_NAME));
Assert.assertTrue(la.getActiveLatencyBreakdown(0).hasProcessingTimesDistribution());
Assert.assertFalse(la.getActiveLatencyBreakdown(0).hasActiveMessageMetadata());
}
}

worker.stop();
}

@Test
public void testLimitOnOutputBundleSize() throws Exception {
// This verifies that ReadOperation, StreamingModeExecutionContext, and windmill sinks
Expand Down

0 comments on commit e17b387

Please sign in to comment.