diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java index f68c4e9589558..2e10581b35249 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ProducerCache.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.common.util.FutureUtil; @Slf4j @@ -61,24 +62,41 @@ record ProducerCacheKey(CacheArea cacheArea, String topic, Object additionalKey) private final CopyOnWriteArrayList> closeFutures = new CopyOnWriteArrayList<>(); public ProducerCache() { - Caffeine builder = Caffeine.newBuilder() + Caffeine> builder = Caffeine.newBuilder() .scheduler(Scheduler.systemScheduler()) - .removalListener((key, producer, cause) -> { + .>removalListener((key, producer, cause) -> { log.info("Closing producer for topic {}, cause {}", key.topic(), cause); CompletableFuture closeFuture = - producer.flushAsync() + CompletableFuture.supplyAsync(() -> producer.flushAsync(), Runnable::run) .orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS) .exceptionally(ex -> { - log.error("Error flushing producer for topic {}", key.topic(), ex); + Throwable unwrappedCause = FutureUtil.unwrapCompletionException(ex); + if (unwrappedCause instanceof PulsarClientException.AlreadyClosedException) { + log.error( + "Error flushing producer for topic {} due to " + + "AlreadyClosedException", + key.topic()); + } else { + log.error("Error flushing producer for topic {}", key.topic(), + unwrappedCause); + } return null; }).thenCompose(__ -> producer.closeAsync().orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, - TimeUnit.SECONDS) - .exceptionally(ex -> { - log.error("Error closing producer for topic {}", key.topic(), - ex); - return null; - })); + TimeUnit.SECONDS) + ).exceptionally(ex -> { + Throwable unwrappedCause = FutureUtil.unwrapCompletionException(ex); + if (unwrappedCause instanceof PulsarClientException.AlreadyClosedException) { + log.error( + "Error closing producer for topic {} due to " + + "AlreadyClosedException", + key.topic()); + } else { + log.error("Error closing producer for topic {}", key.topic(), + unwrappedCause); + } + return null; + }); if (closed.get()) { closeFutures.add(closeFuture); } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java new file mode 100644 index 0000000000000..af95a7901b6e8 --- /dev/null +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ProducerCacheTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.functions.instance; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.testng.annotations.Test; + +public class ProducerCacheTest { + + @Test + public void shouldTolerateAlreadyClosedExceptionInClose() { + ProducerCache cache = new ProducerCache(); + Producer producer = mock(Producer.class); + when(producer.flushAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(producer.closeAsync()).thenReturn( + CompletableFuture.failedFuture(new PulsarClientException.AlreadyClosedException("Already closed"))); + cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key", + () -> (Producer) producer); + cache.close(); + } + + @Test + public void shouldTolerateRuntimeExceptionInClose() { + ProducerCache cache = new ProducerCache(); + Producer producer = mock(Producer.class); + when(producer.flushAsync()).thenReturn(CompletableFuture.completedFuture(null)); + when(producer.closeAsync()).thenThrow(new RuntimeException("Some exception")); + cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key", + () -> (Producer) producer); + cache.close(); + } + + @Test + public void shouldTolerateRuntimeExceptionInFlush() { + ProducerCache cache = new ProducerCache(); + Producer producer = mock(Producer.class); + when(producer.flushAsync()).thenThrow(new RuntimeException("Some exception")); + when(producer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null)); + cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key", + () -> (Producer) producer); + cache.close(); + } + +} \ No newline at end of file