Skip to content

Commit

Permalink
Improve RedisWrapper.java
Browse files Browse the repository at this point in the history
  • Loading branch information
caoli5288 committed Feb 28, 2020
1 parent 4c124cd commit bfebe8f
Showing 1 changed file with 52 additions and 7 deletions.
59 changes: 52 additions & 7 deletions src/main/java/com/mengcraft/simpleorm/RedisWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.mengcraft.simpleorm.redis.RedisMessageTopic;
import lombok.Cleanup;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.json.simple.JSONObject;
Expand All @@ -18,30 +19,38 @@
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.params.SetParams;

import java.io.Closeable;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import static com.mengcraft.simpleorm.ORM.nil;

public class RedisWrapper {
public class RedisWrapper implements Closeable {

private final JedisResources resources;
private MessageFilter messageFilter;

private RedisWrapper(JedisPool pool) {
resources = pool::getResource;
resources = new GenericJedisResources(pool);
}

private RedisWrapper(JedisSentinelPool sentinels) {
resources = sentinels::getResource;
resources = new SentinelJedisResources(sentinels);
}

@Override
@SneakyThrows
public void close() {
resources.close();
}

public static RedisWrapper b(String sentinel, String url, int conn) {
Expand Down Expand Up @@ -120,14 +129,19 @@ public void open(int database, Consumer<Jedis> consumer) {
}
}

public synchronized void subscribe(String channel, Consumer<byte[]> consumer) {
public void subscribe(String channel, Consumer<byte[]> consumer) {
subscribe(channel, consumer, command -> new Thread(command).start());
}

public synchronized void subscribe(String channel, Consumer<byte[]> consumer, Executor executor) {
if (nil(messageFilter)) {
messageFilter = new MessageFilter();
new Thread(() -> open(client -> client.subscribe(messageFilter, channel.getBytes(StandardCharsets.UTF_8)))).start();
messageFilter.handled.put(channel, consumer);
executor.execute(() -> open(client -> client.subscribe(messageFilter, channel.getBytes(StandardCharsets.UTF_8))));
} else if (!messageFilter.handled.containsKey(channel)) {
messageFilter.handled.put(channel, consumer);
messageFilter.subscribe(channel.getBytes(StandardCharsets.UTF_8));
}
messageFilter.handled.put(channel, consumer);
}

public synchronized void unsubscribeAll() {
Expand Down Expand Up @@ -218,11 +232,42 @@ public RedisLiveObjectBucket getLiveObjectBucket(String bucket) {
return new RedisLiveObjectBucket(this, bucket);
}

private interface JedisResources {
private interface JedisResources extends Closeable {

Jedis getResource();
}

@RequiredArgsConstructor
public class SentinelJedisResources implements JedisResources {

private final JedisSentinelPool pool;

@Override
public Jedis getResource() {
return pool.getResource();
}

@Override
public void close() {
pool.close();
}
}

@RequiredArgsConstructor
public class GenericJedisResources implements JedisResources {

private final JedisPool pool;
@Override
public Jedis getResource() {
return pool.getResource();
}

@Override
public void close() {
pool.close();
}
}

private static class MessageFilter extends BinaryJedisPubSub {

private final Multimap<String, Consumer<byte[]>> handled = HashMultimap.create();
Expand Down

0 comments on commit bfebe8f

Please sign in to comment.