Skip to content

Commit

Permalink
[fix][client] Create the retry producer async (apache#23157)
Browse files Browse the repository at this point in the history
Co-authored-by: Ómar Yasin <[email protected]>
(cherry picked from commit a025938)
(cherry picked from commit 507d402)
  • Loading branch information
omarkj authored and nikhil-ctds committed Aug 16, 2024
1 parent 8be115b commit ec1012d
Showing 1 changed file with 49 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle

private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;

private volatile Producer<byte[]> retryLetterProducer;
private volatile CompletableFuture<Producer<byte[]>> retryLetterProducer;
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();

protected volatile boolean paused;
Expand Down Expand Up @@ -599,6 +599,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
Map<String, String> customProperties,
long delayTime,
TimeUnit unit) {

MessageId messageId = message.getMessageId();
if (messageId == null) {
return FutureUtil.failedFuture(new PulsarClientException
Expand All @@ -615,28 +616,8 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}
return FutureUtil.failedFuture(exception);
}
if (delayTime < 0) {
delayTime = 0;
}
if (retryLetterProducer == null) {
createProducerLock.writeLock().lock();
try {
if (retryLetterProducer == null) {
retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.enableBatching(false)
.enableChunking(true)
.blockIfQueueFull(false)
.create();
}
} catch (Exception e) {
log.error("Create retry letter producer exception with topic: {}",
deadLetterPolicy.getRetryLetterTopic(), e);
return FutureUtil.failedFuture(e);
} finally {
createProducerLock.writeLock().unlock();
}
}

initRetryLetterProducerIfNeeded();
CompletableFuture<Void> result = new CompletableFuture<>();
if (retryLetterProducer != null) {
try {
Expand All @@ -656,7 +637,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
}
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
String.valueOf(unit.toMillis(delayTime)));
String.valueOf(unit.toMillis(delayTime < 0 ? 0 : delayTime)));

MessageId finalMessageId = messageId;
if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount()
Expand Down Expand Up @@ -685,23 +666,29 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
});
} else {
assert retryMessage != null;
TypedMessageBuilder<byte[]> typedMessageBuilderNew = retryLetterProducer
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
typedMessageBuilderNew.sendAsync()
.thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
.exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
retryLetterProducer.thenAcceptAsync(rtlProducer -> {
TypedMessageBuilder<byte[]> typedMessageBuilderNew = rtlProducer
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
typedMessageBuilderNew.sendAsync()
.thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
.exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
retryLetterProducer = null;
return null;
});
}
} catch (Exception e) {
result.completeExceptionally(e);
Expand All @@ -710,7 +697,7 @@ protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType a
MessageId finalMessageId = messageId;
result.exceptionally(ex -> {
log.error("Send to retry letter topic exception with topic: {}, messageId: {}",
retryLetterProducer.getTopic(), finalMessageId, ex);
this.deadLetterPolicy.getRetryLetterTopic(), finalMessageId, ex);
Set<MessageId> messageIds = Collections.singleton(finalMessageId);
unAckedMessageTracker.remove(finalMessageId);
redeliverUnacknowledgedMessages(messageIds);
Expand Down Expand Up @@ -1084,7 +1071,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
closeFutures.add(closeFuture);
if (retryLetterProducer != null) {
closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing retryLetterProducer of consumer", ex);
}
Expand Down Expand Up @@ -2200,6 +2187,25 @@ private void initDeadLetterProducerIfNeeded() {
}
}

private void initRetryLetterProducerIfNeeded() {
if (retryLetterProducer == null) {
createProducerLock.writeLock().lock();
try {
if (retryLetterProducer == null) {
retryLetterProducer = client
.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.enableBatching(false)
.enableChunking(true)
.blockIfQueueFull(false)
.createAsync();
}
} finally {
createProducerLock.writeLock().unlock();
}
}
}

@Override
public void seek(MessageId messageId) throws PulsarClientException {
try {
Expand Down

0 comments on commit ec1012d

Please sign in to comment.