Skip to content

Commit

Permalink
[improve][fn] Improve closing of producers in Pulsar Functions Produc…
Browse files Browse the repository at this point in the history
…erCache invalidation (apache#23734)

(cherry picked from commit 9f046a5)
(cherry picked from commit 6b9e9ac)
  • Loading branch information
lhotari authored and srinath-ctds committed Dec 23, 2024
1 parent 981b3c9 commit 540beac
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
Expand Down Expand Up @@ -61,24 +62,41 @@ record ProducerCacheKey(CacheArea cacheArea, String topic, Object additionalKey)
private final CopyOnWriteArrayList<CompletableFuture<Void>> closeFutures = new CopyOnWriteArrayList<>();

public ProducerCache() {
Caffeine<ProducerCacheKey, Producer> builder = Caffeine.newBuilder()
Caffeine<ProducerCacheKey, Producer<?>> builder = Caffeine.newBuilder()
.scheduler(Scheduler.systemScheduler())
.<ProducerCacheKey, Producer>removalListener((key, producer, cause) -> {
.<ProducerCacheKey, Producer<?>>removalListener((key, producer, cause) -> {
log.info("Closing producer for topic {}, cause {}", key.topic(), cause);
CompletableFuture closeFuture =
producer.flushAsync()
CompletableFuture.supplyAsync(() -> producer.flushAsync(), Runnable::run)
.orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("Error flushing producer for topic {}", key.topic(), ex);
Throwable unwrappedCause = FutureUtil.unwrapCompletionException(ex);
if (unwrappedCause instanceof PulsarClientException.AlreadyClosedException) {
log.error(
"Error flushing producer for topic {} due to "
+ "AlreadyClosedException",
key.topic());
} else {
log.error("Error flushing producer for topic {}", key.topic(),
unwrappedCause);
}
return null;
}).thenCompose(__ ->
producer.closeAsync().orTimeout(FLUSH_OR_CLOSE_TIMEOUT_SECONDS,
TimeUnit.SECONDS)
.exceptionally(ex -> {
log.error("Error closing producer for topic {}", key.topic(),
ex);
return null;
}));
TimeUnit.SECONDS)
).exceptionally(ex -> {
Throwable unwrappedCause = FutureUtil.unwrapCompletionException(ex);
if (unwrappedCause instanceof PulsarClientException.AlreadyClosedException) {
log.error(
"Error closing producer for topic {} due to "
+ "AlreadyClosedException",
key.topic());
} else {
log.error("Error closing producer for topic {}", key.topic(),
unwrappedCause);
}
return null;
});
if (closed.get()) {
closeFutures.add(closeFuture);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.functions.instance;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.testng.annotations.Test;

public class ProducerCacheTest {

@Test
public void shouldTolerateAlreadyClosedExceptionInClose() {
ProducerCache cache = new ProducerCache();
Producer producer = mock(Producer.class);
when(producer.flushAsync()).thenReturn(CompletableFuture.completedFuture(null));
when(producer.closeAsync()).thenReturn(
CompletableFuture.failedFuture(new PulsarClientException.AlreadyClosedException("Already closed")));
cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key",
() -> (Producer<Object>) producer);
cache.close();
}

@Test
public void shouldTolerateRuntimeExceptionInClose() {
ProducerCache cache = new ProducerCache();
Producer producer = mock(Producer.class);
when(producer.flushAsync()).thenReturn(CompletableFuture.completedFuture(null));
when(producer.closeAsync()).thenThrow(new RuntimeException("Some exception"));
cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key",
() -> (Producer<Object>) producer);
cache.close();
}

@Test
public void shouldTolerateRuntimeExceptionInFlush() {
ProducerCache cache = new ProducerCache();
Producer producer = mock(Producer.class);
when(producer.flushAsync()).thenThrow(new RuntimeException("Some exception"));
when(producer.closeAsync()).thenReturn(CompletableFuture.completedFuture(null));
cache.getOrCreateProducer(ProducerCache.CacheArea.CONTEXT_CACHE, "topic", "key",
() -> (Producer<Object>) producer);
cache.close();
}

}

0 comments on commit 540beac

Please sign in to comment.