From 738d38a099c08570c8224835734be549118ef0fc Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Thu, 20 Jun 2024 16:22:02 -0700 Subject: [PATCH] Java: Add `XACK` stream command (#1620) * Java: Add XACK stream command (#380) Signed-off-by: Andrew Carbonetto Co-authored-by: Guian Gumpac Co-authored-by: Yi-Pin Chen --- .../src/main/java/glide/api/BaseClient.java | 8 +++ .../api/commands/StreamBaseCommands.java | 23 ++++++- .../glide/api/models/BaseTransaction.java | 21 +++++- .../test/java/glide/api/RedisClientTest.java | 27 ++++++++ .../glide/api/models/TransactionTests.java | 4 ++ .../test/java/glide/SharedCommandTests.java | 68 +++++++++++++++---- .../java/glide/TransactionTestUtilities.java | 5 +- 7 files changed, 138 insertions(+), 18 deletions(-) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index fd166a00a8..6050d4b275 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -121,6 +121,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.Watch; +import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; @@ -1535,6 +1536,13 @@ public CompletableFuture>> xreadgroup( return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse); } + @Override + public CompletableFuture xack( + @NonNull String key, @NonNull String group, @NonNull String[] ids) { + String[] args = concatenateArrays(new String[] {key, group}, ids); + return commandManager.submitNewCommand(XAck, args, this::handleLongResponse); + } + @Override public CompletableFuture pttl(@NonNull String key) { return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse); diff --git a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java index 1c10c2b992..b49c6472c6 100644 --- a/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/StreamBaseCommands.java @@ -422,7 +422,7 @@ CompletableFuture xgroupCreate( * @param consumer The newly created consumer. * @return A {@literal Map>} with stream * keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...]. - * Returns code>null if the consumer group does not exist. Returns a code>Map with a value of code>null if the stream is empty. + * Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty. * @example *
{@code
      * // create a new stream at "mystream", with stream id "1-0"
@@ -462,7 +462,7 @@ CompletableFuture>> xreadgroup(
      * @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
      * @return A {@literal Map>} with stream
      *      keys, to Map of stream-ids, to an array of pairings with format [[field, entry], [field, entry], ...].
-     *      Returns code>null if the consumer group does not exist. Returns a code>Map with a value of code>null if the stream is empty.
+     *      Returns null if the consumer group does not exist. Returns a Map with a value of null if the stream is empty.
      * @example
      *     
{@code
      * // create a new stream at "mystream", with stream id "1-0"
@@ -493,4 +493,23 @@ CompletableFuture>> xreadgroup(
             String group,
             String consumer,
             StreamReadGroupOptions options);
+
+    /**
+     * Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
+     * This command should be called on a pending message so that such message does not get processed again.
+     *
+     * @param key The key of the stream.
+     * @param group The consumer group name.
+     * @param ids Stream entry ID to acknowledge and purge messages.
+     * @return The number of messages that were successfully acknowledged.
+     * @example
+     *     
{@code
+     * String entryId = client.xadd("mystream", Map.of("myfield", "mydata")).get();
+     * // read messages from streamId
+     * var readResult = client.xreadgroup(Map.of("mystream", entryId), "mygroup", "my0consumer").get();
+     * // acknowledge messages on stream
+     * assert 1L == client.xack("mystream", "mygroup", new String[] {entryId}).get();
+     * 
+ */ + CompletableFuture xack(String key, String group, String[] ids); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index db5aa05698..ddf145c8d6 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -149,6 +149,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; @@ -3098,7 +3099,7 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull * @return Command Response - A {@literal Map>} with * stream keys, to Map of stream-ids, to an array of pairings with format * [[field, entry], [field, entry], ...]. - * Returns code>null if the consumer group does not exist. Returns a code>Map + * Returns null if the consumer group does not exist. Returns a Map * with a value of code>null if the stream is empty. */ public T xreadgroup( @@ -3122,7 +3123,7 @@ public T xreadgroup( * @return Command Response - A {@literal Map>} with * stream keys, to Map of stream-ids, to an array of pairings with format * [[field, entry], [field, entry], ...]. - * Returns code>null if the consumer group does not exist. Returns a code>Map + * Returns null if the consumer group does not exist. Returns a Map * with a value of code>null if the stream is empty. */ public T xreadgroup( @@ -3135,6 +3136,22 @@ public T xreadgroup( return getThis(); } + /** + * Returns the number of messages that were successfully acknowledged by the consumer group member + * of a stream. This command should be called on a pending message so that such message does not + * get processed again. + * + * @param key The key of the stream. + * @param group The consumer group name. + * @param ids Stream entry ID to acknowledge and purge messages. + * @return Command Response - The number of messages that were successfully acknowledged. + */ + public T xack(@NonNull String key, @NonNull String group, @NonNull String[] ids) { + String[] args = concatenateArrays(new String[] {key, group}, ids); + protobufTransaction.addCommands(buildCommand(XAck, buildArgs(args))); + return getThis(); + } + /** * Returns the remaining time to live of key that has a timeout, in milliseconds. * diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java index c3ea91c96b..438af62eab 100644 --- a/java/client/src/test/java/glide/api/RedisClientTest.java +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -189,6 +189,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.UnWatch; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; import static redis_request.RedisRequestOuterClass.RequestType.Watch; +import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; @@ -4667,6 +4668,32 @@ public void xreadgroup_with_options() { assertEquals(completedResult, payload); } + @SneakyThrows + @Test + public void xack_returns_success() { + // setup + String key = "testKey"; + String groupName = "testGroupName"; + String[] ids = new String[] {"testId"}; + String[] arguments = concatenateArrays(new String[] {key, groupName}, ids); + Long mockResult = 1L; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(mockResult); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(XAck), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.xack(key, groupName, ids); + Long payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(mockResult, payload); + } + @SneakyThrows @Test public void type_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index a9f8702dcc..92ec6102a6 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -165,6 +165,7 @@ import static redis_request.RedisRequestOuterClass.RequestType.Touch; import static redis_request.RedisRequestOuterClass.RequestType.Type; import static redis_request.RedisRequestOuterClass.RequestType.Unlink; +import static redis_request.RedisRequestOuterClass.RequestType.XAck; import static redis_request.RedisRequestOuterClass.RequestType.XAdd; import static redis_request.RedisRequestOuterClass.RequestType.XDel; import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate; @@ -826,6 +827,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), "key", "id"))); + transaction.xack("key", "group", new String[] {"12345-1", "98765-4"}); + results.add(Pair.of(XAck, buildArgs("key", "group", "12345-1", "98765-4"))); + transaction.time(); results.add(Pair.of(Time, buildArgs())); diff --git a/java/integTest/src/test/java/glide/SharedCommandTests.java b/java/integTest/src/test/java/glide/SharedCommandTests.java index f8a1e52119..d4ead60a7a 100644 --- a/java/integTest/src/test/java/glide/SharedCommandTests.java +++ b/java/integTest/src/test/java/glide/SharedCommandTests.java @@ -3589,7 +3589,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) { @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") - public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) { + public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient client) { String key = UUID.randomUUID().toString(); String stringKey = UUID.randomUUID().toString(); String groupName = "group" + UUID.randomUUID(); @@ -3638,7 +3638,7 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) // delete one of the streams assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get()); - // now xreadgroup yeilds one empty stream and one non-empty stream + // now xreadgroup returns one empty stream and one non-empty stream var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get(); assertEquals(2, result_2.get(key).size()); assertNull(result_2.get(key).get(streamid_1)); @@ -3647,13 +3647,22 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get(); assertNotNull(streamid_3); - // Delete the consumer group and expect 2 pending messages - assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + // xack that streamid_1, and streamid_2 was received + assertEquals(2L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get()); + + // Delete the consumer group and expect 1 pending messages (one was received) + assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get()); + + // xack streamid_1, and streamid_2 already received returns 0L + assertEquals(0L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get()); // Consume the last message with the previously deleted consumer (creates the consumer anew) var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get(); assertEquals(1, result_3.get(key).size()); + // wrong group, so xack streamid_3 returns 0 + assertEquals(0L, client.xack(key, "not_a_group", new String[] {streamid_3}).get()); + // Delete the consumer group and expect the pending message assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get()); @@ -3678,19 +3687,15 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) public void xreadgroup_return_failures(BaseClient client) { String key = "{key}:1" + UUID.randomUUID(); String nonStreamKey = "{key}:3" + UUID.randomUUID(); - String field1 = "f1_"; + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; + String consumerName = "consumer" + UUID.randomUUID(); // setup first entries in streams key1 and key2 - Map timestamp_1_1_map = new LinkedHashMap<>(); - timestamp_1_1_map.put(field1, field1 + "1"); String timestamp_1_1 = - client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get(); + client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get(); assertNotNull(timestamp_1_1); - String groupName = "group" + UUID.randomUUID(); - String zeroStreamId = "0"; - String consumerName = "consumer" + UUID.randomUUID(); - // create group and consumer for the group assertEquals( OK, @@ -3790,6 +3795,45 @@ public void xreadgroup_return_failures(BaseClient client) { } } + @SneakyThrows + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("getClients") + public void xack_return_failures(BaseClient client) { + String key = "{key}:1" + UUID.randomUUID(); + String nonStreamKey = "{key}:3" + UUID.randomUUID(); + String groupName = "group" + UUID.randomUUID(); + String zeroStreamId = "0"; + String consumerName = "consumer" + UUID.randomUUID(); + + // setup first entries in streams key1 and key2 + String timestamp_1_1 = + client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get(); + assertNotNull(timestamp_1_1); + + // create group and consumer for the group + assertEquals( + OK, + client + .xgroupCreate( + key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build()) + .get()); + assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get()); + + // Empty entity id list throws a RequestException + ExecutionException executionException = + assertThrows( + ExecutionException.class, () -> client.xack(key, groupName, new String[0]).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + + // Key exists, but it is not a stream + assertEquals(OK, client.set(nonStreamKey, "bar").get()); + executionException = + assertThrows( + ExecutionException.class, + () -> client.xack(nonStreamKey, groupName, new String[] {zeroStreamId}).get()); + assertInstanceOf(RequestException.class, executionException.getCause()); + } + @SneakyThrows @ParameterizedTest(autoCloseArguments = false) @MethodSource("getClients") diff --git a/java/integTest/src/test/java/glide/TransactionTestUtilities.java b/java/integTest/src/test/java/glide/TransactionTestUtilities.java index 2b66d17995..6d44d664dd 100644 --- a/java/integTest/src/test/java/glide/TransactionTestUtilities.java +++ b/java/integTest/src/test/java/glide/TransactionTestUtilities.java @@ -751,7 +751,6 @@ private static Object[] streamCommands(BaseTransaction transaction) { final String groupName1 = "{groupName}-1-" + UUID.randomUUID(); final String groupName2 = "{groupName}-2-" + UUID.randomUUID(); final String consumer1 = "{consumer}-1-" + UUID.randomUUID(); - final String consumer2 = "{consumer}-2-" + UUID.randomUUID(); transaction .xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build()) @@ -775,6 +774,7 @@ private static Object[] streamCommands(BaseTransaction transaction) { groupName1, consumer1, StreamReadGroupOptions.builder().count(2L).build()) + .xack(streamKey1, groupName1, new String[] {"0-3"}) .xgroupDelConsumer(streamKey1, groupName1, consumer1) .xgroupDestroy(streamKey1, groupName1) .xgroupDestroy(streamKey1, groupName2) @@ -812,7 +812,8 @@ private static Object[] streamCommands(BaseTransaction transaction) { Map.of( streamKey1, Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options); - 1L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) + 1L, // xack(streamKey1, groupName1, new String[] {"0-3"}) + 0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1) true, // xgroupDestroy(streamKey1, groupName1) true, // xgroupDestroy(streamKey1, groupName2) 1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});