Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][misc] Sync commits from apache into 3.1_ds (26 Dec) #356

Merged
merged 5 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,7 @@ public void operationFailed(MetaStoreException e) {
executor.execute(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
callback.initializeFailed(createManagedLedgerException(rc));
return;
}
Expand Down Expand Up @@ -4192,7 +4193,7 @@ public Clock getClock() {
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx instanceof CompletableFuture) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
if (((CompletableFuture) ctx).complete(lh)) {
if (((CompletableFuture) ctx).complete(lh) || rc == BKException.Code.TimeoutException) {
return false;
} else {
if (rc == BKException.Code.OK) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4233,6 +4233,31 @@ public void testNonDurableCursorCreateForInactiveLedger() throws Exception {
assertNotNull(ml.newNonDurableCursor(Position));
}

@Test(timeOut = 60 * 1000)
public void testCreateDataLedgerTimeout() throws Exception {
String mlName = UUID.randomUUID().toString();
ManagedLedgerFactoryImpl factory = null;
ManagedLedger ml = null;
try {
factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMetadataOperationsTimeoutSeconds(5);
bkc.delay(10 * 1000);
ml = factory.open(mlName, config);
fail("Should get a timeout ex");
} catch (ManagedLedgerException ex) {
assertTrue(ex.getMessage().contains("timeout"));
} finally {
// cleanup.
if (ml != null) {
ml.delete();
}
if (factory != null) {
factory.shutdown();
}
}
}

/***
* When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout.
* But we should guarantee that the delay task should be canceled after the ledger create request responded.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2877,17 +2877,18 @@ protected CompletableFuture<Response> internalGetMessageById(long ledgerId, long
public void readEntryFailed(ManagedLedgerException exception,
Object ctx) {
if (exception instanceof ManagedLedgerException.LedgerNotExistException) {
throw new RestException(Status.NOT_FOUND, "Message id not found");
results.completeExceptionally(
new RestException(Status.NOT_FOUND, "Message id not found"));
}
throw new RestException(exception);
results.completeExceptionally(new RestException(exception));
}

@Override
public void readEntryComplete(Entry entry, Object ctx) {
try {
results.complete(generateResponseWithEntry(entry, (PersistentTopic) topic));
} catch (IOException exception) {
throw new RestException(exception);
results.completeExceptionally(new RestException(exception));
} finally {
if (entry != null) {
entry.release();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1377,6 +1377,10 @@ public void testGetMessageById() throws Exception {
Assert.expectThrows(PulsarAdminException.NotFoundException.class, () -> {
admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
});

Assert.expectThrows(PulsarAdminException.ServerSideErrorException.class, () -> {
admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId() + 10);
});
}

@Test
Expand Down
Loading
Loading