Skip to content

Commit

Permalink
Changes to reduce memory pinned while iterating through state backed …
Browse files Browse the repository at this point in the history
…iterable:

- remove reference to completed encoded input page from decoder once we have read it.
- re-read from cache after loading the next page to give eviction a chance to remove blocks
  • Loading branch information
scwhittle committed Nov 11, 2024
1 parent 27908d5 commit d410d20
Showing 1 changed file with 29 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public class BeamFnDataGrpcMultiplexer implements AutoCloseable {
receivers;
private final Cache</*instructionId=*/ String, /*unused=*/ Boolean> poisonedInstructionIds;


private static class PoisonedException extends RuntimeException {
public PoisonedException() {
super("Instruction poisoned");
Expand Down Expand Up @@ -107,46 +106,46 @@ public StreamObserver<BeamFnApi.Elements> getOutboundObserver() {
* instruction ids ensuring that the receiver will only see {@link BeamFnApi.Elements} with a
* single instruction id.
*
* <p>The caller must either {@link #unregisterConsumer unregister the consumer} when all messages have
* been processed or {@link #poisonInstructionId(String) poison the instruction} if messages for the instruction
* should be dropped.
* <p>The caller must either {@link #unregisterConsumer unregister the consumer} when all messages
* have been processed or {@link #poisonInstructionId(String) poison the instruction} if messages
* for the instruction should be dropped.
*/
public void registerConsumer(
String instructionId, CloseableFnDataReceiver<BeamFnApi.Elements> receiver) {
receivers.compute(
instructionId,
(unused, existing) -> {
if (existing != null) {
if (!existing.complete(receiver)) {
throw new IllegalArgumentException("Instruction id was registered twice");
}
return existing;
}
if (poisonedInstructionIds.getIfPresent(instructionId) != null) {
throw new IllegalArgumentException("Instruction id was poisoned");
}
return CompletableFuture.completedFuture(receiver);
});
instructionId,
(unused, existing) -> {
if (existing != null) {
if (!existing.complete(receiver)) {
throw new IllegalArgumentException("Instruction id was registered twice");
}
return existing;
}
if (poisonedInstructionIds.getIfPresent(instructionId) != null) {
throw new IllegalArgumentException("Instruction id was poisoned");
}
return CompletableFuture.completedFuture(receiver);
});
}

/**
* Unregisters a previously registered consumer.
* */
/** Unregisters a previously registered consumer. */
public void unregisterConsumer(String instructionId) {
@Nullable CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
@Nullable
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> receiverFuture =
receivers.remove(instructionId);
if (receiverFuture != null && !receiverFuture.isDone()) {
// The future must have been inserted by the inbound observer since registerConsumer completes the future.
// The future must have been inserted by the inbound observer since registerConsumer completes
// the future.
throw new IllegalArgumentException("Unregistering consumer which was not registered.");
}
}

/**
* Poisons an instruction id.
*
* <p> Any records for the instruction on the inbound observer will be dropped for the next
* {@link #POISONED_INSTRUCTION_ID_CACHE_TIMEOUT}.
* */
* <p>Any records for the instruction on the inbound observer will be dropped for the next {@link
* #POISONED_INSTRUCTION_ID_CACHE_TIMEOUT}.
*/
public void poisonInstructionId(String instructionId) {
poisonedInstructionIds.put(instructionId, Boolean.TRUE);
@Nullable
Expand Down Expand Up @@ -266,16 +265,17 @@ public void onNext(BeamFnApi.Elements value) {
private void forwardToConsumerForInstructionId(String instructionId, BeamFnApi.Elements value) {
CloseableFnDataReceiver<BeamFnApi.Elements> consumer;
try {
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture = receivers.computeIfAbsent(
CompletableFuture<CloseableFnDataReceiver<BeamFnApi.Elements>> consumerFuture =
receivers.computeIfAbsent(
instructionId,
(unused) -> {
if (poisonedInstructionIds.getIfPresent(instructionId) != null) {
throw new PoisonedException();
}
LOG.debug(
"Received data for instruction {} without consumer ready. "
+ "Waiting for consumer to be registered.",
instructionId);
"Received data for instruction {} without consumer ready. "
+ "Waiting for consumer to be registered.",
instructionId);
return new CompletableFuture<>();
});
// The consumer may not be registered until the bundle processor is fully constructed so we
Expand Down

0 comments on commit d410d20

Please sign in to comment.