Skip to content

Commit

Permalink
Compare with lastConfirmedEntry if the ledger handle belongs to the m…
Browse files Browse the repository at this point in the history
…anaged ledger
  • Loading branch information
BewareMyPower committed Aug 12, 2024
1 parent 38a4c84 commit 0d143a9
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4050,6 +4050,8 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod
public static ManagedLedgerException createManagedLedgerException(Throwable t) {
if (t instanceof org.apache.bookkeeper.client.api.BKException) {
return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode());
} else if (t instanceof ManagedLedgerException) {
return (ManagedLedgerException) t;
} else if (t instanceof CompletionException
&& !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) {
return createManagedLedgerException(t.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) {
@Override
public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader,
final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(
ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry).thenAcceptAsync(
ledgerEntries -> {
List<Entry> entries = new ArrayList<>();
long totalSize = 0;
Expand Down Expand Up @@ -107,7 +107,7 @@ public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boole
@Override
public void asyncReadEntry(ReadHandle lh, Position position, AsyncCallbacks.ReadEntryCallback callback,
Object ctx) {
lh.readAsync(position.getEntryId(), position.getEntryId()).whenCompleteAsync(
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).whenCompleteAsync(
(ledgerEntries, exception) -> {
if (exception != null) {
ml.invalidateLedgerHandle(lh);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -250,7 +249,7 @@ private void asyncReadEntry0(ReadHandle lh, Position position, final ReadEntryCa
manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
callback.readEntryComplete(cachedEntry, ctx);
} else {
readAsync(lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ReadEntryUtils.readAsync(ml, lh, position.getEntryId(), position.getEntryId()).thenAcceptAsync(
ledgerEntries -> {
try {
Iterator<LedgerEntry> iterator = ledgerEntries.iterator();
Expand Down Expand Up @@ -430,7 +429,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
CompletableFuture<List<EntryImpl>> readFromStorage(ReadHandle lh,
long firstEntry, long lastEntry, boolean shouldCacheEntry) {
final int entriesToRead = (int) (lastEntry - firstEntry) + 1;
CompletableFuture<List<EntryImpl>> readResult = readAsync(lh, firstEntry, lastEntry)
CompletableFuture<List<EntryImpl>> readResult = ReadEntryUtils.readAsync(ml, lh, firstEntry, lastEntry)
.thenApply(
ledgerEntries -> {
requireNonNull(ml.getName());
Expand Down Expand Up @@ -513,22 +512,5 @@ public void invalidateEntriesBeforeTimestamp(long timestamp) {
manager.entriesRemoved(evictedPair.getRight(), evictedPair.getLeft());
}

private CompletableFuture<LedgerEntries> readAsync(ReadHandle handle, long firstEntry, long lastEntry) {
final var lastConfirmedEntry = ml.getLastConfirmedEntry();
if (lastConfirmedEntry == null) {
return CompletableFuture.failedFuture(new IllegalStateException("LastConfirmedEntry is null when reading "
+ handle.getId()));
}
if (handle.getId() > lastConfirmedEntry.getLedgerId()) {
return CompletableFuture.failedFuture(new IllegalStateException("LastConfirmedEntry is "
+ lastConfirmedEntry + " while trying to read ledger " + handle.getId()));
}
if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry > lastConfirmedEntry.getEntryId()) {
return CompletableFuture.failedFuture(new IllegalStateException("Last ConfirmedEntry is "
+ lastConfirmedEntry + " while trying to read entry " + lastEntry));
}
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}

private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.bookkeeper.mledger.impl.cache;

import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;

class ReadEntryUtils {

static CompletableFuture<LedgerEntries> readAsync(ManagedLedger ml, ReadHandle handle, long firstEntry,
long lastEntry) {
if (ml.getOptionalLedgerInfo(handle.getId()).isEmpty()) {
// The read handle comes from another managed ledger, in this case, we can only compare the entry range with
// the LAC of that read handle. Specifically, it happens when this method is called by a
// ReadOnlyManagedLedgerImpl object.
return handle.readAsync(firstEntry, lastEntry);
}
// Compare the entry range with the lastConfirmedEntry maintained by the managed ledger because the entry cache
// of `ShadowManagedLedgerImpl` reads entries via `ReadOnlyLedgerHandle`, which never updates `lastAddConfirmed`
final var lastConfirmedEntry = ml.getLastConfirmedEntry();
if (lastConfirmedEntry == null) {
return CompletableFuture.failedFuture(new ManagedLedgerException(
"LastConfirmedEntry is null when reading ledger " + handle.getId()));
}
if (handle.getId() > lastConfirmedEntry.getLedgerId()) {
return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading ledger " + handle.getId()));
}
if (handle.getId() == lastConfirmedEntry.getLedgerId() && lastEntry > lastConfirmedEntry.getEntryId()) {
return CompletableFuture.failedFuture(new ManagedLedgerException("LastConfirmedEntry is "
+ lastConfirmedEntry + " when reading entry " + lastEntry));
}
return handle.readUnconfirmedAsync(firstEntry, lastEntry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -45,6 +46,7 @@
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheDisabled;
import org.apache.bookkeeper.mledger.impl.cache.EntryCacheManager;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -392,6 +394,9 @@ void entryCacheDisabledAsyncReadEntry() throws Exception {
EntryCache entryCache = cacheManager.getEntryCache(ml1);

final CountDownLatch counter = new CountDownLatch(1);
when(ml1.getLastConfirmedEntry()).thenReturn(PositionFactory.create(1L, 1L));
when(ml1.getOptionalLedgerInfo(lh.getId())).thenReturn(Optional.of(mock(
MLDataFormats.ManagedLedgerInfo.LedgerInfo.class)));
entryCache.asyncReadEntry(lh, PositionFactory.create(1L,1L), new AsyncCallbacks.ReadEntryCallback() {
public void readEntryComplete(Entry entry, Object ctx) {
Assert.assertNotEquals(entry, null);
Expand All @@ -406,7 +411,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
}, null);
counter.await();

verify(lh).readAsync(anyLong(), anyLong());
verify(lh).readUnconfirmedAsync(anyLong(), anyLong());
}

}
Loading

0 comments on commit 0d143a9

Please sign in to comment.