diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 37f84e988eb57..5113fb46063f4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; +import com.google.common.base.Preconditions; import com.google.common.collect.Collections2; import com.google.common.collect.Lists; import com.google.common.collect.Range; @@ -248,6 +249,8 @@ public class ManagedCursorImpl implements ManagedCursor { // active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger. private volatile boolean isActive = false; + protected int maxPositionChunkSize = 1024 * 1024; + static class MarkDeleteEntry { final PositionImpl newPosition; final MarkDeleteCallback callback; @@ -3305,6 +3308,7 @@ private void buildBatchEntryDeletionIndexInfoList( } void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) { + Preconditions.checkArgument(maxPositionChunkSize > 0, "maxPositionChunkSize mus be greater than zero"); long now = System.nanoTime(); PositionImpl position = mdEntry.newPosition; @@ -3325,10 +3329,9 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin long endCompress = System.nanoTime(); - int maxSize = 1024 * 1024; int offset = 0; final int len = data.readableBytes(); - int numParts = 1 + (len / maxSize); + int numParts = 1 + (len / maxPositionChunkSize); if (log.isDebugEnabled()) { log.debug("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, numParts {}", @@ -3351,7 +3354,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin int part = 0; while (part != numParts) { int remaining = len - offset; - int currentLen = Math.min(maxSize, remaining); + int currentLen = Math.min(maxPositionChunkSize, remaining); boolean isLast = part == numParts - 1; if (log.isDebugEnabled()) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 2afc0696982f4..8c34bcb2e6e25 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -75,6 +75,7 @@ import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.client.LedgerEntry; +import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; @@ -98,6 +99,7 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.common.api.proto.IntRange; import org.apache.pulsar.common.util.FutureUtil; @@ -3576,6 +3578,82 @@ public void operationFailed(ManagedLedgerException exception) { assertEquals(c.getReadPosition(), readPositionBeforeRecover); assertEquals(c.getNumberOfEntries(), 2L); } + + @Test(timeOut = 20000) + public void testRecoverCursorCorruptLastEntry() throws Exception { + ManagedLedger ml = factory.open("testRecoverCursorCorruptLastEntry"); + ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest); + // force chunking + c.maxPositionChunkSize = 2; + + // A new cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + c.resetCursor(PositionImpl.LATEST); + + // A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here. + assertEquals(c.getMarkDeletedPosition().getEntryId(), -1); + assertEquals(c.getReadPosition().getEntryId(), 0); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1); + + // Trigger the lastConfirmedEntry to move forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + c.resetCursor(PositionImpl.LATEST); + //corrupt last entry + LedgerHandle cursorLedger = (LedgerHandle)FieldUtils.readDeclaredField(c, "cursorLedger", true); + // can't parse json + cursorLedger.addEntry("{{".getBytes()); + // can't parse PositionInfo protobuf + cursorLedger.addEntry("aa".getBytes()); + + assertEquals(c.getMarkDeletedPosition().getEntryId(), 3); + assertEquals(c.getReadPosition().getEntryId(), 4); + assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3); + + // Publish messages to move the lastConfirmedEntry field forward + ml.addEntry(new byte[1]); + ml.addEntry(new byte[1]); + + final Position markDeleteBeforeRecover = c.getMarkDeletedPosition(); + final Position readPositionBeforeRecover = c.getReadPosition(); + + ManagedCursorInfo info = ManagedCursorInfo.newBuilder() + .setCursorsLedgerId(c.getCursorLedger()) + .setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId()) + .setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId()) + .setLastActive(0L) + .build(); + + CountDownLatch latch = new CountDownLatch(1); + AtomicBoolean failed = new AtomicBoolean(false); + c.recoverFromLedger(info, new VoidCallback() { + @Override + public void operationComplete() { + latch.countDown(); + } + + @Override + public void operationFailed(ManagedLedgerException exception) { + failed.set(true); + latch.countDown(); + } + }); + + latch.await(); + if (failed.get()) { + fail("Cursor recovery should not fail"); + } + assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover); + assertEquals(c.getReadPosition(), readPositionBeforeRecover); + assertEquals(c.getNumberOfEntries(), 2L); + } + @Test void testAlwaysInactive() throws Exception { ManagedLedger ml = factory.open("testAlwaysInactive");