Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix] [ml] Fix orphan scheduled task for ledger create timeout check (a…
Browse files Browse the repository at this point in the history
…pache#21542)

### Motivation

When an ML tries to create a new ledger, it will create a delay task to check if the ledger create request is timeout<sup>[1]</sup>.

However, we should cancel this delay task after the request to create new ledgers is finished. Otherwise, these tasks will cost unnecessary CPU resources.

### Modifications

Cancel the scheduled task after the create ledger request is finished
  • Loading branch information
poorbarcode authored and Technoboy- committed Nov 11, 2023
1 parent 467e9c0 commit cf78b71
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -3985,7 +3984,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
*/
protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
CreateCallback cb, Map<String, byte[]> metadata) {
AtomicBoolean ledgerCreated = new AtomicBoolean(false);
CompletableFuture<LedgerHandle> ledgerFutureHook = new CompletableFuture<>();
Map<String, byte[]> finalMetadata = new HashMap<>();
finalMetadata.putAll(ledgerMetadata);
finalMetadata.putAll(metadata);
Expand All @@ -3998,33 +3997,39 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf
));
} catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
log.error("[{}] Serialize the placement configuration failed", name, e);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
return;
}
}
createdLedgerCustomMetadata = finalMetadata;

try {
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
} catch (Throwable cause) {
log.error("[{}] Encountered unexpected error when creating ledger",
name, cause);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
ledgerFutureHook.completeExceptionally(cause);
cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
return;
}
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {

ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
if (!ledgerFutureHook.isDone()
&& ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
if (log.isDebugEnabled()) {
log.debug("[{}] Timeout creating ledger", name);
}
cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger already created when timeout task is triggered", name);
}
}
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);

ledgerFutureHook.whenComplete((ignore, ex) -> {
timeoutChecker.cancel(false);
});
}

public Clock getClock() {
Expand All @@ -4033,16 +4038,12 @@ public Clock getClock() {

/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
*
* @param rc
* @param lh
* @param ctx
* @return
*/
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx instanceof AtomicBoolean) {
if (ctx instanceof CompletableFuture) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
if (((CompletableFuture) ctx).complete(lh)) {
return false;
} else {
if (rc == BKException.Code.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,13 @@
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand All @@ -90,6 +92,8 @@
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.BoundedScheduledExecutorService;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand Down Expand Up @@ -136,6 +140,7 @@
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -3086,9 +3091,9 @@ public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {

latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
assertTrue(ctxHolder.get() instanceof AtomicBoolean);
AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
assertFalse(ledgerCreated.get());
assertTrue(ctxHolder.get() instanceof CompletableFuture);
CompletableFuture ledgerCreateHook = (CompletableFuture) ctxHolder.get();
assertTrue(ledgerCreateHook.isCompletedExceptionally());

ledger.close();
}
Expand Down Expand Up @@ -4098,4 +4103,52 @@ public void testNonDurableCursorCreateForInactiveLedger() throws Exception {
Position Position = new PositionImpl(-1L, -1L);
assertNotNull(ml.newNonDurableCursor(Position));
}

/***
* When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout.
* But we should guarantee that the delay task should be canceled after the ledger create request responded.
*/
@Test
public void testNoOrphanScheduledTasksAfterCloseML() throws Exception {
String mlName = UUID.randomUUID().toString();
ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMetadataOperationsTimeoutSeconds(3600);

// Calculate pending task count.
long pendingTaskCountBefore = calculatePendingTaskCount(factory.getScheduledExecutor());
// Trigger create & close ML 1000 times.
for (int i = 0; i < 1000; i++) {
ManagedLedger ml = factory.open(mlName, config);
ml.close();
}
// Verify there is no orphan scheduled task.
long pendingTaskCountAfter = calculatePendingTaskCount(factory.getScheduledExecutor());
// Maybe there are other components also appended scheduled tasks, so leave 100 tasks to avoid flaky.
assertTrue(pendingTaskCountAfter - pendingTaskCountBefore < 100);
}

/**
* Calculate how many pending tasks in {@link OrderedScheduler}
*/
private long calculatePendingTaskCount(OrderedScheduler orderedScheduler) {
ExecutorService[] threads = WhiteboxImpl.getInternalState(orderedScheduler, "threads");
long taskCounter = 0;
for (ExecutorService thread : threads) {
BoundedScheduledExecutorService boundedScheduledExecutorService =
WhiteboxImpl.getInternalState(thread, "delegate");
BlockingQueue<Runnable> queue = WhiteboxImpl.getInternalState(boundedScheduledExecutorService, "queue");
for (Runnable r : queue) {
if (r instanceof FutureTask) {
FutureTask futureTask = (FutureTask) r;
if (!futureTask.isCancelled() && !futureTask.isDone()) {
taskCounter++;
}
} else {
taskCounter++;
}
}
}
return taskCounter;
}
}

0 comments on commit cf78b71

Please sign in to comment.