Skip to content

Commit

Permalink
Heartbeats (#29963)
Browse files Browse the repository at this point in the history
* Adds sending new HeartbeatRequest protos to StreamingDataflowWorker. If any HeartbeatResponses are sent from Windmill containing failed work items, aborts processing those work items as soon as possible.

* Adds sending new HeartbeatRequest protos when using streaming RPC's (streaming engine). Also adds a test.

* Adds new test for custom source reader exiting early for failed work. Adds special exception for handling failed work.

* removes some extra cache invalidations and unneeded log statements.

* Added streaming_engine prefix to experiment enabling heartbeats and changed exception in state reader to be WorkItemFailedException.

* Adds check that heartbeat response sets failed before failing work.

* Adds ability to plumb experiments to test server for GrpcWindmillServerTest so we can test the new style heartbeats.

* Changes StreamingDataflowWorkerTest to look for latency attribution in new-style heartbeat requests since that's what FakeWindmillServer returns now.
  • Loading branch information
acrites authored Jan 23, 2024
1 parent 821a169 commit 5e7edc4
Show file tree
Hide file tree
Showing 24 changed files with 801 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.KeyedGetDataRequest;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill.HeartbeatRequest;
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamPool;
Expand Down Expand Up @@ -239,25 +239,37 @@ public Windmill.GlobalData getSideInputData(Windmill.GlobalDataRequest request)
}

