Skip to content

Commit

Permalink
Merge pull request #5 from Edwin9292/Jedis-subscriber-auto-reconnect
Browse files Browse the repository at this point in the history
Implement reconnecting mechanism for update listener
  • Loading branch information
Edwin9292 authored Jan 4, 2024
2 parents a183f10 + dc4c4d9 commit 87699bc
Showing 1 changed file with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
package com.giffing.bucket4j.spring.boot.starter.config.cache.redis.jedis;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.giffing.bucket4j.spring.boot.starter.config.cache.CacheUpdateEvent;

import io.micrometer.core.instrument.util.NamedThreadFactory;
import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationEventPublisher;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPubSub;
import redis.clients.jedis.exceptions.JedisConnectionException;

/**
* This class is intended to be used as bean.
Expand Down Expand Up @@ -46,13 +52,42 @@ public JedisCacheListener(JedisPool jedisPool, String cacheName, Class<K> keyTyp
}

public void subscribe() {
new Thread(() -> {
try (Jedis jedis = this.jedisPool.getResource()) {
jedis.subscribe(this, updateChannel);
} catch (Exception e) {
log.warn("Failed to instantiate the Jedis subscriber. {}",e.getMessage());
Thread thread = new Thread(() -> {
AtomicInteger reconnectBackoffTimeMillis = new AtomicInteger(1000);
// Using a NamedThreadFactory for creating a Daemon thread, so it will never block the jvm from closing.
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("reset-reconnect-backoff-thread"));
ScheduledFuture<?> resetTask = null;

while(!Thread.currentThread().isInterrupted() && !this.jedisPool.isClosed()){
try (Jedis jedis = this.jedisPool.getResource()) {
// Schedule a reset of the backoff after 10 seconds.
// This is done in a different thread since subscribe is a blocking call.
resetTask = executorService.schedule(()-> reconnectBackoffTimeMillis.set(1000), 10000, TimeUnit.MILLISECONDS);

jedis.subscribe(this, updateChannel);
} catch (Exception e) {
log.error("Failed to connect the Jedis subscriber, attempting to reconnect in {} seconds. " +
"Exception was: {}", (reconnectBackoffTimeMillis.get() /1000), e.getMessage());

// Cancel the reset of the backoff
if(resetTask != null) {
resetTask.cancel(true);
resetTask = null;
}

// Wait before trying to reconnect and increase the backoff duration
try {
Thread.sleep(reconnectBackoffTimeMillis.get());
// exponentially increase the backoff with a max of 30 seconds
reconnectBackoffTimeMillis.set(Math.min((reconnectBackoffTimeMillis.get() * 2), 30000));
} catch (InterruptedException ignored) {
// ignored, already interrupted so the while loop will stop
}
}
}
}, "JedisSubscriberThread").start();
}, "JedisSubscriberThread");
thread.setDaemon(true);
thread.start();
}

@Override
Expand Down

0 comments on commit 87699bc

Please sign in to comment.