Skip to content

Latest commit

 

History

History
619 lines (501 loc) · 24 KB

README.zh_CN.md

File metadata and controls

619 lines (501 loc) · 24 KB

内容索引(Table of Contents)

1. Redis-replicator

1.1. 简介

Join the chat at https://gitter.im/leonchen83/redis-replicator Build Status Coverage Status Maven Central Javadocs Hex.pm

Redis Replicator是一款RDB解析以及AOF解析的工具. 此工具完整实现了Redis Replication协议. 支持SYNC, PSYNC, PSYNC2等三种同步命令. 还支持远程RDB文件备份以及数据同步等功能. 此文中提到的 命令 特指Redis中的写(比如 set,hmset)命令,不包括读命令(比如 get,hmget)

1.2. QQ讨论组

479688557

1.3. 联系作者

[email protected]

1.4. 兼容性声明

com.moilioncircle.redis.replicator.cmd.impl包下的文件;
由于要兼容redis变化,可能API会根据不同版本有不兼容的调整.

2. 安装

2.1. 安装前置条件

jdk 1.7+
maven-3.2.3+
redis 2.6 - 4.0.x

2.2. Maven依赖

    <dependency>
        <groupId>com.moilioncircle</groupId>
        <artifactId>redis-replicator</artifactId>
        <version>2.4.6</version>
    </dependency>

2.3. 安装源码到本地maven仓库

    $mvn clean install package -Dmaven.test.skip=true

2.4. 选择一个版本

redis 版本 redis-replicator 版本
[2.6, 4.0.x] [2.3.0, ]
[2.6, 4.0-RC3] [2.1.0, 2.2.0]
[2.6, 3.2.x] [1.0.18](不再提供支持)

3. 简要用法

3.1. 通过socket同步

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addRdbListener(new RdbListener.Adaptor() {
            @Override
            public void handle(Replicator replicator, KeyValuePair<?> kv) {
                System.out.println(kv);
            }
        });
        replicator.addCommandListener(new CommandListener() {
            @Override
            public void handle(Replicator replicator, Command command) {
                System.out.println(command);
            }
        });
        replicator.open();

3.2. 读取并解析rdb文件

        Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
        replicator.addRdbListener(new RdbListener.Adaptor() {
            @Override
            public void handle(Replicator replicator, KeyValuePair<?> kv) {
                System.out.println(kv);
            }
        });

        replicator.open();

3.3. 读取并解析aof文件

        Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
        replicator.addCommandListener(new CommandListener() {
            @Override
            public void handle(Replicator replicator, Command command) {
                System.out.println(command);
            }
        });
        replicator.open();

3.4. 读取混合格式文件

3.4.1. redis混合文件格式

    [RDB file][AOF tail]

3.4.2. redis混合文件格式配置

    aof-use-rdb-preamble yes

3.4.3. 应用Replicator读取混合格式文件

        final Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
        replicator.addRdbListener(new RdbListener.Adaptor() {
            @Override
            public void handle(Replicator replicator, KeyValuePair<?> kv) {
                System.out.println(kv);
            }
        });
        replicator.addCommandListener(new CommandListener() {
            @Override
            public void handle(Replicator replicator, Command command) {
                System.out.println(command);
            }
        });

        replicator.open();

3.5. 备份远程redis的rdb文件

参阅 RdbBackupExample.java

3.6. 备份远程redis的实时命令

参阅 CommandBackupExample.java

3.7. 其他示例

参阅 examples

4. 高级主题

4.1. 命令扩展

4.1.1. 首先写一个command类

    public static class YourAppendCommand implements Command {
        private final String key;
        private final String value;
    
        public YourAppendCommand(String key, String value) {
            this.key = key;
            this.value = value;
        }
                
        public String getKey() {
            return key;
        }
        
        public String getValue() {
            return value;
        }
    
        @Override
        public String toString() {
            return "YourAppendCommand{" +
                "key='" + key + '\'' +
                ", value='" + value + '\'' +
                '}';
            }
        }
    }

4.1.2. 然后写一个command parser

    public class YourAppendParser implements CommandParser<YourAppendCommand> {

        @Override
        public YourAppendCommand parse(Object[] command) {
            return new YourAppendCommand(new String((byte[]) command[1], UTF_8), new String((byte[]) command[2], UTF_8));
        }
    }

4.1.3. 注册这个command parser到replicator

    Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
    replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser());

4.1.4. 处理这个注册的command事件

    replicator.addCommandListener(new CommandListener() {
        @Override
        public void handle(Replicator replicator, Command command) {
            if(command instanceof YourAppendCommand){
                YourAppendCommand appendCommand = (YourAppendCommand)command;
                //你的业务代码写在这
            }
        }
    });

