From 51840211f8376d4c64b3f046ccdb5cf9345657a4 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 19 Oct 2023 17:37:51 -0700 Subject: [PATCH 01/11] check for clientId representing a retry before activating and completing work --- .../worker/StreamingDataflowWorker.java | 13 +- .../worker/streaming/ActiveWorkState.java | 69 ++++--- .../worker/streaming/ComputationState.java | 4 +- .../dataflow/worker/streaming/Work.java | 10 + .../dataflow/worker/streaming/WorkId.java | 62 +++++++ .../worker/StreamingDataflowWorkerTest.java | 66 +++---- .../worker/streaming/ActiveWorkStateTest.java | 172 ++++++++++++------ 7 files changed, 282 insertions(+), 114 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index d915b77995db..e023b1ad980b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -96,6 +96,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.Work.State; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; @@ -1298,7 +1299,7 @@ public void close() { // Consider the item invalid. It will eventually be retried by Windmill if it still needs to // be processed. computationState.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken()); + ShardedKey.create(key, workItem.getShardingKey()), work.id()); } } finally { // Update total processing time counters. Updating in finally clause ensures that @@ -1376,7 +1377,10 @@ private void commitLoop() { for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) { computationState.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()), - workRequest.getWorkToken()); + WorkId.builder() + .setCacheToken(workRequest.getCacheToken()) + .setWorkToken(workRequest.getWorkToken()) + .build()); } } } @@ -1411,7 +1415,10 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) // was deemed stuck. state.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(request.getKey(), request.getShardingKey()), - request.getWorkToken()); + WorkId.builder() + .setCacheToken(request.getCacheToken()) + .setWorkToken(request.getWorkToken()) + .build()); })) { return true; } else { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 54942dfeee1f..1336da1950a1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -21,6 +21,7 @@ import java.io.PrintWriter; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; import java.util.HashMap; import java.util.List; @@ -57,7 +58,7 @@ public final class ActiveWorkState { private static final Logger LOG = LoggerFactory.getLogger(ActiveWorkState.class); - /* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown.*/ + /* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown for observability.*/ private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; /** @@ -88,6 +89,12 @@ static ActiveWorkState forTesting( return new ActiveWorkState(activeWork, computationStateCache); } + private static String elapsedString(Instant start, Instant end) { + Duration activeFor = new Duration(start, end); + // Duration's toString always starts with "PT"; remove that here. + return activeFor.toString().substring(2); + } + /** * Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 3 {@link * ActivateWorkResult} @@ -112,11 +119,19 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w return ActivateWorkResult.EXECUTE; } - // Ensure we don't already have this work token queued. + // Check to see if we have this work token queued. for (Work queuedWork : workQueue) { - if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { + // Work tokens and cache tokens are equal. + if (queuedWork.id().equals(work.id())) { return ActivateWorkResult.DUPLICATE; } + + if (queuedWork.id().cacheToken() == work.id().cacheToken()) { + if (work.id().workToken() > queuedWork.id().workToken()) { + workQueue.addLast(work); + return ActivateWorkResult.QUEUED; + } + } } // Queue the work for later processing. @@ -175,19 +190,22 @@ synchronized void failWorkForKey(Map> failedWork) { * #activeWork}. */ synchronized Optional completeWorkAndGetNextWorkForKey( - ShardedKey shardedKey, long workToken) { + ShardedKey shardedKey, WorkId workId) { @Nullable Queue workQueue = activeWork.get(shardedKey); if (workQueue == null) { // Work may have been completed due to clearing of stuck commits. - LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workToken); + LOG.warn( + "Unable to complete inactive work for sharded_key {} and work_id {}.", + shardedKey, + workId); return Optional.empty(); } - removeCompletedWorkFromQueue(workQueue, shardedKey, workToken); + removeCompletedWorkFromQueue(workQueue, shardedKey, workId); return getNextWork(workQueue, shardedKey); } private synchronized void removeCompletedWorkFromQueue( - Queue workQueue, ShardedKey shardedKey, long workToken) { + Queue workQueue, ShardedKey shardedKey, WorkId workId) { // avoid Preconditions.checkState here to prevent eagerly evaluating the // format string parameters for the error message. Work completedWork = @@ -196,16 +214,19 @@ private synchronized void removeCompletedWorkFromQueue( () -> new IllegalStateException( String.format( - "Active key %s without work, expected token %d", - shardedKey, workToken))); + "Active key %s without work, expected work_token %d, expected cache_token %d", + shardedKey, workId.workToken(), workId.cacheToken()))); - if (completedWork.getWorkItem().getWorkToken() != workToken) { + if (!completedWork.id().equals(workId)) { // Work may have been completed due to clearing of stuck commits. LOG.warn( - "Unable to complete due to token mismatch for key {} and token {}, actual token was {}.", + "Unable to complete due to token mismatch for " + + "key {}," + + "expected work_id {}, " + + "actual work_id was {}", shardedKey, - workToken, - completedWork.getWorkItem().getWorkToken()); + workId, + completedWork.id()); return; } @@ -227,21 +248,21 @@ private synchronized Optional getNextWork(Queue workQueue, ShardedKe * before the stuckCommitDeadline. */ synchronized void invalidateStuckCommits( - Instant stuckCommitDeadline, BiConsumer shardedKeyAndWorkTokenConsumer) { - for (Entry shardedKeyAndWorkToken : + Instant stuckCommitDeadline, BiConsumer shardedKeyAndWorkIdConsumer) { + for (Entry shardedKeyAndWorkId : getStuckCommitsAt(stuckCommitDeadline).entrySet()) { - ShardedKey shardedKey = shardedKeyAndWorkToken.getKey(); - long workToken = shardedKeyAndWorkToken.getValue(); + ShardedKey shardedKey = shardedKeyAndWorkId.getKey(); + WorkId workId = shardedKeyAndWorkId.getValue(); computationStateCache.invalidate(shardedKey.key(), shardedKey.shardingKey()); - shardedKeyAndWorkTokenConsumer.accept(shardedKey, workToken); + shardedKeyAndWorkIdConsumer.accept(shardedKey, workId); } } - private synchronized ImmutableMap getStuckCommitsAt( + private synchronized ImmutableMap getStuckCommitsAt( Instant stuckCommitDeadline) { // Determine the stuck commit keys but complete them outside the loop iterating over // activeWork as completeWork may delete the entry from activeWork. - ImmutableMap.Builder stuckCommits = ImmutableMap.builder(); + ImmutableMap.Builder stuckCommits = ImmutableMap.builder(); for (Entry> entry : activeWork.entrySet()) { ShardedKey shardedKey = entry.getKey(); @Nullable Work work = entry.getValue().peek(); @@ -251,7 +272,7 @@ private synchronized ImmutableMap getStuckCommitsAt( "Detected key {} stuck in COMMITTING state since {}, completing it with error.", shardedKey, work.getStateStartTime()); - stuckCommits.put(shardedKey, work.getWorkItem().getWorkToken()); + stuckCommits.put(shardedKey, work.id()); } } } @@ -333,12 +354,6 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { } } - private static String elapsedString(Instant start, Instant end) { - Duration activeFor = new Duration(start, end); - // Duration's toString always starts with "PT"; remove that here. - return activeFor.toString().substring(2); - } - enum ActivateWorkResult { QUEUED, EXECUTE, diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java index 8207a6ef2f09..be84fa195b00 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java @@ -107,9 +107,9 @@ public void failWork(Map> failedWork) { /** * Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. */ - public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, long workToken) { + public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, WorkId workId) { activeWorkState - .completeWorkAndGetNextWorkForKey(shardedKey, workToken) + .completeWorkAndGetNextWorkForKey(shardedKey, workId) .ifPresent(this::forceExecute); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index 8d4ba33a1abc..b3167248d224 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -48,6 +48,7 @@ public class Work implements Runnable { private final Instant startTime; private final Map totalDurationPerState; private final Consumer processWorkFn; + private final WorkId id; private TimedState currentState; private volatile boolean isFailed; @@ -60,6 +61,11 @@ private Work(Windmill.WorkItem workItem, Supplier clock, Consumer this.totalDurationPerState = new EnumMap<>(Windmill.LatencyAttribution.State.class); this.currentState = TimedState.initialState(startTime); this.isFailed = false; + this.id = + WorkId.builder() + .setCacheToken(workItem.getCacheToken()) + .setWorkToken(workItem.getWorkToken()) + .build(); } public static Work create( @@ -196,6 +202,10 @@ boolean isStuckCommittingAt(Instant stuckCommitDeadline) { && currentState.startTime().isBefore(stuckCommitDeadline); } + public WorkId id() { + return id; + } + public enum State { QUEUED(Windmill.LatencyAttribution.State.QUEUED), PROCESSING(Windmill.LatencyAttribution.State.ACTIVE), diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java new file mode 100644 index 000000000000..045eb641928e --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.auto.value.AutoValue; +import org.apache.beam.vendor.grpc.v1p54p0.com.google.common.primitives.Longs; + +/** + * A composite key used to identify a unit of {@link Work}. If multiple units of {@link Work} have + * the same workToken AND cacheToken, the {@link Work} is a duplicate. If multiple units of {@link + * Work} have the same workToken, but different cacheTokens, the {@link Work} is a retry. If + * multiple units of {@link Work} have the same cacheToken, but different workTokens, the {@link + * Work} is obsolete. + */ +@AutoValue +public abstract class WorkId implements Comparable { + + public static Builder builder() { + return new AutoValue_WorkId.Builder(); + } + + abstract long cacheToken(); + + abstract long workToken(); + + boolean isRetryOf(WorkId other) { + return other.workToken() == workToken() && other.cacheToken() != cacheToken(); + } + + boolean isForSameWork(WorkId other) { + return workToken() == other.workToken(); + } + + @Override + public final int compareTo(WorkId other) { + return Longs.compare(workToken(), other.workToken()); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setCacheToken(long value); + + public abstract Builder setWorkToken(long value); + + public abstract WorkId build(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 5de67a371a65..e1574f6fc084 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -267,6 +267,7 @@ public Long get() { @Rule public ErrorCollector errorCollector = new ErrorCollector(); WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class); HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class); + volatile boolean stop = false; public StreamingDataflowWorkerTest(Boolean streamingEngine) { this.streamingEngine = streamingEngine; @@ -286,13 +287,17 @@ private static CounterUpdate getCounter(Iterable counters, String return null; } - static Work createMockWork(long workToken) { - return createMockWork(workToken, work -> {}); + static Work createMockWork(long workToken, long cacheToken) { + return createMockWork(workToken, cacheToken, work -> {}); } - static Work createMockWork(long workToken, Consumer processWorkFn) { + static Work createMockWork(long workToken, long cacheToken, Consumer processWorkFn) { return Work.create( - Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(), + Windmill.WorkItem.newBuilder() + .setKey(ByteString.EMPTY) + .setWorkToken(workToken) + .setCacheToken(cacheToken) + .build(), Instant::now, Collections.emptyList(), processWorkFn); @@ -2651,7 +2656,7 @@ public void testUnboundedSourceWorkRetry() throws Exception { } @Test - public void testActiveWork() throws Exception { + public void testActiveWork() { BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class); ComputationState computationState = new ComputationState( @@ -2664,39 +2669,39 @@ public void testActiveWork() throws Exception { ShardedKey key1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); ShardedKey key2 = ShardedKey.create(ByteString.copyFromUtf8("key2"), 2); - Work m1 = createMockWork(1); + Work m1 = createMockWork(1, 1); assertTrue(computationState.activateWork(key1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 1); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m1.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. - Work m2 = createMockWork(2); + Work m2 = createMockWork(2, 2); assertTrue(computationState.activateWork(key1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); - Work m3 = createMockWork(3); + Work m3 = createMockWork(3, 3); assertTrue(computationState.activateWork(key1, m3)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify another key is a separate queue. - Work m4 = createMockWork(4); + Work m4 = createMockWork(4, 4); assertTrue(computationState.activateWork(key2, m4)); Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key2, 4); + computationState.completeWorkAndScheduleNextWorkForKey(key2, m4.id()); Mockito.verifyNoMoreInteractions(mockExecutor); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 2); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m2.id()); Mockito.verify(mockExecutor).forceExecute(m3, m3.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 3); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m3.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify duplicate work dropped. - Work m5 = createMockWork(5); + Work m5 = createMockWork(5, 5); computationState.activateWork(key1, m5); Mockito.verify(mockExecutor).execute(m5, m5.getWorkItem().getSerializedSize()); assertFalse(computationState.activateWork(key1, m5)); Mockito.verifyNoMoreInteractions(mockExecutor); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 5); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m5.id()); Mockito.verifyNoMoreInteractions(mockExecutor); } @@ -2714,22 +2719,22 @@ public void testActiveWorkForShardedKeys() throws Exception { ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); ShardedKey key1Shard2 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 2); - Work m1 = createMockWork(1); + Work m1 = createMockWork(1, 1); assertTrue(computationState.activateWork(key1Shard1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1Shard1, 1); + computationState.completeWorkAndScheduleNextWorkForKey(key1Shard1, m1.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. - Work m2 = createMockWork(2); + Work m2 = createMockWork(2, 2); assertTrue(computationState.activateWork(key1Shard1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); - Work m3 = createMockWork(3); + Work m3 = createMockWork(3, 3); assertTrue(computationState.activateWork(key1Shard1, m3)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify a different shard of key is a separate queue. - Work m4 = createMockWork(3); + Work m4 = createMockWork(3, 3); assertFalse(computationState.activateWork(key1Shard1, m4)); Mockito.verifyNoMoreInteractions(mockExecutor); assertTrue(computationState.activateWork(key1Shard2, m4)); @@ -2737,7 +2742,7 @@ public void testActiveWorkForShardedKeys() throws Exception { // Verify duplicate work dropped assertFalse(computationState.activateWork(key1Shard2, m4)); - computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, 3); + computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, m3.id()); Mockito.verifyNoMoreInteractions(mockExecutor); } @@ -2782,8 +2787,8 @@ public void testMaxThreadMetric() throws Exception { } }; - Work m2 = createMockWork(2, sleepProcessWorkFn); - Work m3 = createMockWork(3, sleepProcessWorkFn); + Work m2 = createMockWork(2, 2, sleepProcessWorkFn); + Work m3 = createMockWork(3, 3, sleepProcessWorkFn); assertTrue(computationState.activateWork(key1Shard1, m2)); assertTrue(computationState.activateWork(key1Shard1, m3)); @@ -2823,7 +2828,7 @@ public void testActiveThreadMetric() throws Exception { ComputationState computationState = new ComputationState( "computation", - defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))), + defaultMapTask(Collections.singletonList(makeSourceInstruction(StringUtf8Coder.of()))), executor, ImmutableMap.of(), null); @@ -2841,11 +2846,11 @@ public void testActiveThreadMetric() throws Exception { } }; - Work m2 = createMockWork(2, sleepProcessWorkFn); + Work m2 = createMockWork(2, 2, sleepProcessWorkFn); - Work m3 = createMockWork(3, sleepProcessWorkFn); + Work m3 = createMockWork(3, 3, sleepProcessWorkFn); - Work m4 = createMockWork(4, sleepProcessWorkFn); + Work m4 = createMockWork(4, 4, sleepProcessWorkFn); assertEquals(0, executor.activeCount()); assertTrue(computationState.activateWork(key1Shard1, m2)); @@ -2892,7 +2897,7 @@ public void testOutstandingBytesMetric() throws Exception { ComputationState computationState = new ComputationState( "computation", - defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))), + defaultMapTask(Collections.singletonList(makeSourceInstruction(StringUtf8Coder.of()))), executor, ImmutableMap.of(), null); @@ -2965,7 +2970,7 @@ public void testOutstandingBundlesMetric() throws Exception { ComputationState computationState = new ComputationState( "computation", - defaultMapTask(Arrays.asList(makeSourceInstruction(StringUtf8Coder.of()))), + defaultMapTask(Collections.singletonList(makeSourceInstruction(StringUtf8Coder.of()))), executor, ImmutableMap.of(), null); @@ -3203,8 +3208,7 @@ public void testExceptionInvalidatesCache() throws Exception { // The commit will include a timer to clean up state - this timer is irrelevant // for the current test. Also remove source_bytes_processed because it's dynamic. setValuesTimestamps( - removeDynamicFields(commit) - .toBuilder() + removeDynamicFields(commit).toBuilder() .clearOutputTimers() .clearSourceBytesProcessed()) .build(), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index b384bb03185d..dd4a2bed5564 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -32,11 +32,10 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; -import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.ActivateWorkResult; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.ByteString; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -50,9 +49,9 @@ @RunWith(JUnit4.class) public class ActiveWorkStateTest { - @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private final WindmillStateCache.ForComputation computationStateCache = mock(WindmillStateCache.ForComputation.class); + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private Map> readOnlyActiveWork; private ActiveWorkState activeWorkState; @@ -61,26 +60,27 @@ private static ShardedKey shardedKey(String str, long shardKey) { return ShardedKey.create(ByteString.copyFromUtf8(str), shardKey); } - private static Work emptyWork() { - return createWork(null); - } - - private static Work createWork(@Nullable Windmill.WorkItem workItem) { + private static Work createWork(WorkItem workItem) { return Work.create(workItem, Instant::now, Collections.emptyList(), unused -> {}); } - private static Work expiredWork(Windmill.WorkItem workItem) { + private static Work expiredWork(WorkItem workItem) { return Work.create(workItem, () -> Instant.EPOCH, Collections.emptyList(), unused -> {}); } - private static Windmill.WorkItem createWorkItem(long workToken) { - return Windmill.WorkItem.newBuilder() + private static WorkItem createWorkItem(long workToken, long cacheToken) { + return WorkItem.newBuilder() .setKey(ByteString.copyFromUtf8("")) .setShardingKey(1) .setWorkToken(workToken) + .setCacheToken(cacheToken) .build(); } + private static WorkId workDedupeToken(long workToken, long cacheToken) { + return WorkId.builder().setCacheToken(cacheToken).setWorkToken(workToken).build(); + } + @Before public void setup() { Map> readWriteActiveWorkMap = new HashMap<>(); @@ -92,7 +92,8 @@ public void setup() { @Test public void testActivateWorkForKey_EXECUTE_unknownKey() { ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey("someKey", 1L), emptyWork()); + activeWorkState.activateWorkForKey( + shardedKey("someKey", 1L), createWork(createWorkItem(1L, 1L))); assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult); } @@ -101,12 +102,14 @@ public void testActivateWorkForKey_EXECUTE_unknownKey() { public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { ShardedKey shardedKey = shardedKey("someKey", 1L); long workToken = 1L; - + long cacheToken = 2L; ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey( + shardedKey, createWork(createWorkItem(workToken, cacheToken))); Optional nextWorkForKey = - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workToken); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, workDedupeToken(workToken, cacheToken)); assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult); assertEquals(Optional.empty(), nextWorkForKey); @@ -116,24 +119,79 @@ public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { @Test public void testActivateWorkForKey_DUPLICATE() { long workToken = 10L; + long cacheToken = 5L; ShardedKey shardedKey = shardedKey("someKey", 1L); // ActivateWork with the same shardedKey, and the same workTokens. - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey( + shardedKey, createWork(createWorkItem(workToken, cacheToken))); + ActivateWorkResult activateWorkResult = + activeWorkState.activateWorkForKey( + shardedKey, createWork(createWorkItem(workToken, cacheToken))); + + assertEquals(ActivateWorkResult.DUPLICATE, activateWorkResult); + } + + @Test + public void testActivateWorkForKey_withMatchingWorkTokenAndDifferentCacheToken_QUEUED() { + long workToken = 10L; + long cacheToken1 = 5L; + long cacheToken2 = 7L; + + Work firstWork = createWork(createWorkItem(workToken, cacheToken1)); + Work secondWork = createWork(createWorkItem(workToken, cacheToken2)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + // ActivateWork with the same shardedKey, and the same workTokens, but different cacheTokens. + activeWorkState.activateWorkForKey(shardedKey, firstWork); + ActivateWorkResult activateWorkResult = + activeWorkState.activateWorkForKey(shardedKey, secondWork); + + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertEquals(secondWork, readOnlyActiveWork.get(shardedKey).peek()); + } + + @Test + public void testActivateWorkForKey_withMatchingWorkId_newerWorkQueued_DUPLICATE() { + long workToken = 10L; + long cacheToken1 = 5L; + + Work newerWork = createWork(createWorkItem(workToken, cacheToken1)); + Work olderWork = expiredWork(createWorkItem(workToken, cacheToken1)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, newerWork); ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey(shardedKey, olderWork); assertEquals(ActivateWorkResult.DUPLICATE, activateWorkResult); } + @Test + public void testActivateWorkForKey_withMatchingWorkId_olderWorkQueued_QUEUED() { + long workToken = 10L; + long cacheToken1 = 5L; + + Work newerWork = createWork(createWorkItem(workToken, cacheToken1)); + Work olderWork = expiredWork(createWorkItem(workToken, cacheToken1)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, olderWork); + ActivateWorkResult activateWorkResult = + activeWorkState.activateWorkForKey(shardedKey, newerWork); + + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertEquals(newerWork, readOnlyActiveWork.get(shardedKey).peek()); + } + @Test public void testActivateWorkForKey_QUEUED() { ShardedKey shardedKey = shardedKey("someKey", 1L); // ActivateWork with the same shardedKey, but different workTokens. - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(1L))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(1L, 1L))); ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(2L))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(2L, 2L))); assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); } @@ -142,18 +200,39 @@ public void testActivateWorkForKey_QUEUED() { public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { assertEquals( Optional.empty(), - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey("someKey", 1L), 10L)); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey("someKey", 1L), workDedupeToken(1L, 1L))); } @Test - public void testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchWorkToComplete() { - long workTokenToComplete = 1L; + public void + testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueWorkTokenDoesNotMatchWorkToComplete() { + long workTokenInQueue = 2L; + long otherWorkToken = 1L; + long cacheToken = 1L; + Work workInQueue = createWork(createWorkItem(workTokenInQueue, cacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, workInQueue); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, workDedupeToken(otherWorkToken, cacheToken)); - Work workInQueue = createWork(createWorkItem(2L)); + assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); + assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); + } + + @Test + public void + testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueCacheTokenDoesNotMatchWorkToComplete() { + long cacheTokenInQueue = 2L; + long otherCacheToken = 1L; + long workToken = 1L; + Work workInQueue = createWork(createWorkItem(workToken, cacheTokenInQueue)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workInQueue); - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, workDedupeToken(workToken, otherCacheToken)); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); @@ -161,15 +240,13 @@ public void testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchW @Test public void testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplete() { - long workTokenToComplete = 1L; - - Work activeWork = createWork(createWorkItem(workTokenToComplete)); - Work nextWork = createWork(createWorkItem(2L)); + Work activeWork = createWork(createWorkItem(1L, 1L)); + Work nextWork = createWork(createWorkItem(2L, 2L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, activeWork); activeWorkState.activateWorkForKey(shardedKey, nextWork); - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, activeWork.id()); assertEquals(nextWork, readOnlyActiveWork.get(shardedKey).peek()); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); @@ -178,37 +255,33 @@ public void testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplet @Test public void testCompleteWorkAndGetNextWorkForKey_removesQueueIfNoWorkPresent() { - Work workInQueue = createWork(createWorkItem(1L)); + Work workInQueue = createWork(createWorkItem(1L, 1L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workInQueue); - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workInQueue.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workInQueue.id()); assertFalse(readOnlyActiveWork.containsKey(shardedKey)); } @Test public void testCompleteWorkAndGetNextWorkForKey_returnsWorkIfPresent() { - Work workToBeCompleted = createWork(createWorkItem(1L)); - Work nextWork = createWork(createWorkItem(2L)); + Work workToBeCompleted = createWork(createWorkItem(1L, 1L)); + Work nextWork = createWork(createWorkItem(2L, 2L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workToBeCompleted); activeWorkState.activateWorkForKey(shardedKey, nextWork); - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workToBeCompleted.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workToBeCompleted.id()); Optional nextWorkOpt = - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workToBeCompleted.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workToBeCompleted.id()); assertTrue(nextWorkOpt.isPresent()); assertSame(nextWork, nextWorkOpt.get()); Optional endOfWorkQueue = - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, nextWork.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, nextWork.id()); assertFalse(endOfWorkQueue.isPresent()); assertFalse(readOnlyActiveWork.containsKey(shardedKey)); @@ -216,11 +289,11 @@ public void testCompleteWorkAndGetNextWorkForKey_returnsWorkIfPresent() { @Test public void testInvalidateStuckCommits() { - Map invalidatedCommits = new HashMap<>(); + Map invalidatedCommits = new HashMap<>(); - Work stuckWork1 = expiredWork(createWorkItem(1L)); + Work stuckWork1 = expiredWork(createWorkItem(1L, 1L)); stuckWork1.setState(Work.State.COMMITTING); - Work stuckWork2 = expiredWork(createWorkItem(2L)); + Work stuckWork2 = expiredWork(createWorkItem(2L, 2L)); stuckWork2.setState(Work.State.COMMITTING); ShardedKey shardedKey1 = shardedKey("someKey", 1L); ShardedKey shardedKey2 = shardedKey("anotherKey", 2L); @@ -230,10 +303,8 @@ public void testInvalidateStuckCommits() { activeWorkState.invalidateStuckCommits(Instant.now(), invalidatedCommits::put); - assertThat(invalidatedCommits) - .containsEntry(shardedKey1, stuckWork1.getWorkItem().getWorkToken()); - assertThat(invalidatedCommits) - .containsEntry(shardedKey2, stuckWork2.getWorkItem().getWorkToken()); + assertThat(invalidatedCommits).containsEntry(shardedKey1, stuckWork1.id()); + assertThat(invalidatedCommits).containsEntry(shardedKey2, stuckWork2.id()); verify(computationStateCache).invalidate(shardedKey1.key(), shardedKey1.shardingKey()); verify(computationStateCache).invalidate(shardedKey2.key(), shardedKey2.shardingKey()); } @@ -241,11 +312,10 @@ public void testInvalidateStuckCommits() { @Test public void testGetKeyHeartbeats() { Instant refreshDeadline = Instant.now(); - - Work freshWork = createWork(createWorkItem(3L)); - Work refreshableWork1 = expiredWork(createWorkItem(1L)); + Work freshWork = createWork(createWorkItem(3L, 3L)); + Work refreshableWork1 = expiredWork(createWorkItem(1L, 1L)); refreshableWork1.setState(Work.State.COMMITTING); - Work refreshableWork2 = expiredWork(createWorkItem(2L)); + Work refreshableWork2 = expiredWork(createWorkItem(2L, 2L)); refreshableWork2.setState(Work.State.COMMITTING); ShardedKey shardedKey1 = shardedKey("someKey", 1L); ShardedKey shardedKey2 = shardedKey("anotherKey", 2L); From 30f5ce1d656c7d3b964c8e8b0ad3dcc136ae9a4b Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 2 Nov 2023 16:55:55 -0700 Subject: [PATCH 02/11] change logic for activateWorkForKey --- .../worker/streaming/ActiveWorkState.java | 115 ++++++++++-------- .../worker/streaming/ComputationState.java | 5 +- .../worker/streaming/ActiveWorkStateTest.java | 83 ++++++++++--- 3 files changed, 135 insertions(+), 68 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 1336da1950a1..110eb3813435 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -21,9 +21,9 @@ import java.io.PrintWriter; import java.util.ArrayDeque; -import java.util.Collection; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -95,6 +95,35 @@ private static String elapsedString(Instant start, Instant end) { return activeFor.toString().substring(2); } + private static void removeIfNotActive( + Work queuedWork, Iterator workIterator, Deque workQueue) { + // Check to see if the queuedWork is active. We only want to remove it if it is NOT currently + // active. + if (!queuedWork.equals(workQueue.peek())) workIterator.remove(); + } + + private static Stream toHeartbeatRequestStream( + Entry> shardedKeyAndWorkQueue, + Instant refreshDeadline, + DataflowExecutionStateSampler sampler) { + ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); + Deque workQueue = shardedKeyAndWorkQueue.getValue(); + + return workQueue.stream() + .filter(work -> work.getStartTime().isBefore(refreshDeadline)) + // Don't send heartbeats for queued work we already know is failed. + .filter(work -> !work.isFailed()) + .map( + work -> + Windmill.HeartbeatRequest.newBuilder() + .setShardingKey(shardedKey.shardingKey()) + .setWorkToken(work.getWorkItem().getWorkToken()) + .setCacheToken(work.getWorkItem().getCacheToken()) + .addAllLatencyAttribution( + work.getLatencyAttributions(true, work.getLatencyTrackingId(), sampler)) + .build()); + } + /** * Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 3 {@link * ActivateWorkResult} @@ -106,7 +135,11 @@ private static String elapsedString(Instant start, Instant end) { * the {@link ShardedKey}'s work queue, mark the {@link Work} as a duplicate. * *

3. QUEUED: A work queue for the {@link ShardedKey} exists, and the work is not in the key's - * work queue, queue the work for later processing. + * work queue, OR the work in the work queue is stale, OR the work in the queue has a matching + * work token but different cache token, queue the work for later processing. + * + *

4. STALE: A work queue for the {@link ShardedKey} exists, and there is a queued {@link Work} + * with a greater workToken than the passed in {@link Work}. */ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work work) { Deque workQueue = activeWork.getOrDefault(shardedKey, new ArrayDeque<>()); @@ -120,14 +153,22 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w } // Check to see if we have this work token queued. - for (Work queuedWork : workQueue) { - // Work tokens and cache tokens are equal. + Iterator workIterator = workQueue.iterator(); + while (workIterator.hasNext()) { + Work queuedWork = workIterator.next(); if (queuedWork.id().equals(work.id())) { return ActivateWorkResult.DUPLICATE; - } - - if (queuedWork.id().cacheToken() == work.id().cacheToken()) { + } else if (queuedWork.id().cacheToken() == work.id().cacheToken()) { if (work.id().workToken() > queuedWork.id().workToken()) { + removeIfNotActive(queuedWork, workIterator, workQueue); + workQueue.addLast(work); + return ActivateWorkResult.QUEUED; + } else { + return ActivateWorkResult.STALE; + } + } else if (queuedWork.id().workToken() == work.id().workToken()) { + if (queuedWork.id().cacheToken() != work.id().cacheToken()) { + removeIfNotActive(queuedWork, workIterator, workQueue); workQueue.addLast(work); return ActivateWorkResult.QUEUED; } @@ -139,16 +180,6 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w return ActivateWorkResult.QUEUED; } - public static final class FailedTokens { - public long workToken; - public long cacheToken; - - public FailedTokens(long workToken, long cacheToken) { - this.workToken = workToken; - this.cacheToken = cacheToken; - } - } - /** * Fails any active work matching an element of the input Map. * @@ -239,7 +270,6 @@ private synchronized Optional getNextWork(Queue workQueue, ShardedKe if (!nextWork.isPresent()) { Preconditions.checkState(workQueue == activeWork.remove(shardedKey)); } - return nextWork; } @@ -266,14 +296,12 @@ private synchronized ImmutableMap getStuckCommitsAt( for (Entry> entry : activeWork.entrySet()) { ShardedKey shardedKey = entry.getKey(); @Nullable Work work = entry.getValue().peek(); - if (work != null) { - if (work.isStuckCommittingAt(stuckCommitDeadline)) { - LOG.error( - "Detected key {} stuck in COMMITTING state since {}, completing it with error.", - shardedKey, - work.getStateStartTime()); - stuckCommits.put(shardedKey, work.id()); - } + if (work != null && work.isStuckCommittingAt(stuckCommitDeadline)) { + LOG.error( + "Detected key {} stuck in COMMITTING state since {}, completing it with error.", + shardedKey, + work.getStateStartTime()); + stuckCommits.put(shardedKey, work.id()); } } @@ -287,28 +315,6 @@ synchronized ImmutableList getKeyHeartbeats( .collect(toImmutableList()); } - private static Stream toHeartbeatRequestStream( - Entry> shardedKeyAndWorkQueue, - Instant refreshDeadline, - DataflowExecutionStateSampler sampler) { - ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); - Deque workQueue = shardedKeyAndWorkQueue.getValue(); - - return workQueue.stream() - .filter(work -> work.getStartTime().isBefore(refreshDeadline)) - // Don't send heartbeats for queued work we already know is failed. - .filter(work -> !work.isFailed()) - .map( - work -> - Windmill.HeartbeatRequest.newBuilder() - .setShardingKey(shardedKey.shardingKey()) - .setWorkToken(work.getWorkItem().getWorkToken()) - .setCacheToken(work.getWorkItem().getCacheToken()) - .addAllLatencyAttribution( - work.getLatencyAttributions(true, work.getLatencyTrackingId(), sampler)) - .build()); - } - synchronized void printActiveWork(PrintWriter writer, Instant now) { writer.println( " getExecutionStateQueue() { /** * Mark the given {@link ShardedKey} and {@link Work} as active, and schedules execution of {@link - * Work} if there is no active {@link Work} for the {@link ShardedKey} already processing. + * Work} if there is no active {@link Work} for the {@link ShardedKey} already processing. Returns + * whether the {@link Work} will be activated, either immediately or sometime in the future. */ public boolean activateWork(ShardedKey shardedKey, Work work) { switch (activeWorkState.activateWorkForKey(shardedKey, work)) { case DUPLICATE: + // Fall through intentionally. Work was not and will not be activated in these cases. + case STALE: return false; case QUEUED: return true; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index dd4a2bed5564..693f02dabc8b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -133,57 +133,104 @@ public void testActivateWorkForKey_DUPLICATE() { } @Test - public void testActivateWorkForKey_withMatchingWorkTokenAndDifferentCacheToken_QUEUED() { + public void + testActivateWorkForKey_withMatchingWorkTokenAndDifferentCacheToken_queuedWorkIsNotActive_QUEUED() { long workToken = 10L; long cacheToken1 = 5L; - long cacheToken2 = 7L; + long cacheToken2 = cacheToken1 + 2L; Work firstWork = createWork(createWorkItem(workToken, cacheToken1)); Work secondWork = createWork(createWorkItem(workToken, cacheToken2)); + Work differentWorkTokenWork = createWork(createWorkItem(1L, 1L)); ShardedKey shardedKey = shardedKey("someKey", 1L); + activeWorkState.activateWorkForKey(shardedKey, differentWorkTokenWork); // ActivateWork with the same shardedKey, and the same workTokens, but different cacheTokens. activeWorkState.activateWorkForKey(shardedKey, firstWork); ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, secondWork); assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); - assertEquals(secondWork, readOnlyActiveWork.get(shardedKey).peek()); + assertFalse(readOnlyActiveWork.get(shardedKey).contains(firstWork)); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork)); + + Optional nextWork = + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, differentWorkTokenWork.id()); + assertTrue(nextWork.isPresent()); + assertSame(secondWork, nextWork.get()); } @Test - public void testActivateWorkForKey_withMatchingWorkId_newerWorkQueued_DUPLICATE() { + public void + testActivateWorkForKey_withMatchingWorkTokenAndDifferentCacheToken_queuedWorkIsActive_QUEUED() { long workToken = 10L; long cacheToken1 = 5L; + long cacheToken2 = 7L; - Work newerWork = createWork(createWorkItem(workToken, cacheToken1)); - Work olderWork = expiredWork(createWorkItem(workToken, cacheToken1)); + Work firstWork = createWork(createWorkItem(workToken, cacheToken1)); + Work secondWork = createWork(createWorkItem(workToken, cacheToken2)); ShardedKey shardedKey = shardedKey("someKey", 1L); - activeWorkState.activateWorkForKey(shardedKey, newerWork); + // ActivateWork with the same shardedKey, and the same workTokens, but different cacheTokens. + activeWorkState.activateWorkForKey(shardedKey, firstWork); ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, olderWork); + activeWorkState.activateWorkForKey(shardedKey, secondWork); - assertEquals(ActivateWorkResult.DUPLICATE, activateWorkResult); + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertEquals(firstWork, readOnlyActiveWork.get(shardedKey).peek()); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork)); + Optional nextWork = + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, firstWork.id()); + assertTrue(nextWork.isPresent()); + assertSame(secondWork, nextWork.get()); } @Test - public void testActivateWorkForKey_withMatchingWorkId_olderWorkQueued_QUEUED() { - long workToken = 10L; - long cacheToken1 = 5L; + public void + testActivateWorkForKey_matchingCacheTokens_newWorkTokenGreater_queuedWorkIsActive_QUEUED() { + long cacheToken = 1L; + long newWorkToken = 10L; + long queuedWorkToken = newWorkToken / 2; - Work newerWork = createWork(createWorkItem(workToken, cacheToken1)); - Work olderWork = expiredWork(createWorkItem(workToken, cacheToken1)); + Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); ShardedKey shardedKey = shardedKey("someKey", 1L); - activeWorkState.activateWorkForKey(shardedKey, olderWork); - ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, newerWork); + activeWorkState.activateWorkForKey(shardedKey, queuedWork); + ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); + + // newWork should be queued and queuedWork should not be removed since it is currently active. + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(newWork)); + assertEquals(queuedWork, readOnlyActiveWork.get(shardedKey).peek()); + } + + @Test + public void + testActivateWorkForKey_matchingCacheTokens_newWorkTokenGreater_queuedWorkNotActive_QUEUED() { + long cacheToken = 1L; + long newWorkToken = 10L; + long queuedWorkToken = newWorkToken / 2; + + Work differentWorkTokenWork = createWork(createWorkItem(1L, 1L)); + Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + activeWorkState.activateWorkForKey(shardedKey, differentWorkTokenWork); + activeWorkState.activateWorkForKey(shardedKey, queuedWork); + ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); + + // newWork should be queued and queuedWork should not be removed since it is currently active. assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); - assertEquals(newerWork, readOnlyActiveWork.get(shardedKey).peek()); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(newWork)); + assertFalse(readOnlyActiveWork.get(shardedKey).contains(queuedWork)); + assertEquals(differentWorkTokenWork, readOnlyActiveWork.get(shardedKey).peek()); } + @Test + public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenLesser_STALE() {} + @Test public void testActivateWorkForKey_QUEUED() { ShardedKey shardedKey = shardedKey("someKey", 1L); From feb276bed68220cdb0061f7704cfcb45801e420c Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 2 Nov 2023 18:46:17 -0700 Subject: [PATCH 03/11] add test cases for ActiveWorkState --- .../worker/streaming/ActiveWorkStateTest.java | 26 ++++++++++++++----- 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 693f02dabc8b..f375b4fbbe60 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -208,20 +208,19 @@ public void testActivateWorkForKey_DUPLICATE() { @Test public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenGreater_queuedWorkNotActive_QUEUED() { - long cacheToken = 1L; + long matchingCacheToken = 1L; long newWorkToken = 10L; long queuedWorkToken = newWorkToken / 2; - Work differentWorkTokenWork = createWork(createWorkItem(1L, 1L)); - Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); - Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); + Work differentWorkTokenWork = createWork(createWorkItem(100L, 100L)); + Work queuedWork = createWork(createWorkItem(queuedWorkToken, matchingCacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, matchingCacheToken)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, differentWorkTokenWork); activeWorkState.activateWorkForKey(shardedKey, queuedWork); ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); - // newWork should be queued and queuedWork should not be removed since it is currently active. assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); assertTrue(readOnlyActiveWork.get(shardedKey).contains(newWork)); assertFalse(readOnlyActiveWork.get(shardedKey).contains(queuedWork)); @@ -229,7 +228,22 @@ public void testActivateWorkForKey_DUPLICATE() { } @Test - public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenLesser_STALE() {} + public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenLesser_STALE() { + long cacheToken = 1L; + long queuedWorkToken = 10L; + long newWorkToken = queuedWorkToken / 2; + + Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, queuedWork); + ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); + + assertEquals(ActivateWorkResult.STALE, activateWorkResult); + assertFalse(readOnlyActiveWork.get(shardedKey).contains(newWork)); + assertEquals(queuedWork, readOnlyActiveWork.get(shardedKey).peek()); + } @Test public void testActivateWorkForKey_QUEUED() { From 310381b31d94e8fcb2d76ffeef14fe03ccc7834b Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 2 Nov 2023 19:22:03 -0700 Subject: [PATCH 04/11] update StreamingDataflowWorker and test to reflect new behavior --- .../worker/StreamingDataflowWorker.java | 2 +- .../worker/StreamingDataflowWorkerTest.java | 59 ++++++++++--------- 2 files changed, 31 insertions(+), 30 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index e023b1ad980b..0c78f7821fa2 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -95,8 +95,8 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.Work.State; -import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.streaming.WorkId; +import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.util.common.worker.ElementCounter; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index e1574f6fc084..85b598fd7676 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -267,7 +267,6 @@ public Long get() { @Rule public ErrorCollector errorCollector = new ErrorCollector(); WorkUnitClient mockWorkUnitClient = mock(WorkUnitClient.class); HotKeyLogger hotKeyLogger = mock(HotKeyLogger.class); - volatile boolean stop = false; public StreamingDataflowWorkerTest(Boolean streamingEngine) { this.streamingEngine = streamingEngine; @@ -287,17 +286,13 @@ private static CounterUpdate getCounter(Iterable counters, String return null; } - static Work createMockWork(long workToken, long cacheToken) { - return createMockWork(workToken, cacheToken, work -> {}); + static Work createMockWork(long workToken) { + return createMockWork(workToken, work -> {}); } - static Work createMockWork(long workToken, long cacheToken, Consumer processWorkFn) { + static Work createMockWork(long workToken, Consumer processWorkFn) { return Work.create( - Windmill.WorkItem.newBuilder() - .setKey(ByteString.EMPTY) - .setWorkToken(workToken) - .setCacheToken(cacheToken) - .build(), + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(workToken).build(), Instant::now, Collections.emptyList(), processWorkFn); @@ -2656,7 +2651,7 @@ public void testUnboundedSourceWorkRetry() throws Exception { } @Test - public void testActiveWork() { + public void testActiveWork() throws Exception { BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class); ComputationState computationState = new ComputationState( @@ -2669,22 +2664,22 @@ public void testActiveWork() { ShardedKey key1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); ShardedKey key2 = ShardedKey.create(ByteString.copyFromUtf8("key2"), 2); - Work m1 = createMockWork(1, 1); + Work m1 = createMockWork(1); assertTrue(computationState.activateWork(key1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); computationState.completeWorkAndScheduleNextWorkForKey(key1, m1.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. - Work m2 = createMockWork(2, 2); + Work m2 = createMockWork(2); assertTrue(computationState.activateWork(key1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); - Work m3 = createMockWork(3, 3); + Work m3 = createMockWork(3); assertTrue(computationState.activateWork(key1, m3)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify another key is a separate queue. - Work m4 = createMockWork(4, 4); + Work m4 = createMockWork(4); assertTrue(computationState.activateWork(key2, m4)); Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize()); computationState.completeWorkAndScheduleNextWorkForKey(key2, m4.id()); @@ -2696,7 +2691,7 @@ public void testActiveWork() { Mockito.verifyNoMoreInteractions(mockExecutor); // Verify duplicate work dropped. - Work m5 = createMockWork(5, 5); + Work m5 = createMockWork(5); computationState.activateWork(key1, m5); Mockito.verify(mockExecutor).execute(m5, m5.getWorkItem().getSerializedSize()); assertFalse(computationState.activateWork(key1, m5)); @@ -2706,7 +2701,7 @@ public void testActiveWork() { } @Test - public void testActiveWorkForShardedKeys() throws Exception { + public void testActiveWorkForShardedKeys() { BoundedQueueExecutor mockExecutor = Mockito.mock(BoundedQueueExecutor.class); ComputationState computationState = new ComputationState( @@ -2719,30 +2714,30 @@ public void testActiveWorkForShardedKeys() throws Exception { ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); ShardedKey key1Shard2 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 2); - Work m1 = createMockWork(1, 1); + Work m1 = createMockWork(1); assertTrue(computationState.activateWork(key1Shard1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); computationState.completeWorkAndScheduleNextWorkForKey(key1Shard1, m1.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. - Work m2 = createMockWork(2, 2); + Work m2 = createMockWork(2); assertTrue(computationState.activateWork(key1Shard1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); - Work m3 = createMockWork(3, 3); + Work m3 = createMockWork(3); assertTrue(computationState.activateWork(key1Shard1, m3)); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify a different shard of key is a separate queue. - Work m4 = createMockWork(3, 3); - assertFalse(computationState.activateWork(key1Shard1, m4)); + Work m4 = createMockWork(3); + assertTrue(computationState.activateWork(key1Shard1, m4)); Mockito.verifyNoMoreInteractions(mockExecutor); assertTrue(computationState.activateWork(key1Shard2, m4)); Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize()); // Verify duplicate work dropped assertFalse(computationState.activateWork(key1Shard2, m4)); - computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, m3.id()); + computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, m4.id()); Mockito.verifyNoMoreInteractions(mockExecutor); } @@ -2787,8 +2782,8 @@ public void testMaxThreadMetric() throws Exception { } }; - Work m2 = createMockWork(2, 2, sleepProcessWorkFn); - Work m3 = createMockWork(3, 3, sleepProcessWorkFn); + Work m2 = createMockWork(2, sleepProcessWorkFn); + Work m3 = createMockWork(3, sleepProcessWorkFn); assertTrue(computationState.activateWork(key1Shard1, m2)); assertTrue(computationState.activateWork(key1Shard1, m3)); @@ -2846,11 +2841,11 @@ public void testActiveThreadMetric() throws Exception { } }; - Work m2 = createMockWork(2, 2, sleepProcessWorkFn); + Work m2 = createMockWork(2, sleepProcessWorkFn); - Work m3 = createMockWork(3, 3, sleepProcessWorkFn); + Work m3 = createMockWork(3, sleepProcessWorkFn); - Work m4 = createMockWork(4, 4, sleepProcessWorkFn); + Work m4 = createMockWork(4, sleepProcessWorkFn); assertEquals(0, executor.activeCount()); assertTrue(computationState.activateWork(key1Shard1, m2)); @@ -3208,7 +3203,8 @@ public void testExceptionInvalidatesCache() throws Exception { // The commit will include a timer to clean up state - this timer is irrelevant // for the current test. Also remove source_bytes_processed because it's dynamic. setValuesTimestamps( - removeDynamicFields(commit).toBuilder() + removeDynamicFields(commit) + .toBuilder() .clearOutputTimers() .clearSourceBytesProcessed()) .build(), @@ -3312,7 +3308,12 @@ public void testActiveWorkFailure() throws Exception { @Test public void testLatencyAttributionProtobufsPopulated() { FakeClock clock = new FakeClock(); - Work work = Work.create(null, clock, Collections.emptyList(), unused -> {}); + Work work = + Work.create( + Windmill.WorkItem.newBuilder().setKey(ByteString.EMPTY).setWorkToken(1L).build(), + clock, + Collections.emptyList(), + unused -> {}); clock.sleep(Duration.millis(10)); work.setState(Work.State.PROCESSING); From e38eab7c6ba63f21c5029eeeae40b77c4928fe54 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 2 Nov 2023 19:25:15 -0700 Subject: [PATCH 05/11] remove unused methods in WorkId --- .../dataflow/worker/streaming/WorkId.java | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java index 045eb641928e..d56b56c184c9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.worker.streaming; import com.google.auto.value.AutoValue; -import org.apache.beam.vendor.grpc.v1p54p0.com.google.common.primitives.Longs; /** * A composite key used to identify a unit of {@link Work}. If multiple units of {@link Work} have @@ -28,7 +27,7 @@ * Work} is obsolete. */ @AutoValue -public abstract class WorkId implements Comparable { +public abstract class WorkId { public static Builder builder() { return new AutoValue_WorkId.Builder(); @@ -38,19 +37,6 @@ public static Builder builder() { abstract long workToken(); - boolean isRetryOf(WorkId other) { - return other.workToken() == workToken() && other.cacheToken() != cacheToken(); - } - - boolean isForSameWork(WorkId other) { - return workToken() == other.workToken(); - } - - @Override - public final int compareTo(WorkId other) { - return Longs.compare(workToken(), other.workToken()); - } - @AutoValue.Builder public abstract static class Builder { public abstract Builder setCacheToken(long value); From 91b3e993857e88cd7f739360cb6bc748cd605900 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 16 Nov 2023 13:24:19 -0800 Subject: [PATCH 06/11] address cl comments --- .../worker/streaming/ActiveWorkState.java | 39 +++++++++++++------ .../dataflow/worker/streaming/Work.java | 7 ++++ .../worker/StreamingDataflowWorkerTest.java | 5 ++- .../worker/streaming/ActiveWorkStateTest.java | 15 +++---- 4 files changed, 45 insertions(+), 21 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 110eb3813435..c60b1707cbd4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -21,6 +21,7 @@ import java.io.PrintWriter; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; @@ -37,6 +38,7 @@ import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.sdk.annotations.Internal; @@ -89,6 +91,20 @@ static ActiveWorkState forTesting( return new ActiveWorkState(activeWork, computationStateCache); } + private static Stream makeHeartbeatKeyedGetDataRequests( + ShardedKey shardedKey, Collection workQueue, Instant refreshDeadline) { + return workQueue.stream() + .filter(work -> work.getStartTime().isBefore(refreshDeadline)) + .map( + work -> + KeyedGetDataRequest.newBuilder() + .setKey(shardedKey.key()) + .setShardingKey(shardedKey.shardingKey()) + .setWorkToken(work.getWorkItem().getWorkToken()) + .addAllLatencyAttribution(work.getLatencyAttributions()) + .build()); + } + private static String elapsedString(Instant start, Instant end) { Duration activeFor = new Duration(start, end); // Duration's toString always starts with "PT"; remove that here. @@ -159,18 +175,13 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w if (queuedWork.id().equals(work.id())) { return ActivateWorkResult.DUPLICATE; } else if (queuedWork.id().cacheToken() == work.id().cacheToken()) { - if (work.id().workToken() > queuedWork.id().workToken()) { - removeIfNotActive(queuedWork, workIterator, workQueue); - workQueue.addLast(work); - return ActivateWorkResult.QUEUED; - } else { + if (work.id().workToken() <= queuedWork.id().workToken()) { return ActivateWorkResult.STALE; } - } else if (queuedWork.id().workToken() == work.id().workToken()) { - if (queuedWork.id().cacheToken() != work.id().cacheToken()) { - removeIfNotActive(queuedWork, workIterator, workQueue); - workQueue.addLast(work); - return ActivateWorkResult.QUEUED; + + if (!queuedWork.equals(workQueue.peek())) { + // We only want to remove it if it is NOT currently active. + workIterator.remove(); } } } @@ -245,8 +256,8 @@ private synchronized void removeCompletedWorkFromQueue( () -> new IllegalStateException( String.format( - "Active key %s without work, expected work_token %d, expected cache_token %d", - shardedKey, workId.workToken(), workId.cacheToken()))); + "Active key %s without work, expected work_id= %s", + shardedKey, workId))); if (!completedWork.id().equals(workId)) { // Work may have been completed due to clearing of stuck commits. @@ -312,6 +323,10 @@ synchronized ImmutableList getKeyHeartbeats( Instant refreshDeadline, DataflowExecutionStateSampler sampler) { return activeWork.entrySet().stream() .flatMap(entry -> toHeartbeatRequestStream(entry, refreshDeadline, sampler)) + .flatMap( + entry -> + makeHeartbeatKeyedGetDataRequests( + entry.getKey(), entry.getValue(), refreshDeadline)) .collect(toImmutableList()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index b3167248d224..644869c6fe4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -206,6 +206,13 @@ public WorkId id() { return id; } + @Override + public final String toString() { + return String.format( + "work_id:[%s]; work_item:[%s]; start_time:[%s]; current_state:[%s]", + id, workItem, startTime, currentState); + } + public enum State { QUEUED(Windmill.LatencyAttribution.State.QUEUED), PROCESSING(Windmill.LatencyAttribution.State.ACTIVE), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 85b598fd7676..413f875def19 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2725,12 +2725,13 @@ public void testActiveWorkForShardedKeys() { assertTrue(computationState.activateWork(key1Shard1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); Work m3 = createMockWork(3); - assertTrue(computationState.activateWork(key1Shard1, m3)); + boolean activateWork = computationState.activateWork(key1Shard1, m3); + assertTrue(activateWork); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify a different shard of key is a separate queue. Work m4 = createMockWork(3); - assertTrue(computationState.activateWork(key1Shard1, m4)); + assertFalse(computationState.activateWork(key1Shard1, m4)); Mockito.verifyNoMoreInteractions(mockExecutor); assertTrue(computationState.activateWork(key1Shard2, m4)); Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index f375b4fbbe60..f2ba6739cdf1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -77,7 +77,7 @@ private static WorkItem createWorkItem(long workToken, long cacheToken) { .build(); } - private static WorkId workDedupeToken(long workToken, long cacheToken) { + private static WorkId workDedupeId(long workToken, long cacheToken) { return WorkId.builder().setCacheToken(cacheToken).setWorkToken(workToken).build(); } @@ -109,7 +109,7 @@ public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { Optional nextWorkForKey = activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workDedupeToken(workToken, cacheToken)); + shardedKey, workDedupeId(workToken, cacheToken)); assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult); assertEquals(Optional.empty(), nextWorkForKey); @@ -151,13 +151,14 @@ public void testActivateWorkForKey_DUPLICATE() { activeWorkState.activateWorkForKey(shardedKey, secondWork); assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); - assertFalse(readOnlyActiveWork.get(shardedKey).contains(firstWork)); + // Different cacheTokens, so no work should be removed from the queue. + assertTrue(readOnlyActiveWork.get(shardedKey).contains(firstWork)); assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork)); Optional nextWork = activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, differentWorkTokenWork.id()); assertTrue(nextWork.isPresent()); - assertSame(secondWork, nextWork.get()); + assertSame(firstWork, nextWork.get()); } @Test @@ -262,7 +263,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { assertEquals( Optional.empty(), activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey("someKey", 1L), workDedupeToken(1L, 1L))); + shardedKey("someKey", 1L), workDedupeId(1L, 1L))); } @Test @@ -276,7 +277,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { activeWorkState.activateWorkForKey(shardedKey, workInQueue); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workDedupeToken(otherWorkToken, cacheToken)); + shardedKey, workDedupeId(otherWorkToken, cacheToken)); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); @@ -293,7 +294,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { activeWorkState.activateWorkForKey(shardedKey, workInQueue); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workDedupeToken(workToken, otherCacheToken)); + shardedKey, workDedupeId(workToken, otherCacheToken)); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); From 89686df70ee1921861ab487faf34676415780ab8 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Fri, 2 Feb 2024 14:38:42 -0800 Subject: [PATCH 07/11] rebase, address cl comments --- .../worker/StreamingDataflowWorker.java | 13 +- .../worker/streaming/ActiveWorkState.java | 81 +++++--- .../worker/streaming/ComputationState.java | 9 +- .../dataflow/worker/streaming/Work.java | 10 + .../dataflow/worker/streaming/WorkId.java | 48 +++++ .../worker/StreamingDataflowWorkerTest.java | 14 +- .../worker/streaming/ActiveWorkStateTest.java | 194 ++++++++++++++---- 7 files changed, 291 insertions(+), 78 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index e8ca3a2834f9..c459cf8ae1c1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -97,6 +97,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.WeightedBoundedQueue; import org.apache.beam.runners.dataflow.worker.streaming.Work; import org.apache.beam.runners.dataflow.worker.streaming.Work.State; +import org.apache.beam.runners.dataflow.worker.streaming.WorkId; import org.apache.beam.runners.dataflow.worker.streaming.sideinput.SideInputStateFetcher; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; @@ -1308,7 +1309,7 @@ public void close() { // Consider the item invalid. It will eventually be retried by Windmill if it still needs to // be processed. computationState.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(key, workItem.getShardingKey()), workItem.getWorkToken()); + ShardedKey.create(key, workItem.getShardingKey()), work.id()); } } finally { // Update total processing time counters. Updating in finally clause ensures that @@ -1386,7 +1387,10 @@ private void commitLoop() { for (Windmill.WorkItemCommitRequest workRequest : entry.getValue().getRequestsList()) { computationState.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(workRequest.getKey(), workRequest.getShardingKey()), - workRequest.getWorkToken()); + WorkId.builder() + .setCacheToken(workRequest.getCacheToken()) + .setWorkToken(workRequest.getWorkToken()) + .build()); } } } @@ -1428,7 +1432,10 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) activeCommitBytes.addAndGet(-size); state.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(request.getKey(), request.getShardingKey()), - request.getWorkToken()); + WorkId.builder() + .setCacheToken(request.getCacheToken()) + .setWorkToken(request.getWorkToken()) + .build()); })) { return true; } else { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index b4b469323932..ca8aeb037a30 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -24,6 +24,7 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -60,7 +61,7 @@ public final class ActiveWorkState { private static final Logger LOG = LoggerFactory.getLogger(ActiveWorkState.class); - /* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown.*/ + /* The max number of keys in COMMITTING or COMMIT_QUEUED status to be shown for observability.*/ private static final int MAX_PRINTABLE_COMMIT_PENDING_KEYS = 50; /** @@ -106,7 +107,7 @@ private static String elapsedString(Instant start, Instant end) { } /** - * Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 3 {@link + * Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 4 {@link * ActivateWorkResult} * *

