Skip to content

Commit

Permalink
[improve][storage] Periodically rollover Cursor ledgers (#257)
Browse files Browse the repository at this point in the history
  • Loading branch information
eolivelli committed May 1, 2024
1 parent b94cec9 commit c61decc
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -866,4 +866,11 @@ default void skipNonRecoverableLedger(long ledgerId){}
* @return whether this cursor is closed.
*/
boolean isClosed();

/**
* Called by the system to trigger perdiodic rollover in absence of activity.
*/
default boolean periodicRollover() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,11 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
*/
void trimConsumedLedgersInBackground(CompletableFuture<?> promise);

/**
* Rollover cursors in background if needed.
*/
default void rolloverCursorsInBackground() {}

/**
* If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is
* used to delete information about this ledger in the ManagedCursor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3093,12 +3093,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
lh1.getId());
}

if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name);
}
startCreatingNewMetadataLedger();
}
rolloverLedgerIfNeeded(lh1);

mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.length);
Expand All @@ -3116,6 +3111,35 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}, null);
}

public boolean periodicRollover() {
LedgerHandle lh = cursorLedger;
if (State.Open.equals(STATE_UPDATER.get(this))
&& lh != null && lh.getLength() > 0) {
boolean triggered = rolloverLedgerIfNeeded(lh);
if (triggered) {
log.info("[{}] Periodic rollover triggered for cursor {} (length={} bytes)",
ledger.getName(), name, lh.getLength());
} else {
log.debug("[{}] Periodic rollover skipped for cursor {} (length={} bytes)",
ledger.getName(), name, lh.getLength());

}
return triggered;
}
return false;
}

boolean rolloverLedgerIfNeeded(LedgerHandle lh1) {
if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name);
}
startCreatingNewMetadataLedger();
return true;
}
return false;
}

void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) {
final PositionImpl newPosition = mdEntry.newPosition;
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,15 @@ public void operationFailed(MetaStoreException e) {
}
}

@Override
public void rolloverCursorsInBackground() {
if (cursors.hasDurableCursors()) {
executor.execute(() -> {
cursors.forEach(ManagedCursor::periodicRollover);
});
}
}

protected void doDeleteLedgers(List<LedgerInfo> ledgersToDelete) {
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
Expand Down Expand Up @@ -4530,4 +4539,4 @@ public Position getTheSlowestNonDurationReadPosition() {
}
return theSlowestNonDurableReadPosition;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
Expand Down Expand Up @@ -548,6 +549,42 @@ public void testChangeCrcType() throws Exception {
}
}

@Test
public void testPeriodicRollover() throws Exception {
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

int rolloverTimeForCursorInSeconds = 5;

@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1)
.setMetadataAckQuorumSize(1)
.setLedgerRolloverTimeout(rolloverTimeForCursorInSeconds);
ManagedLedger ledger = factory.open("my-ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("c1");

Position pos = ledger.addEntry("entry-0".getBytes());
ledger.addEntry("entry-1".getBytes());

List<Entry> entries = cursor.readEntries(2);
assertEquals(2, entries.size());
entries.forEach(Entry::release);

ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor;
assertEquals(ManagedCursorImpl.State.NoLedger, cursorImpl.state);

// this creates the ledger
cursor.delete(pos);

Awaitility.await().until(() -> cursorImpl.state == ManagedCursorImpl.State.Open);

Thread.sleep(rolloverTimeForCursorInSeconds * 1000 + 1000);

long currentLedgerId = cursorImpl.getCursorLedger();
assertTrue(cursor.periodicRollover());
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2114,6 +2114,7 @@ private void checkConsumedLedgers() {
Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent(
managedLedger -> {
managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
managedLedger.rolloverCursorsInBackground();
}
);
}
Expand Down

0 comments on commit c61decc

Please sign in to comment.