Skip to content

Commit

Permalink
address cl comments
Browse files Browse the repository at this point in the history
  • Loading branch information
m-trieu committed Jan 31, 2024
1 parent e38eab7 commit 91b3e99
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.PrintWriter;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -37,6 +38,7 @@
import org.apache.beam.runners.dataflow.worker.DataflowExecutionStateSampler;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.WorkItem;
import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache;
import org.apache.beam.sdk.annotations.Internal;
Expand Down Expand Up @@ -89,6 +91,20 @@ static ActiveWorkState forTesting(
return new ActiveWorkState(activeWork, computationStateCache);
}

private static Stream<KeyedGetDataRequest> makeHeartbeatKeyedGetDataRequests(
ShardedKey shardedKey, Collection<Work> workQueue, Instant refreshDeadline) {
return workQueue.stream()
.filter(work -> work.getStartTime().isBefore(refreshDeadline))
.map(
work ->
KeyedGetDataRequest.newBuilder()
.setKey(shardedKey.key())
.setShardingKey(shardedKey.shardingKey())
.setWorkToken(work.getWorkItem().getWorkToken())
.addAllLatencyAttribution(work.getLatencyAttributions())
.build());
}

private static String elapsedString(Instant start, Instant end) {
Duration activeFor = new Duration(start, end);
// Duration's toString always starts with "PT"; remove that here.
Expand Down Expand Up @@ -159,18 +175,13 @@ synchronized ActivateWorkResult activateWorkForKey(ShardedKey shardedKey, Work w
if (queuedWork.id().equals(work.id())) {
return ActivateWorkResult.DUPLICATE;
} else if (queuedWork.id().cacheToken() == work.id().cacheToken()) {
if (work.id().workToken() > queuedWork.id().workToken()) {
removeIfNotActive(queuedWork, workIterator, workQueue);
workQueue.addLast(work);
return ActivateWorkResult.QUEUED;
} else {
if (work.id().workToken() <= queuedWork.id().workToken()) {
return ActivateWorkResult.STALE;
}
} else if (queuedWork.id().workToken() == work.id().workToken()) {
if (queuedWork.id().cacheToken() != work.id().cacheToken()) {
removeIfNotActive(queuedWork, workIterator, workQueue);
workQueue.addLast(work);
return ActivateWorkResult.QUEUED;

if (!queuedWork.equals(workQueue.peek())) {
// We only want to remove it if it is NOT currently active.
workIterator.remove();
}
}
}
Expand Down Expand Up @@ -245,8 +256,8 @@ private synchronized void removeCompletedWorkFromQueue(
() ->
new IllegalStateException(
String.format(
"Active key %s without work, expected work_token %d, expected cache_token %d",
shardedKey, workId.workToken(), workId.cacheToken())));
"Active key %s without work, expected work_id= %s",
shardedKey, workId)));

