diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 110eb3813435..c60b1707cbd4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -21,6 +21,7 @@ import java.io.PrintWriter; import java.util.ArrayDeque; +import java.util.Collection; import java.util.Deque; import java.util.HashMap; import java.util.Iterator; @@ -37,6 +38,7 @@ import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler; import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.sdk.annotations.Internal; @@ -89,6 +91,20 @@ static ActiveWorkState forTesting( return new ActiveWorkState(activeWork, computationStateCache); } + private static Stream makeHeartbeatKeyedGetDataRequests( + ShardedKey shardedKey, Collection workQueue, Instant refreshDeadline) { + return workQueue.stream() + .filter(work -> work.getStartTime().isBefore(refreshDeadline)) + .map( + work -> + KeyedGetDataRequest.newBuilder() + .setKey(shardedKey.key()) + .setShardingKey(shardedKey.shardingKey()) + .setWorkToken(work.getWorkItem().getWorkToken()) + .addAllLatencyAttribution(work.getLatencyAttributions()) + .build()); + } + private static String elapsedString(Instant start, Instant end) { Duration activeFor = new Duration(start, end); // Duration's toString always starts with "PT"; remove that here. @@ -159,18 +175,13 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w if (queuedWork.id().equals(work.id())) { return ActivateWorkResult.DUPLICATE; } else if (queuedWork.id().cacheToken() == work.id().cacheToken()) { - if (work.id().workToken() > queuedWork.id().workToken()) { - removeIfNotActive(queuedWork, workIterator, workQueue); - workQueue.addLast(work); - return ActivateWorkResult.QUEUED; - } else { + if (work.id().workToken() <= queuedWork.id().workToken()) { return ActivateWorkResult.STALE; } - } else if (queuedWork.id().workToken() == work.id().workToken()) { - if (queuedWork.id().cacheToken() != work.id().cacheToken()) { - removeIfNotActive(queuedWork, workIterator, workQueue); - workQueue.addLast(work); - return ActivateWorkResult.QUEUED; + + if (!queuedWork.equals(workQueue.peek())) { + // We only want to remove it if it is NOT currently active. + workIterator.remove(); } } } @@ -245,8 +256,8 @@ private synchronized void removeCompletedWorkFromQueue( () -> new IllegalStateException( String.format( - "Active key %s without work, expected work_token %d, expected cache_token %d", - shardedKey, workId.workToken(), workId.cacheToken()))); + "Active key %s without work, expected work_id= %s", + shardedKey, workId))); if (!completedWork.id().equals(workId)) { // Work may have been completed due to clearing of stuck commits. @@ -312,6 +323,10 @@ synchronized ImmutableList getKeyHeartbeats( Instant refreshDeadline, DataflowExecutionStateSampler sampler) { return activeWork.entrySet().stream() .flatMap(entry -> toHeartbeatRequestStream(entry, refreshDeadline, sampler)) + .flatMap( + entry -> + makeHeartbeatKeyedGetDataRequests( + entry.getKey(), entry.getValue(), refreshDeadline)) .collect(toImmutableList()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java index b3167248d224..644869c6fe4c 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java @@ -206,6 +206,13 @@ public WorkId id() { return id; } + @Override + public final String toString() { + return String.format( + "work_id:[%s]; work_item:[%s]; start_time:[%s]; current_state:[%s]", + id, workItem, startTime, currentState); + } + public enum State { QUEUED(Windmill.LatencyAttribution.State.QUEUED), PROCESSING(Windmill.LatencyAttribution.State.ACTIVE), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 85b598fd7676..413f875def19 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2725,12 +2725,13 @@ public void testActiveWorkForShardedKeys() { assertTrue(computationState.activateWork(key1Shard1, m2)); Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize()); Work m3 = createMockWork(3); - assertTrue(computationState.activateWork(key1Shard1, m3)); + boolean activateWork = computationState.activateWork(key1Shard1, m3); + assertTrue(activateWork); Mockito.verifyNoMoreInteractions(mockExecutor); // Verify a different shard of key is a separate queue. Work m4 = createMockWork(3); - assertTrue(computationState.activateWork(key1Shard1, m4)); + assertFalse(computationState.activateWork(key1Shard1, m4)); Mockito.verifyNoMoreInteractions(mockExecutor); assertTrue(computationState.activateWork(key1Shard2, m4)); Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java index f375b4fbbe60..f2ba6739cdf1 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java @@ -77,7 +77,7 @@ private static WorkItem createWorkItem(long workToken, long cacheToken) { .build(); } - private static WorkId workDedupeToken(long workToken, long cacheToken) { + private static WorkId workDedupeId(long workToken, long cacheToken) { return WorkId.builder().setCacheToken(cacheToken).setWorkToken(workToken).build(); } @@ -109,7 +109,7 @@ public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() { Optional nextWorkForKey = activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workDedupeToken(workToken, cacheToken)); + shardedKey, workDedupeId(workToken, cacheToken)); assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult); assertEquals(Optional.empty(), nextWorkForKey); @@ -151,13 +151,14 @@ public void testActivateWorkForKey_DUPLICATE() { activeWorkState.activateWorkForKey(shardedKey, secondWork); assertEquals(ActivateWorkResult.QUEUED, activateWorkResult); - assertFalse(readOnlyActiveWork.get(shardedKey).contains(firstWork)); + // Different cacheTokens, so no work should be removed from the queue. + assertTrue(readOnlyActiveWork.get(shardedKey).contains(firstWork)); assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork)); Optional nextWork = activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, differentWorkTokenWork.id()); assertTrue(nextWork.isPresent()); - assertSame(secondWork, nextWork.get()); + assertSame(firstWork, nextWork.get()); } @Test @@ -262,7 +263,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { assertEquals( Optional.empty(), activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey("someKey", 1L), workDedupeToken(1L, 1L))); + shardedKey("someKey", 1L), workDedupeId(1L, 1L))); } @Test @@ -276,7 +277,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { activeWorkState.activateWorkForKey(shardedKey, workInQueue); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workDedupeToken(otherWorkToken, cacheToken)); + shardedKey, workDedupeId(otherWorkToken, cacheToken)); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek()); @@ -293,7 +294,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() { activeWorkState.activateWorkForKey(shardedKey, workInQueue); activeWorkState.completeWorkAndGetNextWorkForKey( - shardedKey, workDedupeToken(workToken, otherCacheToken)); + shardedKey, workDedupeId(workToken, otherCacheToken)); assertEquals(1, readOnlyActiveWork.get(shardedKey).size()); assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek());