From 6d313c0ff77561184ed84dea5c27a8e5562a125a Mon Sep 17 00:00:00 2001 From: Edwin Heuver Date: Tue, 2 Jan 2024 12:02:01 +0100 Subject: [PATCH 1/2] Implement reconnecting mechanism for update listener --- .../cache/redis/jedis/JedisCacheListener.java | 38 +++++++++++++++++-- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java b/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java index 15926647..f5ce238c 100644 --- a/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java +++ b/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java @@ -1,5 +1,8 @@ package com.giffing.bucket4j.spring.boot.starter.config.cache.redis.jedis; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; @@ -11,6 +14,7 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.exceptions.JedisConnectionException; /** * This class is intended to be used as bean. @@ -47,10 +51,36 @@ public JedisCacheListener(JedisPool jedisPool, String cacheName, Class keyTyp public void subscribe() { new Thread(() -> { - try (Jedis jedis = this.jedisPool.getResource()) { - jedis.subscribe(this, updateChannel); - } catch (Exception e) { - log.warn("Failed to instantiate the Jedis subscriber. {}",e.getMessage()); + AtomicInteger reconnectBackoffTimeMillis = new AtomicInteger(1000); + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + ScheduledFuture resetTask = null; + + while(!Thread.currentThread().isInterrupted() && !this.jedisPool.isClosed()){ + try (Jedis jedis = this.jedisPool.getResource()) { + // Schedule a reset of the backoff after 10 seconds. + // This is done in a different thread since subscribe is a blocking call. + resetTask = executorService.schedule(()-> reconnectBackoffTimeMillis.set(1000), 10000, TimeUnit.MILLISECONDS); + + jedis.subscribe(this, updateChannel); + } catch (Exception e) { + log.error("Failed to connect the Jedis subscriber, attempting to reconnect in {} seconds. " + + "Exception was: {}", (reconnectBackoffTimeMillis.get() /1000), e.getMessage()); + + // Cancel the reset of the backoff + if(resetTask != null) { + resetTask.cancel(true); + resetTask = null; + } + + // Wait before trying to reconnect and increase the backoff duration + try { + Thread.sleep(reconnectBackoffTimeMillis.get()); + //exponential increase backoff with a max of 1 minute + reconnectBackoffTimeMillis.set(Math.min((reconnectBackoffTimeMillis.get() * 2), 60000)); + } catch (InterruptedException ignored) { + Thread.currentThread().interrupt(); + } + } } }, "JedisSubscriberThread").start(); } From dc4c4d968671a9e92327aa89092a2dee77dd1eb2 Mon Sep 17 00:00:00 2001 From: Edwin Heuver Date: Wed, 3 Jan 2024 15:51:53 +0100 Subject: [PATCH 2/2] Make threads deamon and lower max backoff --- .../cache/redis/jedis/JedisCacheListener.java | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java b/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java index f5ce238c..10d4fdc5 100644 --- a/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java +++ b/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java @@ -7,6 +7,8 @@ import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.giffing.bucket4j.spring.boot.starter.config.cache.CacheUpdateEvent; + +import io.micrometer.core.instrument.util.NamedThreadFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -50,9 +52,10 @@ public JedisCacheListener(JedisPool jedisPool, String cacheName, Class keyTyp } public void subscribe() { - new Thread(() -> { + Thread thread = new Thread(() -> { AtomicInteger reconnectBackoffTimeMillis = new AtomicInteger(1000); - ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); + // Using a NamedThreadFactory for creating a Daemon thread, so it will never block the jvm from closing. + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("reset-reconnect-backoff-thread")); ScheduledFuture resetTask = null; while(!Thread.currentThread().isInterrupted() && !this.jedisPool.isClosed()){ @@ -75,14 +78,16 @@ public void subscribe() { // Wait before trying to reconnect and increase the backoff duration try { Thread.sleep(reconnectBackoffTimeMillis.get()); - //exponential increase backoff with a max of 1 minute - reconnectBackoffTimeMillis.set(Math.min((reconnectBackoffTimeMillis.get() * 2), 60000)); + // exponentially increase the backoff with a max of 30 seconds + reconnectBackoffTimeMillis.set(Math.min((reconnectBackoffTimeMillis.get() * 2), 30000)); } catch (InterruptedException ignored) { - Thread.currentThread().interrupt(); + // ignored, already interrupted so the while loop will stop } } } - }, "JedisSubscriberThread").start(); + }, "JedisSubscriberThread"); + thread.setDaemon(true); + thread.start(); } @Override