Skip to content

Commit

Permalink
Support poisioning instruction ids to prevent the FnApi data stream f…
Browse files Browse the repository at this point in the history
…rom blocking on failed instructions
  • Loading branch information
scwhittle committed Oct 21, 2024
1 parent 5d0dcfc commit fa7cde6
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@
*/
package org.apache.beam.sdk.fn.data;

import java.time.Duration;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
Expand All @@ -30,6 +33,8 @@
import org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
Expand All @@ -55,7 +60,13 @@ public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
private final ConcurrentMap<
/*instructionId=*/ String, CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>>>
receivers;
private final ConcurrentMap<String, Boolean> erroredInstructionIds;
private final Cache</*instructionId=*/ String, /*unused=*/ Boolean> poisonedInstructionIds;

private static class PoisonedException extends RuntimeException {
public PoisonedException() {
super("Instruction poisoned");
}
};

public BeamFnDataGrpcMultiplexer(
Endpoints.@Nullable ApiServiceDescriptor apiServiceDescriptor,
Expand All @@ -64,7 +75,8 @@ public BeamFnDataGrpcMultiplexer(
baseOutboundObserverFactory) {
this.apiServiceDescriptor = apiServiceDescriptor;
this.receivers = new ConcurrentHashMap<>();
this.erroredInstructionIds = new ConcurrentHashMap<>();
this.poisonedInstructionIds =
CacheBuilder.newBuilder().expireAfterWrite(Duration.ofMinutes(20)).build();
this.inboundObserver = new InboundObserver();
this.outboundObserver =
outboundObserverFactory.outboundObserverFor(baseOutboundObserverFactory, inboundObserver);
Expand All @@ -87,9 +99,24 @@ public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
return outboundObserver;
}

private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture(
/**
* Returns a future that can be awaited upon for processing elements for instruction id.
*
* @param instructionId
* @return null if the instruction id has been poisoned and elements should be ignored.
*/
@SuppressWarnings("nullness")
private @Nullable CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture(
String instructionId) {
return receivers.computeIfAbsent(instructionId, (unused) -> new CompletableFuture<>());
return receivers.computeIfAbsent(
instructionId,
(unused) -> {
if (poisonedInstructionIds.getIfPresent(instructionId) != null) {
return null;
} else {
return new CompletableFuture<>();
}
});
}

/**
Expand All @@ -104,12 +131,43 @@ private CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverF
*/
public void registerConsumer(
String instructionId, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) {
receiverFuture(instructionId).complete(receiver);
@Nullable
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
receiverFuture(instructionId);
if (receiverFuture == null) {
throw new IllegalArgumentException("Instruction id was poisoned");
} else {
receiverFuture.complete(receiver);
}
}

/** Unregisters a consumer. */
public void unregisterConsumer(String instructionId) {
receivers.remove(instructionId);
@Nullable
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
receivers.remove(instructionId);
if (receiverFuture != null && !receiverFuture.isDone()) {
throw new IllegalStateException("Unregistering consumer which was not registered.");
}
}

public void poisonInstructionId(String instructionId) {
poisonedInstructionIds.put(instructionId, Boolean.TRUE);
@Nullable
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
receivers.remove(instructionId);
if (receiverFuture != null) {
// Completing exceptionally has no effect if the future was already notified. In that case
// whatever registered the receiver needs to handle cancelling it.
receiverFuture.completeExceptionally(new PoisonedException());
if (!receiverFuture.isCompletedExceptionally()) {
try {
receiverFuture.get().close();
} catch (Exception e) {
LOG.warn("Unexpected error closing existing observer");
}
}
}
}

@VisibleForTesting
Expand Down Expand Up @@ -210,27 +268,42 @@ public void onNext(BeamFnApi.Elements value) {
}

private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) {
if (erroredInstructionIds.containsKey(instructionId)) {
LOG.debug("Ignoring inbound data for failed instruction {}", instructionId);
return;
}
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture =
receiverFuture(instructionId);
if (consumerFuture == null) {
LOG.debug(
"Received data for instruction {} which was poisoned, dropping data.", instructionId);
return;
}
CloseableFnDataReceiver<BeamFnApi.Elements> consumer;
if (!consumerFuture.isDone()) {
LOG.debug(
"Received data for instruction {} without consumer ready. "
+ "Waiting for consumer to be registered.",
instructionId);
}
CloseableFnDataReceiver<BeamFnApi.Elements> consumer;
try {
consumer = consumerFuture.get();

// The consumer may not be registered until the bundle processor is fully constructed so we
// conservatively set
// a high timeout. Poisoning will prevent this for occurring for consumers that will not be
// registered.
consumer = consumerFuture.get(3, TimeUnit.HOURS);
/*
* TODO: On failure we should fail any bundles that were impacted eagerly
* instead of relying on the Runner harness to do all the failure handling.
*/
} catch (TimeoutException e) {
LOG.error(
"Timed out waiting to observe consumer data stream for instruction {}",
instructionId,
e);
outboundObserver.onError(e);
return;
} catch (ExecutionException | InterruptedException e) {
if (e.getCause() instanceof PoisonedException) {
LOG.debug("Received data for poisoned instruction {}. Dropping input.", instructionId);
return;
}
LOG.error(
"Client interrupted during handling of data for instruction {}", instructionId, e);
outboundObserver.onError(e);
Expand All @@ -243,7 +316,7 @@ private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.E
try {
consumer.accept(value);
} catch (Exception e) {
erroredInstructionIds.put(instructionId, true);
poisonInstructionId(instructionId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public void testFailedProcessingCausesAdditionalInboundDataToBeIgnored() throws
DESCRIPTOR,
OutboundObserverFactory.clientDirect(),
inboundObserver -> TestStreams.withOnNext(outboundValues::add).build());
final AtomicBoolean closed = new AtomicBoolean();
multiplexer.registerConsumer(
DATA_INSTRUCTION_ID,
new CloseableFnDataReceiver<BeamFnApi.Elements>() {
Expand All @@ -290,7 +291,7 @@ public void flush() throws Exception {

@Override
public void close() throws Exception {
fail("Unexpected call");
closed.set(true);
}

@Override
Expand Down Expand Up @@ -320,6 +321,7 @@ public void accept(BeamFnApi.Elements input) throws Exception {
dataInboundValues,
Matchers.contains(
BeamFnApi.Elements.newBuilder().addData(data.setTransformId("A").build()).build()));
assertTrue(closed.get());
}

@Test
Expand Down
Loading

0 comments on commit fa7cde6

Please sign in to comment.