4.1.5. 结合到一起

参阅 CommandExtensionExample.java

4.2. Module扩展(redis-4.0及以上)

4.2.1. 编译redis源码中的测试modules

    $cd /path/to/redis-4.0-rc2/src/modules
    $make

4.2.2. 打开redis配置文件redis.conf中相关注释

    loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so

4.2.3. 写一个module parser

    public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {

        @Override
        public HelloTypeModule parse(RedisInputStream in, int version) throws IOException {
            DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
            int elements = parser.loadUnsigned(version).intValue();
            long[] ary = new long[elements];
            int i = 0;
            while (elements-- > 0) {
                ary[i++] = parser.loadSigned(version);
            }
            return new HelloTypeModule(ary);
        }
    }

    public class HelloTypeModule implements Module {
        private final long[] value;

        public HelloTypeModule(long[] value) {
            this.value = value;
        }

        public long[] getValue() {
            return value;
        }

        @Override
        public String toString() {
            return "HelloTypeModule{" +
                    "value=" + Arrays.toString(value) +
                    '}';
        }
    }

4.2.4. 再写一个command parser

    public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
        @Override
        public HelloTypeCommand parse(Object[] command) {
            String key = new String((byte[])command[1],Constants.UTF_8);
            long value = Long.parseLong(new String((byte[])command[2],Constants.UTF_8));
            return new HelloTypeCommand(key, value);
        }
    }

    public class HelloTypeCommand implements Command {
        private final String key;
        private final long value;

        public long getValue() {
            return value;
        }

        public String getKey() {
            return key;
        }

        public HelloTypeCommand(String key, long value) {
            this.key = key;
            this.value = value;
        }

        @Override
        public String toString() {
            return "HelloTypeCommand{" +
                    "key='" + key + '\'' +
                    ", value=" + value +
                    '}';
        }

    }

4.2.5. 注册module parser和command parser并处理相关事件

    public static void main(String[] args) throws IOException {
        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
        replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
        replicator.addRdbListener(new RdbListener.Adaptor() {
            @Override
            public void handle(Replicator replicator, KeyValuePair<?> kv) {
                if (kv instanceof KeyStringValueModule) {
                    System.out.println(kv);
                }
            }
        });

        replicator.addCommandListener(new CommandListener() {
            @Override
            public void handle(Replicator replicator, Command command) {
                if (command instanceof HelloTypeCommand) {
                    System.out.println(command);
                }
            }
        });

        replicator.open();
    }

4.2.6. 结合到一起

参阅 ModuleExtensionExample.java

4.3. 编写你自己的rdb解析器

  • 写一个类继承 RdbVisitor 抽象类
  • 通过 ReplicatorsetRdbVisitor 方法注册你自己的 RdbVisitor.

4.4. 事件时间线

     |                         全量同步                         |             部分同步          |
     ↓-----------<--------------<-------------<----------<-----↓--------------<--------------↑
     ↓                                                         ↓                             ↑ <-重连    
    连接->------->-------------->------------->---------->-------------------->---------------x <-断线
               ↓              ↓          ↓            ↓                   ↓
          prefullsync    auxfields...  rdbs...   postfullsync            cmds...       

4.5. Redis URI

在 redis-replicator-2.4.0 版之前, 我们按如下方式构造 RedisReplicator :

Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/dump.rdb", FileType.RDB, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.AOF, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.MIXED, Configuration.defaultSetting());

在 redis-replicator-2.4.0 版之后, 我们引入了一个新的概念(Redis URI) 来简化 RedisReplicator 的构造, 以便提供一致的API.

Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");

// 配置的例子
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379?authPassword=foobared&readTimeout=10000&ssl=yes");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb?rateLimit=1000000");

5. 其他主题

5.1. 内置的Command Parser

命令 命令 命令 命令 命令 命令
PING APPEND SET SETEX MSET DEL
SADD HMSET HSET LSET EXPIRE EXPIREAT
GETSET HSETNX MSETNX PSETEX SETNX SETRANGE
HDEL UNLINK SREM LPOP LPUSH LPUSHX
LRem RPOP RPUSH RPUSHX ZREM ZINTERSTORE
INCR DECR INCRBY PERSIST SELECT FLUSHALL
FLUSHDB HINCRBY ZINCRBY MOVE SMOVE BRPOPLPUSH
PFCOUNT PFMERGE SDIFFSTORE RENAMENX PEXPIREAT SINTERSTORE
ZADD BITFIELD SUNIONSTORE RESTORE LINSERT ZREMRANGEBYLEX
GEOADD PEXPIRE ZUNIONSTORE EVAL SCRIPT ZREMRANGEBYRANK
PUBLISH BITOP SETBIT SWAPDB PFADD ZREMRANGEBYSCORE
RENAME MULTI EXEC LTRIM RPOPLPUSH SORT

