Skip to content

Commit

Permalink
[fix][ml] Topic load timeout due to ml data ledger future never finis…
Browse files Browse the repository at this point in the history
…hes (apache#23772)

### Motivation

**Background**
There is a mechanism that repeatedly prevents the callback of ML data ledger creation:
- Start a scheduled task to check whether the creation will be timeout.
- Received a callback
  - Check whether the future(`@param ctx` of `BK.createAsync`) has been done or not.
  - If done: it means the creation has timeout before the creation is completed
  - Otherwise: it is a real callback from BK.

**Issue:**
But the timeout event will call the same callback as above, then the steps are as follows, which you ca reproduce by the test `testCreateDataLedgerTimeout`:
- Start creating a data ledger
  - Call `BK.createAsync`
- Timeout
  - Mark the future(`@param ctx` of `BK.createAsync`) as completed exceptionally.
  - Trigger the callback related to ledger creation.
    - Check whether the future(`@param ctx` of `BK.createAsync`) has been done or not.
    - If done: do nothing.
- Creation is compelled.
  - Trigger the callback related to ledger creation.
    - Check whether the future(`@param ctx` of `BK.createAsync`) has been done or not.
    - If done: do nothing.
- Issue: The callback for ledger creation will never be called.

![Screenshot 2024-12-24 at 00 14 38](https://github.com/user-attachments/assets/44ed19d2-7238-45a4-9186-c127f6ed14f7)
![Screenshot 2024-12-24 at 00 14 08](https://github.com/user-attachments/assets/349f39ff-7e98-4a09-9af2-f80082339592)

### Modifications

Fix the issue

### Documentation

<!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->

- [ ] `doc` <!-- Your PR contains doc changes. -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

### Matching PR in forked repository

PR in forked repository: x

(cherry picked from commit 9699dc2)
(cherry picked from commit 2189b60)
  • Loading branch information
poorbarcode authored and srinath-ctds committed Dec 26, 2024
1 parent 32a8368 commit d4ecfaf
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ public void operationFailed(MetaStoreException e) {
executor.execute(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
callback.initializeFailed(createManagedLedgerException(rc));
return;
}
Expand Down Expand Up @@ -4192,7 +4193,7 @@ public Clock getClock() {
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx instanceof CompletableFuture) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
if (((CompletableFuture) ctx).complete(lh)) {
if (((CompletableFuture) ctx).complete(lh) || rc == BKException.Code.TimeoutException) {
return false;
} else {
if (rc == BKException.Code.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4233,6 +4233,31 @@ public void testNonDurableCursorCreateForInactiveLedger() throws Exception {
assertNotNull(ml.newNonDurableCursor(Position));
}

@Test(timeOut = 60 * 1000)
public void testCreateDataLedgerTimeout() throws Exception {
String mlName = UUID.randomUUID().toString();
ManagedLedgerFactoryImpl factory = null;
ManagedLedger ml = null;
try {
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMetadataOperationsTimeoutSeconds(5);
bkc.delay(10 * 1000);
ml = factory.open(mlName, config);
fail("Should get a timeout ex");
} catch (ManagedLedgerException ex) {
assertTrue(ex.getMessage().contains("timeout"));
} finally {
// cleanup.
if (ml != null) {
ml.delete();
}
if (factory != null) {
factory.shutdown();
}
}
}

/***
* When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout.
* But we should guarantee that the delay task should be canceled after the ledger create request responded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.collect.Lists;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -35,6 +36,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -70,6 +72,7 @@ public class PulsarMockBookKeeper extends BookKeeper {

final OrderedExecutor orderedExecutor;
final ExecutorService executor;
final ScheduledExecutorService scheduler;

@Override
public ClientConfiguration getConf() {
Expand Down Expand Up @@ -97,6 +100,7 @@ public static Collection<BookieId> getMockEnsemble() {
public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
this.orderedExecutor = orderedExecutor;
this.executor = orderedExecutor.chooseThread();
scheduler = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("mock-bk-scheduler"));
}

@Override
Expand Down Expand Up @@ -290,7 +294,7 @@ public void shutdown() {
for (PulsarMockLedgerHandle ledger : ledgers.values()) {
ledger.entries.clear();
}

scheduler.shutdown();
ledgers.clear();
}

Expand Down Expand Up @@ -331,6 +335,15 @@ synchronized CompletableFuture<Void> getProgrammedFailure() {
return failures.isEmpty() ? defaultResponse : failures.remove(0);
}

public void delay(long millis) {
CompletableFuture<Void> delayFuture = new CompletableFuture<>();
scheduler.schedule(() -> {
delayFuture.complete(null);
}, millis, TimeUnit.MILLISECONDS);
failures.add(delayFuture);
}


public void failNow(int rc) {
failAfter(0, rc);
}
Expand Down

0 comments on commit d4ecfaf

Please sign in to comment.