diff --git a/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java b/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java index 15926647..10d4fdc5 100644 --- a/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java +++ b/bucket4j-spring-boot-starter/src/main/java/com/giffing/bucket4j/spring/boot/starter/config/cache/redis/jedis/JedisCacheListener.java @@ -1,9 +1,14 @@ package com.giffing.bucket4j.spring.boot.starter.config.cache.redis.jedis; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.ObjectMapper; import com.giffing.bucket4j.spring.boot.starter.config.cache.CacheUpdateEvent; + +import io.micrometer.core.instrument.util.NamedThreadFactory; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -11,6 +16,7 @@ import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.JedisPubSub; +import redis.clients.jedis.exceptions.JedisConnectionException; /** * This class is intended to be used as bean. @@ -46,13 +52,42 @@ public JedisCacheListener(JedisPool jedisPool, String cacheName, Class keyTyp } public void subscribe() { - new Thread(() -> { - try (Jedis jedis = this.jedisPool.getResource()) { - jedis.subscribe(this, updateChannel); - } catch (Exception e) { - log.warn("Failed to instantiate the Jedis subscriber. {}",e.getMessage()); + Thread thread = new Thread(() -> { + AtomicInteger reconnectBackoffTimeMillis = new AtomicInteger(1000); + // Using a NamedThreadFactory for creating a Daemon thread, so it will never block the jvm from closing. + ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("reset-reconnect-backoff-thread")); + ScheduledFuture resetTask = null; + + while(!Thread.currentThread().isInterrupted() && !this.jedisPool.isClosed()){ + try (Jedis jedis = this.jedisPool.getResource()) { + // Schedule a reset of the backoff after 10 seconds. + // This is done in a different thread since subscribe is a blocking call. + resetTask = executorService.schedule(()-> reconnectBackoffTimeMillis.set(1000), 10000, TimeUnit.MILLISECONDS); + + jedis.subscribe(this, updateChannel); + } catch (Exception e) { + log.error("Failed to connect the Jedis subscriber, attempting to reconnect in {} seconds. " + + "Exception was: {}", (reconnectBackoffTimeMillis.get() /1000), e.getMessage()); + + // Cancel the reset of the backoff + if(resetTask != null) { + resetTask.cancel(true); + resetTask = null; + } + + // Wait before trying to reconnect and increase the backoff duration + try { + Thread.sleep(reconnectBackoffTimeMillis.get()); + // exponentially increase the backoff with a max of 30 seconds + reconnectBackoffTimeMillis.set(Math.min((reconnectBackoffTimeMillis.get() * 2), 30000)); + } catch (InterruptedException ignored) { + // ignored, already interrupted so the while loop will stop + } + } } - }, "JedisSubscriberThread").start(); + }, "JedisSubscriberThread"); + thread.setDaemon(true); + thread.start(); } @Override