Skip to content

Commit

Permalink
1.0.4-SNAPSHOT
Browse files Browse the repository at this point in the history
verbose support
  • Loading branch information
leonchen83 committed Aug 26, 2016
1 parent e6eb36a commit 827073d
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public abstract class AbstractReplicator implements Replicator {
protected RedisInputStream inputStream;
protected BlockingQueue<Object> eventQueue;
protected Configuration configuration;
protected final ConcurrentHashMap<CommandName, CommandParser<? extends Command>> commands = new ConcurrentHashMap<>();
protected final List<CommandFilter> filters = new CopyOnWriteArrayList<>();
protected final List<CommandListener> listeners = new CopyOnWriteArrayList<>();
Expand Down Expand Up @@ -140,6 +141,11 @@ public void submitEvent(Object object) throws InterruptedException {
eventQueue.put(object);
}

@Override
public boolean verbose() {
return configuration == null ? false : configuration.isVerbose();
}

@Override
public void buildInCommandParserRegister() {
addCommandParser(CommandName.name("PING"), new PingParser());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public static Configuration defaultSetting() {
*/
private int eventQueueSize = 1000;

/**
* trace event log
*/
private boolean verbose = false;

/**
* psync master run id
*/
Expand Down Expand Up @@ -191,4 +196,31 @@ public Configuration setEventQueueSize(int eventQueueSize) {
this.eventQueueSize = eventQueueSize;
return this;
}

public boolean isVerbose() {
return verbose;
}

public Configuration setVerbose(boolean verbose) {
this.verbose = verbose;
return this;
}

@Override
public String toString() {
return "Configuration{" +
"connectionTimeout=" + connectionTimeout +
", readTimeout=" + readTimeout +
", receiveBufferSize=" + receiveBufferSize +
", sendBufferSize=" + sendBufferSize +
", retries=" + retries +
", bufferSize=" + bufferSize +
", authPassword='" + authPassword + '\'' +
", discardRdbEvent=" + discardRdbEvent +
", eventQueueSize=" + eventQueueSize +
", verbose=" + verbose +
", masterRunId='" + masterRunId + '\'' +
", offset=" + offset +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public RedisFileReplicator(File file, Configuration configuration) throws FileNo
}

public RedisFileReplicator(InputStream in, Configuration configuration) {
this.inputStream = new RedisInputStream(in, configuration.getBufferSize());
this.eventQueue = new ArrayBlockingQueue<>(configuration.getEventQueueSize());
this.configuration = configuration;
this.inputStream = new RedisInputStream(in, this.configuration.getBufferSize());
this.eventQueue = new ArrayBlockingQueue<>(this.configuration.getEventQueueSize());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ public void submitEvent(Object object) throws InterruptedException {
replicator.submitEvent(object);
}

@Override
public boolean verbose() {
return replicator.verbose();
}

@Override
public void open() throws IOException {
replicator.open();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ class RedisSocketReplicator extends AbstractReplicator {

private final String host;
private final int port;
private final Configuration configuration;
private RedisOutputStream outputStream;
private Socket socket;
private ReplyParser replyParser;
Expand Down Expand Up @@ -115,7 +114,8 @@ public void handle(long len) {
});
//command
if (obj instanceof Object[]) {
if (logger.isDebugEnabled()) logger.debug(Arrays.deepToString((Object[]) obj));
if (configuration.isVerbose() && logger.isDebugEnabled())
logger.debug(Arrays.deepToString((Object[]) obj));

Object[] command = (Object[]) obj;
CommandName cmdName = CommandName.name((String) command[0]);
Expand All @@ -130,7 +130,7 @@ public void handle(long len) {
Command parsedCommand = operations.parse(cmdName, params);

//submit event
this.eventQueue.put(parsedCommand);
this.submitEvent(parsedCommand);
} else {
if (logger.isInfoEnabled()) logger.info("Redis reply:" + obj);
}
Expand Down Expand Up @@ -183,7 +183,7 @@ private void parseDump(final AbstractReplicator replicator) throws IOException {
String reply = (String) replyParser.parse(new BulkReplyHandler() {
@Override
public String handle(long len, RedisInputStream in) throws IOException {
if (logger.isDebugEnabled()) logger.debug("RDB dump file size:" + len);
logger.info("RDB dump file size:" + len);
if (configuration.isDiscardRdbEvent()) {
logger.info("Discard " + len + " bytes");
in.skip(len);
Expand All @@ -195,7 +195,8 @@ public String handle(long len, RedisInputStream in) throws IOException {
}
});
//sync command
if (!reply.equals("OK")) throw new AssertionError("SYNC failed." + reply);
if (reply.equals("OK")) return;
throw new AssertionError("SYNC failed." + reply);
}

private void auth(String password) throws IOException {
Expand Down Expand Up @@ -305,7 +306,7 @@ public void close() throws IOException {
heartBeat.cancel();
heartBeat = null;
}
if (logger.isInfoEnabled()) logger.info("channel closed");
logger.info("channel closed");
}

private enum SyncMode {SYNC, PSYNC}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,7 @@ public interface Replicator extends Closeable {

void submitEvent(Object object) throws InterruptedException;

boolean verbose();

void open() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected long rdbLoad() throws IOException, InterruptedException {
throw new AssertionError("Un-except value-type:" + type);
}
if (kv == null) continue;
if (logger.isDebugEnabled()) logger.debug(kv);
if (replicator.verbose() && logger.isDebugEnabled()) logger.debug(kv);
//submit event
this.replicator.submitEvent(kv);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ protected long rdbLoad() throws IOException, InterruptedException {
throw new AssertionError("Un-except value-type:" + type);
}
if (kv == null) continue;
if (logger.isDebugEnabled()) logger.debug(kv);
if (replicator.verbose() && logger.isDebugEnabled()) logger.debug(kv);
//submit event
this.replicator.submitEvent(kv);
}
Expand Down

0 comments on commit 827073d

Please sign in to comment.