Skip to content

Commit

Permalink
Changes to reduce memory pinned while iterating through state backed …
Browse files Browse the repository at this point in the history
…iterable: (#32961)

- remove reference to completed encoded input page from decoder once we have read it.
- re-read from cache after loading the next page to give eviction a chance to remove blocks
  • Loading branch information
scwhittle authored Nov 11, 2024
1 parent 126f278 commit d760383
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ public WeightedList<T> decodeFromChunkBoundaryToChunkBoundary() {
T next = next();
rvals.add(next);
}
// We don't support seeking backwards so release the memory of the last
// page if it is completed.
if (inbound.currentStream.available() == 0) {
inbound.position = 0;
inbound.currentStream = EMPTY_STREAM;
}

// Uses the size of the ByteString as an approximation for the heap size occupied by the
// page, considering an overhead of {@link BYTES_LIST_ELEMENT_OVERHEAD} for each element.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public long getWeight() {
// many different state subcaches.
return 0;
}
};
}

/** A mutable iterable that supports prefetch and is backed by a cache. */
static class CachingStateIterable<T> extends PrefetchableIterables.Default<T> {
Expand Down Expand Up @@ -138,8 +138,8 @@ public long getWeight() {
private static <T> long sumWeight(List<Block<T>> blocks) {
try {
long sum = 0;
for (int i = 0; i < blocks.size(); ++i) {
sum = Math.addExact(sum, blocks.get(i).getWeight());
for (Block<T> block : blocks) {
sum = Math.addExact(sum, block.getWeight());
}
return sum;
} catch (ArithmeticException e) {
Expand Down Expand Up @@ -437,50 +437,59 @@ public boolean hasNext() {
if (currentBlock.getValues().size() > currentCachedBlockValueIndex) {
return true;
}
if (currentBlock.getNextToken() == null) {
final ByteString nextToken = currentBlock.getNextToken();
if (nextToken == null) {
return false;
}
Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
boolean isFirstBlock = ByteString.EMPTY.equals(currentBlock.getNextToken());
// Release the block while we are loading the next one.
currentBlock =
Block.fromValues(new WeightedList<>(Collections.emptyList(), 0L), ByteString.EMPTY);

@Nullable Blocks<T> existing = cache.peek(IterableCacheKey.INSTANCE);
boolean isFirstBlock = ByteString.EMPTY.equals(nextToken);
if (existing == null) {
currentBlock = loadNextBlock(currentBlock.getNextToken());
currentBlock = loadNextBlock(nextToken);
if (isFirstBlock) {
cache.put(
IterableCacheKey.INSTANCE,
new BlocksPrefix<>(Collections.singletonList(currentBlock)));
}
} else if (isFirstBlock) {
currentBlock = existing.getBlocks().get(0);
} else {
if (isFirstBlock) {
currentBlock = existing.getBlocks().get(0);
} else {
checkState(
existing instanceof BlocksPrefix,
"Unexpected blocks type %s, expected a %s.",
existing.getClass(),
BlocksPrefix.class);
List<Block<T>> blocks = existing.getBlocks();
int currentBlockIndex = 0;
for (; currentBlockIndex < blocks.size(); ++currentBlockIndex) {
if (currentBlock
.getNextToken()
.equals(blocks.get(currentBlockIndex).getNextToken())) {
break;
}
checkState(
existing instanceof BlocksPrefix,
"Unexpected blocks type %s, expected a %s.",
existing.getClass(),
BlocksPrefix.class);
List<Block<T>> blocks = existing.getBlocks();
int currentBlockIndex = 0;
for (; currentBlockIndex < blocks.size(); ++currentBlockIndex) {
if (nextToken.equals(blocks.get(currentBlockIndex).getNextToken())) {
break;
}
// Load the next block from cache if it was found.
if (currentBlockIndex + 1 < blocks.size()) {
currentBlock = blocks.get(currentBlockIndex + 1);
} else {
// Otherwise load the block from state API.
currentBlock = loadNextBlock(currentBlock.getNextToken());

// Append this block to the existing set of blocks if it is logically the next one.
if (currentBlockIndex == blocks.size() - 1) {
List<Block<T>> newBlocks = new ArrayList<>(currentBlockIndex + 1);
newBlocks.addAll(blocks);
newBlocks.add(currentBlock);
cache.put(IterableCacheKey.INSTANCE, new BlocksPrefix<>(newBlocks));
}
}
// Take the next block from the cache if it was found.
if (currentBlockIndex + 1 < blocks.size()) {
currentBlock = blocks.get(currentBlockIndex + 1);
} else {
// Otherwise load the block from state API.
// Remove references on the cached values while we are loading the next block.
existing = null;
blocks = null;
currentBlock = loadNextBlock(nextToken);
existing = cache.peek(IterableCacheKey.INSTANCE);
// Append this block to the existing set of blocks if it is logically the next one
// according to the
// tokens.
if (existing != null
&& !existing.getBlocks().isEmpty()
&& nextToken.equals(
existing.getBlocks().get(existing.getBlocks().size() - 1).getNextToken())) {
List<Block<T>> newBlocks = new ArrayList<>(currentBlockIndex + 1);
newBlocks.addAll(existing.getBlocks());
newBlocks.add(currentBlock);
cache.put(IterableCacheKey.INSTANCE, new BlocksPrefix<>(newBlocks));
}
}
}
Expand Down

0 comments on commit d760383

Please sign in to comment.