Skip to content

Commit

Permalink
Java: Add XACK stream command (#1620)
Browse files Browse the repository at this point in the history
* Java: Add XACK stream command (#380)

Signed-off-by: Andrew Carbonetto <[email protected]>
Co-authored-by: Guian Gumpac <[email protected]>
Co-authored-by: Yi-Pin Chen <[email protected]>
  • Loading branch information
3 people authored Jun 20, 2024
1 parent aaa8e45 commit 738d38a
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 18 deletions.
8 changes: 8 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -1535,6 +1536,13 @@ public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
}

@Override
public CompletableFuture<Long> xack(
@NonNull String key, @NonNull String group, @NonNull String[] ids) {
String[] args = concatenateArrays(new String[] {key, group}, ids);
return commandManager.submitNewCommand(XAck, args, this::handleLongResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ CompletableFuture<String> xgroupCreate(
* @param consumer The newly created consumer.
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* Returns <code>null</code> if the consumer group does not exist. Returns a <code>Map</code> with a value of <code>null</code> if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
Expand Down Expand Up @@ -462,7 +462,7 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
* @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* Returns <code>null</code> if the consumer group does not exist. Returns a <code>Map</code> with a value of <code>null</code> if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
Expand Down Expand Up @@ -493,4 +493,23 @@ CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
String group,
String consumer,
StreamReadGroupOptions options);

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member of a stream.
* This command should be called on a pending message so that such message does not get processed again.
*
* @param key The key of the stream.
* @param group The consumer group name.
* @param ids Stream entry ID to acknowledge and purge messages.
* @return The number of messages that were successfully acknowledged.
* @example
* <pre>{@code
* String entryId = client.xadd("mystream", Map.of("myfield", "mydata")).get();
* // read messages from streamId
* var readResult = client.xreadgroup(Map.of("mystream", entryId), "mygroup", "my0consumer").get();
* // acknowledge messages on stream
* assert 1L == client.xack("mystream", "mygroup", new String[] {entryId}).get();
* </pre>
*/
CompletableFuture<Long> xack(String key, String group, String[] ids);
}
21 changes: 19 additions & 2 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -3098,7 +3099,7 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with
* stream keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>
* [[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* Returns <code>null</code> if the consumer group does not exist. Returns a <code>Map</code>
* with a value of code>null</code> if the stream is empty.
*/
public T xreadgroup(
Expand All @@ -3122,7 +3123,7 @@ public T xreadgroup(
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with
* stream keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>
* [[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* Returns <code>null</code> if the consumer group does not exist. Returns a <code>Map</code>
* with a value of code>null</code> if the stream is empty.
*/
public T xreadgroup(
Expand All @@ -3135,6 +3136,22 @@ public T xreadgroup(
return getThis();
}

/**
* Returns the number of messages that were successfully acknowledged by the consumer group member
* of a stream. This command should be called on a pending message so that such message does not
* get processed again.
*
* @param key The key of the stream.
* @param group The consumer group name.
* @param ids Stream entry ID to acknowledge and purge messages.
* @return Command Response - The number of messages that were successfully acknowledged.
*/
public T xack(@NonNull String key, @NonNull String group, @NonNull String[] ids) {
String[] args = concatenateArrays(new String[] {key, group}, ids);
protobufTransaction.addCommands(buildCommand(XAck, buildArgs(args)));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
27 changes: 27 additions & 0 deletions java/client/src/test/java/glide/api/RedisClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.UnWatch;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.Watch;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -4667,6 +4668,32 @@ public void xreadgroup_with_options() {
assertEquals(completedResult, payload);
}

@SneakyThrows
@Test
public void xack_returns_success() {
// setup
String key = "testKey";
String groupName = "testGroupName";
String[] ids = new String[] {"testId"};
String[] arguments = concatenateArrays(new String[] {key, groupName}, ids);
Long mockResult = 1L;

CompletableFuture<Long> testResponse = new CompletableFuture<>();
testResponse.complete(mockResult);

// match on protobuf request
when(commandManager.<Long>submitNewCommand(eq(XAck), eq(arguments), any()))
.thenReturn(testResponse);

// exercise
CompletableFuture<Long> response = service.xack(key, groupName, ids);
Long payload = response.get();

// verify
assertEquals(testResponse, response);
assertEquals(mockResult, payload);
}

@SneakyThrows
@Test
public void type_returns_success() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Touch;
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAck;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XGroupCreate;
Expand Down Expand Up @@ -826,6 +827,9 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)),
"key",
"id")));

transaction.xack("key", "group", new String[] {"12345-1", "98765-4"});
results.add(Pair.of(XAck, buildArgs("key", "group", "12345-1", "98765-4")));

transaction.time();
results.add(Pair.of(Time, buildArgs()));

Expand Down
68 changes: 56 additions & 12 deletions java/integTest/src/test/java/glide/SharedCommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -3589,7 +3589,7 @@ public void xgroupCreate_xgroupDestroy(BaseClient client) {
@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client) {
public void xgroupCreateConsumer_xgroupDelConsumer_xreadgroup_xack(BaseClient client) {
String key = UUID.randomUUID().toString();
String stringKey = UUID.randomUUID().toString();
String groupName = "group" + UUID.randomUUID();
Expand Down Expand Up @@ -3638,7 +3638,7 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
// delete one of the streams
assertEquals(1L, client.xdel(key, new String[] {streamid_1}).get());

// now xreadgroup yeilds one empty stream and one non-empty stream
// now xreadgroup returns one empty stream and one non-empty stream
var result_2 = client.xreadgroup(Map.of(key, "0"), groupName, consumerName).get();
assertEquals(2, result_2.get(key).size());
assertNull(result_2.get(key).get(streamid_1));
Expand All @@ -3647,13 +3647,22 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
String streamid_3 = client.xadd(key, Map.of("field3", "value3")).get();
assertNotNull(streamid_3);

// Delete the consumer group and expect 2 pending messages
assertEquals(2L, client.xgroupDelConsumer(key, groupName, consumerName).get());
// xack that streamid_1, and streamid_2 was received
assertEquals(2L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get());

// Delete the consumer group and expect 1 pending messages (one was received)
assertEquals(0L, client.xgroupDelConsumer(key, groupName, consumerName).get());

// xack streamid_1, and streamid_2 already received returns 0L
assertEquals(0L, client.xack(key, groupName, new String[] {streamid_1, streamid_2}).get());

// Consume the last message with the previously deleted consumer (creates the consumer anew)
var result_3 = client.xreadgroup(Map.of(key, ">"), groupName, consumerName).get();
assertEquals(1, result_3.get(key).size());

// wrong group, so xack streamid_3 returns 0
assertEquals(0L, client.xack(key, "not_a_group", new String[] {streamid_3}).get());

// Delete the consumer group and expect the pending message
assertEquals(1L, client.xgroupDelConsumer(key, groupName, consumerName).get());

Expand All @@ -3678,19 +3687,15 @@ public void xgroupCreateConsumer_xreadgroup_xgroupDelConsumer(BaseClient client)
public void xreadgroup_return_failures(BaseClient client) {
String key = "{key}:1" + UUID.randomUUID();
String nonStreamKey = "{key}:3" + UUID.randomUUID();
String field1 = "f1_";
String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// setup first entries in streams key1 and key2
Map<String, String> timestamp_1_1_map = new LinkedHashMap<>();
timestamp_1_1_map.put(field1, field1 + "1");
String timestamp_1_1 =
client.xadd(key, timestamp_1_1_map, StreamAddOptions.builder().id("1-1").build()).get();
client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get();
assertNotNull(timestamp_1_1);

String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// create group and consumer for the group
assertEquals(
OK,
Expand Down Expand Up @@ -3790,6 +3795,45 @@ public void xreadgroup_return_failures(BaseClient client) {
}
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
public void xack_return_failures(BaseClient client) {
String key = "{key}:1" + UUID.randomUUID();
String nonStreamKey = "{key}:3" + UUID.randomUUID();
String groupName = "group" + UUID.randomUUID();
String zeroStreamId = "0";
String consumerName = "consumer" + UUID.randomUUID();

// setup first entries in streams key1 and key2
String timestamp_1_1 =
client.xadd(key, Map.of("f1", "v1"), StreamAddOptions.builder().id("1-1").build()).get();
assertNotNull(timestamp_1_1);

// create group and consumer for the group
assertEquals(
OK,
client
.xgroupCreate(
key, groupName, zeroStreamId, StreamGroupOptions.builder().makeStream().build())
.get());
assertTrue(client.xgroupCreateConsumer(key, groupName, consumerName).get());

// Empty entity id list throws a RequestException
ExecutionException executionException =
assertThrows(
ExecutionException.class, () -> client.xack(key, groupName, new String[0]).get());
assertInstanceOf(RequestException.class, executionException.getCause());

// Key exists, but it is not a stream
assertEquals(OK, client.set(nonStreamKey, "bar").get());
executionException =
assertThrows(
ExecutionException.class,
() -> client.xack(nonStreamKey, groupName, new String[] {zeroStreamId}).get());
assertInstanceOf(RequestException.class, executionException.getCause());
}

@SneakyThrows
@ParameterizedTest(autoCloseArguments = false)
@MethodSource("getClients")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,6 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
final String groupName1 = "{groupName}-1-" + UUID.randomUUID();
final String groupName2 = "{groupName}-2-" + UUID.randomUUID();
final String consumer1 = "{consumer}-1-" + UUID.randomUUID();
final String consumer2 = "{consumer}-2-" + UUID.randomUUID();

transaction
.xadd(streamKey1, Map.of("field1", "value1"), StreamAddOptions.builder().id("0-1").build())
Expand All @@ -775,6 +774,7 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
groupName1,
consumer1,
StreamReadGroupOptions.builder().count(2L).build())
.xack(streamKey1, groupName1, new String[] {"0-3"})
.xgroupDelConsumer(streamKey1, groupName1, consumer1)
.xgroupDestroy(streamKey1, groupName1)
.xgroupDestroy(streamKey1, groupName2)
Expand Down Expand Up @@ -812,7 +812,8 @@ private static Object[] streamCommands(BaseTransaction<?> transaction) {
Map.of(
streamKey1,
Map.of()), // xreadgroup(Map.of(streamKey1, ">"), groupName1, consumer1, options);
1L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
1L, // xack(streamKey1, groupName1, new String[] {"0-3"})
0L, // xgroupDelConsumer(streamKey1, groupName1, consumer1)
true, // xgroupDestroy(streamKey1, groupName1)
true, // xgroupDestroy(streamKey1, groupName2)
1L, // .xdel(streamKey1, new String[] {"0-1", "0-5"});
Expand Down

0 comments on commit 738d38a

Please sign in to comment.