5.2. 当出现EOFException

  • 调整redis server中的以下配置. 相关配置请参考 redis.conf
    client-output-buffer-limit slave 0 0 0

警告: 这个配置可能会使redis-server中的内存溢出

5.3. 跟踪事件日志log

  • 日志级别调整成 debug
  • 如果你项目中使用log4j2,请加入如下Logger到配置文件:
    <Logger name="com.moilioncircle" level="debug">
        <AppenderRef ref="YourAppender"/>
    </Logger>
    Configuration.defaultSetting().setVerbose(true);
    // redis uri
    "redis://127.0.0.1?verbose=yes"

5.4. SSL安全链接

    System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
    System.setProperty("javax.net.ssl.trustStorePassword", "password");
    System.setProperty("javax.net.ssl.trustStoreType", "your_type");
    Configuration.defaultSetting().setSsl(true);
    //可选设置
    Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
    Configuration.defaultSetting().setSslParameters(sslParameters);
    Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);

5.5. redis认证

    Configuration.defaultSetting().setAuthPassword("foobared");
    // redis uri
    "redis://127.0.0.1:6379?authPassword=foobared"

5.6. 避免全量同步

  • 调整redis-server中的如下配置
    repl-backlog-size
    repl-backlog-ttl
    repl-ping-slave-period

repl-ping-slave-period 必须 小于 Configuration.getReadTimeout(), 默认的 Configuration.getReadTimeout() 是30秒.

5.7. FullSyncEvent事件

        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        final long start = System.currentTimeMillis();
        final AtomicInteger acc = new AtomicInteger(0);
        replicator.addRdbListener(new RdbListener() {
            @Override
            public void preFullSync(Replicator replicator) {
                System.out.println("pre full sync");
            }

            @Override
            public void handle(Replicator replicator, KeyValuePair<?> kv) {
                acc.incrementAndGet();
            }

            @Override
            public void postFullSync(Replicator replicator, long checksum) {
                long end = System.currentTimeMillis();
                System.out.println("time elapsed:" + (end - start));
                System.out.println("rdb event count:" + acc.get());
            }
        });
        replicator.open();

5.8. 处理原始字节数组

  • KeyStringValueModule以外的kv类型, 都可以得到原始的字节数组. 在某些情况(比如HyperLogLog)下会很有用.
        Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
        replicator.addRdbListener(new RdbListener.Adaptor() {
            @Override
            public void handle(Replicator replicator, KeyValuePair<?> kv) {
                if (kv instanceof KeyStringValueString) {
                    KeyStringValueString ksvs = (KeyStringValueString) kv;
                    byte[] rawValue = ksvs.getRawValue();
                    // handle raw bytes value
                } else if (kv instanceof KeyStringValueHash) {
                    KeyStringValueHash ksvh = (KeyStringValueHash) kv;
                    Map<byte[], byte[]> rawValue = ksvh.getRawValue();
                    // handle raw bytes value
                } else {
                    ...
                }
            }
        });
        replicator.open();

为了操作简便KeyStringValueHash.getRawValue返回的Map<byte[], byte[]>中的key可以当做值类型存取

KeyStringValueHash ksvh = (KeyStringValueHash) kv;
Map<byte[], byte[]> rawValue = ksvh.getRawValue();
byte[] value = new byte[]{2};
rawValue.put(new byte[]{1}, value);
System.out.println(rawValue.get(new byte[]{1}) == value) // 会打印true

命令解析同样支持原始字节数组.

SetCommand set = (SetCommand) command;
byte[] rawKey = set.getRawKey();
byte[] rawValue = set.getRawValue();

5.9. 处理巨大的KV

根据 4.3. 编写你自己的rdb解析器, 这个工具内嵌了一个迭代方式的rdb解析器, 以便处理巨大的KV.
详细的例子参阅:
[1] HugeKVFileExample.java
[2] HugeKVSocketExample.java

6. 贡献者

7. 相关引用

8. 致谢

8.1. YourKit

YourKit
YourKit is kindly supporting this open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
YourKit Java Profiler and YourKit .NET Profiler.

8.2. IntelliJ IDEA

IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software.
It is developed by JetBrains (formerly known as IntelliJ), and is available as an Apache 2 Licensed community edition,
and in a proprietary commercial edition. Both can be used for commercial development.

8.3. Redisson

Redisson is Redis based In-Memory Data Grid for Java offers distributed objects and services (BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) backed by Redis server. Redisson provides more convenient and easiest way to work with Redis. Redisson objects provides a separation of concern, which allows you to keep focus on the data modeling and application logic.