Skip to content

Commit

Permalink
Java: Add the XREADGROUP command (#1613)
Browse files Browse the repository at this point in the history
* Java: Add the `XREADGROUP` command (#376)

* Add XGROUP CreateConsumer, DelConsumer

Signed-off-by: Andrew Carbonetto <[email protected]>

* Add XREADGROUP command

Signed-off-by: Andrew Carbonetto <[email protected]>

* Udpate IT tests

Signed-off-by: Andrew Carbonetto <[email protected]>

* Fix IT tests

Signed-off-by: Andrew Carbonetto <[email protected]>

* SPOTLESS & merge conflict fix

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update for review comments

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>

* Remove old test from IT suite

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update xreadgroup docs

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto authored Jun 20, 2024
1 parent 499e2cf commit 75d784e
Show file tree
Hide file tree
Showing 11 changed files with 584 additions and 16 deletions.
25 changes: 24 additions & 1 deletion glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -791,6 +791,7 @@ fn convert_to_array_of_pairs(
value_expected_return_type: Option<ExpectedReturnType>,
) -> RedisResult<Value> {
match response {
Value::Nil => Ok(response),
Value::Array(ref array) if array.is_empty() || matches!(array[0], Value::Array(_)) => {
// The server response is an empty array or a RESP3 array of pairs. In RESP3, the values in the pairs are
// already of the correct type, so we do not need to convert them and `response` is in the correct format.
Expand Down Expand Up @@ -852,7 +853,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::ArrayOfPairs),
}),
b"XREAD" => Some(ExpectedReturnType::Map {
b"XREAD" | b"XREADGROUP" => Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
Expand Down Expand Up @@ -1205,6 +1206,28 @@ mod tests {
));
}

#[test]
fn convert_xreadgroup() {
assert!(matches!(
expected_type_for_cmd(
redis::cmd("XREADGROUP")
.arg("GROUP")
.arg("group")
.arg("consumer")
.arg("streams")
.arg("key")
.arg("id")
),
Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::Map {
key_type: &Some(ExpectedReturnType::BulkString),
value_type: &Some(ExpectedReturnType::ArrayOfPairs),
}),
})
));
}

#[test]
fn test_convert_empty_array_to_map_is_nil() {
let mut cmd = redis::cmd("XREAD");
Expand Down
18 changes: 18 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup;
import static redis_request.RedisRequestOuterClass.RequestType.XRevRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
Expand Down Expand Up @@ -195,6 +196,7 @@
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadGroupOptions;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
Expand Down Expand Up @@ -1429,6 +1431,22 @@ public CompletableFuture<Long> xgroupDelConsumer(
XGroupDelConsumer, new String[] {key, group, consumer}, this::handleLongResponse);
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
@NonNull Map<String, String> keysAndIds, @NonNull String group, @NonNull String consumer) {
return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build());
}

