From bfebe8fc3811bc6c7d9850c4662a7e60d90a4e41 Mon Sep 17 00:00:00 2001 From: caoli5288 Date: Fri, 28 Feb 2020 13:41:07 +0800 Subject: [PATCH] Improve RedisWrapper.java --- .../com/mengcraft/simpleorm/RedisWrapper.java | 59 ++++++++++++++++--- 1 file changed, 52 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/mengcraft/simpleorm/RedisWrapper.java b/src/main/java/com/mengcraft/simpleorm/RedisWrapper.java index dc9ad3a..4762280 100644 --- a/src/main/java/com/mengcraft/simpleorm/RedisWrapper.java +++ b/src/main/java/com/mengcraft/simpleorm/RedisWrapper.java @@ -7,6 +7,7 @@ import com.mengcraft.simpleorm.redis.RedisMessageTopic; import lombok.Cleanup; import lombok.NonNull; +import lombok.RequiredArgsConstructor; import lombok.SneakyThrows; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import org.json.simple.JSONObject; @@ -18,6 +19,7 @@ import redis.clients.jedis.exceptions.JedisConnectionException; import redis.clients.jedis.params.SetParams; +import java.io.Closeable; import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.Collection; @@ -25,23 +27,30 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import static com.mengcraft.simpleorm.ORM.nil; -public class RedisWrapper { +public class RedisWrapper implements Closeable { private final JedisResources resources; private MessageFilter messageFilter; private RedisWrapper(JedisPool pool) { - resources = pool::getResource; + resources = new GenericJedisResources(pool); } private RedisWrapper(JedisSentinelPool sentinels) { - resources = sentinels::getResource; + resources = new SentinelJedisResources(sentinels); + } + + @Override + @SneakyThrows + public void close() { + resources.close(); } public static RedisWrapper b(String sentinel, String url, int conn) { @@ -120,14 +129,19 @@ public void open(int database, Consumer consumer) { } } - public synchronized void subscribe(String channel, Consumer consumer) { + public void subscribe(String channel, Consumer consumer) { + subscribe(channel, consumer, command -> new Thread(command).start()); + } + + public synchronized void subscribe(String channel, Consumer consumer, Executor executor) { if (nil(messageFilter)) { messageFilter = new MessageFilter(); - new Thread(() -> open(client -> client.subscribe(messageFilter, channel.getBytes(StandardCharsets.UTF_8)))).start(); + messageFilter.handled.put(channel, consumer); + executor.execute(() -> open(client -> client.subscribe(messageFilter, channel.getBytes(StandardCharsets.UTF_8)))); } else if (!messageFilter.handled.containsKey(channel)) { + messageFilter.handled.put(channel, consumer); messageFilter.subscribe(channel.getBytes(StandardCharsets.UTF_8)); } - messageFilter.handled.put(channel, consumer); } public synchronized void unsubscribeAll() { @@ -218,11 +232,42 @@ public RedisLiveObjectBucket getLiveObjectBucket(String bucket) { return new RedisLiveObjectBucket(this, bucket); } - private interface JedisResources { + private interface JedisResources extends Closeable { Jedis getResource(); } + @RequiredArgsConstructor + public class SentinelJedisResources implements JedisResources { + + private final JedisSentinelPool pool; + + @Override + public Jedis getResource() { + return pool.getResource(); + } + + @Override + public void close() { + pool.close(); + } + } + + @RequiredArgsConstructor + public class GenericJedisResources implements JedisResources { + + private final JedisPool pool; + @Override + public Jedis getResource() { + return pool.getResource(); + } + + @Override + public void close() { + pool.close(); + } + } + private static class MessageFilter extends BinaryJedisPubSub { private final Multimap> handled = HashMultimap.create();