From 47b5b546984416c678a739993c07846a8868bf73 Mon Sep 17 00:00:00 2001 From: Claire McCarthy Date: Fri, 1 Dec 2023 15:34:28 -0800 Subject: [PATCH] spotlessApply --- .../core/metrics/ExecutionStateSampler.java | 15 ++--- .../worker/ActiveMessageMetadata.java | 19 +++++- .../worker/DataflowExecutionContext.java | 12 ++-- .../worker/DataflowExecutionStateSampler.java | 47 ++++++++++---- .../worker/streaming/ActiveWorkState.java | 11 ++-- .../worker/streaming/ComputationState.java | 8 +-- .../dataflow/worker/streaming/Work.java | 41 ++++++------ .../worker/DataflowExecutionContextTest.java | 64 +++++++++--------- .../DataflowExecutionStateSamplerTest.java | 65 ++++++++++++------- .../worker/StreamingDataflowWorkerTest.java | 17 +++-- .../worker/streaming/ActiveWorkStateTest.java | 4 +- 11 files changed, 173 insertions(+), 130 deletions(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java index 644b0dd5a244..9478c218b075 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/ExecutionStateSampler.java @@ -34,9 +34,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTimeUtils.MillisProvider; -/** - * Monitors the execution of one or more execution threads. - */ +/** Monitors the execution of one or more execution threads. */ public class ExecutionStateSampler { private final Set activeTrackers = ConcurrentHashMap.newKeySet(); @@ -47,8 +45,7 @@ public class ExecutionStateSampler { new ExecutionStateSampler(SYSTEM_MILLIS_PROVIDER); protected final MillisProvider clock; - @VisibleForTesting - protected volatile long lastSampleTimeMillis; + @VisibleForTesting protected volatile long lastSampleTimeMillis; protected ExecutionStateSampler(MillisProvider clock) { this.clock = clock; @@ -149,16 +146,12 @@ public synchronized void stop() { } } - /** - * Add the tracker to the sampling set. - */ + /** Add the tracker to the sampling set. */ protected void addTracker(ExecutionStateTracker tracker) { this.activeTrackers.add(tracker); } - /** - * Remove the tracker from the sampling set. - */ + /** Remove the tracker from the sampling set. */ protected void removeTracker(ExecutionStateTracker tracker) { activeTrackers.remove(tracker); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java index c70075b1ff29..ffb26491d20f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/ActiveMessageMetadata.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.dataflow.worker; public class ActiveMessageMetadata { @@ -9,4 +26,4 @@ public ActiveMessageMetadata(String userStepName, Long startTime) { this.userStepName = userStepName; this.startTime = startTime; } -} \ No newline at end of file +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java index d26a601607b3..857d0bed9e54 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContext.java @@ -315,8 +315,8 @@ public Closeable enterState(ExecutionState newState) { if (this.activeMessageMetadata != null) { recordActiveMessageInProcessingTimesMap(); } - this.activeMessageMetadata = new ActiveMessageMetadata( - newDFState.getStepName().userName(), clock.getMillis()); + this.activeMessageMetadata = + new ActiveMessageMetadata(newDFState.getStepName().userName(), clock.getMillis()); } elementExecutionTracker.enter(newDFState.getStepName()); } @@ -343,7 +343,7 @@ public ActiveMessageMetadata getActiveMessageMetadata() { public Map getProcessingTimesByStep() { return processingTimesByStep; } - + /** * Transitions the metadata for the currently active message to an entry in the completed * processing times map. Sets the activeMessageMetadata to null after the entry has been @@ -354,12 +354,12 @@ private void recordActiveMessageInProcessingTimesMap() { return; } this.processingTimesByStep.compute( - this.activeMessageMetadata.userStepName, (k, v) -> { + this.activeMessageMetadata.userStepName, + (k, v) -> { if (v == null) { v = new IntSummaryStatistics(); } - v.accept( - (int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime)); + v.accept((int) (System.currentTimeMillis() - this.activeMessageMetadata.startTime)); return v; }); this.activeMessageMetadata = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java index 424b0f4920d0..5bb02aa27d84 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSampler.java @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.dataflow.worker; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -20,7 +37,8 @@ public class DataflowExecutionStateSampler extends ExecutionStateSampler { new DataflowExecutionStateSampler(SYSTEM_MILLIS_PROVIDER); private HashMap activeTrackersByWorkId = new HashMap<>(); - private HashMap> completedProcessingMetrics = new HashMap<>(); + private HashMap> completedProcessingMetrics = + new HashMap<>(); public static DataflowExecutionStateSampler instance() { return INSTANCE; @@ -46,15 +64,16 @@ public void addTracker(ExecutionStateTracker tracker) { private Map mergeStepStatsMaps( Map map1, Map map2) { - for (Entry steps : map2 - .entrySet()) { - map1.compute(steps.getKey(), (k, v) -> { - if (v == null) { - return steps.getValue(); - } - v.combine(steps.getValue()); - return v; - }); + for (Entry steps : map2.entrySet()) { + map1.compute( + steps.getKey(), + (k, v) -> { + if (v == null) { + return steps.getValue(); + } + v.combine(steps.getValue()); + return v; + }); } return map1; } @@ -96,8 +115,7 @@ public ActiveMessageMetadata getActiveMessageMetadataForWorkId(String workId) { } @Nullable - public Map getProcessingDistributionsForWorkId( - String workId) { + public Map getProcessingDistributionsForWorkId(String workId) { if (!activeTrackersByWorkId.containsKey(workId)) { if (completedProcessingMetrics.containsKey(workId)) { return completedProcessingMetrics.get(workId); @@ -105,11 +123,12 @@ public Map getProcessingDistributionsForWorkId( return null; } DataflowExecutionStateTracker tracker = activeTrackersByWorkId.get(workId); - return mergeStepStatsMaps(completedProcessingMetrics.getOrDefault(workId, new HashMap<>()), + return mergeStepStatsMaps( + completedProcessingMetrics.getOrDefault(workId, new HashMap<>()), tracker.getProcessingTimesByStep()); } public synchronized void resetForWorkId(String workId) { completedProcessingMetrics.remove(workId); } -} \ No newline at end of file +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index c5b77a5de55c..3665ab4bd6cd 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -212,15 +212,16 @@ private synchronized ImmutableMap getStuckCommitsAt( return stuckCommits.build(); } - synchronized ImmutableList getKeysToRefresh(Instant refreshDeadline, - DataflowExecutionStateSampler sampler) { + synchronized ImmutableList getKeysToRefresh( + Instant refreshDeadline, DataflowExecutionStateSampler sampler) { return activeWork.entrySet().stream() .flatMap(entry -> toKeyedGetDataRequestStream(entry, refreshDeadline, sampler)) .collect(toImmutableList()); } private static Stream toKeyedGetDataRequestStream( - Entry> shardedKeyAndWorkQueue, Instant refreshDeadline, + Entry> shardedKeyAndWorkQueue, + Instant refreshDeadline, DataflowExecutionStateSampler sampler) { ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); Deque workQueue = shardedKeyAndWorkQueue.getValue(); @@ -234,8 +235,8 @@ private static Stream toKeyedGetDataRequestStream( .setShardingKey(shardedKey.shardingKey()) .setWorkToken(work.getWorkItem().getWorkToken()) .addAllLatencyAttribution( - work.getLatencyAttributions(true, constructWorkId(work.getWorkItem()), - sampler)) + work.getLatencyAttributions( + true, constructWorkId(work.getWorkItem()), sampler)) .build()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java index be0f77843239..410b4f46eca2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java @@ -120,11 +120,9 @@ private void forceExecute(Work work) { executor.forceExecute(work, work.getWorkItem().getSerializedSize()); } - /** - * Adds any work started before the refreshDeadline to the GetDataRequest builder. - */ - public List getKeysToRefresh(Instant refreshDeadline, - DataflowExecutionStateSampler sampler) { + /** Adds any work started before the refreshDeadline to the GetDataRequest builder. */ + public List getKeysToRefresh( + Instant refreshDeadline, DataflowExecutionStateSampler sampler) { return activeWorkState.getKeysToRefresh(refreshDeadline, sampler); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index b6924592d323..4e4f7de538e2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -109,8 +109,8 @@ private void recordGetWorkStreamLatencies( } } - public Collection getLatencyAttributions(Boolean isHeartbeat, - String workId, DataflowExecutionStateSampler sampler) { + public Collection getLatencyAttributions( + Boolean isHeartbeat, String workId, DataflowExecutionStateSampler sampler) { List list = new ArrayList<>(); for (Windmill.LatencyAttribution.State state : Windmill.LatencyAttribution.State.values()) { Duration duration = totalDurationPerState.getOrDefault(state, Duration.ZERO); @@ -122,24 +122,23 @@ public Collection getLatencyAttributions(Boolean is } LatencyAttribution.Builder laBuilder = Windmill.LatencyAttribution.newBuilder(); if (state == LatencyAttribution.State.ACTIVE) { - laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder, - workId, sampler); + laBuilder = addActiveLatencyBreakdownToBuilder(isHeartbeat, laBuilder, workId, sampler); } - Windmill.LatencyAttribution la = laBuilder - .setState(state) - .setTotalDurationMillis(duration.getMillis()) - .build(); + Windmill.LatencyAttribution la = + laBuilder.setState(state).setTotalDurationMillis(duration.getMillis()).build(); list.add(la); } return list; } - private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boolean isHeartbeat, - LatencyAttribution.Builder builder, String workId, DataflowExecutionStateSampler sampler) { + private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder( + Boolean isHeartbeat, + LatencyAttribution.Builder builder, + String workId, + DataflowExecutionStateSampler sampler) { if (isHeartbeat) { ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder(); - ActiveMessageMetadata activeMessage = sampler.getActiveMessageMetadataForWorkId( - workId); + ActiveMessageMetadata activeMessage = sampler.getActiveMessageMetadataForWorkId(workId); if (activeMessage == null) { return builder; } @@ -151,20 +150,22 @@ private static LatencyAttribution.Builder addActiveLatencyBreakdownToBuilder(Boo builder.addActiveLatencyBreakdown(stepBuilder.build()); return builder; } - - Map processingDistributions = sampler.getProcessingDistributionsForWorkId( - workId); + + Map processingDistributions = + sampler.getProcessingDistributionsForWorkId(workId); if (processingDistributions == null) { return builder; } for (Entry entry : processingDistributions.entrySet()) { ActiveLatencyBreakdown.Builder stepBuilder = ActiveLatencyBreakdown.newBuilder(); stepBuilder.setUserStepName(entry.getKey()); - Distribution.Builder distributionBuilder = Distribution.newBuilder() - .setCount(entry.getValue().getCount()) - .setMin(entry.getValue().getMin()).setMax(entry.getValue() - .getMax()).setMean((long) entry.getValue().getAverage()) - .setSum(entry.getValue().getSum()); + Distribution.Builder distributionBuilder = + Distribution.newBuilder() + .setCount(entry.getValue().getCount()) + .setMin(entry.getValue().getMin()) + .setMax(entry.getValue().getMax()) + .setMean((long) entry.getValue().getAverage()) + .setSum(entry.getValue().getSum()); stepBuilder.setProcessingTimesDistribution(distributionBuilder.build()); builder.addActiveLatencyBreakdown(stepBuilder.build()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java index a6d634ea127e..dcf4bfcaee42 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionContextTest.java @@ -25,18 +25,13 @@ import java.io.Closeable; import java.io.IOException; import java.util.Arrays; -import java.util.HashMap; import java.util.HashSet; import java.util.IntSummaryStatistics; import java.util.Map; import org.apache.beam.runners.core.metrics.ExecutionStateSampler; import org.apache.beam.runners.core.metrics.ExecutionStateTracker; -import org.apache.beam.runners.dataflow.worker.DataflowOperationContext.DataflowExecutionState; import org.apache.beam.runners.dataflow.worker.StreamingModeExecutionContext.StreamingModeExecutionState; -import org.apache.beam.runners.dataflow.worker.counters.NameContext; -import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler; import org.apache.beam.runners.dataflow.worker.profiler.ScopedProfiler.NoopProfileScope; -import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.junit.Assert; import org.junit.Test; @@ -130,28 +125,27 @@ public void testContextActivationObserverActivation() throws Exception { public void testDataflowExecutionStateTrackerRecordsActiveMessageMetadata() throws IOException { DataflowExecutionContext.DataflowExecutionStateTracker tracker = new DataflowExecutionContext.DataflowExecutionStateTracker( - ExecutionStateSampler.instance(), + ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), ""); + StreamingModeExecutionState state = + new StreamingModeExecutionState( + NameContextsForTests.nameContextForTest(), + PROCESS_STATE_NAME, null, - null, - PipelineOptionsFactory.create(), - ""); - StreamingModeExecutionState state = new StreamingModeExecutionState( - NameContextsForTests.nameContextForTest(), PROCESS_STATE_NAME, null, NoopProfileScope.NOOP, - null); + NoopProfileScope.NOOP, + null); Closeable closure = tracker.enterState(state); // After entering a process state, we should have an active message tracked. - ActiveMessageMetadata expectedMetadata = new ActiveMessageMetadata( - NameContextsForTests.nameContextForTest().userName(), 1l); - Assert.assertEquals(expectedMetadata.userStepName, - tracker.getActiveMessageMetadata().userStepName); + ActiveMessageMetadata expectedMetadata = + new ActiveMessageMetadata(NameContextsForTests.nameContextForTest().userName(), 1l); + Assert.assertEquals( + expectedMetadata.userStepName, tracker.getActiveMessageMetadata().userStepName); closure.close(); // Once the state closes, the active message should get cleared. - Assert.assertEquals(null, - tracker.getActiveMessageMetadata()); + Assert.assertEquals(null, tracker.getActiveMessageMetadata()); } @Test @@ -159,21 +153,25 @@ public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes() throws IOException { DataflowExecutionContext.DataflowExecutionStateTracker tracker = new DataflowExecutionContext.DataflowExecutionStateTracker( - ExecutionStateSampler.instance(), - null, - null, - PipelineOptionsFactory.create(), - ""); + ExecutionStateSampler.instance(), null, null, PipelineOptionsFactory.create(), ""); // Enter a processing state - StreamingModeExecutionState state = new StreamingModeExecutionState( - NameContextsForTests.nameContextForTest(), PROCESS_STATE_NAME, null, NoopProfileScope.NOOP, - null); + StreamingModeExecutionState state = + new StreamingModeExecutionState( + NameContextsForTests.nameContextForTest(), + PROCESS_STATE_NAME, + null, + NoopProfileScope.NOOP, + null); tracker.enterState(state); // Enter a new processing state - StreamingModeExecutionState newState = new StreamingModeExecutionState( - NameContextsForTests.nameContextForTest(), PROCESS_STATE_NAME, null, NoopProfileScope.NOOP, - null); + StreamingModeExecutionState newState = + new StreamingModeExecutionState( + NameContextsForTests.nameContextForTest(), + PROCESS_STATE_NAME, + null, + NoopProfileScope.NOOP, + null); tracker.enterState(newState); // The first completed state should be recorded and the new state should be active. @@ -182,9 +180,9 @@ public void testDataflowExecutionStateTrackerRecordsCompletedProcessingTimes() Assert.assertEquals( new HashSet<>(Arrays.asList(NameContextsForTests.nameContextForTest().userName())), gotProcessingTimes.keySet()); - ActiveMessageMetadata expectedMetadata = new ActiveMessageMetadata( - NameContextsForTests.nameContextForTest().userName(), 1l); - Assert.assertEquals(expectedMetadata.userStepName, - tracker.getActiveMessageMetadata().userStepName); + ActiveMessageMetadata expectedMetadata = + new ActiveMessageMetadata(NameContextsForTests.nameContextForTest().userName(), 1l); + Assert.assertEquals( + expectedMetadata.userStepName, tracker.getActiveMessageMetadata().userStepName); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java index 01e3503c14db..1737da963e3e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/DataflowExecutionStateSamplerTest.java @@ -1,8 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.apache.beam.runners.dataflow.worker; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.anEmptyMap; -import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -31,8 +46,9 @@ public void setUp() { sampler = DataflowExecutionStateSampler.newForTest(clock); } - private final TestOperationContext.TestDataflowExecutionState step1act1 = new TestOperationContext.TestDataflowExecutionState( - createNameContext("test-stage1"), "activity1"); + private final TestOperationContext.TestDataflowExecutionState step1act1 = + new TestOperationContext.TestDataflowExecutionState( + createNameContext("test-stage1"), "activity1"); private NameContext createNameContext(String userName) { return NameContext.create("", "", "", userName); @@ -41,15 +57,13 @@ private NameContext createNameContext(String userName) { @Test public void testAddTrackerRemoveTrackerActiveMessageMetadataGetsUpdated() { String workId = "work-item-id1"; - ActiveMessageMetadata testMetadata = new ActiveMessageMetadata( - step1act1.getStepName().userName(), - clock.getMillis()); + ActiveMessageMetadata testMetadata = + new ActiveMessageMetadata(step1act1.getStepName().userName(), clock.getMillis()); DataflowExecutionStateTracker trackerMock = createMockTracker(workId); when(trackerMock.getActiveMessageMetadata()).thenReturn(testMetadata); sampler.addTracker(trackerMock); - assertThat(sampler.getActiveMessageMetadataForWorkId(workId), - equalTo(testMetadata)); + assertThat(sampler.getActiveMessageMetadataForWorkId(workId), equalTo(testMetadata)); sampler.removeTracker(trackerMock); Assert.assertNull(sampler.getActiveMessageMetadataForWorkId(workId)); @@ -66,8 +80,8 @@ public void testRemoveTrackerCompletedProcessingTimesGetsUpdated() { sampler.addTracker(trackerMock); sampler.removeTracker(trackerMock); - assertThat(sampler.getProcessingDistributionsForWorkId(workId), - equalTo(testCompletedProcessingTimes)); + assertThat( + sampler.getProcessingDistributionsForWorkId(workId), equalTo(testCompletedProcessingTimes)); } @Test @@ -79,19 +93,17 @@ public void testGetCompletedProcessingTimesAndActiveMessageFromActiveTracker() { testSummaryStats.accept(3); testSummaryStats.accept(5); testCompletedProcessingTimes.put("some-step", testSummaryStats); - ActiveMessageMetadata testMetadata = new ActiveMessageMetadata( - step1act1.getStepName().userName(), - clock.getMillis()); + ActiveMessageMetadata testMetadata = + new ActiveMessageMetadata(step1act1.getStepName().userName(), clock.getMillis()); DataflowExecutionStateTracker trackerMock = createMockTracker(workId); when(trackerMock.getActiveMessageMetadata()).thenReturn(testMetadata); when(trackerMock.getProcessingTimesByStep()).thenReturn(testCompletedProcessingTimes); sampler.addTracker(trackerMock); - assertThat(sampler.getActiveMessageMetadataForWorkId(workId), - equalTo(testMetadata)); - assertThat(sampler.getProcessingDistributionsForWorkId(workId), - equalTo(testCompletedProcessingTimes)); + assertThat(sampler.getActiveMessageMetadataForWorkId(workId), equalTo(testMetadata)); + assertThat( + sampler.getProcessingDistributionsForWorkId(workId), equalTo(testCompletedProcessingTimes)); } @Test @@ -104,20 +116,25 @@ public void testResetForWorkIdClearsMaps() { sampler.addTracker(tracker1Mock); sampler.addTracker(tracker2Mock); - assertThat(sampler.getActiveMessageMetadataForWorkId(workId1), + assertThat( + sampler.getActiveMessageMetadataForWorkId(workId1), equalTo(tracker1Mock.getActiveMessageMetadata())); - assertThat(sampler.getProcessingDistributionsForWorkId(workId1), + assertThat( + sampler.getProcessingDistributionsForWorkId(workId1), equalTo(tracker1Mock.getProcessingTimesByStep())); - assertThat(sampler.getActiveMessageMetadataForWorkId(workId2), + assertThat( + sampler.getActiveMessageMetadataForWorkId(workId2), equalTo(tracker2Mock.getActiveMessageMetadata())); - assertThat(sampler.getProcessingDistributionsForWorkId(workId2), + assertThat( + sampler.getProcessingDistributionsForWorkId(workId2), equalTo(tracker2Mock.getProcessingTimesByStep())); sampler.removeTracker(tracker1Mock); sampler.removeTracker(tracker2Mock); sampler.resetForWorkId(workId2); - assertThat(sampler.getProcessingDistributionsForWorkId(workId1), + assertThat( + sampler.getProcessingDistributionsForWorkId(workId1), equalTo(tracker1Mock.getProcessingTimesByStep())); Assert.assertNull(sampler.getProcessingDistributionsForWorkId(workId2)); } @@ -127,4 +144,4 @@ private DataflowExecutionStateTracker createMockTracker(String workItemId) { when(trackerMock.getWorkItemId()).thenReturn(workItemId); return trackerMock; } -} \ No newline at end of file +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index d064accbba34..6a363b710dba 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -3275,8 +3275,8 @@ public void testLatencyAttributionProtobufsPopulated() { work.setState(Work.State.COMMITTING); clock.sleep(Duration.millis(60)); - Iterator it = work.getLatencyAttributions(false, - "", DataflowExecutionStateSampler.instance()).iterator(); + Iterator it = + work.getLatencyAttributions(false, "", DataflowExecutionStateSampler.instance()).iterator(); assertTrue(it.hasNext()); LatencyAttribution lat = it.next(); assertSame(State.QUEUED, lat.getState()); @@ -3526,14 +3526,13 @@ public void testDoFnLatencyBreakdownsReportedOnCommit() throws Exception { Map result = server.waitForAndGetCommits(1); Windmill.WorkItemCommitRequest commit = result.get(0L); - Windmill.LatencyAttribution.Builder laBuilder = LatencyAttribution.newBuilder() - .setState(State.ACTIVE) - .setTotalDurationMillis(100); + Windmill.LatencyAttribution.Builder laBuilder = + LatencyAttribution.newBuilder().setState(State.ACTIVE).setTotalDurationMillis(100); for (LatencyAttribution la : commit.getPerWorkItemLatencyAttributionsList()) { if (la.getState() == State.ACTIVE) { assertThat(la.getActiveLatencyBreakdownCount(), equalTo(1)); - assertThat(la.getActiveLatencyBreakdown(0).getUserStepName(), - equalTo(DEFAULT_PARDO_USER_NAME)); + assertThat( + la.getActiveLatencyBreakdown(0).getUserStepName(), equalTo(DEFAULT_PARDO_USER_NAME)); Assert.assertTrue(la.getActiveLatencyBreakdown(0).hasProcessingTimesDistribution()); Assert.assertFalse(la.getActiveLatencyBreakdown(0).hasActiveMessageMetadata()); } @@ -3563,8 +3562,8 @@ public void testDoFnActiveMessageMetadataReportedOnHeartbeat() throws Exception assertThat(server.numGetDataRequests(), greaterThan(0)); Windmill.GetDataRequest heartbeat = server.getGetDataRequests().get(2); - for (LatencyAttribution la : heartbeat.getRequests(0).getRequests(0) - .getLatencyAttributionList()) { + for (LatencyAttribution la : + heartbeat.getRequests(0).getRequests(0).getLatencyAttributionList()) { if (la.getState() == State.ACTIVE) { assertTrue(la.getActiveLatencyBreakdownCount() > 0); assertTrue(la.getActiveLatencyBreakdown(0).hasActiveMessageMetadata()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 97d02968c14f..ce9e3d6bde4f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -252,8 +252,8 @@ public void testGetKeysToRefresh() { activeWorkState.activateWorkForKey(shardedKey1, freshWork); activeWorkState.activateWorkForKey(shardedKey2, refreshableWork2); - ImmutableList requests = activeWorkState.getKeysToRefresh(refreshDeadline, - DataflowExecutionStateSampler.instance()); + ImmutableList requests = + activeWorkState.getKeysToRefresh(refreshDeadline, DataflowExecutionStateSampler.instance()); ImmutableList expected = ImmutableList.of(