diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 844f7f54a62a85..65df1154af6ad1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -197,7 +197,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private volatile CompletableFuture> deadLetterProducer; - private volatile Producer retryLetterProducer; + private volatile CompletableFuture> retryLetterProducer; private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock(); protected volatile boolean paused; @@ -599,6 +599,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a Map customProperties, long delayTime, TimeUnit unit) { + MessageId messageId = message.getMessageId(); if (messageId == null) { return FutureUtil.failedFuture(new PulsarClientException @@ -615,28 +616,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a } return FutureUtil.failedFuture(exception); } - if (delayTime < 0) { - delayTime = 0; - } - if (retryLetterProducer == null) { - createProducerLock.writeLock().lock(); - try { - if (retryLetterProducer == null) { - retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) - .topic(this.deadLetterPolicy.getRetryLetterTopic()) - .enableBatching(false) - .enableChunking(true) - .blockIfQueueFull(false) - .create(); - } - } catch (Exception e) { - log.error("Create retry letter producer exception with topic: {}", - deadLetterPolicy.getRetryLetterTopic(), e); - return FutureUtil.failedFuture(e); - } finally { - createProducerLock.writeLock().unlock(); - } - } + + initRetryLetterProducerIfNeeded(); CompletableFuture result = new CompletableFuture<>(); if (retryLetterProducer != null) { try { @@ -656,7 +637,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a } propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes)); propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME, - String.valueOf(unit.toMillis(delayTime))); + String.valueOf(unit.toMillis(delayTime < 0 ? 0 : delayTime))); MessageId finalMessageId = messageId; if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount() @@ -685,23 +666,29 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a }); } else { assert retryMessage != null; - TypedMessageBuilder typedMessageBuilderNew = retryLetterProducer - .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) - .value(retryMessage.getData()) - .properties(propertiesMap); - if (delayTime > 0) { - typedMessageBuilderNew.deliverAfter(delayTime, unit); - } - if (message.hasKey()) { - typedMessageBuilderNew.key(message.getKey()); - } - typedMessageBuilderNew.sendAsync() - .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) - .thenAccept(v -> result.complete(null)) - .exceptionally(ex -> { - result.completeExceptionally(ex); - return null; - }); + retryLetterProducer.thenAcceptAsync(rtlProducer -> { + TypedMessageBuilder typedMessageBuilderNew = rtlProducer + .newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get())) + .value(retryMessage.getData()) + .properties(propertiesMap); + if (delayTime > 0) { + typedMessageBuilderNew.deliverAfter(delayTime, unit); + } + if (message.hasKey()) { + typedMessageBuilderNew.key(message.getKey()); + } + typedMessageBuilderNew.sendAsync() + .thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null)) + .thenAccept(v -> result.complete(null)) + .exceptionally(ex -> { + result.completeExceptionally(ex); + return null; + }); + }, internalPinnedExecutor).exceptionally(ex -> { + result.completeExceptionally(ex); + retryLetterProducer = null; + return null; + }); } } catch (Exception e) { result.completeExceptionally(e); @@ -710,7 +697,7 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a MessageId finalMessageId = messageId; result.exceptionally(ex -> { log.error("Send to retry letter topic exception with topic: {}, messageId: {}", - retryLetterProducer.getTopic(), finalMessageId, ex); + this.deadLetterPolicy.getRetryLetterTopic(), finalMessageId, ex); Set messageIds = Collections.singleton(finalMessageId); unAckedMessageTracker.remove(finalMessageId); redeliverUnacknowledgedMessages(messageIds); @@ -1084,7 +1071,7 @@ public synchronized CompletableFuture closeAsync() { ArrayList> closeFutures = new ArrayList<>(4); closeFutures.add(closeFuture); if (retryLetterProducer != null) { - closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> { + closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { if (ex != null) { log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); } @@ -2200,6 +2187,25 @@ private void initDeadLetterProducerIfNeeded() { } } + private void initRetryLetterProducerIfNeeded() { + if (retryLetterProducer == null) { + createProducerLock.writeLock().lock(); + try { + if (retryLetterProducer == null) { + retryLetterProducer = client + .newProducer(Schema.AUTO_PRODUCE_BYTES(schema)) + .topic(this.deadLetterPolicy.getRetryLetterTopic()) + .enableBatching(false) + .enableChunking(true) + .blockIfQueueFull(false) + .createAsync(); + } + } finally { + createProducerLock.writeLock().unlock(); + } + } + } + @Override public void seek(MessageId messageId) throws PulsarClientException { try {