diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 0e638881c70db..e130c254a8b7f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -530,6 +530,7 @@ public void operationFailed(MetaStoreException e) { executor.execute(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { + log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc)); callback.initializeFailed(createManagedLedgerException(rc)); return; } @@ -4192,7 +4193,7 @@ public Clock getClock() { protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) { if (ctx instanceof CompletableFuture) { // ledger-creation is already timed out and callback is already completed so, delete this ledger and return. - if (((CompletableFuture) ctx).complete(lh)) { + if (((CompletableFuture) ctx).complete(lh) || rc == BKException.Code.TimeoutException) { return false; } else { if (rc == BKException.Code.OK) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index de51f63e06ccd..8620604e41bf0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -4233,6 +4233,31 @@ public void testNonDurableCursorCreateForInactiveLedger() throws Exception { assertNotNull(ml.newNonDurableCursor(Position)); } + @Test(timeOut = 60 * 1000) + public void testCreateDataLedgerTimeout() throws Exception { + String mlName = UUID.randomUUID().toString(); + ManagedLedgerFactoryImpl factory = null; + ManagedLedger ml = null; + try { + factory = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMetadataOperationsTimeoutSeconds(5); + bkc.delay(10 * 1000); + ml = factory.open(mlName, config); + fail("Should get a timeout ex"); + } catch (ManagedLedgerException ex) { + assertTrue(ex.getMessage().contains("timeout")); + } finally { + // cleanup. + if (ml != null) { + ml.delete(); + } + if (factory != null) { + factory.shutdown(); + } + } + } + /*** * When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout. * But we should guarantee that the delay task should be canceled after the ledger create request responded. diff --git a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java index 4516cfea01f05..620b1c6fb6a2a 100644 --- a/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java +++ b/testmocks/src/main/java/org/apache/bookkeeper/client/PulsarMockBookKeeper.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.Lists; +import io.netty.util.concurrent.DefaultThreadFactory; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -35,6 +36,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -70,6 +72,7 @@ public class PulsarMockBookKeeper extends BookKeeper { final OrderedExecutor orderedExecutor; final ExecutorService executor; + final ScheduledExecutorService scheduler; @Override public ClientConfiguration getConf() { @@ -97,6 +100,7 @@ public static Collection getMockEnsemble() { public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception { this.orderedExecutor = orderedExecutor; this.executor = orderedExecutor.chooseThread(); + scheduler = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("mock-bk-scheduler")); } @Override @@ -290,7 +294,7 @@ public void shutdown() { for (PulsarMockLedgerHandle ledger : ledgers.values()) { ledger.entries.clear(); } - + scheduler.shutdown(); ledgers.clear(); } @@ -331,6 +335,15 @@ synchronized CompletableFuture getProgrammedFailure() { return failures.isEmpty() ? defaultResponse : failures.remove(0); } + public void delay(long millis) { + CompletableFuture delayFuture = new CompletableFuture<>(); + scheduler.schedule(() -> { + delayFuture.complete(null); + }, millis, TimeUnit.MILLISECONDS); + failures.add(delayFuture); + } + + public void failNow(int rc) { failAfter(0, rc); }