Skip to content

Commit

Permalink
[improve] [broker] improve read entry error log for troubleshooting (a…
Browse files Browse the repository at this point in the history
…pache#21169)

(cherry picked from commit 65706c6)
(cherry picked from commit faee123)
  • Loading branch information
poorbarcode authored and mukesh-ctds committed Apr 15, 2024
1 parent 8d1e249 commit 7fe00d4
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,11 @@ public void readEntryComplete(Entry entry, Object ctx) {
result.entry = entry;
counter.countDown();
}

@Override
public String toString() {
return String.format("Cursor [{}] get Nth entry", ManagedCursorImpl.this);
}
}, null);

counter.await(ledger.getConfig().getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
Expand Down Expand Up @@ -1536,6 +1541,11 @@ public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx)
callback.readEntriesFailed(exception.get(), ctx);
}
}

@Override
public String toString() {
return String.format("Cursor [{}] async replay entries", ManagedCursorImpl.this);
}
};

positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
log.error("Error read entry for position {}", nextPos, exception);
future.completeExceptionally(exception);
}

@Override
public String toString() {
return String.format("ML [{}] get earliest message publish time of pos",
ManagedLedgerImpl.this.name);
}
}, null);

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallba
this.getLedgerHandle(position.getLedgerId())
.thenAccept((ledger) -> asyncReadEntry(ledger, position, callback, ctx))
.exceptionally((ex) -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", this.name, position,
ex.getMessage());
log.error("[{}] Error opening ledger for reading at position {} - {}. Op: {}", this.name,
position, ex.getMessage(), callback);
callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2780,6 +2780,12 @@ public void readEntryComplete(Entry entry, Object ctx) {
}
}
}

@Override
public String toString() {
return String.format("Topic [{}] get entry batch size",
PersistentTopicsBase.this.topicName);
}
}, null);
} catch (NullPointerException npe) {
batchSizeFuture.completeExceptionally(new RestException(Status.NOT_FOUND, "Message not found"));
Expand Down Expand Up @@ -2881,6 +2887,12 @@ public void readEntryComplete(Entry entry, Object ctx) {
}
}
}

@Override
public String toString() {
return String.format("Topic [{}] internal get message by id",
PersistentTopicsBase.this.topicName);
}
}, null);
return results;
});
Expand Down Expand Up @@ -3087,6 +3099,12 @@ public void readEntryComplete(Entry entry, Object ctx) {
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}

@Override
public String toString() {
return String.format("Topic [{}] internal examine message async",
PersistentTopicsBase.this.topicName);
}
}, null);
return future;
} catch (ManagedLedgerException exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
public void readEntryComplete(Entry entry, Object ctx) {
future.complete(entry);
}

@Override
public String toString() {
return String.format("Replication [{}] peek Nth message",
PersistentReplicator.this.producer.getProducerName());
}
}, null);

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
public void readEntryComplete(Entry entry, Object ctx) {
future.complete(entry);
}

@Override
public String toString() {
return String.format("Subscription [{}-{}] async replay entries", PersistentSubscription.this.topicName,
PersistentSubscription.this.subName);
}
}, null);

return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3123,7 +3123,7 @@ public boolean isOldestMessageExpired(ManagedCursor cursor, int messageTTLInSeco
// if AutoSkipNonRecoverableData is set to true, just return true here.
return true;
} else {
log.warn("[{}] Error while getting the oldest message", topic, e);
log.warn("[{}] [{}] Error while getting the oldest message", topic, cursor.toString(), e);
}
} finally {
if (entry != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,12 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
hasInvalidIndex.set(true);
}
}

@Override
public String toString() {
return String.format("Transaction buffer [{}] recover from snapshot",
SnapshotSegmentAbortedTxnProcessorImpl.this.topic.getName());
}
}, null);
});
openManagedLedgerAndHandleSegmentsFuture.complete(null);
Expand Down

0 comments on commit 7fe00d4

Please sign in to comment.