/** Tells windmill processing is ongoing for the given keys. */
public void refreshActiveWork(Map<String, List<KeyedGetDataRequest>> active) {
activeHeartbeats.set(active.size());
public void refreshActiveWork(Map<String, List<HeartbeatRequest>> heartbeats) {
activeHeartbeats.set(heartbeats.size());
try {
if (useStreamingRequests) {
// With streaming requests, always send the request even when it is empty, to ensure that
// we trigger health checks for the stream even when it is idle.
GetDataStream stream = streamPool.getStream();
try {
stream.refreshActiveWork(active);
stream.refreshActiveWork(heartbeats);
} finally {
streamPool.releaseStream(stream);
}
} else if (!active.isEmpty()) {
} else if (!heartbeats.isEmpty()) {
// This code path is only used by appliance which sends heartbeats (used to refresh active
// work) as KeyedGetDataRequests. So we must translate the HeartbeatRequest to a
// KeyedGetDataRequest here regardless of the value of sendKeyedGetDataRequests.
Windmill.GetDataRequest.Builder builder = Windmill.GetDataRequest.newBuilder();
for (Map.Entry<String, List<KeyedGetDataRequest>> entry : active.entrySet()) {
builder.addRequests(
Windmill.ComputationGetDataRequest.newBuilder()
.setComputationId(entry.getKey())
.addAllRequests(entry.getValue()));
for (Map.Entry<String, List<HeartbeatRequest>> entry : heartbeats.entrySet()) {
Windmill.ComputationGetDataRequest.Builder perComputationBuilder =
Windmill.ComputationGetDataRequest.newBuilder();
perComputationBuilder.setComputationId(entry.getKey());
for (HeartbeatRequest request : entry.getValue()) {
perComputationBuilder.addRequests(
Windmill.KeyedGetDataRequest.newBuilder()
.setShardingKey(request.getShardingKey())
.setWorkToken(request.getWorkToken())
.setCacheToken(request.getCacheToken())
.addAllLatencyAttribution(request.getLatencyAttributionList())
.build());
}
builder.addRequests(perComputationBuilder.build());
}
server.getData(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ protected PubsubReaderIterator(Windmill.WorkItem work) {
super(work);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
}

@Override
protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOException {
T value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.beam.runners.dataflow.worker.status.LastExceptionDataProvider;
import org.apache.beam.runners.dataflow.worker.status.StatusDataProvider;
import org.apache.beam.runners.dataflow.worker.status.WorkerStatusPages;
import org.apache.beam.runners.dataflow.worker.streaming.ActiveWorkState.FailedTokens;
import org.apache.beam.runners.dataflow.worker.streaming.Commit;
import org.apache.beam.runners.dataflow.worker.streaming.ComputationState;
import org.apache.beam.runners.dataflow.worker.streaming.ExecutionState;
Expand Down Expand Up @@ -422,6 +423,7 @@ public void run() {

this.publishCounters = publishCounters;
this.windmillServer = options.getWindmillServerStub();
this.windmillServer.setProcessHeartbeatResponses(this::handleHeartbeatResponses);
this.metricTrackingWindmillServer =
new MetricTrackingWindmillServerStub(windmillServer, memoryMonitor, windmillServiceEnabled);
this.metricTrackingWindmillServer.start();
Expand Down Expand Up @@ -982,6 +984,9 @@ private void process(
String counterName = "dataflow_source_bytes_processed-" + mapTask.getSystemName();

try {
if (work.isFailed()) {
throw new WorkItemCancelledException(workItem.getShardingKey());
}
executionState = computationState.getExecutionStateQueue().poll();
if (executionState == null) {
MutableNetwork<Node, Edge> mapTaskNetwork = mapTaskToNetwork.apply(mapTask);
Expand Down Expand Up @@ -1098,7 +1103,8 @@ public void close() {
work.setState(State.PROCESSING);
}
};
});
},
work::isFailed);
SideInputStateFetcher localSideInputStateFetcher = sideInputStateFetcher.byteTrackingView();

// If the read output KVs, then we can decode Windmill's byte key into a userland
Expand Down Expand Up @@ -1136,12 +1142,16 @@ public void close() {
synchronizedProcessingTime,
stateReader,
localSideInputStateFetcher,
outputBuilder);
outputBuilder,
work::isFailed);

// Blocks while executing work.
executionState.workExecutor().execute();

// Reports source bytes processed to workitemcommitrequest if available.
if (work.isFailed()) {
throw new WorkItemCancelledException(workItem.getShardingKey());
}
// Reports source bytes processed to WorkItemCommitRequest if available.
try {
long sourceBytesProcessed = 0;
HashMap<String, ElementCounter> counters =
Expand Down Expand Up @@ -1234,6 +1244,12 @@ public void close() {
+ "Work will not be retried locally.",
computationId,
key.toStringUtf8());
} else if (WorkItemCancelledException.isWorkItemCancelledException(t)) {
LOG.debug(
"Execution of work for computation '{}' on key '{}' failed. "
+ "Work will not be retried locally.",
computationId,
workItem.getShardingKey());
} else {
LastExceptionDataProvider.reportException(t);
LOG.debug("Failed work: {}", work);
Expand Down Expand Up @@ -1369,6 +1385,10 @@ private void commitLoop() {
// Adds the commit to the commitStream if it fits, returning true iff it is consumed.
private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) {
Preconditions.checkNotNull(commit);
// Drop commits for failed work. Such commits will be dropped by Windmill anyway.
if (commit.work().isFailed()) {
return true;
}
final ComputationState state = commit.computationState();
final Windmill.WorkItemCommitRequest request = commit.request();
final int size = commit.getSize();
Expand Down Expand Up @@ -1896,6 +1916,25 @@ private void sendWorkerUpdatesToDataflowService(
}
}

public void handleHeartbeatResponses(List<Windmill.ComputationHeartbeatResponse> responses) {
for (Windmill.ComputationHeartbeatResponse computationHeartbeatResponse : responses) {
// Maps sharding key to (work token, cache token) for work that should be marked failed.
Map<Long, List<FailedTokens>> failedWork = new HashMap<>();
for (Windmill.HeartbeatResponse heartbeatResponse :
computationHeartbeatResponse.getHeartbeatResponsesList()) {
if (heartbeatResponse.getFailed()) {
failedWork
.computeIfAbsent(heartbeatResponse.getShardingKey(), key -> new ArrayList<>())
.add(
new FailedTokens(
heartbeatResponse.getWorkToken(), heartbeatResponse.getCacheToken()));
}
}
ComputationState state = computationMap.get(computationHeartbeatResponse.getComputationId());
if (state != null) state.failWork(failedWork);
}
}

/**
* Sends a GetData request to Windmill for all sufficiently old active work.
*
Expand All @@ -1904,15 +1943,15 @@ private void sendWorkerUpdatesToDataflowService(
* StreamingDataflowWorkerOptions#getActiveWorkRefreshPeriodMillis}.
*/
private void refreshActiveWork() {
Map<String, List<Windmill.KeyedGetDataRequest>> active = new HashMap<>();
Map<String, List<Windmill.HeartbeatRequest>> heartbeats = new HashMap<>();
Instant refreshDeadline =
clock.get().minus(Duration.millis(options.getActiveWorkRefreshPeriodMillis()));

for (Map.Entry<String, ComputationState> entry : computationMap.entrySet()) {
active.put(entry.getKey(), entry.getValue().getKeysToRefresh(refreshDeadline, sampler));
heartbeats.put(entry.getKey(), entry.getValue().getKeyHeartbeats(refreshDeadline, sampler));
}

metricTrackingWindmillServer.refreshActiveWork(active);
metricTrackingWindmillServer.refreshActiveWork(heartbeats);
}

private void invalidateStuckCommits() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class StreamingModeExecutionContext extends DataflowExecutionContext<Step
private Windmill.WorkItemCommitRequest.Builder outputBuilder;
private UnboundedSource.UnboundedReader<?> activeReader;
private volatile long backlogBytes;
private Supplier<Boolean> workIsFailed;

public StreamingModeExecutionContext(
CounterFactory counterFactory,
Expand All @@ -135,13 +136,18 @@ public StreamingModeExecutionContext(
this.stateNameMap = ImmutableMap.copyOf(stateNameMap);
this.stateCache = stateCache;
this.backlogBytes = UnboundedSource.UnboundedReader.BACKLOG_UNKNOWN;
this.workIsFailed = () -> Boolean.FALSE;
}

@VisibleForTesting
public long getBacklogBytes() {
return backlogBytes;
}

public boolean workIsFailed() {
return workIsFailed.get();
}

public void start(
@Nullable Object key,
Windmill.WorkItem work,
Expand All @@ -150,9 +156,11 @@ public void start(
@Nullable Instant synchronizedProcessingTime,
WindmillStateReader stateReader,
SideInputStateFetcher sideInputStateFetcher,
Windmill.WorkItemCommitRequest.Builder outputBuilder) {
Windmill.WorkItemCommitRequest.Builder outputBuilder,
@Nullable Supplier<Boolean> workFailed) {
this.key = key;
this.work = work;
this.workIsFailed = (workFailed != null) ? workFailed : () -> Boolean.FALSE;
this.computationKey =
WindmillComputationKey.create(computationId, work.getKey(), work.getShardingKey());
this.sideInputStateFetcher = sideInputStateFetcher;
Expand Down Expand Up @@ -429,7 +437,7 @@ <T, W extends BoundedWindow> void writePCollectionViewData(

/**
* Execution states in Streaming are shared between multiple map-task executors. Thus this class
* needs to be thread safe for multiple writers. A single stage could have have multiple executors
* needs to be thread safe for multiple writers. A single stage could have multiple executors
* running concurrently.
*/
public static class StreamingModeExecutionState
Expand Down Expand Up @@ -670,7 +678,7 @@ class StepContext extends DataflowExecutionContext.DataflowStepContext
private NavigableSet<TimerData> modifiedUserSynchronizedProcessingTimersOrdered = null;
// A list of timer keys that were modified by user processing earlier in this bundle. This
// serves a tombstone, so
// that we know not to fire any bundle tiemrs that were moddified.
// that we know not to fire any bundle timers that were modified.
private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys = null;

public StepContext(DataflowOperationContext operationContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ class UngroupedWindmillReaderIterator extends WindmillReaderIteratorBase {
super(work);
}

@Override
public boolean advance() throws IOException {
if (context.workIsFailed()) {
return false;
}
return super.advance();
}

@Override
protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOException {
Instant timestampMillis =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.dataflow.worker;

/** Indicates that the work item was cancelled and should not be retried. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class WorkItemCancelledException extends RuntimeException {
public WorkItemCancelledException(long sharding_key) {
super("Work item cancelled for key " + sharding_key);
}

/** Returns whether an exception was caused by a {@link WorkItemCancelledException}. */
public static boolean isWorkItemCancelledException(Throwable t) {
while (t != null) {
if (t instanceof WorkItemCancelledException) {
return true;
}
t = t.getCause();
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,8 @@ public boolean advance() throws IOException {
while (true) {
if (elemsRead >= maxElems
|| Instant.now().isAfter(endTime)
|| context.isSinkFullHintSet()) {
|| context.isSinkFullHintSet()
|| context.workIsFailed()) {
return false;
}
try {
Expand Down
Loading

0 comments on commit 5e7edc4

Please sign in to comment.