diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java index 7068664fd470..aa0dea80b0a1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/fn/data/BeamFnDataGrpcMultiplexer.java @@ -62,7 +62,6 @@ public class BeamFnDataGrpcMultiplexer implements AutoCloseable { receivers; private final Cache poisonedInstructionIds; - private static class PoisonedException extends RuntimeException { public PoisonedException() { super("Instruction poisoned"); @@ -107,36 +106,36 @@ public StreamObserver getOutboundObserver() { * instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a * single instruction id. * - *

The caller must either {@link #unregisterConsumer unregister the consumer} when all messages have - * been processed or {@link #poisonInstructionId(String) poison the instruction} if messages for the instruction - * should be dropped. + *

The caller must either {@link #unregisterConsumer unregister the consumer} when all messages + * have been processed or {@link #poisonInstructionId(String) poison the instruction} if messages + * for the instruction should be dropped. */ public void registerConsumer( String instructionId, CloseableFnDataReceiver receiver) { receivers.compute( - instructionId, - (unused, existing) -> { - if (existing != null) { - if (!existing.complete(receiver)) { - throw new IllegalArgumentException("Instruction id was registered twice"); - } - return existing; - } - if (poisonedInstructionIds.getIfPresent(instructionId) != null) { - throw new IllegalArgumentException("Instruction id was poisoned"); - } - return CompletableFuture.completedFuture(receiver); - }); + instructionId, + (unused, existing) -> { + if (existing != null) { + if (!existing.complete(receiver)) { + throw new IllegalArgumentException("Instruction id was registered twice"); + } + return existing; + } + if (poisonedInstructionIds.getIfPresent(instructionId) != null) { + throw new IllegalArgumentException("Instruction id was poisoned"); + } + return CompletableFuture.completedFuture(receiver); + }); } - /** - * Unregisters a previously registered consumer. - * */ + /** Unregisters a previously registered consumer. */ public void unregisterConsumer(String instructionId) { - @Nullable CompletableFuture> receiverFuture = + @Nullable + CompletableFuture> receiverFuture = receivers.remove(instructionId); if (receiverFuture != null && !receiverFuture.isDone()) { - // The future must have been inserted by the inbound observer since registerConsumer completes the future. + // The future must have been inserted by the inbound observer since registerConsumer completes + // the future. throw new IllegalArgumentException("Unregistering consumer which was not registered."); } } @@ -144,9 +143,9 @@ public void unregisterConsumer(String instructionId) { /** * Poisons an instruction id. * - *

Any records for the instruction on the inbound observer will be dropped for the next - * {@link #POISONED_INSTRUCTION_ID_CACHE_TIMEOUT}. - * */ + *

Any records for the instruction on the inbound observer will be dropped for the next {@link + * #POISONED_INSTRUCTION_ID_CACHE_TIMEOUT}. + */ public void poisonInstructionId(String instructionId) { poisonedInstructionIds.put(instructionId, Boolean.TRUE); @Nullable @@ -266,16 +265,17 @@ public void onNext(BeamFnApi.Elements value) { private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) { CloseableFnDataReceiver consumer; try { - CompletableFuture> consumerFuture = receivers.computeIfAbsent( + CompletableFuture> consumerFuture = + receivers.computeIfAbsent( instructionId, (unused) -> { if (poisonedInstructionIds.getIfPresent(instructionId) != null) { throw new PoisonedException(); } LOG.debug( - "Received data for instruction {} without consumer ready. " - + "Waiting for consumer to be registered.", - instructionId); + "Received data for instruction {} without consumer ready. " + + "Waiting for consumer to be registered.", + instructionId); return new CompletableFuture<>(); }); // The consumer may not be registered until the bundle processor is fully constructed so we