From 27669899d3fd68fd967ba1e3422f6ea2d5ce5219 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Thu, 25 Apr 2024 14:58:55 -0700 Subject: [PATCH] Java: Add `XTRIM` command for streams (#1335) * Java: Add `XTRIM` command for streams (#215) * Java: Add XTRIM command Signed-off-by: Andrew Carbonetto * Update imports; add exception test Signed-off-by: Andrew Carbonetto * Add IT test for non-extent key Signed-off-by: Andrew Carbonetto * Update documentation examples Signed-off-by: Andrew Carbonetto * XTRIM: make exact optional Signed-off-by: Andrew Carbonetto * Consolidate stream options into one class Signed-off-by: Andrew Carbonetto * Remove StreamAddOptions and StreamTrimOptions Signed-off-by: Andrew Carbonetto * Update docs for StreamOptions Signed-off-by: Andrew Carbonetto * Pull StreamOption tests into separate file Signed-off-by: Andrew Carbonetto * Remove extra tests Signed-off-by: Andrew Carbonetto * SPOTLESS Signed-off-by: Andrew Carbonetto * Update tests Signed-off-by: Andrew Carbonetto * Move some tests around Signed-off-by: Andrew Carbonetto * Delete test file Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto * Clean up example comment Signed-off-by: Andrew Carbonetto --------- Signed-off-by: Andrew Carbonetto --- .../src/main/java/glide/api/BaseClient.java | 10 +- .../api/commands/StreamBaseCommands.java | 25 ++- .../glide/api/models/BaseTransaction.java | 20 +- ...reamAddOptions.java => StreamOptions.java} | 163 +++++++++------ .../test/java/glide/api/RedisClientTest.java | 187 +++++++++++------- .../glide/api/models/TransactionTests.java | 9 +- .../test/java/glide/SharedCommandTests.java | 42 ++-- .../java/glide/TransactionTestUtilities.java | 14 +- 8 files changed, 313 insertions(+), 157 deletions(-) rename java/client/src/main/java/glide/api/models/commands/{StreamAddOptions.java => StreamOptions.java} (51%) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index a66e2230c0..3b148fc62b 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -75,6 +75,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; +import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZDiff; import static redis_request.RedisRequestOuterClass.RequestType.ZDiffStore; import static redis_request.RedisRequestOuterClass.RequestType.ZLexCount; @@ -111,7 +112,8 @@ import glide.api.models.commands.RangeOptions.ScoredRangeQuery; import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; -import glide.api.models.commands.StreamAddOptions; +import glide.api.models.commands.StreamOptions.StreamAddOptions; +import glide.api.models.commands.StreamOptions.StreamTrimOptions; import glide.api.models.commands.ZaddOptions; import glide.api.models.configuration.BaseClientConfiguration; import glide.api.models.exceptions.RedisException; @@ -927,6 +929,12 @@ public CompletableFuture xadd( return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse); } + @Override + public CompletableFuture xtrim(@NonNull String key, @NonNull StreamTrimOptions options) { + String[] arguments = ArrayUtils.addFirst(options.toArgs(), key); + return commandManager.submitNewCommand(XTrim, arguments, this::handleLongResponse); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 2fa1ff4daf..fc788b137c 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -1,8 +1,9 @@ /** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.commands; -import glide.api.models.commands.StreamAddOptions; -import glide.api.models.commands.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.StreamOptions.StreamAddOptions; +import glide.api.models.commands.StreamOptions.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.StreamOptions.StreamTrimOptions; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -52,4 +53,24 @@ public interface StreamBaseCommands { * } */ CompletableFuture xadd(String key, Map values, StreamAddOptions options); + + /** + * Trims the stream by evicting older entries. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param options Stream trim options. + * @return The number of entries deleted from the stream. + * @example + *
{@code
+     * // A nearly exact trimming of the stream to at least a length of 10
+     * Long trimmed = client.xtrim("key", new MaxLen(false, 10L)).get();
+     * System.out.println("Number of trimmed entries from stream: " + trimmed);
+     *
+     * // An exact trimming of the stream by minimum id of "0-3", limit of 10 entries
+     * Long trimmed = client.xtrim("key", new MinId(true, "0-3", 10L)).get();
+     * System.out.println("Number of trimmed entries from stream: " + trimmed);
+     * }
+ */ + CompletableFuture xtrim(String key, StreamTrimOptions options); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index ac4ecb717f..4db361320e 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -90,6 +90,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; +import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZDiff; import static redis_request.RedisRequestOuterClass.RequestType.ZDiffStore; import static redis_request.RedisRequestOuterClass.RequestType.ZLexCount; @@ -127,8 +128,9 @@ import glide.api.models.commands.SetOptions; import glide.api.models.commands.SetOptions.ConditionalSet; import glide.api.models.commands.SetOptions.SetOptionsBuilder; -import glide.api.models.commands.StreamAddOptions; -import glide.api.models.commands.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.StreamOptions.StreamAddOptions; +import glide.api.models.commands.StreamOptions.StreamAddOptions.StreamAddOptionsBuilder; +import glide.api.models.commands.StreamOptions.StreamTrimOptions; import glide.api.models.commands.ZaddOptions; import java.util.Arrays; import java.util.Map; @@ -1952,6 +1954,20 @@ public T xadd( return getThis(); } + /** + * Trims the stream by evicting older entries. + * + * @see redis.io for details. + * @param key The key of the stream. + * @param options Stream trim options. + * @return Command Response - The number of entries deleted from the stream. + */ + public T xtrim(@NonNull String key, @NonNull StreamTrimOptions options) { + ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(options.toArgs(), key)); + protobufTransaction.addCommands(buildCommand(XTrim, commandArgs)); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java b/java/client/src/main/java/glide/api/models/commands/StreamOptions.java similarity index 51% rename from java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java rename to java/client/src/main/java/glide/api/models/commands/StreamOptions.java index 34509523ed..9f66b990f8 100644 --- a/java/client/src/main/java/glide/api/models/commands/StreamAddOptions.java +++ b/java/client/src/main/java/glide/api/models/commands/StreamOptions.java @@ -8,42 +8,77 @@ import lombok.Builder; import lombok.NonNull; -/** - * Optional arguments to {@link StreamBaseCommands#xadd(String, Map, StreamAddOptions)} - * - * @see redis.io - */ -@Builder -public final class StreamAddOptions { - - public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM"; - public static final String ID_WILDCARD_REDIS_API = "*"; - public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN"; - public static final String TRIM_MINID_REDIS_API = "MINID"; - public static final String TRIM_EXACT_REDIS_API = "="; - public static final String TRIM_NOT_EXACT_REDIS_API = "~"; - public static final String TRIM_LIMIT_REDIS_API = "LIMIT"; - - /** If set, the new entry will be added with this id. */ - private final String id; +/** Optional arguments for {@link StreamBaseCommands} */ +public final class StreamOptions { /** - * If set to false, a new stream won't be created if no stream matches the given key. - *
- * Equivalent to NOMKSTREAM in the Redis API. + * Optional arguments to {@link StreamBaseCommands#xadd(String, Map, StreamAddOptions)} + * + * @see redis.io */ - private final Boolean makeStream; + @Builder + public static final class StreamAddOptions { + public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM"; + public static final String ID_WILDCARD_REDIS_API = "*"; + + /** If set, the new entry will be added with this id. */ + private final String id; + + /** + * If set to false, a new stream won't be created if no stream matches the given + * key.
+ * Equivalent to NOMKSTREAM in the Redis API. + */ + private final Boolean makeStream; + + /** If set, the add operation will also trim the older entries in the stream. */ + private final StreamTrimOptions trim; + + /** + * Converts options for Xadd into a String[]. + * + * @return String[] + */ + public String[] toArgs() { + List optionArgs = new ArrayList<>(); + + if (makeStream != null && !makeStream) { + optionArgs.add(NO_MAKE_STREAM_REDIS_API); + } + + if (trim != null) { + optionArgs.addAll(trim.getRedisApi()); + } + + if (id != null) { + optionArgs.add(id); + } else { + optionArgs.add(ID_WILDCARD_REDIS_API); + } - /** If set, the add operation will also trim the older entries in the stream. */ - private final StreamTrimOptions trim; + return optionArgs.toArray(new String[0]); + } + } + /** + * Optional arguments for {@link StreamBaseCommands#xtrim(String, StreamTrimOptions)} + * + * @see redis.io + */ public abstract static class StreamTrimOptions { + + public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN"; + public static final String TRIM_MINID_REDIS_API = "MINID"; + public static final String TRIM_EXACT_REDIS_API = "="; + public static final String TRIM_NOT_EXACT_REDIS_API = "~"; + public static final String TRIM_LIMIT_REDIS_API = "LIMIT"; + /** * If true, the stream will be trimmed exactly. Equivalent to = in the * Redis API. Otherwise, the stream will be trimmed in a near-exact manner, which is more * efficient, equivalent to ~ in the Redis API. */ - protected boolean exact; + protected Boolean exact; /** If set, sets the maximal amount of entries that will be deleted. */ protected Long limit; @@ -56,9 +91,10 @@ protected List getRedisApi() { List optionArgs = new ArrayList<>(); optionArgs.add(this.getMethod()); - optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API); + if (this.exact != null) { + optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API); + } optionArgs.add(this.getThreshold()); - if (this.limit != null) { optionArgs.add(TRIM_LIMIT_REDIS_API); optionArgs.add(this.limit.toString()); @@ -66,6 +102,22 @@ protected List getRedisApi() { return optionArgs; } + + /** + * Converts options for {@link StreamBaseCommands#xtrim(String, StreamTrimOptions)} into a + * String[]. + * + * @return String[] + */ + public String[] toArgs() { + List optionArgs = new ArrayList<>(); + + if (this.getRedisApi() != null) { + optionArgs.addAll(this.getRedisApi()); + } + + return optionArgs.toArray(new String[0]); + } } /** Option to trim the stream according to minimum ID. */ @@ -73,6 +125,15 @@ public static class MinId extends StreamTrimOptions { /** Trim the stream according to entry ID. Equivalent to MINID in the Redis API. */ private final String threshold; + /** + * Create a trim option to trim stream based on stream ID. + * + * @param threshold Comparison id. + */ + public MinId(@NonNull String threshold) { + this.threshold = threshold; + } + /** * Create a trim option to trim stream based on stream ID. * @@ -87,13 +148,12 @@ public MinId(boolean exact, @NonNull String threshold) { /** * Create a trim option to trim stream based on stream ID. * - * @param exact Whether to match exactly on the threshold. * @param threshold Comparison id. - * @param limit Max number of stream entries to be trimmed. + * @param limit Max number of stream entries to be trimmed for non-exact match. */ - public MinId(boolean exact, @NonNull String threshold, long limit) { + public MinId(@NonNull String threshold, long limit) { + this.exact = false; this.threshold = threshold; - this.exact = exact; this.limit = limit; } @@ -116,6 +176,15 @@ public static class MaxLen extends StreamTrimOptions { */ private final Long threshold; + /** + * Create a Max Length trim option to trim stream based on length. + * + * @param threshold Comparison count. + */ + public MaxLen(long threshold) { + this.threshold = threshold; + } + /** * Create a Max Length trim option to trim stream based on length. * @@ -130,13 +199,12 @@ public MaxLen(boolean exact, long threshold) { /** * Create a Max Length trim option to trim stream entries exceeds the threshold. * - * @param exact Whether to match exactly on the threshold. * @param threshold Comparison count. - * @param limit Max number of stream entries to be trimmed. + * @param limit Max number of stream entries to be trimmed for non-exact match. */ - public MaxLen(boolean exact, long threshold, long limit) { + public MaxLen(long threshold, long limit) { + this.exact = false; this.threshold = threshold; - this.exact = exact; this.limit = limit; } @@ -150,29 +218,4 @@ protected String getThreshold() { return threshold.toString(); } } - - /** - * Converts options for Xadd into a String[]. - * - * @return String[] - */ - public String[] toArgs() { - List optionArgs = new ArrayList<>(); - - if (makeStream != null && !makeStream) { - optionArgs.add(NO_MAKE_STREAM_REDIS_API); - } - - if (trim != null) { - optionArgs.addAll(trim.getRedisApi()); - } - - if (id != null) { - optionArgs.add(id); - } else { - optionArgs.add(ID_WILDCARD_REDIS_API); - } - - return optionArgs.toArray(new String[0]); - } } diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index 2b0206e647..5e4b257e3a 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -9,12 +9,12 @@ import static glide.api.models.commands.SetOptions.ConditionalSet.ONLY_IF_DOES_NOT_EXIST; import static glide.api.models.commands.SetOptions.ConditionalSet.ONLY_IF_EXISTS; import static glide.api.models.commands.SetOptions.RETURN_OLD_VALUE; -import static glide.api.models.commands.StreamAddOptions.NO_MAKE_STREAM_REDIS_API; -import static glide.api.models.commands.StreamAddOptions.TRIM_EXACT_REDIS_API; -import static glide.api.models.commands.StreamAddOptions.TRIM_LIMIT_REDIS_API; -import static glide.api.models.commands.StreamAddOptions.TRIM_MAXLEN_REDIS_API; -import static glide.api.models.commands.StreamAddOptions.TRIM_MINID_REDIS_API; -import static glide.api.models.commands.StreamAddOptions.TRIM_NOT_EXACT_REDIS_API; +import static glide.api.models.commands.StreamOptions.StreamAddOptions.NO_MAKE_STREAM_REDIS_API; +import static glide.api.models.commands.StreamOptions.StreamTrimOptions.TRIM_EXACT_REDIS_API; +import static glide.api.models.commands.StreamOptions.StreamTrimOptions.TRIM_LIMIT_REDIS_API; +import static glide.api.models.commands.StreamOptions.StreamTrimOptions.TRIM_MAXLEN_REDIS_API; +import static glide.api.models.commands.StreamOptions.StreamTrimOptions.TRIM_MINID_REDIS_API; +import static glide.api.models.commands.StreamOptions.StreamTrimOptions.TRIM_NOT_EXACT_REDIS_API; import static glide.utils.ArrayTransformUtils.concatenateArrays; import static glide.utils.ArrayTransformUtils.convertMapToKeyValueStringArray; import static glide.utils.ArrayTransformUtils.convertMapToValueKeyStringArray; @@ -111,6 +111,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; +import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZDiff; import static redis_request.RedisRequestOuterClass.RequestType.ZDiffStore; import static redis_request.RedisRequestOuterClass.RequestType.ZLexCount; @@ -144,7 +145,10 @@ import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; import glide.api.models.commands.SetOptions.Expiry; -import glide.api.models.commands.StreamAddOptions; +import glide.api.models.commands.StreamOptions.MaxLen; +import glide.api.models.commands.StreamOptions.MinId; +import glide.api.models.commands.StreamOptions.StreamAddOptions; +import glide.api.models.commands.StreamOptions.StreamTrimOptions; import glide.api.models.commands.ZaddOptions; import glide.managers.CommandManager; import glide.managers.ConnectionManager; @@ -156,7 +160,6 @@ import java.util.concurrent.CompletableFuture; import lombok.SneakyThrows; import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.tuple.Pair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -2914,65 +2917,22 @@ public void xadd_returns_success() { assertEquals(returnId, payload); } - @SneakyThrows - @Test - public void xadd_with_nomakestream_maxlen_options_returns_success() { - // setup - String key = "testKey"; - Map fieldValues = new LinkedHashMap<>(); - fieldValues.put("testField1", "testValue1"); - fieldValues.put("testField2", "testValue2"); - StreamAddOptions options = - StreamAddOptions.builder() - .id("id") - .makeStream(false) - .trim(new StreamAddOptions.MaxLen(true, 5L)) - .build(); - - String[] arguments = - new String[] { - key, - NO_MAKE_STREAM_REDIS_API, - TRIM_MAXLEN_REDIS_API, - TRIM_EXACT_REDIS_API, - Long.toString(5L), - "id" - }; - arguments = ArrayUtils.addAll(arguments, convertMapToKeyValueStringArray(fieldValues)); - - String returnId = "testId"; - CompletableFuture testResponse = new CompletableFuture<>(); - testResponse.complete(returnId); - - // match on protobuf request - when(commandManager.submitNewCommand(eq(XAdd), eq(arguments), any())) - .thenReturn(testResponse); - - // exercise - CompletableFuture response = service.xadd(key, fieldValues, options); - String payload = response.get(); - - // verify - assertEquals(testResponse, response); - assertEquals(returnId, payload); - } - private static List getStreamAddOptions() { return List.of( Arguments.of( - Pair.of( - // no TRIM option - StreamAddOptions.builder().id("id").makeStream(Boolean.FALSE).build(), - new String[] {"testKey", NO_MAKE_STREAM_REDIS_API, "id"}), - Pair.of( + // no TRIM option + "test_xadd_no_trim", + StreamAddOptions.builder().id("id").makeStream(Boolean.FALSE).build(), + new String[] {NO_MAKE_STREAM_REDIS_API, "id"}, + Arguments.of( // MAXLEN with LIMIT + "test_xadd_maxlen_with_limit", StreamAddOptions.builder() .id("id") .makeStream(Boolean.TRUE) - .trim(new StreamAddOptions.MaxLen(Boolean.TRUE, 5L, 10L)) + .trim(new MaxLen(5L, 10L)) .build(), new String[] { - "testKey", TRIM_MAXLEN_REDIS_API, TRIM_EXACT_REDIS_API, Long.toString(5L), @@ -2980,29 +2940,29 @@ private static List getStreamAddOptions() { Long.toString(10L), "id" }), - Pair.of( + Arguments.of( // MAXLEN with non exact match + "test_xadd_maxlen_with_non_exact_match", StreamAddOptions.builder() .makeStream(Boolean.FALSE) - .trim(new StreamAddOptions.MaxLen(Boolean.FALSE, 2L)) + .trim(new MaxLen(false, 2L)) .build(), new String[] { - "testKey", NO_MAKE_STREAM_REDIS_API, TRIM_MAXLEN_REDIS_API, TRIM_NOT_EXACT_REDIS_API, Long.toString(2L), "*" }), - Pair.of( + Arguments.of( // MIN ID with LIMIT + "test_xadd_minid_with_limit", StreamAddOptions.builder() .id("id") .makeStream(Boolean.TRUE) - .trim(new StreamAddOptions.MinId(Boolean.TRUE, "testKey", 10L)) + .trim(new MinId("testKey", 10L)) .build(), new String[] { - "testKey", TRIM_MINID_REDIS_API, TRIM_EXACT_REDIS_API, Long.toString(5L), @@ -3010,14 +2970,14 @@ private static List getStreamAddOptions() { Long.toString(10L), "id" }), - Pair.of( + Arguments.of( // MIN ID with non exact match + "test_xadd_minid_with_non_exact_match", StreamAddOptions.builder() .makeStream(Boolean.FALSE) - .trim(new StreamAddOptions.MinId(Boolean.FALSE, "testKey")) + .trim(new MinId(false, "testKey")) .build(), new String[] { - "testKey", NO_MAKE_STREAM_REDIS_API, TRIM_MINID_REDIS_API, TRIM_NOT_EXACT_REDIS_API, @@ -3027,16 +2987,34 @@ private static List getStreamAddOptions() { } @SneakyThrows - @ParameterizedTest + @ParameterizedTest(name = "{0}") @MethodSource("getStreamAddOptions") - public void xadd_with_options_returns_success(Pair optionAndArgs) { + public void xadd_with_options_to_arguments( + String testName, StreamAddOptions options, String[] expectedArgs) { + assertArrayEquals(expectedArgs, options.toArgs()); + } + + @SneakyThrows + @Test + public void xadd_with_nomakestream_maxlen_options_returns_success() { // setup String key = "testKey"; Map fieldValues = new LinkedHashMap<>(); fieldValues.put("testField1", "testValue1"); fieldValues.put("testField2", "testValue2"); + StreamAddOptions options = + StreamAddOptions.builder().id("id").makeStream(false).trim(new MaxLen(true, 5L)).build(); + String[] arguments = - ArrayUtils.addAll(optionAndArgs.getRight(), convertMapToKeyValueStringArray(fieldValues)); + new String[] { + key, + NO_MAKE_STREAM_REDIS_API, + TRIM_MAXLEN_REDIS_API, + TRIM_EXACT_REDIS_API, + Long.toString(5L), + "id" + }; + arguments = ArrayUtils.addAll(arguments, convertMapToKeyValueStringArray(fieldValues)); String returnId = "testId"; CompletableFuture testResponse = new CompletableFuture<>(); @@ -3047,7 +3025,7 @@ public void xadd_with_options_returns_success(Pair o .thenReturn(testResponse); // exercise - CompletableFuture response = service.xadd(key, fieldValues, optionAndArgs.getLeft()); + CompletableFuture response = service.xadd(key, fieldValues, options); String payload = response.get(); // verify @@ -3055,6 +3033,73 @@ public void xadd_with_options_returns_success(Pair o assertEquals(returnId, payload); } + @Test + @SneakyThrows + public void xtrim_with_exact_MinId() { + // setup + String key = "testKey"; + StreamTrimOptions limit = new MinId(true, "id"); + String[] arguments = new String[] {key, TRIM_MINID_REDIS_API, TRIM_EXACT_REDIS_API, "id"}; + Long completedResult = 1L; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(completedResult); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XTrim), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xtrim(key, limit); + Long payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(completedResult, payload); + } + + private static List getStreamTrimOptions() { + return List.of( + Arguments.of( + // MAXLEN just THRESHOLD + "test_xtrim_maxlen", new MaxLen(5L), new String[] {TRIM_MAXLEN_REDIS_API, "5"}), + Arguments.of( + // MAXLEN with LIMIT + "test_xtrim_maxlen_with_limit", + new MaxLen(5L, 10L), + new String[] { + TRIM_MAXLEN_REDIS_API, TRIM_NOT_EXACT_REDIS_API, "5", TRIM_LIMIT_REDIS_API, "10" + }), + Arguments.of( + // MAXLEN with exact + "test_xtrim_exact_maxlen", + new MaxLen(true, 10L), + new String[] {TRIM_MAXLEN_REDIS_API, TRIM_EXACT_REDIS_API, "10"}), + Arguments.of( + // MINID just THRESHOLD + "test_xtrim_minid", new MinId("0-1"), new String[] {TRIM_MINID_REDIS_API, "0-1"}), + Arguments.of( + // MINID with exact + "test_xtrim_exact_minid", + new MinId(true, "0-2"), + new String[] {TRIM_MINID_REDIS_API, TRIM_EXACT_REDIS_API, "0-2"}), + Arguments.of( + // MINID with LIMIT + "test_xtrim_minid_with_limit", + new MinId("0-3", 10L), + new String[] { + TRIM_MINID_REDIS_API, TRIM_NOT_EXACT_REDIS_API, "0-3", TRIM_LIMIT_REDIS_API, "10" + })); + } + + @SneakyThrows + @ParameterizedTest(name = "{0}") + @MethodSource("getStreamTrimOptions") + public void xtrim_with_options_to_arguments( + String testName, StreamTrimOptions options, String[] expectedArgs) { + assertArrayEquals(expectedArgs, options.toArgs()); + } + @SneakyThrows @Test public void type_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index 7cde78d528..c967576c46 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -12,6 +12,8 @@ import static glide.api.models.commands.RangeOptions.InfScoreBound.NEGATIVE_INFINITY; import static glide.api.models.commands.RangeOptions.InfScoreBound.POSITIVE_INFINITY; import static glide.api.models.commands.SetOptions.RETURN_OLD_VALUE; +import static glide.api.models.commands.StreamOptions.StreamTrimOptions.TRIM_EXACT_REDIS_API; +import static glide.api.models.commands.StreamOptions.StreamTrimOptions.TRIM_MINID_REDIS_API; import static glide.api.models.commands.ZaddOptions.UpdateOptions.SCORE_LESS_THAN_CURRENT; import static org.junit.jupiter.api.Assertions.assertEquals; import static redis_request.RedisRequestOuterClass.RequestType.BZPopMax; @@ -95,6 +97,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; +import static redis_request.RedisRequestOuterClass.RequestType.XTrim; import static redis_request.RedisRequestOuterClass.RequestType.ZDiff; import static redis_request.RedisRequestOuterClass.RequestType.ZDiffStore; import static redis_request.RedisRequestOuterClass.RequestType.ZLexCount; @@ -123,7 +126,8 @@ import glide.api.models.commands.RangeOptions.RangeByScore; import glide.api.models.commands.RangeOptions.ScoreBoundary; import glide.api.models.commands.SetOptions; -import glide.api.models.commands.StreamAddOptions; +import glide.api.models.commands.StreamOptions.MinId; +import glide.api.models.commands.StreamOptions.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -460,6 +464,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.xadd("key", Map.of("field1", "foo1"), StreamAddOptions.builder().id("id").build()); results.add(Pair.of(XAdd, buildArgs("key", "id", "field1", "foo1"))); + transaction.xtrim("key", new MinId(true, "id")); + results.add(Pair.of(XTrim, buildArgs("key", TRIM_MINID_REDIS_API, TRIM_EXACT_REDIS_API, "id"))); + transaction.time(); results.add(Pair.of(Time, buildArgs())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index 859087a9ac..227be290cb 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -36,7 +36,9 @@ import glide.api.models.commands.RangeOptions.ScoreBoundary; import glide.api.models.commands.ScriptOptions; import glide.api.models.commands.SetOptions; -import glide.api.models.commands.StreamAddOptions; +import glide.api.models.commands.StreamOptions.MaxLen; +import glide.api.models.commands.StreamOptions.MinId; +import glide.api.models.commands.StreamOptions.StreamAddOptions; import glide.api.models.commands.ZaddOptions; import glide.api.models.configuration.NodeAddress; import glide.api.models.configuration.RedisClientConfiguration; @@ -2207,10 +2209,11 @@ public void zrangestore_by_lex(BaseClient client) { @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") - public void xadd(BaseClient client) { + public void xadd_and_xtrim(BaseClient client) { String key = UUID.randomUUID().toString(); String field1 = UUID.randomUUID().toString(); String field2 = UUID.randomUUID().toString(); + String key2 = UUID.randomUUID().toString(); assertNull( client @@ -2249,9 +2252,7 @@ public void xadd(BaseClient client) { .xadd( key, Map.of(field1, "foo3", field2, "bar3"), - StreamAddOptions.builder() - .trim(new StreamAddOptions.MaxLen(Boolean.TRUE, 2L)) - .build()) + StreamAddOptions.builder().trim(new MaxLen(true, 2L)).build()) .get(); assertNotNull(id); // TODO update test when XLEN is available @@ -2272,9 +2273,7 @@ public void xadd(BaseClient client) { .xadd( key, Map.of(field1, "foo4", field2, "bar4"), - StreamAddOptions.builder() - .trim(new StreamAddOptions.MinId(Boolean.TRUE, id)) - .build()) + StreamAddOptions.builder().trim(new MinId(true, id)).build()) .get()); // TODO update test when XLEN is available if (client instanceof RedisClient) { @@ -2288,11 +2287,28 @@ public void xadd(BaseClient client) { .getSingleValue()); } - /** - * TODO add test to XTRIM on maxlen expect( await client.xtrim(key, { method: "maxlen", - * threshold: 1, exact: true, }), ).toEqual(1); expect(await client.customCommand(["XLEN", - * key])).toEqual(1); - */ + // test xtrim to remove 1 element + assertEquals(1L, client.xtrim(key, new MaxLen(1)).get()); + // TODO update test when XLEN is available + if (client instanceof RedisClient) { + assertEquals(1L, ((RedisClient) client).customCommand(new String[] {"XLEN", key}).get()); + } else if (client instanceof RedisClusterClient) { + assertEquals( + 1L, + ((RedisClusterClient) client) + .customCommand(new String[] {"XLEN", key}) + .get() + .getSingleValue()); + } + + // Key does not exist - returns 0 + assertEquals(0L, client.xtrim(key, new MaxLen(true, 1)).get()); + + // Key exists, but it is not a stream + assertEquals(OK, client.set(key2, "xtrimtest").get()); + ExecutionException executionException = + assertThrows(ExecutionException.class, () -> client.xtrim(key2, new MinId("0-1")).get()); + assertTrue(executionException.getCause() instanceof RequestException); } @SneakyThrows diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index be9f237281..6b1c2bec54 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -12,7 +12,8 @@ import glide.api.models.commands.RangeOptions.RangeByIndex; import glide.api.models.commands.RangeOptions.ScoreBoundary; import glide.api.models.commands.SetOptions; -import glide.api.models.commands.StreamAddOptions; +import glide.api.models.commands.StreamOptions.MinId; +import glide.api.models.commands.StreamOptions.StreamAddOptions; import java.util.Map; import java.util.Set; import java.util.UUID; @@ -147,6 +148,7 @@ public static BaseTransaction transactionTest(BaseTransaction baseTransact key9, Map.of("field2", "value2"), StreamAddOptions.builder().id("0-2").build()); baseTransaction.xadd( key9, Map.of("field3", "value3"), StreamAddOptions.builder().id("0-3").build()); + baseTransaction.xtrim(key9, new MinId(true, "0-2")); baseTransaction.configSet(Map.of("timeout", "1000")); baseTransaction.configGet(new String[] {"timeout"}); @@ -255,12 +257,10 @@ public static Object[] transactionTestResult() { new String[] {"one", "two"}, // zdiff(new String[] {zSetKey2, key8}) Map.of("one", 1.0, "two", 2.0), // zdiffWithScores(new String[] {zSetKey2, key8}) new Object[] {zSetKey2, "two", 2.0}, // bzpopmax(new String[] { zsetKey2 }, .1) - "0-1", // xadd(key9, Map.of("field1", "value1"), - // StreamAddOptions.builder().id("0-1").build()); - "0-2", // xadd(key9, Map.of("field2", "value2"), - // StreamAddOptions.builder().id("0-2").build()); - "0-3", // xadd(key9, Map.of("field3", "value3"), - // StreamAddOptions.builder().id("0-3").build()); + "0-1", // xadd(key9, Map.of("field1", "value1"), id("0-1")); + "0-2", // xadd(key9, Map.of("field2", "value2"), id("0-2")); + "0-3", // xadd(key9, Map.of("field3", "value3"), id("0-3")); + 1L, // xtrim(key9, new MinId(true, "0-2")); OK, Map.of("timeout", "1000"), OK,