if (!completedWork.id().equals(workId)) {
// Work may have been completed due to clearing of stuck commits.
Expand Down Expand Up @@ -312,6 +323,10 @@ synchronized ImmutableList<HeartbeatRequest> getKeyHeartbeats(
Instant refreshDeadline, DataflowExecutionStateSampler sampler) {
return activeWork.entrySet().stream()
.flatMap(entry -> toHeartbeatRequestStream(entry, refreshDeadline, sampler))
.flatMap(
entry ->
makeHeartbeatKeyedGetDataRequests(
entry.getKey(), entry.getValue(), refreshDeadline))
.collect(toImmutableList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,13 @@ public WorkId id() {
return id;
}

@Override
public final String toString() {
return String.format(
"work_id:[%s]; work_item:[%s]; start_time:[%s]; current_state:[%s]",
id, workItem, startTime, currentState);
}

public enum State {
QUEUED(Windmill.LatencyAttribution.State.QUEUED),
PROCESSING(Windmill.LatencyAttribution.State.ACTIVE),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2725,12 +2725,13 @@ public void testActiveWorkForShardedKeys() {
assertTrue(computationState.activateWork(key1Shard1, m2));
Mockito.verify(mockExecutor).execute(m2, m2.getWorkItem().getSerializedSize());
Work m3 = createMockWork(3);
assertTrue(computationState.activateWork(key1Shard1, m3));
boolean activateWork = computationState.activateWork(key1Shard1, m3);
assertTrue(activateWork);
Mockito.verifyNoMoreInteractions(mockExecutor);

// Verify a different shard of key is a separate queue.
Work m4 = createMockWork(3);
assertTrue(computationState.activateWork(key1Shard1, m4));
assertFalse(computationState.activateWork(key1Shard1, m4));
Mockito.verifyNoMoreInteractions(mockExecutor);
assertTrue(computationState.activateWork(key1Shard2, m4));
Mockito.verify(mockExecutor).execute(m4, m4.getWorkItem().getSerializedSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private static WorkItem createWorkItem(long workToken, long cacheToken) {
.build();
}

private static WorkId workDedupeToken(long workToken, long cacheToken) {
private static WorkId workDedupeId(long workToken, long cacheToken) {
return WorkId.builder().setCacheToken(cacheToken).setWorkToken(workToken).build();
}

Expand Down Expand Up @@ -109,7 +109,7 @@ public void testActivateWorkForKey_EXECUTE_emptyWorkQueueForKey() {

Optional<Work> nextWorkForKey =
activeWorkState.completeWorkAndGetNextWorkForKey(
shardedKey, workDedupeToken(workToken, cacheToken));
shardedKey, workDedupeId(workToken, cacheToken));

assertEquals(ActivateWorkResult.EXECUTE, activateWorkResult);
assertEquals(Optional.empty(), nextWorkForKey);
Expand Down Expand Up @@ -151,13 +151,14 @@ public void testActivateWorkForKey_DUPLICATE() {
activeWorkState.activateWorkForKey(shardedKey, secondWork);

assertEquals(ActivateWorkResult.QUEUED, activateWorkResult);
assertFalse(readOnlyActiveWork.get(shardedKey).contains(firstWork));
// Different cacheTokens, so no work should be removed from the queue.
assertTrue(readOnlyActiveWork.get(shardedKey).contains(firstWork));
assertTrue(readOnlyActiveWork.get(shardedKey).contains(secondWork));

Optional<Work> nextWork =
activeWorkState.completeWorkAndGetNextWorkForKey(shardedKey, differentWorkTokenWork.id());
assertTrue(nextWork.isPresent());
assertSame(secondWork, nextWork.get());
assertSame(firstWork, nextWork.get());
}

@Test
Expand Down Expand Up @@ -262,7 +263,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() {
assertEquals(
Optional.empty(),
activeWorkState.completeWorkAndGetNextWorkForKey(
shardedKey("someKey", 1L), workDedupeToken(1L, 1L)));
shardedKey("someKey", 1L), workDedupeId(1L, 1L)));
}

@Test
Expand All @@ -276,7 +277,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() {

activeWorkState.activateWorkForKey(shardedKey, workInQueue);
activeWorkState.completeWorkAndGetNextWorkForKey(
shardedKey, workDedupeToken(otherWorkToken, cacheToken));
shardedKey, workDedupeId(otherWorkToken, cacheToken));

assertEquals(1, readOnlyActiveWork.get(shardedKey).size());
assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek());
Expand All @@ -293,7 +294,7 @@ public void testCompleteWorkAndGetNextWorkForKey_noWorkQueueForKey() {

activeWorkState.activateWorkForKey(shardedKey, workInQueue);
activeWorkState.completeWorkAndGetNextWorkForKey(
shardedKey, workDedupeToken(workToken, otherCacheToken));
shardedKey, workDedupeId(workToken, otherCacheToken));

assertEquals(1, readOnlyActiveWork.get(shardedKey).size());
assertEquals(workInQueue, readOnlyActiveWork.get(shardedKey).peek());
Expand Down

0 comments on commit 91b3e99

Please sign in to comment.