Skip to content

Commit

Permalink
1.0.14-release
Browse files Browse the repository at this point in the history
fix closeListener bug
fix close bug
  • Loading branch information
leonchen83 committed Oct 9, 2016
1 parent 13f5eed commit 3ab51be
Showing 1 changed file with 221 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import com.moilioncircle.redis.replicator.cmd.CommandName;
import com.moilioncircle.redis.replicator.cmd.impl.SetParser;
import com.moilioncircle.redis.replicator.cmd.impl.ZAddParser;
import com.moilioncircle.redis.replicator.cmd.impl.ZInterStoreParser;
import com.moilioncircle.redis.replicator.cmd.impl.ZUnionStoreParser;
import com.moilioncircle.redis.replicator.rdb.RdbFilter;
import com.moilioncircle.redis.replicator.rdb.RdbListener;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyStringValueString;
import com.moilioncircle.redis.replicator.rdb.datatype.KeyValuePair;
import junit.framework.TestCase;
import org.junit.Test;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.ZParams;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -90,6 +93,136 @@ public void handle(Replicator replicator, Command command) {
assertEquals("ok", ref.get());
}

@Test
public void testZInterStore() throws Exception {
final AtomicReference<String> ref = new AtomicReference<>(null);
new TestTemplate() {
@Override
protected void test(RedisReplicator replicator) {
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
}

@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
}

@Override
public void postFullSync(Replicator replicator, long checksum) {
Jedis jedis = new Jedis("localhost",
6379);
jedis.del("zset1");
jedis.del("zset2");
jedis.del("out");
jedis.zadd("zset1", 1, "one");
jedis.zadd("zset1", 2, "two");
jedis.zadd("zset2", 1, "one");
jedis.zadd("zset2", 2, "two");
jedis.zadd("zset2", 3, "three");
//ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3
ZParams zParams = new ZParams();
zParams.weightsByDouble(2, 3);
zParams.aggregate(ZParams.Aggregate.MIN);
jedis.zinterstore("out", zParams, "zset1", "zset2");
jedis.close();
}
});
replicator.addCommandFilter(new CommandFilter() {
@Override
public boolean accept(Command command) {
return command.name().equals(CommandName.name("ZINTERSTORE"));
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
ZInterStoreParser.ZInterStoreCommand zInterStoreCommand = (ZInterStoreParser.ZInterStoreCommand) command;
assertEquals("out", zInterStoreCommand.destination);
assertEquals(2, zInterStoreCommand.numkeys);
assertEquals("zset1", zInterStoreCommand.keys[0]);
assertEquals("zset2", zInterStoreCommand.keys[1]);
assertEquals(2.0, zInterStoreCommand.weights[0]);
assertEquals(3.0, zInterStoreCommand.weights[1]);
assertEquals(Boolean.TRUE, zInterStoreCommand.isMin);
ref.compareAndSet(null, "ok");
}
});
}
}.testSocket(
"localhost",
6379,
Configuration.defaultSetting()
.setRetries(0),
15000);
assertEquals("ok", ref.get());
}

@Test
public void testZUnionStore() throws Exception {
final AtomicReference<String> ref = new AtomicReference<>(null);
new TestTemplate() {
@Override
protected void test(RedisReplicator replicator) {
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
}

@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
}

@Override
public void postFullSync(Replicator replicator, long checksum) {
Jedis jedis = new Jedis("localhost",
6379);
jedis.del("zset3");
jedis.del("zset4");
jedis.del("out1");
jedis.zadd("zset3", 1, "one");
jedis.zadd("zset3", 2, "two");
jedis.zadd("zset4", 1, "one");
jedis.zadd("zset4", 2, "two");
jedis.zadd("zset4", 3, "three");
//ZINTERSTORE out 2 zset1 zset2 WEIGHTS 2 3
ZParams zParams = new ZParams();
zParams.weightsByDouble(2, 3);
zParams.aggregate(ZParams.Aggregate.SUM);
jedis.zunionstore("out1", zParams, "zset3", "zset4");
jedis.close();
}
});
replicator.addCommandFilter(new CommandFilter() {
@Override
public boolean accept(Command command) {
return command.name().equals(CommandName.name("ZUNIONSTORE"));
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
ZUnionStoreParser.ZUnionStoreCommand zInterStoreCommand = (ZUnionStoreParser.ZUnionStoreCommand) command;
assertEquals("out1", zInterStoreCommand.destination);
assertEquals(2, zInterStoreCommand.numkeys);
assertEquals("zset3", zInterStoreCommand.keys[0]);
assertEquals("zset4", zInterStoreCommand.keys[1]);
assertEquals(2.0, zInterStoreCommand.weights[0]);
assertEquals(3.0, zInterStoreCommand.weights[1]);
assertEquals(Boolean.TRUE, zInterStoreCommand.isSum);
ref.compareAndSet(null, "ok");
}
});
}
}.testSocket(
"localhost",
6379,
Configuration.defaultSetting()
.setRetries(0),
15000);
assertEquals("ok", ref.get());
}

@Test
public void testZAdd() throws Exception {
final AtomicReference<String> ref = new AtomicReference<>(null);
Expand Down Expand Up @@ -342,6 +475,43 @@ public void handle(Replicator replicator) {
assertEquals(1, acc.get());
}

@Test
public void testCloseListener1() throws IOException, InterruptedException {
final AtomicInteger acc = new AtomicInteger(0);
RedisReplicator replicator = new RedisReplicator(
RedisReplicatorTest.class.getClassLoader().getResourceAsStream("dumpV6.rdb"),
Configuration.defaultSetting());
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {

}

@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {

}

@Override
public void postFullSync(Replicator replicator, long checksum) {
try {
replicator.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
replicator.addCloseListener(new CloseListener() {
@Override
public void handle(Replicator replicator) {
acc.incrementAndGet();
}
});
replicator.open();
Thread.sleep(2000);
assertEquals(1, acc.get());
}

@Test
public void testChecksumV6() throws IOException, InterruptedException {
RedisReplicator redisReplicator = new RedisReplicator(
Expand Down Expand Up @@ -391,4 +561,55 @@ public void postFullSync(Replicator replicator, long checksum) {
assertEquals(6576517133597126869L, atomicChecksum.get());
redisReplicator.close();
}

@Test
public void testCount() throws IOException, InterruptedException {
Jedis jedis = new Jedis("127.0.0.1", 6379);
for (int i = 0; i < 8000; i++) {
jedis.del("test_" + i);
jedis.set("test_" + i, "value_" + i);
}
jedis.close();

RedisReplicator redisReplicator = new RedisReplicator(
"127.0.0.1", 6379,
Configuration.defaultSetting());
final AtomicInteger acc = new AtomicInteger(0);
final AtomicReference<String> ref = new AtomicReference<>(null);
redisReplicator.addRdbFilter(new RdbFilter() {
@Override
public boolean accept(KeyValuePair<?> kv) {
return kv.getKey().startsWith("test_");
}
});
redisReplicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
}

@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
acc.incrementAndGet();
}

@Override
public void postFullSync(Replicator replicator, long checksum) {
try {
replicator.close();
} catch (IOException e) {
e.printStackTrace();
}
assertEquals(8000, acc.get());
ref.compareAndSet(null, "ok");
}
});
redisReplicator.open();
Thread.sleep(10000);
assertEquals("ok", ref.get());
}
}

0 comments on commit 3ab51be

Please sign in to comment.