Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed May 23, 2024
1 parent c1ee50d commit 9b3b4d1
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
Expand Down Expand Up @@ -248,6 +249,8 @@ public class ManagedCursorImpl implements ManagedCursor {
// active state cache in ManagedCursor. It should be in sync with the state in activeCursors in ManagedLedger.
private volatile boolean isActive = false;

protected int maxPositionChunkSize = 1024 * 1024;

static class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
Expand Down Expand Up @@ -3305,6 +3308,7 @@ private void buildBatchEntryDeletionIndexInfoList(
}

void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
Preconditions.checkArgument(maxPositionChunkSize > 0, "maxPositionChunkSize mus be greater than zero");
long now = System.nanoTime();
PositionImpl position = mdEntry.newPosition;

Expand All @@ -3325,10 +3329,9 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin

long endCompress = System.nanoTime();

int maxSize = 1024 * 1024;
int offset = 0;
final int len = data.readableBytes();
int numParts = 1 + (len / maxSize);
int numParts = 1 + (len / maxPositionChunkSize);

if (log.isDebugEnabled()) {
log.debug("[{}] Cursor {} Appending to ledger={} position={} data size {} bytes, numParts {}",
Expand All @@ -3351,7 +3354,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
int part = 0;
while (part != numParts) {
int remaining = len - offset;
int currentLen = Math.min(maxSize, remaining);
int currentLen = Math.min(maxPositionChunkSize, remaining);
boolean isLast = part == numParts - 1;

if (log.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand All @@ -98,6 +99,7 @@
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.common.util.FutureUtil;
Expand Down Expand Up @@ -3576,6 +3578,82 @@ public void operationFailed(ManagedLedgerException exception) {
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 2L);
}

@Test(timeOut = 20000)
public void testRecoverCursorCorruptLastEntry() throws Exception {
ManagedLedger ml = factory.open("testRecoverCursorCorruptLastEntry");
ManagedCursorImpl c = (ManagedCursorImpl) ml.openCursor("sub", CommandSubscribe.InitialPosition.Latest);
// force chunking
c.maxPositionChunkSize = 2;

// A new cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

c.resetCursor(PositionImpl.LATEST);

// A reset cursor starts out with these values. The rest of the test assumes this, so we assert it here.
assertEquals(c.getMarkDeletedPosition().getEntryId(), -1);
assertEquals(c.getReadPosition().getEntryId(), 0);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), -1);

// Trigger the lastConfirmedEntry to move forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

c.resetCursor(PositionImpl.LATEST);
//corrupt last entry
LedgerHandle cursorLedger = (LedgerHandle)FieldUtils.readDeclaredField(c, "cursorLedger", true);
// can't parse json
cursorLedger.addEntry("{{".getBytes());
// can't parse PositionInfo protobuf
cursorLedger.addEntry("aa".getBytes());

assertEquals(c.getMarkDeletedPosition().getEntryId(), 3);
assertEquals(c.getReadPosition().getEntryId(), 4);
assertEquals(ml.getLastConfirmedEntry().getEntryId(), 3);

// Publish messages to move the lastConfirmedEntry field forward
ml.addEntry(new byte[1]);
ml.addEntry(new byte[1]);

final Position markDeleteBeforeRecover = c.getMarkDeletedPosition();
final Position readPositionBeforeRecover = c.getReadPosition();

ManagedCursorInfo info = ManagedCursorInfo.newBuilder()
.setCursorsLedgerId(c.getCursorLedger())
.setMarkDeleteLedgerId(markDeleteBeforeRecover.getLedgerId())
.setMarkDeleteEntryId(markDeleteBeforeRecover.getEntryId())
.setLastActive(0L)
.build();

CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean failed = new AtomicBoolean(false);
c.recoverFromLedger(info, new VoidCallback() {
@Override
public void operationComplete() {
latch.countDown();
}

@Override
public void operationFailed(ManagedLedgerException exception) {
failed.set(true);
latch.countDown();
}
});

latch.await();
if (failed.get()) {
fail("Cursor recovery should not fail");
}
assertEquals(c.getMarkDeletedPosition(), markDeleteBeforeRecover);
assertEquals(c.getReadPosition(), readPositionBeforeRecover);
assertEquals(c.getNumberOfEntries(), 2L);
}

@Test
void testAlwaysInactive() throws Exception {
ManagedLedger ml = factory.open("testAlwaysInactive");
Expand Down

0 comments on commit 9b3b4d1

Please sign in to comment.