From 74778923c4fd3bfebc36581ddad0a161a4f55a95 Mon Sep 17 00:00:00 2001 From: jruaux Date: Tue, 25 Jun 2024 23:44:18 -0700 Subject: [PATCH] feat: Added empty-stream struct replication --- build.gradle | 1 + connectors/riot-file/riot-file.gradle | 9 +- .../com/redis/riot/file/AmazonS3Args.java | 72 --------------- .../riot/file/AmazonS3ProtocolResolver.java | 21 ----- .../java/com/redis/riot/file/AwsArgs.java | 87 +++++++++++++++++++ ...tialsArgs.java => AwsCredentialsArgs.java} | 11 +-- .../java/com/redis/riot/file/FileArgs.java | 8 +- .../java/com/redis/riot/file/FileUtils.java | 9 +- .../com/redis/riot/file/FileWriterArgs.java | 3 +- .../redis/riot/file/GoogleStorageArgs.java | 8 +- .../redis/riot/core/AbstractJobCommand.java | 64 +++++++------- docs/guide/src/docs/asciidoc/concepts.adoc | 2 - .../guide/src/docs/asciidoc/introduction.adoc | 1 + gradle.properties | 10 +-- gradle/wrapper/gradle-wrapper.properties | 2 +- gradlew | 2 +- .../riot/CompareLoggingWriteListener.java | 20 ++--- .../src/main/java/com/redis/riot/Main.java | 2 + .../java/com/redis/riot/ProcessorArgs.java | 44 +++++----- .../main/java/com/redis/riot/Replicate.java | 6 +- .../com/redis/riot/StreamProcessorArgs.java | 49 +++++++++++ .../riot/function/DropStreamMessageId.java | 29 ------- .../redis/riot/function/StreamOperator.java | 58 +++++++++++++ .../java/com/redis/riot/StackRiotTests.java | 43 +++++++++ .../test/resources/replicate-no-stream-ids | 1 + .../resources/replicate-no-stream-ids-prune | 1 + 26 files changed, 338 insertions(+), 225 deletions(-) delete mode 100644 connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3Args.java delete mode 100644 connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3ProtocolResolver.java create mode 100644 connectors/riot-file/src/main/java/com/redis/riot/file/AwsArgs.java rename connectors/riot-file/src/main/java/com/redis/riot/file/{AmazonS3CredentialsArgs.java => AwsCredentialsArgs.java} (62%) create mode 100644 plugins/riot/src/main/java/com/redis/riot/StreamProcessorArgs.java delete mode 100644 plugins/riot/src/main/java/com/redis/riot/function/DropStreamMessageId.java create mode 100644 plugins/riot/src/main/java/com/redis/riot/function/StreamOperator.java create mode 100644 plugins/riot/src/test/resources/replicate-no-stream-ids create mode 100644 plugins/riot/src/test/resources/replicate-no-stream-ids-prune diff --git a/build.gradle b/build.gradle index ecb602687..610d01f6a 100644 --- a/build.gradle +++ b/build.gradle @@ -120,6 +120,7 @@ subprojects { subproj -> configurations { all*.exclude module: 'spring-boot-starter-logging' + all*.exclude module: 'commons-logging' } bootJar { diff --git a/connectors/riot-file/riot-file.gradle b/connectors/riot-file/riot-file.gradle index c6377c0d9..2be09fbb9 100644 --- a/connectors/riot-file/riot-file.gradle +++ b/connectors/riot-file/riot-file.gradle @@ -16,17 +16,14 @@ * limitations under the License. */ dependencies { - api group: 'info.picocli', name: 'picocli', version: picocliVersion + implementation group: 'info.picocli', name: 'picocli', version: picocliVersion annotationProcessor group: 'info.picocli', name: 'picocli-codegen', version: picocliVersion implementation 'org.springframework.batch:spring-batch-infrastructure' implementation group: 'com.redis', name: 'spring-batch-resource', version: springBatchRedisVersion implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml' implementation 'org.springframework:spring-oxm' - implementation group: 'org.springframework.cloud', name: 'spring-cloud-aws-context', version: awsVersion - implementation group: 'org.springframework.cloud', name: 'spring-cloud-aws-autoconfigure', version: awsVersion - implementation(group: 'org.springframework.cloud', name: 'spring-cloud-gcp-starter-storage', version: gcpVersion) { - exclude group: 'javax.annotation', module: 'javax.annotation-api' - } + implementation group: 'io.awspring.cloud', name: 'spring-cloud-aws-starter-s3', version: awsVersion + implementation group: 'com.google.cloud', name: 'spring-cloud-gcp-starter-storage', version: gcpVersion testImplementation group: 'com.redis', name: 'spring-batch-redis-test', version: springBatchRedisVersion } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3Args.java b/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3Args.java deleted file mode 100644 index 52b2ee55c..000000000 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3Args.java +++ /dev/null @@ -1,72 +0,0 @@ -package com.redis.riot.file; - -import org.springframework.core.io.DefaultResourceLoader; -import org.springframework.core.io.Resource; -import org.springframework.util.Assert; - -import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration; -import com.amazonaws.services.s3.AmazonS3Client; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; - -import picocli.CommandLine.ArgGroup; -import picocli.CommandLine.Option; - -public class AmazonS3Args { - - private static final String S3_PROTOCOL_PREFIX = "s3://"; - - @ArgGroup(exclusive = false) - private AmazonS3CredentialsArgs credentialsArgs; - - @Option(names = "--s3-region", description = "Region to use for the AWS client (e.g. us-west-1).", paramLabel = "") - private String region; - - @Option(names = "--s3-endpoint", description = "AWS service endpoint either with or without the protocol (e.g. https://sns.us-west-1.amazonaws.com or sns.us-west-1.amazonaws.com).", paramLabel = "") - private String endpoint; - - public static boolean isSimpleStorageResource(String location) { - Assert.notNull(location, "Location must not be null"); - return location.toLowerCase().startsWith(S3_PROTOCOL_PREFIX); - } - - public Resource resource(String location) { - AmazonS3ClientBuilder clientBuilder = AmazonS3Client.builder(); - if (endpoint == null) { - if (region != null) { - clientBuilder.withRegion(region); - } - } else { - clientBuilder.withEndpointConfiguration(new EndpointConfiguration(endpoint, region)); - } - if (credentialsArgs != null) { - clientBuilder.withCredentials(credentialsArgs.credentials()); - } - AmazonS3ProtocolResolver resolver = new AmazonS3ProtocolResolver(clientBuilder); - resolver.afterPropertiesSet(); - return resolver.resolve(location, new DefaultResourceLoader()); - } - - public AmazonS3CredentialsArgs getCredentialsArgs() { - return credentialsArgs; - } - - public void setCredentialsArgs(AmazonS3CredentialsArgs args) { - this.credentialsArgs = args; - } - - public String getRegion() { - return region; - } - - public void setRegion(String region) { - this.region = region; - } - - public String getEndpoint() { - return endpoint; - } - - public void setEndpoint(String endpoint) { - this.endpoint = endpoint; - } -} diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3ProtocolResolver.java b/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3ProtocolResolver.java deleted file mode 100644 index 6fa4cd6ae..000000000 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3ProtocolResolver.java +++ /dev/null @@ -1,21 +0,0 @@ -package com.redis.riot.file; - -import org.springframework.cloud.aws.core.io.s3.SimpleStorageProtocolResolver; - -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; - -public class AmazonS3ProtocolResolver extends SimpleStorageProtocolResolver { - - private final AmazonS3ClientBuilder clientBuilder; - - public AmazonS3ProtocolResolver(AmazonS3ClientBuilder clientBuilder) { - this.clientBuilder = clientBuilder; - } - - @Override - public AmazonS3 getAmazonS3() { - return clientBuilder.build(); - } - -} diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/AwsArgs.java b/connectors/riot-file/src/main/java/com/redis/riot/file/AwsArgs.java new file mode 100644 index 000000000..25d017acc --- /dev/null +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/AwsArgs.java @@ -0,0 +1,87 @@ +package com.redis.riot.file; + +import java.net.URI; + +import org.springframework.core.io.Resource; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import io.awspring.cloud.s3.InMemoryBufferingS3OutputStreamProvider; +import io.awspring.cloud.s3.Location; +import io.awspring.cloud.s3.PropertiesS3ObjectContentTypeResolver; +import io.awspring.cloud.s3.S3Resource; +import picocli.CommandLine.ArgGroup; +import picocli.CommandLine.Option; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; + +public class AwsArgs { + + @ArgGroup(exclusive = false) + private AwsCredentialsArgs credentialsArgs; + + @Option(names = "--s3-region", description = "Region to use for the AWS client (e.g. us-west-1).", paramLabel = "") + private Region region; + + @Option(names = "--s3-endpoint", description = "Service endpoint with which the AWS client should communicate (e.g. https://sns.us-west-1.amazonaws.com).", paramLabel = "") + private URI endpoint; + + public static boolean isSimpleStorageResource(String location) { + Assert.notNull(location, "Location must not be null"); + return location.toLowerCase().startsWith(Location.S3_PROTOCOL_PREFIX); + } + + public Resource resource(String location) { + S3ClientBuilder clientBuilder = S3Client.builder(); + if (region != null) { + clientBuilder.region(region); + } + if (endpoint != null) { + clientBuilder.endpointOverride(endpoint); + } + clientBuilder.credentialsProvider(credentialsProvider()); + S3Client client = clientBuilder.build(); + InMemoryBufferingS3OutputStreamProvider outputStreamProvider = new InMemoryBufferingS3OutputStreamProvider( + client, new PropertiesS3ObjectContentTypeResolver()); + return S3Resource.create(location, client, outputStreamProvider); + } + + private AwsCredentialsProvider credentialsProvider() { + if (credentialsArgs != null && StringUtils.hasText(credentialsArgs.getAccessKey()) + && StringUtils.hasText(credentialsArgs.getSecretKey())) { + AwsBasicCredentials credentials = AwsBasicCredentials.create(credentialsArgs.getAccessKey(), + credentialsArgs.getSecretKey()); + return StaticCredentialsProvider.create(credentials); + } + return AnonymousCredentialsProvider.create(); + } + + public AwsCredentialsArgs getCredentialsArgs() { + return credentialsArgs; + } + + public void setCredentialsArgs(AwsCredentialsArgs args) { + this.credentialsArgs = args; + } + + public Region getRegion() { + return region; + } + + public void setRegion(Region region) { + this.region = region; + } + + public URI getEndpoint() { + return endpoint; + } + + public void setEndpoint(URI endpoint) { + this.endpoint = endpoint; + } +} diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3CredentialsArgs.java b/connectors/riot-file/src/main/java/com/redis/riot/file/AwsCredentialsArgs.java similarity index 62% rename from connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3CredentialsArgs.java rename to connectors/riot-file/src/main/java/com/redis/riot/file/AwsCredentialsArgs.java index 82d44ac30..76672152c 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/AmazonS3CredentialsArgs.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/AwsCredentialsArgs.java @@ -1,12 +1,8 @@ package com.redis.riot.file; -import com.amazonaws.auth.AWSCredentialsProvider; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; - import picocli.CommandLine.Option; -public class AmazonS3CredentialsArgs { +public class AwsCredentialsArgs { @Option(names = "--s3-access", required = true, description = "AWS access key.", paramLabel = "") private String accessKey; @@ -14,11 +10,6 @@ public class AmazonS3CredentialsArgs { @Option(names = "--s3-secret", required = true, arity = "0..1", interactive = true, description = "AWS secret key.", paramLabel = "") private String secretKey; - public AWSCredentialsProvider credentials() { - BasicAWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey); - return new AWSStaticCredentialsProvider(credentials); - } - public String getAccessKey() { return accessKey; } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileArgs.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileArgs.java index 2295eefd5..5bd0884b4 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileArgs.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileArgs.java @@ -16,7 +16,7 @@ public class FileArgs { public static final char DEFAULT_QUOTE_CHARACTER = '"'; @ArgGroup(exclusive = false) - private AmazonS3Args amazonS3Args = new AmazonS3Args(); + private AwsArgs amazonS3Args = new AwsArgs(); @ArgGroup(exclusive = false) private GoogleStorageArgs googleStorageArgs = new GoogleStorageArgs(); @@ -51,7 +51,7 @@ public FileType fileType(Resource resource) { } public Resource resource(String location) throws IOException { - if (AmazonS3Args.isSimpleStorageResource(location)) { + if (AwsArgs.isSimpleStorageResource(location)) { return amazonS3Args.resource(location); } if (GoogleStorageArgs.isGoogleStorageResource(location)) { @@ -91,11 +91,11 @@ public void setGoogleStorageArgs(GoogleStorageArgs args) { this.googleStorageArgs = args; } - public AmazonS3Args getAmazonS3Args() { + public AwsArgs getAmazonS3Args() { return amazonS3Args; } - public void setAmazonS3Args(AmazonS3Args args) { + public void setAmazonS3Args(AwsArgs args) { this.amazonS3Args = args; } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java index 8e18dc2a2..6aa5aacc1 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileUtils.java @@ -16,6 +16,9 @@ import org.springframework.core.io.Resource; import org.springframework.util.ResourceUtils; +import picocli.CommandLine; +import software.amazon.awssdk.regions.Region; + public abstract class FileUtils { public static final Pattern EXTENSION_PATTERN = Pattern.compile("(?i)\\.(?\\w+)(?:\\.(?gz))?$"); @@ -88,7 +91,7 @@ public static boolean isStdin(String file) { } public static boolean isFile(String file) { - return !AmazonS3Args.isSimpleStorageResource(file) && !GoogleStorageArgs.isGoogleStorageResource(file) + return !AwsArgs.isSimpleStorageResource(file) && !GoogleStorageArgs.isGoogleStorageResource(file) && !ResourceUtils.isUrl(file) && !isStdin(file); } @@ -109,4 +112,8 @@ public static List expand(Path path) { } } + public static void registerConverters(CommandLine commandLine) { + commandLine.registerConverter(Region.class, Region::of); + } + } diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/FileWriterArgs.java b/connectors/riot-file/src/main/java/com/redis/riot/file/FileWriterArgs.java index e600c0172..92346b2c6 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/FileWriterArgs.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/FileWriterArgs.java @@ -4,11 +4,12 @@ import java.io.OutputStream; import java.util.zip.GZIPOutputStream; -import org.springframework.cloud.gcp.core.GcpScope; import org.springframework.core.io.Resource; import org.springframework.core.io.WritableResource; import org.springframework.util.Assert; +import com.google.cloud.spring.core.GcpScope; + import picocli.CommandLine.Option; import picocli.CommandLine.Parameters; diff --git a/connectors/riot-file/src/main/java/com/redis/riot/file/GoogleStorageArgs.java b/connectors/riot-file/src/main/java/com/redis/riot/file/GoogleStorageArgs.java index c064d8405..dc23d0b36 100644 --- a/connectors/riot-file/src/main/java/com/redis/riot/file/GoogleStorageArgs.java +++ b/connectors/riot-file/src/main/java/com/redis/riot/file/GoogleStorageArgs.java @@ -7,15 +7,15 @@ import java.nio.file.Files; import java.util.Base64; -import org.springframework.cloud.gcp.autoconfigure.storage.GcpStorageAutoConfiguration; -import org.springframework.cloud.gcp.core.GcpScope; -import org.springframework.cloud.gcp.core.UserAgentHeaderProvider; -import org.springframework.cloud.gcp.storage.GoogleStorageResource; import org.springframework.core.io.Resource; import org.springframework.util.Assert; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.ServiceOptions; +import com.google.cloud.spring.autoconfigure.storage.GcpStorageAutoConfiguration; +import com.google.cloud.spring.core.GcpScope; +import com.google.cloud.spring.core.UserAgentHeaderProvider; +import com.google.cloud.spring.storage.GoogleStorageResource; import com.google.cloud.storage.StorageOptions; import picocli.CommandLine.Option; diff --git a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java index dcc9aa671..a1e36bf8a 100644 --- a/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java +++ b/core/riot-core/src/main/java/com/redis/riot/core/AbstractJobCommand.java @@ -57,28 +57,6 @@ public abstract class AbstractJobCommand extends AbstractCommand { protected PlatformTransactionManager transactionManager; protected JobLauncher jobLauncher; - protected Job job(Step... steps) { - return job(Stream.of(steps)); - } - - protected Job job(Iterable> steps) { - return job(StreamSupport.stream(steps.spliterator(), false)); - } - - private Job job(Stream> steps) { - Iterator iterator = steps.map(this::step).iterator(); - SimpleJobBuilder job = jobBuilder().start(iterator.next()); - while (iterator.hasNext()) { - job.next(iterator.next()); - } - return job.build(); - } - - private JobBuilder jobBuilder() { - log.info("Creating job {}", jobName); - return new JobBuilder(jobName, jobRepository); - } - @Override public void afterPropertiesSet() throws Exception { super.afterPropertiesSet(); @@ -98,22 +76,14 @@ public void afterPropertiesSet() throws Exception { } } - private TaskExecutorJobLauncher taskExecutorJobLauncher() throws Exception { - TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher(); - launcher.setJobRepository(jobRepository); - launcher.setTaskExecutor(new SyncTaskExecutor()); - launcher.afterPropertiesSet(); - return launcher; - } - @Override protected void execute() { - Job job = job(); JobExecution jobExecution; try { + Job job = job(); jobExecution = jobLauncher.run(job, new JobParameters()); } catch (JobExecutionException e) { - throw new RiotException("Could not run job " + job.getName(), e); + throw new RiotException("Could not run job " + jobName, e); } finally { shutdown(); } @@ -127,6 +97,36 @@ protected void execute() { } } + protected Job job(Step... steps) { + return job(Stream.of(steps)); + } + + protected Job job(Iterable> steps) { + return job(StreamSupport.stream(steps.spliterator(), false)); + } + + private Job job(Stream> steps) { + Iterator iterator = steps.map(this::step).iterator(); + SimpleJobBuilder job = jobBuilder().start(iterator.next()); + while (iterator.hasNext()) { + job.next(iterator.next()); + } + return job.build(); + } + + private JobBuilder jobBuilder() { + log.info("Creating job {}", jobName); + return new JobBuilder(jobName, jobRepository); + } + + private TaskExecutorJobLauncher taskExecutorJobLauncher() throws Exception { + TaskExecutorJobLauncher launcher = new TaskExecutorJobLauncher(); + launcher.setJobRepository(jobRepository); + launcher.setTaskExecutor(new SyncTaskExecutor()); + launcher.afterPropertiesSet(); + return launcher; + } + protected abstract void shutdown(); protected boolean shouldShowProgress() { diff --git a/docs/guide/src/docs/asciidoc/concepts.adoc b/docs/guide/src/docs/asciidoc/concepts.adoc index 156d63bab..a75adde2d 100644 --- a/docs/guide/src/docs/asciidoc/concepts.adoc +++ b/docs/guide/src/docs/asciidoc/concepts.adoc @@ -92,8 +92,6 @@ Both snapshot and live replication modes are supported. image::replication-architecture.svg[] -WARNING: Please note that {project-title} replication is NEITHER recommended NOR officially supported by Redis, Inc. - The basic replication mechanism is as follows: 1. Identify source keys to be replicated using scan and/or keyspace notifications depending on the <<_replication_mode,replication mode>>. diff --git a/docs/guide/src/docs/asciidoc/introduction.adoc b/docs/guide/src/docs/asciidoc/introduction.adoc index 492e06ab1..044e050d5 100644 --- a/docs/guide/src/docs/asciidoc/introduction.adoc +++ b/docs/guide/src/docs/asciidoc/introduction.adoc @@ -11,3 +11,4 @@ * Data migration from a Redis database to another * Live replication between two Redis databases +{project-title} is supported by Redis, Inc. on a good faith effort basis. To report bugs, request features, or receive assistance, please https://github.com/redis/riot/issues[file an issue] or contact your Redis account team. diff --git a/gradle.properties b/gradle.properties index f6b74bac8..099fa8ae7 100644 --- a/gradle.properties +++ b/gradle.properties @@ -22,23 +22,23 @@ testSourceCompatibility = 17 testTargetCompatibility = 17 reproducibleBuild = true -bootPluginVersion = 3.2.6 +bootPluginVersion = 3.3.1 dependencyPluginVersion = 1.1.4 gitPluginVersion = 3.0.0 jacocoPluginVersion = 0.8.12 -jdksPluginVersion = 1.11.0 +jdksPluginVersion = 1.12.0 kordampBuildVersion = 3.4.0 kordampPluginVersion = 0.54.0 -awsVersion = 2.2.6.RELEASE +awsVersion = 3.1.1 datafakerVersion = 2.2.2 -gcpVersion = 1.2.8.RELEASE +gcpVersion = 5.4.1 globVersion = 0.9.0 latencyUtilsVersion = 2.0.3 lettucemodVersion = 3.8.0 picocliVersion = 4.7.6 progressbarVersion = 0.10.1 -springBatchRedisVersion = 4.3.10 +springBatchRedisVersion = 4.3.12 testcontainersRedisVersion = 2.2.2 org.gradle.daemon = false diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index b82aa23a4..a4413138c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.8-bin.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/gradlew b/gradlew index 1aa94a426..b740cf133 100755 --- a/gradlew +++ b/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. diff --git a/plugins/riot/src/main/java/com/redis/riot/CompareLoggingWriteListener.java b/plugins/riot/src/main/java/com/redis/riot/CompareLoggingWriteListener.java index 5fc6b2769..8e5d09ceb 100644 --- a/plugins/riot/src/main/java/com/redis/riot/CompareLoggingWriteListener.java +++ b/plugins/riot/src/main/java/com/redis/riot/CompareLoggingWriteListener.java @@ -7,7 +7,6 @@ import org.slf4j.LoggerFactory; import org.springframework.batch.core.ItemWriteListener; import org.springframework.batch.item.Chunk; -import org.springframework.util.ObjectUtils; import com.redis.spring.batch.item.redis.common.BatchUtils; import com.redis.spring.batch.item.redis.reader.KeyComparison; @@ -33,31 +32,24 @@ public void afterWrite(Chunk> items) { public void log(KeyComparison comparison) { switch (comparison.getStatus()) { case MISSING: - log("Missing key {}", comparison); + log.error("Missing {} {}", comparison.getSource().getType(), key(comparison)); break; case TYPE: - log("Type mismatch on key {}. Expected {} but was {}", comparison, comparison.getSource().getType(), - comparison.getTarget().getType()); + log.error("Type mismatch on key {}. Expected {} but was {}", key(comparison), + comparison.getSource().getType(), comparison.getTarget().getType()); break; case VALUE: - log("Value mismatch on key {}", comparison); + log.error("Value mismatch on {} {}", comparison.getSource().getType(), key(comparison)); break; case TTL: - log("TTL mismatch on key {}. Expected {} but was {}", comparison, comparison.getSource().getTtl(), - comparison.getTarget().getTtl()); + log.error("TTL mismatch on key {}. Expected {} but was {}", key(comparison), + comparison.getSource().getTtl(), comparison.getTarget().getTtl()); break; default: break; } } - private void log(String msg, KeyComparison comparison, Object... args) { - if (log.isErrorEnabled()) { - String key = key(comparison); - log.error(msg, ObjectUtils.addObjectToArray(args, key, 0)); - } - } - private String key(KeyComparison comparison) { return toStringKeyFunction.apply(comparison.getSource().getKey()); } diff --git a/plugins/riot/src/main/java/com/redis/riot/Main.java b/plugins/riot/src/main/java/com/redis/riot/Main.java index 383999a6c..b8ee74761 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Main.java +++ b/plugins/riot/src/main/java/com/redis/riot/Main.java @@ -9,6 +9,7 @@ import com.redis.riot.core.IO; import com.redis.riot.core.PrintExceptionMessageHandler; import com.redis.riot.core.TemplateExpression; +import com.redis.riot.file.FileUtils; import com.redis.riot.operation.OperationCommand; import com.redis.spring.batch.Range; @@ -69,6 +70,7 @@ public static int run(CommandLine commandLine, String... args) { commandLine.registerConverter(Range.class, Range::parse); commandLine.registerConverter(Expression.class, Expression::parse); commandLine.registerConverter(TemplateExpression.class, Expression::parseTemplate); + FileUtils.registerConverters(commandLine); return commandLine.execute(args); } diff --git a/plugins/riot/src/main/java/com/redis/riot/ProcessorArgs.java b/plugins/riot/src/main/java/com/redis/riot/ProcessorArgs.java index f338d633c..dce7c7608 100644 --- a/plugins/riot/src/main/java/com/redis/riot/ProcessorArgs.java +++ b/plugins/riot/src/main/java/com/redis/riot/ProcessorArgs.java @@ -3,16 +3,16 @@ import java.util.ArrayList; import java.util.List; import java.util.function.Consumer; +import java.util.function.UnaryOperator; import org.springframework.batch.item.ItemProcessor; -import org.springframework.batch.item.function.FunctionItemProcessor; import org.springframework.expression.EvaluationContext; import com.redis.riot.core.EvaluationContextArgs; import com.redis.riot.core.Expression; +import com.redis.riot.core.RiotUtils; import com.redis.riot.core.TemplateExpression; import com.redis.riot.function.ConsumerUnaryOperator; -import com.redis.riot.function.DropStreamMessageId; import com.redis.spring.batch.item.redis.common.KeyValue; import picocli.CommandLine.ArgGroup; @@ -35,11 +35,16 @@ public class ProcessorArgs { @Option(names = "--ttls", description = "Propagate key expiration times. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true") private boolean propagateTtl = true; - @Option(names = "--stream-ids", description = "Propagate stream message IDs. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true") - private boolean propagateStreamMessageId = true; + @ArgGroup(exclusive = false) + private StreamProcessorArgs streamProcessorArgs = new StreamProcessorArgs(); public ItemProcessor, KeyValue> keyValueProcessor( EvaluationContext context) { + UnaryOperator> transform = transform(context); + return RiotUtils.processor(streamProcessorArgs.operator(), transform); + } + + private UnaryOperator> transform(EvaluationContext context) { List>> consumers = new ArrayList<>(); if (keyExpression != null) { consumers.add(t -> t.setKey(keyExpression.getValue(context, t))); @@ -50,56 +55,53 @@ public ItemProcessor, KeyValue> keyValu if (ttlExpression != null) { consumers.add(t -> t.setTtl(ttlExpression.getLong(context, t))); } - if (!propagateStreamMessageId) { - consumers.add(new DropStreamMessageId()); - } if (typeExpression != null) { consumers.add(t -> t.setType(typeExpression.getString(context, t))); } if (consumers.isEmpty()) { return null; } - return new FunctionItemProcessor<>(new ConsumerUnaryOperator<>(consumers)); + return new ConsumerUnaryOperator<>(consumers); } public TemplateExpression getKeyExpression() { return keyExpression; } - public void setKeyExpression(TemplateExpression keyExpression) { - this.keyExpression = keyExpression; + public void setKeyExpression(TemplateExpression expression) { + this.keyExpression = expression; } public Expression getTypeExpression() { return typeExpression; } - public void setTypeExpression(Expression typeExpression) { - this.typeExpression = typeExpression; + public void setTypeExpression(Expression expression) { + this.typeExpression = expression; } public Expression getTtlExpression() { return ttlExpression; } - public void setTtlExpression(Expression ttlExpression) { - this.ttlExpression = ttlExpression; + public void setTtlExpression(Expression expression) { + this.ttlExpression = expression; } public boolean isPropagateTtl() { return propagateTtl; } - public void setPropagateTtl(boolean propagateTtls) { - this.propagateTtl = propagateTtls; + public void setPropagateTtl(boolean propagate) { + this.propagateTtl = propagate; } - public boolean isPropagateStreamMessageId() { - return propagateStreamMessageId; + public StreamProcessorArgs getStreamProcessorArgs() { + return streamProcessorArgs; } - public void setPropagateStreamMessageId(boolean propagateStreamMessageIds) { - this.propagateStreamMessageId = propagateStreamMessageIds; + public void setStreamProcessorArgs(StreamProcessorArgs args) { + this.streamProcessorArgs = args; } public EvaluationContextArgs getEvaluationContextArgs() { @@ -114,7 +116,7 @@ public void setEvaluationContextArgs(EvaluationContextArgs args) { public String toString() { return "ProcessorArgs [evaluationContextArgs=" + evaluationContextArgs + ", keyExpression=" + keyExpression + ", typeExpression=" + typeExpression + ", ttlExpression=" + ttlExpression + ", propagateTtl=" - + propagateTtl + ", propagateStreamMessageId=" + propagateStreamMessageId + "]"; + + propagateTtl + ", streamProcessorArgs=" + streamProcessorArgs + "]"; } } diff --git a/plugins/riot/src/main/java/com/redis/riot/Replicate.java b/plugins/riot/src/main/java/com/redis/riot/Replicate.java index 38424e992..f36fef42e 100644 --- a/plugins/riot/src/main/java/com/redis/riot/Replicate.java +++ b/plugins/riot/src/main/java/com/redis/riot/Replicate.java @@ -6,6 +6,7 @@ import org.springframework.batch.core.Job; import org.springframework.batch.item.ItemProcessor; import org.springframework.batch.item.function.FunctionItemProcessor; +import org.springframework.util.Assert; import com.redis.riot.core.RiotUtils; import com.redis.riot.core.Step; @@ -72,7 +73,7 @@ protected Job job() { @Override protected boolean isIgnoreStreamMessageId() { - return !processorArgs.isPropagateStreamMessageId(); + return !processorArgs.getStreamProcessorArgs().isPropagateIds(); } private ItemProcessor, KeyValue> processor() { @@ -80,6 +81,9 @@ private ItemProcessor, KeyValue> proces } private ItemProcessor, KeyValue> keyValueProcessor() { + if (isIgnoreStreamMessageId()) { + Assert.isTrue(isStruct(), "'--no-stream-ids' can only be used with '--struct'"); + } ItemProcessor, KeyValue> processor = processorArgs .keyValueProcessor(evaluationContext(processorArgs)); if (processor == null) { diff --git a/plugins/riot/src/main/java/com/redis/riot/StreamProcessorArgs.java b/plugins/riot/src/main/java/com/redis/riot/StreamProcessorArgs.java new file mode 100644 index 000000000..0229d170f --- /dev/null +++ b/plugins/riot/src/main/java/com/redis/riot/StreamProcessorArgs.java @@ -0,0 +1,49 @@ +package com.redis.riot; + +import java.util.function.UnaryOperator; + +import com.redis.riot.function.StreamOperator; +import com.redis.spring.batch.item.redis.common.KeyValue; + +import picocli.CommandLine.Option; + +public class StreamProcessorArgs { + + @Option(names = "--stream-ids", description = "Propagate stream message IDs. True by default.", negatable = true, defaultValue = "true", fallbackValue = "true") + private boolean propagateIds = true; + + @Option(names = "--stream-prune", description = "Drop empty streams.") + private boolean prune; + + public UnaryOperator> operator() { + if (propagateIds && !prune) { + return null; + } + StreamOperator operator = new StreamOperator(); + operator.setDropMessageIds(!propagateIds); + operator.setPrune(prune); + return operator; + } + + public boolean isPropagateIds() { + return propagateIds; + } + + public void setPropagateIds(boolean propagateIds) { + this.propagateIds = propagateIds; + } + + public boolean isPrune() { + return prune; + } + + public void setPrune(boolean prune) { + this.prune = prune; + } + + @Override + public String toString() { + return "StreamProcessorArgs [propagateIds=" + propagateIds + ", prune=" + prune + "]"; + } + +} diff --git a/plugins/riot/src/main/java/com/redis/riot/function/DropStreamMessageId.java b/plugins/riot/src/main/java/com/redis/riot/function/DropStreamMessageId.java deleted file mode 100644 index f6870a16e..000000000 --- a/plugins/riot/src/main/java/com/redis/riot/function/DropStreamMessageId.java +++ /dev/null @@ -1,29 +0,0 @@ -package com.redis.riot.function; - -import java.util.Collection; -import java.util.function.Consumer; -import java.util.stream.Collectors; - -import com.redis.spring.batch.item.redis.common.DataType; -import com.redis.spring.batch.item.redis.common.KeyValue; - -import io.lettuce.core.StreamMessage; - -@SuppressWarnings("unchecked") -public class DropStreamMessageId implements Consumer> { - - @SuppressWarnings("rawtypes") - @Override - public void accept(KeyValue t) { - if (KeyValue.hasValue(t) && KeyValue.type(t) == DataType.STREAM) { - Collection messages = (Collection) t.getValue(); - t.setValue(messages.stream().map(this::message).collect(Collectors.toList())); - } - } - - @SuppressWarnings("rawtypes") - private StreamMessage message(StreamMessage message) { - return new StreamMessage(message.getStream(), null, message.getBody()); - } - -} diff --git a/plugins/riot/src/main/java/com/redis/riot/function/StreamOperator.java b/plugins/riot/src/main/java/com/redis/riot/function/StreamOperator.java new file mode 100644 index 000000000..1866f98a4 --- /dev/null +++ b/plugins/riot/src/main/java/com/redis/riot/function/StreamOperator.java @@ -0,0 +1,58 @@ +package com.redis.riot.function; + +import java.util.Collection; +import java.util.function.UnaryOperator; +import java.util.stream.Collectors; + +import org.springframework.util.CollectionUtils; + +import com.redis.spring.batch.item.redis.common.DataType; +import com.redis.spring.batch.item.redis.common.KeyValue; + +import io.lettuce.core.StreamMessage; + +public class StreamOperator implements UnaryOperator> { + + private boolean prune; + private boolean dropMessageIds; + + @SuppressWarnings("unchecked") + @Override + public KeyValue apply(KeyValue t) { + if (KeyValue.hasValue(t) && KeyValue.type(t) == DataType.STREAM) { + Collection> messages = (Collection>) t.getValue(); + if (CollectionUtils.isEmpty(messages)) { + if (prune) { + return null; + } + } else { + if (dropMessageIds) { + t.setValue(messages.stream().map(this::message).collect(Collectors.toList())); + } + } + } + return t; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private StreamMessage message(StreamMessage message) { + return new StreamMessage(message.getStream(), null, message.getBody()); + } + + public boolean isDropMessageIds() { + return dropMessageIds; + } + + public void setDropMessageIds(boolean dropMessageIds) { + this.dropMessageIds = dropMessageIds; + } + + public boolean isPrune() { + return prune; + } + + public void setPrune(boolean prune) { + this.prune = prune; + } + +} diff --git a/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java b/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java index 64628db08..64a291093 100644 --- a/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java +++ b/plugins/riot/src/test/java/com/redis/riot/StackRiotTests.java @@ -41,6 +41,10 @@ import com.redis.spring.batch.item.redis.common.KeyValue; import com.redis.spring.batch.item.redis.gen.GeneratorItemReader; import com.redis.spring.batch.item.redis.gen.GeneratorOptions; +import com.redis.spring.batch.item.redis.reader.DefaultKeyComparator; +import com.redis.spring.batch.item.redis.reader.KeyComparison; +import com.redis.spring.batch.item.redis.reader.KeyComparison.Status; +import com.redis.spring.batch.item.redis.reader.KeyComparisonItemReader; import com.redis.spring.batch.test.KeyspaceComparison; import com.redis.testcontainers.RedisStackContainer; @@ -564,6 +568,45 @@ void replicate(TestInfo info) throws Throwable { assertCompare(info); } + @Test + void replicateNoStreamIds(TestInfo info) throws Throwable { + String filename = "replicate-no-stream-ids"; + generate(info, generator(73)); + Assertions.assertTrue(redisCommands.dbsize() > 0); + execute(info, filename); + assertDbNotEmpty(redisCommands); + KeyComparisonItemReader reader = comparisonReader(info); + ((DefaultKeyComparator) reader.getComparator()).setIgnoreStreamMessageId(true); + reader.open(new ExecutionContext()); + List> comparisons = readAll(reader); + reader.close(); + KeyspaceComparison comparison = new KeyspaceComparison<>(comparisons); + Assertions.assertFalse(comparison.getAll().isEmpty()); + Assertions.assertEquals(Collections.emptyList(), comparison.mismatches()); + } + + @Test + void replicateNoStreamIdsPrune(TestInfo info) throws Throwable { + String filename = "replicate-no-stream-ids-prune"; + generate(info, generator(73)); + String emptyStream = "stream:empty"; + redisCommands.xadd(emptyStream, Map.of("field", "value")); + redisCommands.xtrim(emptyStream, 0); + Assertions.assertTrue(redisCommands.dbsize() > 0); + execute(info, filename); + assertDbNotEmpty(redisCommands); + KeyComparisonItemReader reader = comparisonReader(info); + ((DefaultKeyComparator) reader.getComparator()).setIgnoreStreamMessageId(true); + reader.open(new ExecutionContext()); + List> comparisons = readAll(reader); + reader.close(); + KeyspaceComparison comparison = new KeyspaceComparison<>(comparisons); + Assertions.assertFalse(comparison.getAll().isEmpty()); + KeyComparison missing = comparison.mismatches().get(0); + Assertions.assertEquals(Status.MISSING, missing.getStatus()); + Assertions.assertEquals(emptyStream, missing.getSource().getKey()); + } + @Test void replicateDryRun(TestInfo info) throws Throwable { String filename = "replicate-dry-run"; diff --git a/plugins/riot/src/test/resources/replicate-no-stream-ids b/plugins/riot/src/test/resources/replicate-no-stream-ids new file mode 100644 index 000000000..7e2fd6785 --- /dev/null +++ b/plugins/riot/src/test/resources/replicate-no-stream-ids @@ -0,0 +1 @@ +riot replicate --struct --batch 10 redis://source redis://target --no-stream-ids \ No newline at end of file diff --git a/plugins/riot/src/test/resources/replicate-no-stream-ids-prune b/plugins/riot/src/test/resources/replicate-no-stream-ids-prune new file mode 100644 index 000000000..0b5998c94 --- /dev/null +++ b/plugins/riot/src/test/resources/replicate-no-stream-ids-prune @@ -0,0 +1 @@ +riot replicate --struct --batch 10 redis://source redis://target --no-stream-ids --stream-prune \ No newline at end of file