@Override
public CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
@NonNull Map<String, String> keysAndIds,
@NonNull String group,
@NonNull String consumer,
@NonNull StreamReadGroupOptions options) {
String[] arguments = options.toArgs(group, consumer, keysAndIds);
return commandManager.submitNewCommand(XReadGroup, arguments, this::handleXReadResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamRange.IdBound;
import glide.api.models.commands.stream.StreamRange.InfRangeBound;
import glide.api.models.commands.stream.StreamReadGroupOptions;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import java.util.Map;
Expand Down Expand Up @@ -68,7 +69,7 @@ public interface StreamBaseCommands {
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @return A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* @example
* <pre>{@code
Expand All @@ -95,7 +96,7 @@ public interface StreamBaseCommands {
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @param options Options detailing how to read the stream {@link StreamReadOptions}.
* @return A <code>{@literal Map<String, Map<String[][]>>}</code> with stream
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* @example
* <pre>{@code
Expand Down Expand Up @@ -407,4 +408,89 @@ CompletableFuture<String> xgroupCreate(
* }</pre>
*/
CompletableFuture<Long> xgroupDelConsumer(String key, String group, String consumer);

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal ">"}</code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
* Map<String, String> xreadKeys = Map.of("myfield", "mydata");
* String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
* assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
* Map<String, Map<String, String[][]>> streamReadResponse = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "myconsumer").get();
* // Returns "mystream": "1-0": {{"myfield", "mydata"}}
* for (var keyEntry : streamReadResponse.entrySet()) {
* System.out.printf("Key: %s", keyEntry.getKey());
* for (var streamEntry : keyEntry.getValue().entrySet()) {
* Arrays.stream(streamEntry.getValue()).forEach(entity ->
* System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
* );
* }
* }
* assert client.xdel("mystream", "1-0").get() == 1L;
* client.xreadgroup(Map.of("mystream", "0"), "mygroup", "myconsumer").get();
* // Returns "mystream": "1-0": null
* assert streamReadResponse.get("mystream").get("1-0") == null;
* </pre>
*/
CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
Map<String, String> keysAndIds, String group, String consumer);

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal ">"}</code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
* @return A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code> with a value of code>null</code> if the stream is empty.
* @example
* <pre>{@code
* // create a new stream at "mystream", with stream id "1-0"
* Map<String, String> xreadKeys = Map.of("myfield", "mydata");
* String streamId = client.xadd("mystream", Map.of("myfield", "mydata"), StreamAddOptions.builder().id("1-0").build()).get();
* assert client.xgroupCreate("mystream", "mygroup").get().equals("OK"); // create the consumer group "mygroup"
* StreamReadGroupOptions options = StreamReadGroupOptions.builder().count(1).build(); // retrieves only a single message at a time
* Map<String, Map<String, String[][]>> streamReadResponse = client.xreadgroup(Map.of("mystream", ">"), "mygroup", "myconsumer", options).get();
* // Returns "mystream": "1-0": {{"myfield", "mydata"}}
* for (var keyEntry : streamReadResponse.entrySet()) {
* System.out.printf("Key: %s", keyEntry.getKey());
* for (var streamEntry : keyEntry.getValue().entrySet()) {
* Arrays.stream(streamEntry.getValue()).forEach(entity ->
* System.out.printf("stream id: %s; field: %s; value: %s\n", streamEntry.getKey(), entity[0], entity[1])
* );
* }
* }
* assert client.xdel("mystream", "1-0").get() == 1L;
* // read the first 10 items and acknowledge (ACK) them:
* StreamReadGroupOptions options = StreamReadGroupOptions.builder().count(10L).noack().build();
* streamReadResponse = client.xreadgroup(Map.of("mystream", "0"), "mygroup", "myconsumer", options).get();
* // Returns "mystream": "1-0": null
* assert streamReadResponse.get("mystream").get("1-0") == null;
* </pre>
*/
CompletableFuture<Map<String, Map<String, String[][]>>> xreadgroup(
Map<String, String> keysAndIds,
String group,
String consumer,
StreamReadGroupOptions options);
}
58 changes: 56 additions & 2 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XRead;
import static redis_request.RedisRequestOuterClass.RequestType.XReadGroup;
import static redis_request.RedisRequestOuterClass.RequestType.XRevRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
Expand Down Expand Up @@ -233,6 +234,7 @@
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamGroupOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamReadGroupOptions;
import glide.api.models.commands.stream.StreamReadOptions;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.ReadFrom;
Expand Down Expand Up @@ -2767,7 +2769,7 @@ public T xadd(
* @param keysAndIds An array of <code>Pair</code>s of keys and entry ids to read from. A <code>
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @return Command Response - A <code>{@literal Map<String, Map<Object[][]>>}</code> with stream
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
*/
public T xread(@NonNull Map<String, String> keysAndIds) {
Expand All @@ -2782,7 +2784,7 @@ public T xread(@NonNull Map<String, String> keysAndIds) {
* pair</code> is composed of a stream's key and the id of the entry after which the stream
* will be read.
* @param options options detailing how to read the stream {@link StreamReadOptions}.
* @return Command Response - A <code>{@literal Map<String, Map<Object[][]>>}</code> with stream
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with stream
* keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>[[field, entry], [field, entry], ...]<code>.
*/
public T xread(@NonNull Map<String, String> keysAndIds, @NonNull StreamReadOptions options) {
Expand Down Expand Up @@ -3048,6 +3050,58 @@ public T xgroupDelConsumer(@NonNull String key, @NonNull String group, @NonNull
return getThis();
}

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal Map<String, Map<String, String[][]>>}
* </code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with
* stream keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>
* [[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* with a value of code>null</code> if the stream is empty.
*/
public T xreadgroup(
@NonNull Map<String, String> keysAndIds, @NonNull String group, @NonNull String consumer) {
return xreadgroup(keysAndIds, group, consumer, StreamReadGroupOptions.builder().build());
}

/**
* Reads entries from the given streams owned by a consumer group.
*
* @apiNote When in cluster mode, all keys in <code>keysAndIds</code> must map to the same hash
* slot.
* @see <a href="https://valkey.io/commands/xreadgroup/">valkey.io</a> for details.
* @param keysAndIds A <code>Map</code> of keys and entry ids to read from. The <code>
* Map</code> is composed of a stream's key and the id of the entry after which the stream
* will be read. Use the special id of <code>{@literal Map<String, Map<String, String[][]>>}
* </code> to receive only new messages.
* @param group The consumer group name.
* @param consumer The newly created consumer.
* @param options Options detailing how to read the stream {@link StreamReadGroupOptions}.
* @return Command Response - A <code>{@literal Map<String, Map<String, String[][]>>}</code> with
* stream keys, to <code>Map</code> of stream-ids, to an array of pairings with format <code>
* [[field, entry], [field, entry], ...]<code>.
* Returns code>null</code> if the consumer group does not exist. Returns a code>Map</code>
* with a value of code>null</code> if the stream is empty.
*/
public T xreadgroup(
@NonNull Map<String, String> keysAndIds,
@NonNull String group,
@NonNull String consumer,
@NonNull StreamReadGroupOptions options) {
protobufTransaction.addCommands(
buildCommand(XReadGroup, buildArgs(options.toArgs(group, consumer, keysAndIds))));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands.stream;

import glide.api.commands.StreamBaseCommands;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import lombok.experimental.SuperBuilder;

/**
* Optional arguments for {@link StreamBaseCommands#xreadgroup(Map, String, String,
* StreamReadGroupOptions)}
*
* @see <a href="https://valkey.io/commands/xreadgroup/">redis.io</a>
*/
@SuperBuilder
public final class StreamReadGroupOptions extends StreamReadOptions {

public static final String READ_GROUP_REDIS_API = "GROUP";
public static final String READ_NOACK_REDIS_API = "NOACK";

/**
* If set, messages are not added to the Pending Entries List (PEL). This is equivalent to
* acknowledging the message when it is read.
*/
private boolean noack;

public abstract static class StreamReadGroupOptionsBuilder<
C extends StreamReadGroupOptions, B extends StreamReadGroupOptionsBuilder<C, B>>
extends StreamReadOptions.StreamReadOptionsBuilder<C, B> {
public B noack() {
this.noack = true;
return self();
}
}

/**
* Converts options and the key-to-id input for {@link StreamBaseCommands#xreadgroup(Map, String,
* String, StreamReadGroupOptions)} into a String[].
*
* @return String[]
*/
public String[] toArgs(String group, String consumer, Map<String, String> streams) {
List<String> optionArgs = new ArrayList<>();
optionArgs.add(READ_GROUP_REDIS_API);
optionArgs.add(group);
optionArgs.add(consumer);

if (this.count != null) {
optionArgs.add(READ_COUNT_REDIS_API);
optionArgs.add(count.toString());
}

if (this.block != null) {
optionArgs.add(READ_BLOCK_REDIS_API);
optionArgs.add(block.toString());
}

if (this.noack) {
optionArgs.add(READ_NOACK_REDIS_API);
}

optionArgs.add(READ_STREAMS_REDIS_API);
Set<Map.Entry<String, String>> entrySet = streams.entrySet();
optionArgs.addAll(entrySet.stream().map(Map.Entry::getKey).collect(Collectors.toList()));
optionArgs.addAll(entrySet.stream().map(Map.Entry::getValue).collect(Collectors.toList()));

return optionArgs.toArray(new String[0]);
}
}
Loading

0 comments on commit 75d784e

Please sign in to comment.