Skip to content

Commit

Permalink
feat: Added processor options to parent of replicate and compare.
Browse files Browse the repository at this point in the history
Resolves #156
  • Loading branch information
jruaux committed Sep 15, 2024
1 parent e7da21a commit 20025ee
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,26 @@
import java.time.Duration;
import java.util.Collection;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.util.Assert;

import com.redis.riot.CompareStatusItemWriter.StatusCount;
import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.Step;
import com.redis.riot.function.StringKeyValue;
import com.redis.riot.function.ToStringKeyValue;
import com.redis.spring.batch.item.redis.RedisItemReader;
import com.redis.spring.batch.item.redis.common.KeyValue;
import com.redis.spring.batch.item.redis.reader.DefaultKeyComparator;
import com.redis.spring.batch.item.redis.reader.KeyComparator;
import com.redis.spring.batch.item.redis.reader.KeyComparison;
import com.redis.spring.batch.item.redis.reader.KeyComparisonItemReader;
import com.redis.spring.batch.item.redis.reader.RedisScanSizeEstimator;

import io.lettuce.core.codec.ByteArrayCodec;
import picocli.CommandLine.ArgGroup;
import picocli.CommandLine.Option;

public abstract class AbstractCompareCommand extends AbstractReplicateCommand {
Expand All @@ -33,6 +43,41 @@ public abstract class AbstractCompareCommand extends AbstractReplicateCommand {
@Option(names = "--ttl-tolerance", description = "Max TTL offset in millis to consider keys equal (default: ${DEFAULT-VALUE}).", paramLabel = "<ms>")
private long ttlToleranceMillis = DEFAULT_TTL_TOLERANCE.toMillis();

@ArgGroup(exclusive = false)
private EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs();

@ArgGroup(exclusive = false, heading = "Processor options%n")
private KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs();

protected ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor() {
return RiotUtils.processor(new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log), keyValueProcessor());
}

protected abstract boolean isStruct();

private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> keyValueProcessor() {
if (isIgnoreStreamMessageId()) {
Assert.isTrue(isStruct(), "--no-stream-id can only be used with --struct");
}
StandardEvaluationContext evaluationContext = evaluationContext();
log.info("Creating processor with {}", processorArgs);
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processorArgs
.processor(evaluationContext);
if (processor == null) {
return null;
}
ToStringKeyValue<byte[]> code = new ToStringKeyValue<>(ByteArrayCodec.INSTANCE);
StringKeyValue<byte[]> decode = new StringKeyValue<>(ByteArrayCodec.INSTANCE);
return RiotUtils.processor(new FunctionItemProcessor<>(code), processor, new FunctionItemProcessor<>(decode));
}

private StandardEvaluationContext evaluationContext() {
log.info("Creating SpEL evaluation context with {}", evaluationContextArgs);
StandardEvaluationContext evaluationContext = evaluationContextArgs.evaluationContext();
configure(evaluationContext);
return evaluationContext;
}

private String compareMessage(Collection<StatusCount> counts) {
StringBuilder builder = new StringBuilder();
counts.stream().map(CompareStepListener::toString).forEach(s -> builder.append(STATUS_DELIMITER).append(s));
Expand Down Expand Up @@ -70,6 +115,7 @@ protected KeyComparisonItemReader<byte[], byte[]> compareReader() {
RedisItemReader<byte[], byte[], Object> target = compareTargetReader();
KeyComparisonItemReader<byte[], byte[]> reader = new KeyComparisonItemReader<>(source, target);
reader.setComparator(keyComparator());
reader.setProcessor(processor());
return reader;
}

Expand All @@ -83,7 +129,9 @@ private KeyComparator<byte[]> keyComparator() {
return comparator;
}

protected abstract boolean isIgnoreStreamMessageId();
protected boolean isIgnoreStreamMessageId() {
return !processorArgs.isPropagateIds();
}

private RedisItemReader<byte[], byte[], Object> compareSourceReader() {
RedisItemReader<byte[], byte[], Object> reader = compareRedisReader();
Expand Down Expand Up @@ -128,4 +176,12 @@ public void setTtlToleranceMillis(long tolerance) {
this.ttlToleranceMillis = tolerance;
}

public KeyValueProcessorArgs getProcessorArgs() {
return processorArgs;
}

public void setProcessorArgs(KeyValueProcessorArgs args) {
this.processorArgs = args;
}

}
5 changes: 5 additions & 0 deletions plugins/riot/src/main/java/com/redis/riot/Compare.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ public class Compare extends AbstractCompareCommand {
@Option(names = "--quick", description = "Skip value comparison.")
private boolean quick;

@Override
protected boolean isStruct() {
return true;
}

@Override
protected boolean isQuickCompare() {
return quick;
Expand Down
54 changes: 1 addition & 53 deletions plugins/riot/src/main/java/com/redis/riot/Replicate.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,8 @@
import java.util.List;

import org.springframework.batch.core.Job;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.util.Assert;

import com.redis.riot.core.RiotUtils;
import com.redis.riot.core.Step;
import com.redis.riot.function.StringKeyValue;
import com.redis.riot.function.ToStringKeyValue;
import com.redis.spring.batch.item.redis.RedisItemReader;
import com.redis.spring.batch.item.redis.RedisItemReader.ReaderMode;
import com.redis.spring.batch.item.redis.RedisItemWriter;
Expand Down Expand Up @@ -45,12 +38,6 @@ public enum CompareMode {
@Option(names = "--struct", description = "Enable data structure-specific replication")
private boolean struct;

@ArgGroup(exclusive = false)
private EvaluationContextArgs evaluationContextArgs = new EvaluationContextArgs();

@ArgGroup(exclusive = false, heading = "Processor options%n")
private KeyValueProcessorArgs processorArgs = new KeyValueProcessorArgs();

@ArgGroup(exclusive = false)
private RedisWriterArgs targetRedisWriterArgs = new RedisWriterArgs();

Expand All @@ -76,38 +63,6 @@ protected Job job() {
return job(steps);
}

@Override
protected boolean isIgnoreStreamMessageId() {
return !processorArgs.isPropagateIds();
}

private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> processor() {
return RiotUtils.processor(new KeyValueFilter<>(ByteArrayCodec.INSTANCE, log), keyValueProcessor());
}

private ItemProcessor<KeyValue<byte[], Object>, KeyValue<byte[], Object>> keyValueProcessor() {
if (isIgnoreStreamMessageId()) {
Assert.isTrue(isStruct(), "--no-stream-id can only be used with --struct");
}
StandardEvaluationContext evaluationContext = evaluationContext();
log.info("Creating processor with {}", processorArgs);
ItemProcessor<KeyValue<String, Object>, KeyValue<String, Object>> processor = processorArgs
.processor(evaluationContext);
if (processor == null) {
return null;
}
ToStringKeyValue<byte[]> code = new ToStringKeyValue<>(ByteArrayCodec.INSTANCE);
StringKeyValue<byte[]> decode = new StringKeyValue<>(ByteArrayCodec.INSTANCE);
return RiotUtils.processor(new FunctionItemProcessor<>(code), processor, new FunctionItemProcessor<>(decode));
}

private StandardEvaluationContext evaluationContext() {
log.info("Creating SpEL evaluation context with {}", evaluationContextArgs);
StandardEvaluationContext evaluationContext = evaluationContextArgs.evaluationContext();
configure(evaluationContext);
return evaluationContext;
}

@Override
protected void configureTargetRedisWriter(RedisItemWriter<?, ?, ?> writer) {
super.configureTargetRedisWriter(writer);
Expand Down Expand Up @@ -198,6 +153,7 @@ public void setTargetRedisWriterArgs(RedisWriterArgs redisWriterArgs) {
this.targetRedisWriterArgs = redisWriterArgs;
}

@Override
public boolean isStruct() {
return struct;
}
Expand All @@ -206,14 +162,6 @@ public void setStruct(boolean type) {
this.struct = type;
}

public KeyValueProcessorArgs getProcessorArgs() {
return processorArgs;
}

public void setProcessorArgs(KeyValueProcessorArgs args) {
this.processorArgs = args;
}

public boolean isLogKeys() {
return logKeys;
}
Expand Down
20 changes: 10 additions & 10 deletions plugins/riot/src/test/java/com/redis/riot/RiotTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,17 @@ protected static double abv(Map<String, String> beer) {
return Double.parseDouble(beer.get("abv"));
}

protected void execute(Replicate replication, TestInfo info) throws Exception {
protected void execute(Replicate replicate, TestInfo info) throws Exception {
System.setProperty(SimpleLogger.LOG_KEY_PREFIX + ReplicateWriteLogger.class.getName(), "error");
replication.getJobArgs().getProgressArgs().setStyle(ProgressStyle.NONE);
replication.setJobName(name(info));
replication.setJobRepository(jobRepository);
replication.setSourceRedisUri(redisURI);
replication.getSourceRedisArgs().setCluster(getRedisServer().isRedisCluster());
replication.setTargetRedisUri(targetRedisURI);
replication.getTargetRedisArgs().setCluster(getTargetRedisServer().isRedisCluster());
replication.getSourceRedisReaderArgs().setIdleTimeout(DEFAULT_IDLE_TIMEOUT_SECONDS);
replication.call();
replicate.getJobArgs().getProgressArgs().setStyle(ProgressStyle.NONE);
replicate.setJobName(name(info));
replicate.setJobRepository(jobRepository);
replicate.setSourceRedisUri(redisURI);
replicate.getSourceRedisArgs().setCluster(getRedisServer().isRedisCluster());
replicate.setTargetRedisUri(targetRedisURI);
replicate.getTargetRedisArgs().setCluster(getTargetRedisServer().isRedisCluster());
replicate.getSourceRedisReaderArgs().setIdleTimeout(DEFAULT_IDLE_TIMEOUT_SECONDS);
replicate.call();
}

@Test
Expand Down
14 changes: 14 additions & 0 deletions plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import com.redis.lettucemod.timeseries.MRangeOptions;
import com.redis.lettucemod.timeseries.RangeResult;
import com.redis.lettucemod.timeseries.TimeRange;
import com.redis.riot.Replicate.CompareMode;
import com.redis.riot.core.Expression;
import com.redis.riot.core.ProgressStyle;
import com.redis.riot.core.QuietMapAccessor;
import com.redis.riot.file.xml.XmlItemReader;
import com.redis.riot.file.xml.XmlItemReaderBuilder;
Expand Down Expand Up @@ -741,6 +743,18 @@ void replicateStruct(TestInfo info) throws Throwable {
execute(info, filename);
}

@Test
void compareKeyProcessor(TestInfo info) throws Throwable {
GeneratorItemReader gen = generator(1, DataType.HASH);
generate(info, gen);
Long sourceSize = redisCommands.dbsize();
Assertions.assertTrue(sourceSize > 0);
execute(testInfo(info, "replicate"), "replicate-key-processor-compare-none");
Assertions.assertEquals(sourceSize, targetRedisCommands.dbsize());
Assertions.assertEquals(redisCommands.hgetall("gen:1"), targetRedisCommands.hgetall("prefix:gen:1"));
execute(info, "compare-key-processor");
}

@Test
void keyProcessor(TestInfo info) throws Throwable {
String key1 = "key1";
Expand Down
1 change: 1 addition & 0 deletions plugins/riot/src/test/resources/compare-key-processor
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
riot compare --batch 1 --key-proc="prefix:#{key}" redis://source redis://target
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
riot replicate --batch 1 --key-proc="prefix:#{key}" --compare none redis://source redis://target

0 comments on commit 20025ee

Please sign in to comment.