Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DO NOT MERGE: Test branch 3.1_fx #255

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -866,4 +866,11 @@ default void skipNonRecoverableLedger(long ledgerId){}
* @return whether this cursor is closed.
*/
boolean isClosed();

/**
* Called by the system to trigger perdiodic rollover in absence of activity.
*/
default boolean periodicRollover() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,11 @@ void asyncSetProperties(Map<String, String> properties, AsyncCallbacks.UpdatePro
*/
void trimConsumedLedgersInBackground(CompletableFuture<?> promise);

/**
* Rollover cursors in background if needed.
*/
default void rolloverCursorsInBackground() {}

/**
* If a ledger is lost, this ledger will be skipped after enabled "autoSkipNonRecoverableData", and the method is
* used to delete information about this ledger in the ManagedCursor.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3093,12 +3093,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
lh1.getId());
}

if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name);
}
startCreatingNewMetadataLedger();
}
rolloverLedgerIfNeeded(lh1);

mbean.persistToLedger(true);
mbean.addWriteCursorLedgerSize(data.length);
Expand All @@ -3116,6 +3111,35 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
}, null);
}

public boolean periodicRollover() {
LedgerHandle lh = cursorLedger;
if (State.Open.equals(STATE_UPDATER.get(this))
&& lh != null && lh.getLength() > 0) {
boolean triggered = rolloverLedgerIfNeeded(lh);
if (triggered) {
log.info("[{}] Periodic rollover triggered for cursor {} (length={} bytes)",
ledger.getName(), name, lh.getLength());
} else {
log.debug("[{}] Periodic rollover skipped for cursor {} (length={} bytes)",
ledger.getName(), name, lh.getLength());

}
return triggered;
}
return false;
}

boolean rolloverLedgerIfNeeded(LedgerHandle lh1) {
if (shouldCloseLedger(lh1)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Need to create new metadata ledger for cursor {}", ledger.getName(), name);
}
startCreatingNewMetadataLedger();
return true;
}
return false;
}

void persistPositionToMetaStore(MarkDeleteEntry mdEntry, final VoidCallback callback) {
final PositionImpl newPosition = mdEntry.newPosition;
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2828,6 +2828,15 @@ public void operationFailed(MetaStoreException e) {
}
}

@Override
public void rolloverCursorsInBackground() {
if (cursors.hasDurableCursors()) {
executor.execute(() -> {
cursors.forEach(ManagedCursor::periodicRollover);
});
}
}

protected void doDeleteLedgers(List<LedgerInfo> ledgersToDelete) {
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
Expand Down Expand Up @@ -4530,4 +4539,4 @@ public Position getTheSlowestNonDurationReadPosition() {
}
return theSlowestNonDurableReadPosition;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ public void recycle() {
entries = null;
nextReadPosition = null;
maxPosition = null;
recyclerHandle.recycle(this);
skipCondition = null;
recyclerHandle.recycle(this);
}

private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.awaitility.Awaitility;
import org.testng.annotations.Test;

public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
Expand Down Expand Up @@ -548,6 +549,42 @@ public void testChangeCrcType() throws Exception {
}
}

@Test
public void testPeriodicRollover() throws Exception {
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

int rolloverTimeForCursorInSeconds = 5;

@Cleanup("shutdown")
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1)
.setMetadataAckQuorumSize(1)
.setLedgerRolloverTimeout(rolloverTimeForCursorInSeconds);
ManagedLedger ledger = factory.open("my-ledger" + testName, config);
ManagedCursor cursor = ledger.openCursor("c1");

Position pos = ledger.addEntry("entry-0".getBytes());
ledger.addEntry("entry-1".getBytes());

List<Entry> entries = cursor.readEntries(2);
assertEquals(2, entries.size());
entries.forEach(Entry::release);

ManagedCursorImpl cursorImpl = (ManagedCursorImpl) cursor;
assertEquals(ManagedCursorImpl.State.NoLedger, cursorImpl.state);

// this creates the ledger
cursor.delete(pos);

Awaitility.await().until(() -> cursorImpl.state == ManagedCursorImpl.State.Open);

Thread.sleep(rolloverTimeForCursorInSeconds * 1000 + 1000);

long currentLedgerId = cursorImpl.getCursorLedger();
assertTrue(cursor.periodicRollover());
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
}

}
42 changes: 21 additions & 21 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ flexible messaging model and an intuitive client API.</description>
<!-- apache commons -->
<commons-compress.version>1.26.0</commons-compress.version>

<bookkeeper.version>4.16.4</bookkeeper.version>
<bookkeeper.version>4.16.5-fx-f4db1a24ab</bookkeeper.version>
<zookeeper.version>3.9.2</zookeeper.version>
<commons-cli.version>1.5.0</commons-cli.version>
<commons-text.version>1.10.0</commons-text.version>
Expand Down Expand Up @@ -2749,26 +2749,26 @@ flexible messaging model and an intuitive client API.</description>
<enabled>false</enabled>
</snapshots>
</repository>
<repository>
<id>datastax-releases</id>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-releases-local</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
<releases>
<enabled>true</enabled>
</releases>
</repository>
<repository>
<id>datastax-snapshots-local</id>
<url>https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
<releases>
<enabled>false</enabled>
</releases>
</repository>
<!-- <repository>-->
<!-- <id>datastax-releases</id>-->
<!-- <url>https://repo.aws.dsinternal.org/artifactory/datastax-releases-local</url>-->
<!-- <snapshots>-->
<!-- <enabled>false</enabled>-->
<!-- </snapshots>-->
<!-- <releases>-->
<!-- <enabled>true</enabled>-->
<!-- </releases>-->
<!-- </repository>-->
<!-- <repository>-->
<!-- <id>datastax-snapshots-local</id>-->
<!-- <url>https://repo.aws.dsinternal.org/artifactory/datastax-snapshots-local</url>-->
<!-- <snapshots>-->
<!-- <enabled>true</enabled>-->
<!-- </snapshots>-->
<!-- <releases>-->
<!-- <enabled>false</enabled>-->
<!-- </releases>-->
<!-- </repository>-->
<repository>
<id>public-datastax-releases</id>
<url>https://repo.datastax.com/datastax-public-releases-local</url>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,22 +197,21 @@ protected CompletableFuture<Void> deleteAsync(String path) {
}

protected CompletableFuture<Void> deleteIfExistsAsync(String path) {
return cache.exists(path).thenCompose(exists -> {
if (!exists) {
return CompletableFuture.completedFuture(null);
log.info("Deleting path: {}", path);
CompletableFuture<Void> future = new CompletableFuture<>();
cache.delete(path).whenComplete((ignore, ex) -> {
if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) {
log.info("Path {} did not exist in metadata store", path);
future.complete(null);
} else if (ex != null) {
log.info("Failed to delete path from metadata store: {}", path, ex);
future.completeExceptionally(ex);
} else {
log.info("Deleted path from metadata store: {}", path);
future.complete(null);
}
CompletableFuture<Void> future = new CompletableFuture<>();
cache.delete(path).whenComplete((ignore, ex) -> {
if (ex != null && ex.getCause() instanceof MetadataStoreException.NotFoundException) {
future.complete(null);
} else if (ex != null) {
future.completeExceptionally(ex);
} else {
future.complete(null);
}
});
return future;
});
return future;
}

protected boolean exists(String path) throws MetadataStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void deleteLocalPolicies(NamespaceName ns) throws MetadataStoreException
}

public CompletableFuture<Void> deleteLocalPoliciesAsync(NamespaceName ns) {
return deleteAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
return deleteIfExistsAsync(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
}

public CompletableFuture<Void> deleteLocalPoliciesTenantAsync(String tenant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public void deletePolicies(NamespaceName ns) throws MetadataStoreException{
}

public CompletableFuture<Void> deletePoliciesAsync(NamespaceName ns){
return deleteAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
return deleteIfExistsAsync(joinPath(BASE_POLICIES_PATH, ns.toString()));
}

public Optional<Policies> getPolicies(NamespaceName ns) throws MetadataStoreException{
Expand Down Expand Up @@ -152,10 +152,18 @@ public static boolean pathIsNamespaceLocalPolicies(String path) {
&& path.substring(LOCAL_POLICIES_ROOT.length() + 1).contains("/");
}

// clear resource of `/namespace/{namespaceName}` for zk-node
/**
* Clear resource of `/namespace/{namespaceName}` for zk-node.
* @param ns the namespace name
* @return a handle to the results of the operation
* */
//
public CompletableFuture<Void> deleteNamespaceAsync(NamespaceName ns) {
final String namespacePath = joinPath(NAMESPACE_BASE_PATH, ns.toString());
return deleteIfExistsAsync(namespacePath);
// please beware that this will delete all the children of the namespace
// including the ownership nodes (ephemeral nodes)
// see ServiceUnitUtils.path(ns) for the ownership node path
return getStore().deleteRecursive(namespacePath);
}

// clear resource of `/namespace/{tenant}` for zk-node
Expand Down Expand Up @@ -298,11 +306,14 @@ public CompletableFuture<Void> deletePartitionedTopicAsync(TopicName tn) {

public CompletableFuture<Void> clearPartitionedTopicMetadataAsync(NamespaceName namespaceName) {
final String globalPartitionedPath = joinPath(PARTITIONED_TOPIC_PATH, namespaceName.toString());
log.info("Clearing partitioned topic metadata for namespace {}, path is {}",
namespaceName, globalPartitionedPath);
return getStore().deleteRecursive(globalPartitionedPath);
}

public CompletableFuture<Void> clearPartitionedTopicTenantAsync(String tenant) {
final String partitionedTopicPath = joinPath(PARTITIONED_TOPIC_PATH, tenant);
log.info("Clearing partitioned topic metadata for tenant {}, path is {}", tenant, partitionedTopicPath);
return deleteIfExistsAsync(partitionedTopicPath);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,6 @@ public CompletableFuture<List<String>> getExistingPartitions(NamespaceName ns, T
);
}

public CompletableFuture<Void> deletePersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.delete(path, Optional.of(-1L));
}

public CompletableFuture<Void> createPersistentTopicAsync(TopicName topic) {
String path = MANAGED_LEDGER_PATH + "/" + topic.getPersistenceNamingEncoding();
return store.put(path, new byte[0], Optional.of(-1L))
Expand All @@ -93,38 +88,20 @@ public CompletableFuture<Boolean> persistentTopicExists(TopicName topic) {

public CompletableFuture<Void> clearNamespacePersistence(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns;
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing namespace persistence for namespace: {}, path {}", ns, path);
return store.deleteIfExists(path, Optional.empty());
}

public CompletableFuture<Void> clearDomainPersistence(NamespaceName ns) {
String path = MANAGED_LEDGER_PATH + "/" + ns + "/persistent";
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing domain persistence for namespace: {}, path {}", ns, path);
return store.deleteIfExists(path, Optional.empty());
}

public CompletableFuture<Void> clearTenantPersistence(String tenant) {
String path = MANAGED_LEDGER_PATH + "/" + tenant;
return store.exists(path)
.thenCompose(exists -> {
if (exists) {
return store.delete(path, Optional.empty());
} else {
return CompletableFuture.completedFuture(null);
}
});
log.info("Clearing tenant persistence for tenant: {}, path {}", tenant, path);
return store.deleteIfExists(path, Optional.empty());
}

void handleNotification(Notification notification) {
Expand Down
Loading
Loading