Skip to content

Commit

Permalink
spotlessApply
Browse files Browse the repository at this point in the history
  • Loading branch information
clmccart committed Dec 1, 2023
1 parent 8455f6c commit 47b5b54
Show file tree
Hide file tree
Showing 11 changed files with 173 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTimeUtils.MillisProvider;

/**
* Monitors the execution of one or more execution threads.
*/
/** Monitors the execution of one or more execution threads. */
public class ExecutionStateSampler {

private final Set<ExecutionStateTracker> activeTrackers = ConcurrentHashMap.newKeySet();
Expand All @@ -47,8 +45,7 @@ public class ExecutionStateSampler {
new ExecutionStateSampler(SYSTEM_MILLIS_PROVIDER);

protected final MillisProvider clock;
@VisibleForTesting
protected volatile long lastSampleTimeMillis;
@VisibleForTesting protected volatile long lastSampleTimeMillis;

protected ExecutionStateSampler(MillisProvider clock) {
this.clock = clock;
Expand Down Expand Up @@ -149,16 +146,12 @@ public synchronized void stop() {
}
}

/**
* Add the tracker to the sampling set.
*/
/** Add the tracker to the sampling set. */
protected void addTracker(ExecutionStateTracker tracker) {
this.activeTrackers.add(tracker);
}

/**
* Remove the tracker from the sampling set.
*/
/** Remove the tracker from the sampling set. */
protected void removeTracker(ExecutionStateTracker tracker) {
activeTrackers.remove(tracker);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

public class ActiveMessageMetadata {
Expand All @@ -9,4 +26,4 @@ public ActiveMessageMetadata(String userStepName, Long startTime) {
this.userStepName = userStepName;
this.startTime = startTime;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,8 @@ public Closeable enterState(ExecutionState newState) {
if (this.activeMessageMetadata != null) {
recordActiveMessageInProcessingTimesMap();
}
this.activeMessageMetadata = new ActiveMessageMetadata(
newDFState.getStepName().userName(), clock.getMillis());
this.activeMessageMetadata =
new ActiveMessageMetadata(newDFState.getStepName().userName(), clock.getMillis());
}
elementExecutionTracker.enter(newDFState.getStepName());
}
Expand All @@ -343,7 +343,7 @@ public ActiveMessageMetadata getActiveMessageMetadata() {
public Map<String, IntSummaryStatistics> getProcessingTimesByStep() {
return processingTimesByStep;
}

/**
* Transitions the metadata for the currently active message to an entry in the completed
* processing times map. Sets the activeMessageMetadata to null after the entry has been
Expand All @@ -354,12 +354,12 @@ private void recordActiveMessageInProcessingTimesMap() {
return;
}
this.processingTimesByStep.compute(
this.activeMessageMetadata.userStepName, (k, v) -> {
this.activeMessageMetadata.userStepName,
(k, v) -> {
if (v == null) {
v = new IntSummaryStatistics();
}
v.accept(
(int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime));
v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime));
return v;
});
this.activeMessageMetadata = null;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -20,7 +37,8 @@ public class DataflowExecutionStateSampler extends ExecutionStateSampler {
new DataflowExecutionStateSampler(SYSTEM_MILLIS_PROVIDER);

private HashMap<String, DataflowExecutionStateTracker> activeTrackersByWorkId = new HashMap<>();
private HashMap<String, Map<String, IntSummaryStatistics>> completedProcessingMetrics = new HashMap<>();
private HashMap<String, Map<String, IntSummaryStatistics>> completedProcessingMetrics =
new HashMap<>();

public static DataflowExecutionStateSampler instance() {
return INSTANCE;
Expand All @@ -46,15 +64,16 @@ public void addTracker(ExecutionStateTracker tracker) {

private Map<String, IntSummaryStatistics> mergeStepStatsMaps(
Map<String, IntSummaryStatistics> map1, Map<String, IntSummaryStatistics> map2) {
for (Entry<String, IntSummaryStatistics> steps : map2
.entrySet()) {
map1.compute(steps.getKey(), (k, v) -> {
if (v == null) {
return steps.getValue();
}
v.combine(steps.getValue());
return v;
});
for (Entry<String, IntSummaryStatistics> steps : map2.entrySet()) {
map1.compute(
steps.getKey(),
(k, v) -> {
if (v == null) {
return steps.getValue();
}
v.combine(steps.getValue());
return v;
});
}
return map1;
}
Expand Down Expand Up @@ -96,20 +115,20 @@ public ActiveMessageMetadata getActiveMessageMetadataForWorkId(String workId) {
}

@Nullable
public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(
String workId) {
public Map<String, IntSummaryStatistics> getProcessingDistributionsForWorkId(String workId) {
if (!activeTrackersByWorkId.containsKey(workId)) {
if (completedProcessingMetrics.containsKey(workId)) {
return completedProcessingMetrics.get(workId);
}
return null;
}
DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId);
return mergeStepStatsMaps(completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
return mergeStepStatsMaps(
completedProcessingMetrics.getOrDefault(workId, new HashMap<>()),
tracker.getProcessingTimesByStep());
}

public synchronized void resetForWorkId(String workId) {
completedProcessingMetrics.remove(workId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,16 @@ private synchronized ImmutableMap<ShardedKey, Long> getStuckCommitsAt(
return stuckCommits.build();
}

synchronized ImmutableList<KeyedGetDataRequest> getKeysToRefresh(Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
synchronized ImmutableList<KeyedGetDataRequest> getKeysToRefresh(
Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
return activeWork.entrySet().stream()
.flatMap(entry -> toKeyedGetDataRequestStream(entry, refreshDeadline, sampler))
.collect(toImmutableList());
}

private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue, Instant refreshDeadline,
Entry<ShardedKey, Deque<Work>> shardedKeyAndWorkQueue,
Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey();
Deque<Work> workQueue = shardedKeyAndWorkQueue.getValue();
Expand All @@ -234,8 +235,8 @@ private static Stream<KeyedGetDataRequest> toKeyedGetDataRequestStream(
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
.addAllLatencyAttribution(
work.getLatencyAttributions(true, constructWorkId(work.getWorkItem()),
sampler))
work.getLatencyAttributions(
true, constructWorkId(work.getWorkItem()), sampler))
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,9 @@ private void forceExecute(Work work) {
executor.forceExecute(work, work.getWorkItem().getSerializedSize());
}

/**
* Adds any work started before the refreshDeadline to the GetDataRequest builder.
*/
public List<Windmill.KeyedGetDataRequest> getKeysToRefresh(Instant refreshDeadline,
DataflowExecutionStateSampler sampler) {
/** Adds any work started before the refreshDeadline to the GetDataRequest builder. */
public List<Windmill.KeyedGetDataRequest> getKeysToRefresh(
Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
return activeWorkState.getKeysToRefresh(refreshDeadline, sampler);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private void recordGetWorkStreamLatencies(
}
}

public Collection<Windmill.LatencyAttribution> getLatencyAttributions(Boolean isHeartbeat,
String workId, DataflowExecutionStateSampler sampler) {
public Collection<Windmill.LatencyAttribution> getLatencyAttributions(
Boolean isHeartbeat, String workId, DataflowExecutionStateSampler sampler) {
List<Windmill.LatencyAttribution> list = new ArrayList<>();
for (Windmill.LatencyAttribution.State state : Windmill.LatencyAttribution.State.values()) {
Duration duration = totalDurationPerState.getOrDefault(state, Duration.ZERO);
Expand All @@ -122,24 +122,23 @@ public Collection<Windmill.LatencyAttribution> getLatencyAttributions(Boolean is
}
LatencyAttribution.Builder laBuilder = Windmill.LatencyAttribution.newBuilder();
if (state == LatencyAttribution.State.ACTIVE) {
laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder,
workId, sampler);
laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder, workId, sampler);
}
Windmill.LatencyAttribution la = laBuilder
.setState(state)
.setTotalDurationMillis(duration.getMillis())
.build();
Windmill.LatencyAttribution la =
laBuilder.setState(state).setTotalDurationMillis(duration.getMillis()).build();
list.add(la);
}
return list;
}

private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boolean isHeartbeat,
LatencyAttribution.Builder builder, String workId, DataflowExecutionStateSampler sampler) {
private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(
Boolean isHeartbeat,
LatencyAttribution.Builder builder,
String workId,
DataflowExecutionStateSampler sampler) {
if (isHeartbeat) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
ActiveMessageMetadata activeMessage = sampler.getActiveMessageMetadataForWorkId(
workId);
ActiveMessageMetadata activeMessage = sampler.getActiveMessageMetadataForWorkId(workId);
if (activeMessage == null) {
return builder;
}
Expand All @@ -151,20 +150,22 @@ private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boo
builder.addActiveLatencyBreakdown(stepBuilder.build());
return builder;
}
Map<String, IntSummaryStatistics> processingDistributions = sampler.getProcessingDistributionsForWorkId(
workId);

Map<String, IntSummaryStatistics> processingDistributions =
sampler.getProcessingDistributionsForWorkId(workId);
if (processingDistributions == null) {
return builder;
}
for (Entry<String, IntSummaryStatistics> entry : processingDistributions.entrySet()) {
ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder();
stepBuilder.setUserStepName(entry.getKey());
Distribution.Builder distributionBuilder = Distribution.newBuilder()
.setCount(entry.getValue().getCount())
.setMin(entry.getValue().getMin()).setMax(entry.getValue()
.getMax()).setMean((long) entry.getValue().getAverage())
.setSum(entry.getValue().getSum());
Distribution.Builder distributionBuilder =
Distribution.newBuilder()
.setCount(entry.getValue().getCount())
.setMin(entry.getValue().getMin())
.setMax(entry.getValue().getMax())
.setMean((long) entry.getValue().getAverage())
.setSum(entry.getValue().getSum());
stepBuilder.setProcessingTimesDistribution(distributionBuilder.build());
builder.addActiveLatencyBreakdown(stepBuilder.build());
}
Expand Down
Loading

0 comments on commit 47b5b54

Please sign in to comment.