1. EXECUTE: The {@link ShardedKey} has not been seen before, create a {@link Queue} @@ -116,7 +117,11 @@ private static String elapsedString(Instant start, Instant end) { * the {@link ShardedKey}'s work queue, mark the {@link Work} as a duplicate. * *

3. QUEUED: A work queue for the {@link ShardedKey} exists, and the work is not in the key's - * work queue, queue the work for later processing. + * work queue, OR the work in the work queue is stale, OR the work in the queue has a matching + * work token but different cache token, queue the work for later processing. + * + *

4. STALE: A work queue for the {@link ShardedKey} exists, and there is a queued {@link Work} + * with a greater workToken than the passed in {@link Work}. */ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work work) { Deque workQueue = activeWork.getOrDefault(shardedKey, new ArrayDeque<>()); @@ -129,11 +134,22 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w return ActivateWorkResult.EXECUTE; } - // Ensure we don't already have this work token queued. - for (Work queuedWork : workQueue) { - if (queuedWork.getWorkItem().getWorkToken() == work.getWorkItem().getWorkToken()) { + // Check to see if we have this work token queued. + Iterator workIterator = workQueue.iterator(); + while (workIterator.hasNext()) { + Work queuedWork = workIterator.next(); + if (queuedWork.id().equals(work.id())) { return ActivateWorkResult.DUPLICATE; } + if (queuedWork.id().cacheToken() == work.id().cacheToken()) { + if (work.id().workToken() > queuedWork.id().workToken()) { + removeIfNotActive(queuedWork, workIterator, workQueue); + workQueue.addLast(work); + // Continue here to possibly remove more non-active stale work that is queued. + } else { + return ActivateWorkResult.STALE; + } + } } // Queue the work for later processing. @@ -213,34 +229,38 @@ private void decrementActiveWorkBudget(Work work) { * #activeWork}. */ synchronized Optional completeWorkAndGetNextWorkForKey( - ShardedKey shardedKey, long workToken) { + ShardedKey shardedKey, WorkId workId) { @Nullable Queue workQueue = activeWork.get(shardedKey); if (workQueue == null) { // Work may have been completed due to clearing of stuck commits. - LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workToken); + LOG.warn("Unable to complete inactive work for key {} and token {}.", shardedKey, workId); return Optional.empty(); } - removeCompletedWorkFromQueue(workQueue, shardedKey, workToken); + removeCompletedWorkFromQueue(workQueue, shardedKey, workId); return getNextWork(workQueue, shardedKey); } private synchronized void removeCompletedWorkFromQueue( - Queue workQueue, ShardedKey shardedKey, long workToken) { + Queue workQueue, ShardedKey shardedKey, WorkId workId) { + // avoid Preconditions.checkState here to prevent eagerly evaluating the + // format string parameters for the error message. Work completedWork = workQueue.peek(); if (completedWork == null) { // Work may have been completed due to clearing of stuck commits. - LOG.warn( - String.format("Active key %s without work, expected token %d", shardedKey, workToken)); + LOG.warn("Active key {} without work, expected token {}", shardedKey, workId); return; } - if (completedWork.getWorkItem().getWorkToken() != workToken) { + if (!completedWork.id().equals(workId)) { // Work may have been completed due to clearing of stuck commits. LOG.warn( - "Unable to complete due to token mismatch for key {} and token {}, actual token was {}.", + "Unable to complete due to token mismatch for " + + "key {}," + + "expected work_id {}, " + + "actual work_id was {}", shardedKey, - workToken, - completedWork.getWorkItem().getWorkToken()); + workId, + completedWork.id()); return; } @@ -263,21 +283,21 @@ private synchronized Optional getNextWork(Queue workQueue, ShardedKe * before the stuckCommitDeadline. */ synchronized void invalidateStuckCommits( - Instant stuckCommitDeadline, BiConsumer shardedKeyAndWorkTokenConsumer) { - for (Entry shardedKeyAndWorkToken : + Instant stuckCommitDeadline, BiConsumer shardedKeyAndWorkTokenConsumer) { + for (Entry shardedKeyAndWorkId : getStuckCommitsAt(stuckCommitDeadline).entrySet()) { - ShardedKey shardedKey = shardedKeyAndWorkToken.getKey(); - long workToken = shardedKeyAndWorkToken.getValue(); + ShardedKey shardedKey = shardedKeyAndWorkId.getKey(); + WorkId workId = shardedKeyAndWorkId.getValue(); computationStateCache.invalidate(shardedKey.key(), shardedKey.shardingKey()); - shardedKeyAndWorkTokenConsumer.accept(shardedKey, workToken); + shardedKeyAndWorkTokenConsumer.accept(shardedKey, workId); } } - private synchronized ImmutableMap getStuckCommitsAt( + private synchronized ImmutableMap getStuckCommitsAt( Instant stuckCommitDeadline) { // Determine the stuck commit keys but complete them outside the loop iterating over // activeWork as completeWork may delete the entry from activeWork. - ImmutableMap.Builder stuckCommits = ImmutableMap.builder(); + ImmutableMap.Builder stuckCommits = ImmutableMap.builder(); for (Entry> entry : activeWork.entrySet()) { ShardedKey shardedKey = entry.getKey(); @Nullable Work work = entry.getValue().peek(); @@ -287,7 +307,7 @@ private synchronized ImmutableMap getStuckCommitsAt( "Detected key {} stuck in COMMITTING state since {}, completing it with error.", shardedKey, work.getStateStartTime()); - stuckCommits.put(shardedKey, work.getWorkItem().getWorkToken()); + stuckCommits.put(shardedKey, work.id()); } } } @@ -320,7 +340,8 @@ private static Stream toHeartbeatRequestStream( .setWorkToken(work.getWorkItem().getWorkToken()) .setCacheToken(work.getWorkItem().getCacheToken()) .addAllLatencyAttribution( - work.getLatencyAttributions(true, work.getLatencyTrackingId(), sampler)) + work.getLatencyAttributions( + /* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler)) .build()); } @@ -383,9 +404,17 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { writer.println("
"); } + private static void removeIfNotActive( + Work queuedWork, Iterator workIterator, Deque workQueue) { + // Check to see if the queuedWork is active. We only want to remove it if it is NOT currently + // active. + if (!queuedWork.equals(workQueue.peek())) workIterator.remove(); + } + enum ActivateWorkResult { QUEUED, EXECUTE, - DUPLICATE + DUPLICATE, + STALE } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java index 8207a6ef2f09..2682e91d8227 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java @@ -81,11 +81,14 @@ public ConcurrentLinkedQueue getExecutionStateQueue() { /** * Mark the given {@link ShardedKey} and {@link Work} as active, and schedules execution of {@link - * Work} if there is no active {@link Work} for the {@link ShardedKey} already processing. + * Work} if there is no active {@link Work} for the {@link ShardedKey} already processing. Returns + * whether the {@link Work} will be activated, either immediately or sometime in the future. */ public boolean activateWork(ShardedKey shardedKey, Work work) { switch (activeWorkState.activateWorkForKey(shardedKey, work)) { case DUPLICATE: + // Fall through intentionally. Work was not and will not be activated in these cases. + case STALE: return false; case QUEUED: return true; @@ -107,9 +110,9 @@ public void failWork(Map> failedWork) { /** * Marks the work for the given shardedKey as complete. Schedules queued work for the key if any. */ - public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, long workToken) { + public void completeWorkAndScheduleNextWorkForKey(ShardedKey shardedKey, WorkId workId) { activeWorkState - .completeWorkAndGetNextWorkForKey(shardedKey, workToken) + .completeWorkAndGetNextWorkForKey(shardedKey, workId) .ifPresent(this::forceExecute); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index 6c85c615af15..99cdaad200e4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -47,6 +47,7 @@ public class Work implements Runnable { private final Instant startTime; private final Map totalDurationPerState; private final Consumer processWorkFn; + private final WorkId id; private TimedState currentState; private volatile boolean isFailed; @@ -58,6 +59,11 @@ private Work(Windmill.WorkItem workItem, Supplier clock, Consumer this.totalDurationPerState = new EnumMap<>(Windmill.LatencyAttribution.State.class); this.currentState = TimedState.initialState(startTime); this.isFailed = false; + this.id = + WorkId.builder() + .setCacheToken(workItem.getCacheToken()) + .setWorkToken(workItem.getWorkToken()) + .build(); } public static Work create( @@ -116,6 +122,10 @@ public String getLatencyTrackingId() { return workIdBuilder.toString(); } + public WorkId id() { + return id; + } + private void recordGetWorkStreamLatencies( Collection getWorkStreamLatencies) { for (Windmill.LatencyAttribution latency : getWorkStreamLatencies) { diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java new file mode 100644 index 000000000000..d56b56c184c9 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/WorkId.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming; + +import com.google.auto.value.AutoValue; + +/** + * A composite key used to identify a unit of {@link Work}. If multiple units of {@link Work} have + * the same workToken AND cacheToken, the {@link Work} is a duplicate. If multiple units of {@link + * Work} have the same workToken, but different cacheTokens, the {@link Work} is a retry. If + * multiple units of {@link Work} have the same cacheToken, but different workTokens, the {@link + * Work} is obsolete. + */ +@AutoValue +public abstract class WorkId { + + public static Builder builder() { + return new AutoValue_WorkId.Builder(); + } + + abstract long cacheToken(); + + abstract long workToken(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setCacheToken(long value); + + public abstract Builder setWorkToken(long value); + + public abstract WorkId build(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index e7eedcf3780a..cf2a74bf7cdf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2677,7 +2677,7 @@ public void testActiveWork() throws Exception { Work m1 = createMockWork(1); assertTrue(computationState.activateWork(key1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 1); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m1.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. @@ -2692,12 +2692,12 @@ public void testActiveWork() throws Exception { Work m4 = createMockWork(4); assertTrue(computationState.activateWork(key2, m4)); Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key2, 4); + computationState.completeWorkAndScheduleNextWorkForKey(key2, m4.id()); Mockito.verifyNoMoreInteractions(mockExecutor); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 2); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m2.id()); Mockito.verify(mockExecutor).forceExecute(m3, m3.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 3); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m3.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify duplicate work dropped. @@ -2706,7 +2706,7 @@ public void testActiveWork() throws Exception { Mockito.verify(mockExecutor).execute(m5, m5.getWorkItem().getSerializedSize()); assertFalse(computationState.activateWork(key1, m5)); Mockito.verifyNoMoreInteractions(mockExecutor); - computationState.completeWorkAndScheduleNextWorkForKey(key1, 5); + computationState.completeWorkAndScheduleNextWorkForKey(key1, m5.id()); Mockito.verifyNoMoreInteractions(mockExecutor); } @@ -2727,7 +2727,7 @@ public void testActiveWorkForShardedKeys() throws Exception { Work m1 = createMockWork(1); assertTrue(computationState.activateWork(key1Shard1, m1)); Mockito.verify(mockExecutor).execute(m1, m1.getWorkItem().getSerializedSize()); - computationState.completeWorkAndScheduleNextWorkForKey(key1Shard1, 1); + computationState.completeWorkAndScheduleNextWorkForKey(key1Shard1, m1.id()); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify work queues. @@ -2747,7 +2747,7 @@ public void testActiveWorkForShardedKeys() throws Exception { // Verify duplicate work dropped assertFalse(computationState.activateWork(key1Shard2, m4)); - computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, 3); + computationState.completeWorkAndScheduleNextWorkForKey(key1Shard2, m4.id()); Mockito.verifyNoMoreInteractions(mockExecutor); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 82ff24c03bb8..170e6e3046cf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -69,11 +69,16 @@ private static Work expiredWork(Windmill.WorkItem workItem) { return Work.create(workItem, () -> Instant.EPOCH, Collections.emptyList(), unused -> {}); } - private static Windmill.WorkItem createWorkItem(long workToken) { + private static WorkId workId(long workToken, long cacheToken) { + return WorkId.builder().setCacheToken(cacheToken).setWorkToken(workToken).build(); + } + + private static Windmill.WorkItem createWorkItem(long workToken, long cacheToken) { return Windmill.WorkItem.newBuilder() .setKey(ByteString.copyFromUtf8("")) .setShardingKey(1) .setWorkToken(workToken) + .setCacheToken(cacheToken) .build(); } @@ -98,12 +103,14 @@ public void testActivateWorkForKey_EXECUTE_unknownKey() { public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { ShardedKey shardedKey = shardedKey("someKey", 1L); long workToken = 1L; + long cacheToken = 2L; ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey( + shardedKey, createWork(createWorkItem(workToken, cacheToken))); Optional nextWorkForKey = - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workToken); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workId(workToken, cacheToken)); assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult); assertEquals(Optional.empty(), nextWorkForKey); @@ -116,9 +123,9 @@ public void testActivateWorkForKey_DUPLICATE() { ShardedKey shardedKey = shardedKey("someKey", 1L); // ActivateWork with the same shardedKey, and the same workTokens. - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken, 1L))); ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(workToken, 1L))); assertEquals(ActivateWorkResult.DUPLICATE, activateWorkResult); } @@ -128,9 +135,9 @@ public void testActivateWorkForKey_QUEUED() { ShardedKey shardedKey = shardedKey("someKey", 1L); // ActivateWork with the same shardedKey, but different workTokens. - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(1L))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(1L, 1L))); ActivateWorkResult activateWorkResult = - activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(2L))); + activeWorkState.activateWorkForKey(shardedKey, createWork(createWorkItem(2L, 1L))); assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); } @@ -139,18 +146,22 @@ public void testActivateWorkForKey_QUEUED() { public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { assertEquals( Optional.empty(), - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey("someKey", 1L), 10L)); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey("someKey", 1L), workId(1L, 1L))); } @Test - public void testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchWorkToComplete() { - long workTokenToComplete = 1L; - - Work workInQueue = createWork(createWorkItem(2L)); + public void + testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueWorkTokenDoesNotMatchWorkToComplete() { + long workTokenInQueue = 2L; + long otherWorkToken = 1L; + long cacheToken = 1L; + Work workInQueue = createWork(createWorkItem(workTokenInQueue, cacheToken)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workInQueue); - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete); + activeWorkState.completeWorkAndGetNextWorkForKey( + shardedKey, workId(otherWorkToken, cacheToken)); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); @@ -158,15 +169,13 @@ public void testCompleteWorkAndGetNextWorkForKey_currentWorkInQueueDoesNotMatchW @Test public void testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplete() { - long workTokenToComplete = 1L; - - Work activeWork = createWork(createWorkItem(workTokenToComplete)); - Work nextWork = createWork(createWorkItem(2L)); + Work activeWork = createWork(createWorkItem(1L, 1L)); + Work nextWork = createWork(createWorkItem(2L, 2L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, activeWork); activeWorkState.activateWorkForKey(shardedKey, nextWork); - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workTokenToComplete); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, activeWork.id()); assertEquals(nextWork, readOnlyActiveWork.get(shardedKey).peek()); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); @@ -175,37 +184,33 @@ public void testCompleteWorkAndGetNextWorkForKey_removesWorkFromQueueWhenComplet @Test public void testCompleteWorkAndGetNextWorkForKey_removesQueueIfNoWorkPresent() { - Work workInQueue = createWork(createWorkItem(1L)); + Work workInQueue = createWork(createWorkItem(1L, 1L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workInQueue); - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workInQueue.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workInQueue.id()); assertFalse(readOnlyActiveWork.containsKey(shardedKey)); } @Test public void testCompleteWorkAndGetNextWorkForKey_returnsWorkIfPresent() { - Work workToBeCompleted = createWork(createWorkItem(1L)); - Work nextWork = createWork(createWorkItem(2L)); + Work workToBeCompleted = createWork(createWorkItem(1L, 1L)); + Work nextWork = createWork(createWorkItem(2L, 2L)); ShardedKey shardedKey = shardedKey("someKey", 1L); activeWorkState.activateWorkForKey(shardedKey, workToBeCompleted); activeWorkState.activateWorkForKey(shardedKey, nextWork); - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workToBeCompleted.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workToBeCompleted.id()); Optional nextWorkOpt = - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workToBeCompleted.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, workToBeCompleted.id()); assertTrue(nextWorkOpt.isPresent()); assertSame(nextWork, nextWorkOpt.get()); Optional endOfWorkQueue = - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, nextWork.getWorkItem().getWorkToken()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, nextWork.id()); assertFalse(endOfWorkQueue.isPresent()); assertFalse(readOnlyActiveWork.containsKey(shardedKey)); @@ -283,11 +288,11 @@ public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_mult @Test public void testInvalidateStuckCommits() { - Map invalidatedCommits = new HashMap<>(); + Map invalidatedCommits = new HashMap<>(); - Work stuckWork1 = expiredWork(createWorkItem(1L)); + Work stuckWork1 = expiredWork(createWorkItem(1L, 1L)); stuckWork1.setState(Work.State.COMMITTING); - Work stuckWork2 = expiredWork(createWorkItem(2L)); + Work stuckWork2 = expiredWork(createWorkItem(2L, 1L)); stuckWork2.setState(Work.State.COMMITTING); ShardedKey shardedKey1 = shardedKey("someKey", 1L); ShardedKey shardedKey2 = shardedKey("anotherKey", 2L); @@ -297,22 +302,133 @@ public void testInvalidateStuckCommits() { activeWorkState.invalidateStuckCommits(Instant.now(), invalidatedCommits::put); - assertThat(invalidatedCommits) - .containsEntry(shardedKey1, stuckWork1.getWorkItem().getWorkToken()); - assertThat(invalidatedCommits) - .containsEntry(shardedKey2, stuckWork2.getWorkItem().getWorkToken()); + assertThat(invalidatedCommits).containsEntry(shardedKey1, stuckWork1.id()); + assertThat(invalidatedCommits).containsEntry(shardedKey2, stuckWork2.id()); verify(computationStateCache).invalidate(shardedKey1.key(), shardedKey1.shardingKey()); verify(computationStateCache).invalidate(shardedKey2.key(), shardedKey2.shardingKey()); } + @Test + public void + testActivateWorkForKey_withMatchingWorkTokenAndDifferentCacheToken_queuedWorkIsNotActive_QUEUED() { + long workToken = 10L; + long cacheToken1 = 5L; + long cacheToken2 = cacheToken1 + 2L; + + Work firstWork = createWork(createWorkItem(workToken, cacheToken1)); + Work secondWork = createWork(createWorkItem(workToken, cacheToken2)); + Work differentWorkTokenWork = createWork(createWorkItem(1L, 1L)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, differentWorkTokenWork); + // ActivateWork with the same shardedKey, and the same workTokens, but different cacheTokens. + activeWorkState.activateWorkForKey(shardedKey, firstWork); + ActivateWorkResult activateWorkResult = + activeWorkState.activateWorkForKey(shardedKey, secondWork); + + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertFalse(readOnlyActiveWork.get(shardedKey).contains(firstWork)); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork)); + + Optional nextWork = + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, differentWorkTokenWork.id()); + assertTrue(nextWork.isPresent()); + assertSame(secondWork, nextWork.get()); + } + + @Test + public void + testActivateWorkForKey_withMatchingWorkTokenAndDifferentCacheToken_queuedWorkIsActive_QUEUED() { + long workToken = 10L; + long cacheToken1 = 5L; + long cacheToken2 = 7L; + + Work firstWork = createWork(createWorkItem(workToken, cacheToken1)); + Work secondWork = createWork(createWorkItem(workToken, cacheToken2)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + // ActivateWork with the same shardedKey, and the same workTokens, but different cacheTokens. + activeWorkState.activateWorkForKey(shardedKey, firstWork); + ActivateWorkResult activateWorkResult = + activeWorkState.activateWorkForKey(shardedKey, secondWork); + + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertEquals(firstWork, readOnlyActiveWork.get(shardedKey).peek()); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork)); + Optional nextWork = + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, firstWork.id()); + assertTrue(nextWork.isPresent()); + assertSame(secondWork, nextWork.get()); + } + + @Test + public void + testActivateWorkForKey_matchingCacheTokens_newWorkTokenGreater_queuedWorkIsActive_QUEUED() { + long cacheToken = 1L; + long newWorkToken = 10L; + long queuedWorkToken = newWorkToken / 2; + + Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, queuedWork); + ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); + + // newWork should be queued and queuedWork should not be removed since it is currently active. + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(newWork)); + assertEquals(queuedWork, readOnlyActiveWork.get(shardedKey).peek()); + } + + @Test + public void + testActivateWorkForKey_matchingCacheTokens_newWorkTokenGreater_queuedWorkNotActive_QUEUED() { + long matchingCacheToken = 1L; + long newWorkToken = 10L; + long queuedWorkToken = newWorkToken / 2; + + Work differentWorkTokenWork = createWork(createWorkItem(100L, 100L)); + Work queuedWork = createWork(createWorkItem(queuedWorkToken, matchingCacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, matchingCacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, differentWorkTokenWork); + activeWorkState.activateWorkForKey(shardedKey, queuedWork); + ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); + + assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); + assertTrue(readOnlyActiveWork.get(shardedKey).contains(newWork)); + assertFalse(readOnlyActiveWork.get(shardedKey).contains(queuedWork)); + assertEquals(differentWorkTokenWork, readOnlyActiveWork.get(shardedKey).peek()); + } + + @Test + public void testActivateWorkForKey_matchingCacheTokens_newWorkTokenLesser_STALE() { + long cacheToken = 1L; + long queuedWorkToken = 10L; + long newWorkToken = queuedWorkToken / 2; + + Work queuedWork = createWork(createWorkItem(queuedWorkToken, cacheToken)); + Work newWork = createWork(createWorkItem(newWorkToken, cacheToken)); + ShardedKey shardedKey = shardedKey("someKey", 1L); + + activeWorkState.activateWorkForKey(shardedKey, queuedWork); + ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey(shardedKey, newWork); + + assertEquals(ActivateWorkResult.STALE, activateWorkResult); + assertFalse(readOnlyActiveWork.get(shardedKey).contains(newWork)); + assertEquals(queuedWork, readOnlyActiveWork.get(shardedKey).peek()); + } + @Test public void testGetKeyHeartbeats() { Instant refreshDeadline = Instant.now(); - Work freshWork = createWork(createWorkItem(3L)); - Work refreshableWork1 = expiredWork(createWorkItem(1L)); + Work freshWork = createWork(createWorkItem(3L, 3L)); + Work refreshableWork1 = expiredWork(createWorkItem(1L, 1L)); refreshableWork1.setState(Work.State.COMMITTING); - Work refreshableWork2 = expiredWork(createWorkItem(2L)); + Work refreshableWork2 = expiredWork(createWorkItem(2L, 2L)); refreshableWork2.setState(Work.State.COMMITTING); ShardedKey shardedKey1 = shardedKey("someKey", 1L); ShardedKey shardedKey2 = shardedKey("anotherKey", 2L); From cba40592af3da08021ae6d111a6af33f0d8ca7bf Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Mon, 5 Feb 2024 12:14:23 -0800 Subject: [PATCH 08/11] fix failing tests --- .../worker/streaming/ActiveWorkState.java | 27 +++---- .../worker/StreamingDataflowWorkerTest.java | 80 ++++++++++++++++--- 2 files changed, 84 insertions(+), 23 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index ca8aeb037a30..b2911ce8d48e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -24,13 +24,14 @@ import java.util.ArrayDeque; import java.util.Deque; import java.util.HashMap; -import java.util.Iterator; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Queue; import java.util.concurrent.atomic.AtomicReference; +import java.util.Set; import java.util.function.BiConsumer; import java.util.stream.Stream; import javax.annotation.Nullable; @@ -77,7 +78,7 @@ public final class ActiveWorkState { /** * Current budget that is being processed or queued on the user worker. Incremented when work is * activated in {@link #activateWorkForKey(ShardedKey, Work)}, and decremented when work is - * completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, long)}. + * completed in {@link #completeWorkAndGetNextWorkForKey(ShardedKey, WorkId)}. */ private final AtomicReference activeGetWorkBudget; @@ -135,16 +136,17 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w } // Check to see if we have this work token queued. - Iterator workIterator = workQueue.iterator(); - while (workIterator.hasNext()) { - Work queuedWork = workIterator.next(); + // This set is for adding remove-able WorkItems if they exist in the workQueue. We add them to + // this set since a ConcurrentModificationException will be thrown if we modify the workQueue + // and then resume iteration. + Set queuedWorkToRemove = new HashSet<>(); + for (Work queuedWork : workQueue) { if (queuedWork.id().equals(work.id())) { return ActivateWorkResult.DUPLICATE; } if (queuedWork.id().cacheToken() == work.id().cacheToken()) { if (work.id().workToken() > queuedWork.id().workToken()) { - removeIfNotActive(queuedWork, workIterator, workQueue); - workQueue.addLast(work); + queuedWorkToRemove.add(queuedWork.id()); // Continue here to possibly remove more non-active stale work that is queued. } else { return ActivateWorkResult.STALE; @@ -152,6 +154,10 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w } } + workQueue.removeIf( + queuedWork -> + queuedWorkToRemove.contains(queuedWork.id()) && !queuedWork.equals(workQueue.peek())); + // Queue the work for later processing. workQueue.addLast(work); incrementActiveWorkBudget(work); @@ -404,13 +410,6 @@ synchronized void printActiveWork(PrintWriter writer, Instant now) { writer.println("
"); } - private static void removeIfNotActive( - Work queuedWork, Iterator workIterator, Deque workQueue) { - // Check to see if the queuedWork is active. We only want to remove it if it is NOT currently - // active. - if (!queuedWork.equals(workQueue.peek())) workIterator.remove(); - } - enum ActivateWorkResult { QUEUED, EXECUTE, diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index cf2a74bf7cdf..1035fada0ff9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -556,7 +556,8 @@ private Windmill.GetWorkResponse makeInput( + shardingKey + " work_token: " + index - + " cache_token: 3" + + " cache_token: " + + (index + 1) + " hot_key_info {" + " hot_key_age_usec: 1000000" + " }" @@ -579,6 +580,47 @@ private Windmill.GetWorkResponse makeInput( Collections.singletonList(DEFAULT_WINDOW))); } + private Windmill.GetWorkResponse makeInput( + int workToken, int cacheToken, long timestamp, String key, long shardingKey) + throws Exception { + return buildInput( + "work {" + + " computation_id: \"" + + DEFAULT_COMPUTATION_ID + + "\"" + + " input_data_watermark: 0" + + " work {" + + " key: \"" + + key + + "\"" + + " sharding_key: " + + shardingKey + + " work_token: " + + workToken + + " cache_token: " + + cacheToken + + " hot_key_info {" + + " hot_key_age_usec: 1000000" + + " }" + + " message_bundles {" + + " source_computation_id: \"" + + DEFAULT_SOURCE_COMPUTATION_ID + + "\"" + + " messages {" + + " timestamp: " + + timestamp + + " data: \"data" + + workToken + + "\"" + + " }" + + " }" + + " }" + + "}", + CoderUtils.encodeToByteArray( + CollectionCoder.of(IntervalWindow.getCoder()), + Collections.singletonList(DEFAULT_WINDOW))); + } + /** * Returns a {@link * org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest} builder parsed @@ -655,7 +697,9 @@ private StringBuilder initializeExpectedCommitRequest( requestBuilder.append("work_token: "); requestBuilder.append(index); requestBuilder.append(" "); - requestBuilder.append("cache_token: 3 "); + requestBuilder.append("cache_token: "); + requestBuilder.append(index + 1); + requestBuilder.append(" "); if (hasSourceBytesProcessed) requestBuilder.append("source_bytes_processed: 0 "); return requestBuilder; @@ -3286,11 +3330,20 @@ public void testActiveWorkFailure() throws Exception { StreamingDataflowWorker worker = makeWorker(instructions, options, true /* publishCounters */); worker.start(); + GetWorkResponse workItem = + makeInput(0, TimeUnit.MILLISECONDS.toMicros(0), "key", DEFAULT_SHARDING_KEY); + int failedWorkToken = 1; + int failedCacheToken = 5; + GetWorkResponse workItemToFail = + makeInput( + failedWorkToken, + failedCacheToken, + TimeUnit.MILLISECONDS.toMicros(0), + "key", + DEFAULT_SHARDING_KEY); + // Queue up two work items for the same key. - server - .whenGetWorkCalled() - .thenReturn(makeInput(0, TimeUnit.MILLISECONDS.toMicros(0), "key", DEFAULT_SHARDING_KEY)) - .thenReturn(makeInput(1, TimeUnit.MILLISECONDS.toMicros(0), "key", DEFAULT_SHARDING_KEY)); + server.whenGetWorkCalled().thenReturn(workItem).thenReturn(workItemToFail); server.waitForEmptyWorkQueue(); // Mock Windmill sending a heartbeat response failing the second work item while the first @@ -3300,8 +3353,8 @@ public void testActiveWorkFailure() throws Exception { failedHeartbeat .setComputationId(DEFAULT_COMPUTATION_ID) .addHeartbeatResponsesBuilder() - .setCacheToken(3) - .setWorkToken(1) + .setCacheToken(failedCacheToken) + .setWorkToken(failedWorkToken) .setShardingKey(DEFAULT_SHARDING_KEY) .setFailed(true); server.sendFailedHeartbeats(Collections.singletonList(failedHeartbeat.build())); @@ -3318,7 +3371,16 @@ public void testActiveWorkFailure() throws Exception { @Test public void testLatencyAttributionProtobufsPopulated() { FakeClock clock = new FakeClock(); - Work work = Work.create(null, clock, Collections.emptyList(), unused -> {}); + Work work = + Work.create( + Windmill.WorkItem.newBuilder() + .setKey(ByteString.EMPTY) + .setWorkToken(1L) + .setCacheToken(1L) + .build(), + clock, + Collections.emptyList(), + unused -> {}); clock.sleep(Duration.millis(10)); work.setState(Work.State.PROCESSING); From c2a094c45386bd318cfd62005258fd7625e6a442 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 8 Feb 2024 16:43:43 -0800 Subject: [PATCH 09/11] fix failing tests --- .../worker/StreamingDataflowWorker.java | 6 ++++- .../worker/streaming/ActiveWorkStateTest.java | 23 +++++++++++-------- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index c459cf8ae1c1..68f149cd862f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1410,7 +1410,11 @@ private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) .forComputation(state.getComputationId()) .invalidate(request.getKey(), request.getShardingKey()); state.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); + ShardedKey.create(request.getKey(), request.getShardingKey()), + WorkId.builder() + .setWorkToken(request.getWorkToken()) + .setCacheToken(request.getCacheToken()) + .build()); return true; } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index 170e6e3046cf..b96010c79dd5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -94,7 +94,7 @@ public void setup() { public void testActivateWorkForKey_EXECUTE_unknownKey() { ActivateWorkResult activateWorkResult = activeWorkState.activateWorkForKey( - shardedKey("someKey", 1L), createWork(createWorkItem(1L))); + shardedKey("someKey", 1L), createWork(createWorkItem(1L, 1L))); assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult); } @@ -219,8 +219,8 @@ public void testCompleteWorkAndGetNextWorkForKey_returnsWorkIfPresent() { @Test public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_oneShardKey() { ShardedKey shardedKey = shardedKey("someKey", 1L); - Work work1 = createWork(createWorkItem(1L)); - Work work2 = createWork(createWorkItem(2L)); + Work work1 = createWork(createWorkItem(1L, 1L)); + Work work2 = createWork(createWorkItem(2L, 2L)); activeWorkState.activateWorkForKey(shardedKey, work1); activeWorkState.activateWorkForKey(shardedKey, work2); @@ -235,7 +235,7 @@ public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_oneS assertThat(activeWorkState.currentActiveWorkBudget()).isEqualTo(expectedActiveBudget1); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, work1.getWorkItem().getWorkToken()); + shardedKey, work1.id()); GetWorkBudget expectedActiveBudget2 = GetWorkBudget.builder() @@ -249,13 +249,13 @@ public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_oneS @Test public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_whenWorkCompleted() { ShardedKey shardedKey = shardedKey("someKey", 1L); - Work work1 = createWork(createWorkItem(1L)); - Work work2 = createWork(createWorkItem(2L)); + Work work1 = createWork(createWorkItem(1L, 1L)); + Work work2 = createWork(createWorkItem(2L, 2L)); activeWorkState.activateWorkForKey(shardedKey, work1); activeWorkState.activateWorkForKey(shardedKey, work2); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, work1.getWorkItem().getWorkToken()); + shardedKey, work1.id()); GetWorkBudget expectedActiveBudget = GetWorkBudget.builder() @@ -270,8 +270,8 @@ public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_when public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_multipleShardKeys() { ShardedKey shardedKey1 = shardedKey("someKey", 1L); ShardedKey shardedKey2 = shardedKey("someKey", 2L); - Work work1 = createWork(createWorkItem(1L)); - Work work2 = createWork(createWorkItem(2L)); + Work work1 = createWork(createWorkItem(1L, 1L)); + Work work2 = createWork(createWorkItem(2L, 2L)); activeWorkState.activateWorkForKey(shardedKey1, work1); activeWorkState.activateWorkForKey(shardedKey2, work2); @@ -327,12 +327,15 @@ public void testInvalidateStuckCommits() { activeWorkState.activateWorkForKey(shardedKey, secondWork); assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); - assertFalse(readOnlyActiveWork.get(shardedKey).contains(firstWork)); assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork)); Optional nextWork = activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, differentWorkTokenWork.id()); assertTrue(nextWork.isPresent()); + assertSame(firstWork, nextWork.get()); + nextWork = + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, firstWork.id()); + assertTrue(nextWork.isPresent()); assertSame(secondWork, nextWork.get()); } From 2e219d1bc1b35489f741d7d62cf6950d08519264 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Thu, 8 Feb 2024 22:33:07 -0800 Subject: [PATCH 10/11] fix formatting --- .../dataflow/worker/streaming/ActiveWorkState.java | 2 +- .../dataflow/worker/streaming/ActiveWorkStateTest.java | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index b2911ce8d48e..5082ef75e0f5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -30,8 +30,8 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Queue; -import java.util.concurrent.atomic.AtomicReference; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Stream; import javax.annotation.Nullable; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index b96010c79dd5..c581638d98bf 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -234,8 +234,7 @@ public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_oneS assertThat(activeWorkState.currentActiveWorkBudget()).isEqualTo(expectedActiveBudget1); - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, work1.id()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, work1.id()); GetWorkBudget expectedActiveBudget2 = GetWorkBudget.builder() @@ -254,8 +253,7 @@ public void testCurrentActiveWorkBudget_correctlyAggregatesActiveWorkBudget_when activeWorkState.activateWorkForKey(shardedKey, work1); activeWorkState.activateWorkForKey(shardedKey, work2); - activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, work1.id()); + activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, work1.id()); GetWorkBudget expectedActiveBudget = GetWorkBudget.builder() @@ -333,8 +331,7 @@ public void testInvalidateStuckCommits() { activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, differentWorkTokenWork.id()); assertTrue(nextWork.isPresent()); assertSame(firstWork, nextWork.get()); - nextWork = - activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, firstWork.id()); + nextWork = activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, firstWork.id()); assertTrue(nextWork.isPresent()); assertSame(secondWork, nextWork.get()); } From 279b669661887904ef1ea7e9eb77b64e3343e768 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Mon, 12 Feb 2024 23:11:38 -0800 Subject: [PATCH 11/11] address cl comments --- .../worker/StreamingDataflowWorker.java | 21 ++-- .../worker/streaming/ActiveWorkState.java | 107 +++++++----------- .../worker/streaming/ComputationState.java | 5 +- 3 files changed, 54 insertions(+), 79 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 68f149cd862f..13d4c52867d5 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -87,7 +87,6 @@ import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider; import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider; import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages; -import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.FailedTokens; import org.apache.beam.runners.dataflow.worker.streaming.Commit; import org.apache.beam.runners.dataflow.worker.streaming.ComputationState; import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState; @@ -105,6 +104,7 @@ import org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter; import org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ComputationHeartbeatResponse; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.LatencyAttribution; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItemCommitRequest; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; @@ -1971,20 +1971,19 @@ private void sendWorkerUpdatesToDataflowService( } } - public void handleHeartbeatResponses(List responses) { - for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse : responses) { + public void handleHeartbeatResponses(List responses) { + for (ComputationHeartbeatResponse computationHeartbeatResponse : responses) { // Maps sharding key to (work token, cache token) for work that should be marked failed. - Map> failedWork = new HashMap<>(); + Multimap failedWork = ArrayListMultimap.create(); for (Windmill.HeartbeatResponse heartbeatResponse : computationHeartbeatResponse.getHeartbeatResponsesList()) { if (heartbeatResponse.getFailed()) { - failedWork - .computeIfAbsent(heartbeatResponse.getShardingKey(), key -> new ArrayList<>()) - .add( - FailedTokens.newBuilder() - .setWorkToken(heartbeatResponse.getWorkToken()) - .setCacheToken(heartbeatResponse.getCacheToken()) - .build()); + failedWork.put( + heartbeatResponse.getShardingKey(), + WorkId.builder() + .setWorkToken(heartbeatResponse.getWorkToken()) + .setCacheToken(heartbeatResponse.getCacheToken()) + .build()); } } ComputationState state = computationMap.get(computationHeartbeatResponse.getComputationId()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 5082ef75e0f5..a989206408e7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -19,18 +19,16 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList.toImmutableList; -import com.google.auto.value.AutoValue; import java.io.PrintWriter; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; +import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Optional; import java.util.Queue; -import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.stream.Stream; @@ -48,6 +46,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -107,6 +106,29 @@ private static String elapsedString(Instant start, Instant end) { return activeFor.toString().substring(2); } + private static Stream toHeartbeatRequestStream( + Entry> shardedKeyAndWorkQueue, + Instant refreshDeadline, + DataflowExecutionStateSampler sampler) { + ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); + Deque workQueue = shardedKeyAndWorkQueue.getValue(); + + return workQueue.stream() + .filter(work -> work.getStartTime().isBefore(refreshDeadline)) + // Don't send heartbeats for queued work we already know is failed. + .filter(work -> !work.isFailed()) + .map( + work -> + Windmill.HeartbeatRequest.newBuilder() + .setShardingKey(shardedKey.shardingKey()) + .setWorkToken(work.getWorkItem().getWorkToken()) + .setCacheToken(work.getWorkItem().getCacheToken()) + .addAllLatencyAttribution( + work.getLatencyAttributions( + /* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler)) + .build()); + } + /** * Activates {@link Work} for the {@link ShardedKey}. Outcome can be 1 of 4 {@link * ActivateWorkResult} @@ -136,17 +158,20 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w } // Check to see if we have this work token queued. - // This set is for adding remove-able WorkItems if they exist in the workQueue. We add them to - // this set since a ConcurrentModificationException will be thrown if we modify the workQueue - // and then resume iteration. - Set queuedWorkToRemove = new HashSet<>(); - for (Work queuedWork : workQueue) { + Iterator workIterator = workQueue.iterator(); + while (workIterator.hasNext()) { + Work queuedWork = workIterator.next(); if (queuedWork.id().equals(work.id())) { return ActivateWorkResult.DUPLICATE; } if (queuedWork.id().cacheToken() == work.id().cacheToken()) { if (work.id().workToken() > queuedWork.id().workToken()) { - queuedWorkToRemove.add(queuedWork.id()); + // Check to see if the queuedWork is active. We only want to remove it if it is NOT + // currently active. + if (!queuedWork.equals(workQueue.peek())) { + workIterator.remove(); + decrementActiveWorkBudget(queuedWork); + } // Continue here to possibly remove more non-active stale work that is queued. } else { return ActivateWorkResult.STALE; @@ -154,61 +179,36 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w } } - workQueue.removeIf( - queuedWork -> - queuedWorkToRemove.contains(queuedWork.id()) && !queuedWork.equals(workQueue.peek())); - // Queue the work for later processing. workQueue.addLast(work); incrementActiveWorkBudget(work); return ActivateWorkResult.QUEUED; } - @AutoValue - public abstract static class FailedTokens { - public static Builder newBuilder() { - return new AutoValue_ActiveWorkState_FailedTokens.Builder(); - } - - public abstract long workToken(); - - public abstract long cacheToken(); - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setWorkToken(long value); - - public abstract Builder setCacheToken(long value); - - public abstract FailedTokens build(); - } - } - /** * Fails any active work matching an element of the input Map. * * @param failedWork a map from sharding_key to tokens for the corresponding work. */ - synchronized void failWorkForKey(Map> failedWork) { + synchronized void failWorkForKey(Multimap failedWork) { // Note we can't construct a ShardedKey and look it up in activeWork directly since // HeartbeatResponse doesn't include the user key. for (Entry> entry : activeWork.entrySet()) { - List failedTokens = failedWork.get(entry.getKey().shardingKey()); - if (failedTokens == null) continue; - for (FailedTokens failedToken : failedTokens) { + Collection failedWorkIds = failedWork.get(entry.getKey().shardingKey()); + for (WorkId failedWorkId : failedWorkIds) { for (Work queuedWork : entry.getValue()) { WorkItem workItem = queuedWork.getWorkItem(); - if (workItem.getWorkToken() == failedToken.workToken() - && workItem.getCacheToken() == failedToken.cacheToken()) { + if (workItem.getWorkToken() == failedWorkId.workToken() + && workItem.getCacheToken() == failedWorkId.cacheToken()) { LOG.debug( "Failing work " + computationStateCache.getComputation() + " " + entry.getKey().shardingKey() + " " - + failedToken.workToken() + + failedWorkId.workToken() + " " - + failedToken.cacheToken() + + failedWorkId.cacheToken() + ". The work will be retried and is not lost."); queuedWork.setFailed(); break; @@ -328,29 +328,6 @@ synchronized ImmutableList getKeyHeartbeats( .collect(toImmutableList()); } - private static Stream toHeartbeatRequestStream( - Entry> shardedKeyAndWorkQueue, - Instant refreshDeadline, - DataflowExecutionStateSampler sampler) { - ShardedKey shardedKey = shardedKeyAndWorkQueue.getKey(); - Deque workQueue = shardedKeyAndWorkQueue.getValue(); - - return workQueue.stream() - .filter(work -> work.getStartTime().isBefore(refreshDeadline)) - // Don't send heartbeats for queued work we already know is failed. - .filter(work -> !work.isFailed()) - .map( - work -> - Windmill.HeartbeatRequest.newBuilder() - .setShardingKey(shardedKey.shardingKey()) - .setWorkToken(work.getWorkItem().getWorkToken()) - .setCacheToken(work.getWorkItem().getCacheToken()) - .addAllLatencyAttribution( - work.getLatencyAttributions( - /* isHeartbeat= */ true, work.getLatencyTrackingId(), sampler)) - .build()); - } - /** * Returns the current aggregate {@link GetWorkBudget} that is active on the user worker. Active * means that the work is received from Windmill, being processed or queued to be processed in diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java index 2682e91d8227..33ef4950f9a3 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationState.java @@ -19,18 +19,17 @@ import com.google.api.services.dataflow.model.MapTask; import java.io.PrintWriter; -import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentLinkedQueue; import javax.annotation.Nullable; import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; -import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.FailedTokens; import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Multimap; import org.joda.time.Instant; /** @@ -103,7 +102,7 @@ public boolean activateWork(ShardedKey shardedKey, Work work) { } } - public void failWork(Map> failedWork) { + public void failWork(Multimap failedWork) { activeWorkState.failWorkForKey(failedWork); }