From 3ab51be89cdfa8b1f4d0de72a391e0b87d518ff5 Mon Sep 17 00:00:00 2001 From: leon Date: Sun, 9 Oct 2016 20:49:22 +0800 Subject: [PATCH] 1.0.14-release fix closeListener bug fix close bug --- .../redis/replicator/RedisReplicatorTest.java | 221 ++++++++++++++++++ 1 file changed, 221 insertions(+) diff --git a/src/test/java/com/moilioncircle/redis/replicator/RedisReplicatorTest.java b/src/test/java/com/moilioncircle/redis/replicator/RedisReplicatorTest.java index 4316263f..8b52c509 100644 --- a/src/test/java/com/moilioncircle/redis/replicator/RedisReplicatorTest.java +++ b/src/test/java/com/moilioncircle/redis/replicator/RedisReplicatorTest.java @@ -22,6 +22,8 @@ import com.moilioncircle.redis.replicator.cmd.CommandName; import com.moilioncircle.redis.replicator.cmd.impl.SetParser; import com.moilioncircle.redis.replicator.cmd.impl.ZAddParser; +import com.moilioncircle.redis.replicator.cmd.impl.ZInterStoreParser; +import com.moilioncircle.redis.replicator.cmd.impl.ZUnionStoreParser; import com.moilioncircle.redis.replicator.rdb.RdbFilter; import com.moilioncircle.redis.replicator.rdb.RdbListener; import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString; @@ -29,6 +31,7 @@ import junit.framework.TestCase; import org.junit.Test; import redis.clients.jedis.Jedis; +import redis.clients.jedis.ZParams; import java.io.IOException; import java.util.concurrent.atomic.AtomicInteger; @@ -90,6 +93,136 @@ public void handle(Replicator replicator, Command command) { assertEquals("ok", ref.get()); } + @Test + public void testZInterStore() throws Exception { + final AtomicReference ref = new AtomicReference<>(null); + new TestTemplate() { + @Override + protected void test(RedisReplicator replicator) { + replicator.addRdbListener(new RdbListener() { + @Override + public void preFullSync(Replicator replicator) { + } + + @Override + public void handle(Replicator replicator, KeyValuePair kv) { + } + + @Override + public void postFullSync(Replicator replicator, long checksum) { + Jedis jedis = new Jedis("localhost", + 6379); + jedis.del("zset1"); + jedis.del("zset2"); + jedis.del("out"); + jedis.zadd("zset1", 1, "one"); + jedis.zadd("zset1", 2, "two"); + jedis.zadd("zset2", 1, "one"); + jedis.zadd("zset2", 2, "two"); + jedis.zadd("zset2", 3, "three"); + //ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 + ZParams zParams = new ZParams(); + zParams.weightsByDouble(2, 3); + zParams.aggregate(ZParams.Aggregate.MIN); + jedis.zinterstore("out", zParams, "zset1", "zset2"); + jedis.close(); + } + }); + replicator.addCommandFilter(new CommandFilter() { + @Override + public boolean accept(Command command) { + return command.name().equals(CommandName.name("ZINTERSTORE")); + } + }); + replicator.addCommandListener(new CommandListener() { + @Override + public void handle(Replicator replicator, Command command) { + ZInterStoreParser.ZInterStoreCommand zInterStoreCommand = (ZInterStoreParser.ZInterStoreCommand) command; + assertEquals("out", zInterStoreCommand.destination); + assertEquals(2, zInterStoreCommand.numkeys); + assertEquals("zset1", zInterStoreCommand.keys[0]); + assertEquals("zset2", zInterStoreCommand.keys[1]); + assertEquals(2.0, zInterStoreCommand.weights[0]); + assertEquals(3.0, zInterStoreCommand.weights[1]); + assertEquals(Boolean.TRUE, zInterStoreCommand.isMin); + ref.compareAndSet(null, "ok"); + } + }); + } + }.testSocket( + "localhost", + 6379, + Configuration.defaultSetting() + .setRetries(0), + 15000); + assertEquals("ok", ref.get()); + } + + @Test + public void testZUnionStore() throws Exception { + final AtomicReference ref = new AtomicReference<>(null); + new TestTemplate() { + @Override + protected void test(RedisReplicator replicator) { + replicator.addRdbListener(new RdbListener() { + @Override + public void preFullSync(Replicator replicator) { + } + + @Override + public void handle(Replicator replicator, KeyValuePair kv) { + } + + @Override + public void postFullSync(Replicator replicator, long checksum) { + Jedis jedis = new Jedis("localhost", + 6379); + jedis.del("zset3"); + jedis.del("zset4"); + jedis.del("out1"); + jedis.zadd("zset3", 1, "one"); + jedis.zadd("zset3", 2, "two"); + jedis.zadd("zset4", 1, "one"); + jedis.zadd("zset4", 2, "two"); + jedis.zadd("zset4", 3, "three"); + //ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3 + ZParams zParams = new ZParams(); + zParams.weightsByDouble(2, 3); + zParams.aggregate(ZParams.Aggregate.SUM); + jedis.zunionstore("out1", zParams, "zset3", "zset4"); + jedis.close(); + } + }); + replicator.addCommandFilter(new CommandFilter() { + @Override + public boolean accept(Command command) { + return command.name().equals(CommandName.name("ZUNIONSTORE")); + } + }); + replicator.addCommandListener(new CommandListener() { + @Override + public void handle(Replicator replicator, Command command) { + ZUnionStoreParser.ZUnionStoreCommand zInterStoreCommand = (ZUnionStoreParser.ZUnionStoreCommand) command; + assertEquals("out1", zInterStoreCommand.destination); + assertEquals(2, zInterStoreCommand.numkeys); + assertEquals("zset3", zInterStoreCommand.keys[0]); + assertEquals("zset4", zInterStoreCommand.keys[1]); + assertEquals(2.0, zInterStoreCommand.weights[0]); + assertEquals(3.0, zInterStoreCommand.weights[1]); + assertEquals(Boolean.TRUE, zInterStoreCommand.isSum); + ref.compareAndSet(null, "ok"); + } + }); + } + }.testSocket( + "localhost", + 6379, + Configuration.defaultSetting() + .setRetries(0), + 15000); + assertEquals("ok", ref.get()); + } + @Test public void testZAdd() throws Exception { final AtomicReference ref = new AtomicReference<>(null); @@ -342,6 +475,43 @@ public void handle(Replicator replicator) { assertEquals(1, acc.get()); } + @Test + public void testCloseListener1() throws IOException, InterruptedException { + final AtomicInteger acc = new AtomicInteger(0); + RedisReplicator replicator = new RedisReplicator( + RedisReplicatorTest.class.getClassLoader().getResourceAsStream("dumpV6.rdb"), + Configuration.defaultSetting()); + replicator.addRdbListener(new RdbListener() { + @Override + public void preFullSync(Replicator replicator) { + + } + + @Override + public void handle(Replicator replicator, KeyValuePair kv) { + + } + + @Override + public void postFullSync(Replicator replicator, long checksum) { + try { + replicator.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + }); + replicator.addCloseListener(new CloseListener() { + @Override + public void handle(Replicator replicator) { + acc.incrementAndGet(); + } + }); + replicator.open(); + Thread.sleep(2000); + assertEquals(1, acc.get()); + } + @Test public void testChecksumV6() throws IOException, InterruptedException { RedisReplicator redisReplicator = new RedisReplicator( @@ -391,4 +561,55 @@ public void postFullSync(Replicator replicator, long checksum) { assertEquals(6576517133597126869L, atomicChecksum.get()); redisReplicator.close(); } + + @Test + public void testCount() throws IOException, InterruptedException { + Jedis jedis = new Jedis("127.0.0.1", 6379); + for (int i = 0; i < 8000; i++) { + jedis.del("test_" + i); + jedis.set("test_" + i, "value_" + i); + } + jedis.close(); + + RedisReplicator redisReplicator = new RedisReplicator( + "127.0.0.1", 6379, + Configuration.defaultSetting()); + final AtomicInteger acc = new AtomicInteger(0); + final AtomicReference ref = new AtomicReference<>(null); + redisReplicator.addRdbFilter(new RdbFilter() { + @Override + public boolean accept(KeyValuePair kv) { + return kv.getKey().startsWith("test_"); + } + }); + redisReplicator.addRdbListener(new RdbListener() { + @Override + public void preFullSync(Replicator replicator) { + } + + @Override + public void handle(Replicator replicator, KeyValuePair kv) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + acc.incrementAndGet(); + } + + @Override + public void postFullSync(Replicator replicator, long checksum) { + try { + replicator.close(); + } catch (IOException e) { + e.printStackTrace(); + } + assertEquals(8000, acc.get()); + ref.compareAndSet(null, "ok"); + } + }); + redisReplicator.open(); + Thread.sleep(10000); + assertEquals("ok", ref.get()); + } }