Skip to content

Commit

Permalink
Java: Add the XRANGE command (valkey-io#1501)
Browse files Browse the repository at this point in the history
* Java: Add the XRANGE command (#325)

* Java: Add the XRANGE command

Signed-off-by: Andrew Carbonetto <[email protected]>

* CARGO FMT

Signed-off-by: Andrew Carbonetto <[email protected]>

* Clean documentation

Signed-off-by: Andrew Carbonetto <[email protected]>

* Add empty test

Signed-off-by: Andrew Carbonetto <[email protected]>

* Add empty test

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update XRANGE for self-review

Signed-off-by: Andrew Carbonetto <[email protected]>

* Use XRANGE bounding objects

Signed-off-by: Andrew Carbonetto <[email protected]>

* Add StreamRange documentation

Signed-off-by: Andrew Carbonetto <[email protected]>

* SPOTLESS

Signed-off-by: Andrew Carbonetto <[email protected]>

* Fix transaction tests

Signed-off-by: Andrew Carbonetto <[email protected]>

* SPOTLESS

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>

* XRANGE udpates for review

Signed-off-by: Andrew Carbonetto <[email protected]>

* SPOTLESS

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto authored and cyip10 committed Jun 24, 2024
1 parent fa6c58d commit 242b4f1
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 2 deletions.
2 changes: 1 addition & 1 deletion glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {

// TODO use enum to avoid mistakes
match command.as_slice() {
b"HGETALL" | b"XREAD" | b"CONFIG GET" | b"FT.CONFIG GET" | b"HELLO" => {
b"HGETALL" | b"XREAD" | b"CONFIG GET" | b"FT.CONFIG GET" | b"HELLO" | b"XRANGE" => {
Some(ExpectedReturnType::Map)
}
b"INCRBYFLOAT" | b"HINCRBYFLOAT" | b"ZINCRBY" => Some(ExpectedReturnType::Double),
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ enum RequestType {
XLen = 159;
LSet = 165;
XDel = 166;
XRange = 167;
LMove = 168;
BLMove = 169;
GetDel = 170;
Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub enum RequestType {
XLen = 159,
LSet = 165,
XDel = 166,
XRange = 167,
LMove = 168,
BLMove = 169,
GetDel = 170,
Expand Down Expand Up @@ -341,6 +342,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::XLen => RequestType::XLen,
ProtobufRequestType::LSet => RequestType::LSet,
ProtobufRequestType::XDel => RequestType::XDel,
ProtobufRequestType::XRange => RequestType::XRange,
ProtobufRequestType::LMove => RequestType::LMove,
ProtobufRequestType::BLMove => RequestType::BLMove,
ProtobufRequestType::GetDel => RequestType::GetDel,
Expand Down Expand Up @@ -511,6 +513,7 @@ impl RequestType {
RequestType::XLen => Some(cmd("XLEN")),
RequestType::LSet => Some(cmd("LSET")),
RequestType::XDel => Some(cmd("XDEL")),
RequestType::XRange => Some(cmd("XRANGE")),
RequestType::LMove => Some(cmd("LMOVE")),
RequestType::BLMove => Some(cmd("BLMOVE")),
RequestType::GetDel => Some(cmd("GETDEL")),
Expand Down
18 changes: 18 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
Expand Down Expand Up @@ -166,6 +167,7 @@
import glide.api.models.commands.geospatial.GeoUnit;
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.exceptions.RedisException;
Expand Down Expand Up @@ -1253,6 +1255,22 @@ public CompletableFuture<Long> xdel(@NonNull String key, @NonNull String[] ids)
return commandManager.submitNewCommand(XDel, arguments, this::handleLongResponse);
}

@Override
public CompletableFuture<Map<String, String[]>> xrange(
@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end) {
String[] arguments = ArrayUtils.addFirst(StreamRange.toArgs(start, end), key);
return commandManager.submitNewCommand(
XRange, arguments, response -> castMapOfArrays(handleMapResponse(response), String.class));
}

@Override
public CompletableFuture<Map<String, String[]>> xrange(
@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end, long count) {
String[] arguments = ArrayUtils.addFirst(StreamRange.toArgs(start, end, count), key);
return commandManager.submitNewCommand(
XRange, arguments, response -> castMapOfArrays(handleMapResponse(response), String.class));
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamRange.IdBound;
import glide.api.models.commands.stream.StreamRange.InfRangeBound;
import glide.api.models.commands.stream.StreamTrimOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -105,4 +108,77 @@ public interface StreamBaseCommands {
* }</pre>
*/
CompletableFuture<Long> xdel(String key, String[] ids);

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
* <li>Use {@link IdBound#of} to specify a stream ID.
* <li>Use {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID.
* <li>Use {@link InfRangeBound#MIN} to start with the minimum available ID.
* </ul>
*
* @param end Ending stream ID bound for range.
* <ul>
* <li>Use {@link IdBound#of} to specify a stream ID.
* <li>Use {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID.
* <li>Use {@link InfRangeBound#MAX} to end with the maximum available ID.
* </ul>
*
* @return @return A <code>Map</code> of key to stream entry data, where entry data is an array of
* item pairings.
* @example
* <pre>{@code
* // Retrieve all stream entries
* Map<String, String[]> result = client.xrange("key", InfRangeBound.MIN, InfRangeBound.MAX).get();
* result.forEach((k, v) -> {
* System.out.println("Stream ID: " + k);
* for (int i = 0; i < v.length;) {
* System.out.println(v[i++] + ": " + v[i++]);
* }
* });
* // Retrieve exactly one stream entry by id
* Map<String, String[]> result = client.xrange("key", IdBound.of(streamId), IdBound.of(streamId)).get();
* System.out.println("Stream ID: " + streamid + " -> " + Arrays.toString(result.get(streamid)));
* }</pre>
*/
CompletableFuture<Map<String, String[]>> xrange(String key, StreamRange start, StreamRange end);

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
* <li>Use {@link IdBound#of} to specify a stream ID.
* <li>Use {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID.
* <li>Use {@link InfRangeBound#MIN} to start with the minimum available ID.
* </ul>
*
* @param end Ending stream ID bound for range.
* <ul>
* <li>Use {@link IdBound#of} to specify a stream ID.
* <li>Use {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID.
* <li>Use {@link InfRangeBound#MAX} to end with the maximum available ID.
* </ul>
*
* @param count Maximum count of stream entries to return.
* @return A <code>Map</code> of key to stream entry data, where entry data is an array of item
* pairings.
* @example
* <pre>{@code
* // Retrieve the first 2 stream entries
* Map<String, String[]> result = client.xrange("key", InfRangeBound.MIN, InfRangeBound.MAX, 2).get();
* result.forEach((k, v) -> {
* System.out.println("Stream ID: " + k);
* for (int i = 0; i < v.length;) {
* System.out.println(v[i++] + ": " + v[i++]);
* }
* });
* }</pre>
*/
CompletableFuture<Map<String, String[]>> xrange(
String key, StreamRange start, StreamRange end, long count);
}
65 changes: 64 additions & 1 deletion java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
Expand Down Expand Up @@ -190,6 +191,7 @@
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamTrimOptions;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -2676,12 +2678,73 @@ public T xlen(@NonNull String key) {
* less than the number of entries in <code>ids</code>, if the specified <code>ids</code>
* don't exist in the stream.
*/
public T xdel(String key, String[] ids) {
public T xdel(@NonNull String key, @NonNull String[] ids) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(ids, key));
protobufTransaction.addCommands(buildCommand(XDel, commandArgs));
return getThis();
}

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
* <li>Use {@link StreamRange.IdBound#of} to specify a stream ID.
* <li>Use {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded stream
* ID.
* <li>Use {@link StreamRange.InfRangeBound#MIN} to start with the minimum available ID.
* </ul>
*
* @param end Ending stream ID bound for range.
* <ul>
* <li>Use {@link StreamRange.IdBound#of} to specify a stream ID.
* <li>Use {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded stream
* ID.
* <li>Use {@link StreamRange.InfRangeBound#MAX} to end with the maximum available ID.
* </ul>
*
* @return Command Response - A <code>Map</code> of key to stream entry data, where entry data is
* an array of item pairings.
*/
public T xrange(@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(StreamRange.toArgs(start, end), key));
protobufTransaction.addCommands(buildCommand(XRange, commandArgs));
return getThis();
}

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range.
* <ul>
* <li>Use {@link StreamRange.IdBound#of} to specify a stream ID.
* <li>Use {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded stream
* ID.
* <li>Use {@link StreamRange.InfRangeBound#MIN} to start with the minimum available ID.
* </ul>
*
* @param end Ending stream ID bound for range.
* <ul>
* <li>Use {@link StreamRange.IdBound#of} to specify a stream ID.
* <li>Use {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded stream
* ID.
* <li>Use {@link StreamRange.InfRangeBound#MAX} to end with the maximum available ID.
* </ul>
*
* @param count Maximum count of stream entries to return.
* @return Command Response - A <code>Map</code> of key to stream entry data, where entry data is
* an array of item pairings.
*/
public T xrange(
@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end, long count) {
ArgsArray commandArgs =
buildArgs(ArrayUtils.addFirst(StreamRange.toArgs(start, end, count), key));
protobufTransaction.addCommands(buildCommand(XRange, commandArgs));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands.stream;

import glide.utils.ArrayTransformUtils;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Arguments for {@link glide.api.commands.StreamBaseCommands#xrange} and {@link
* glide.api.commands.StreamBaseCommands#xrevrange} to specify the starting and ending range for the
* stream search by stream ID.
*
* @see <a href="https://redis.io/commands/xrange/">redis.io</a>
* @see <a href="https://redis.io/commands/xrevrange/">redis.io</a>
*/
public interface StreamRange {

String getRedisApi();

String MINIMUM_RANGE_REDIS_API = "-";
String MAXIMUM_RANGE_REDIS_API = "+";
String RANGE_COUNT_REDIS_API = "COUNT";

/**
* Enumeration representing minimum or maximum stream entry bounds for the range search, to get
* the first or last stream ID.
*/
@RequiredArgsConstructor
@Getter
enum InfRangeBound implements StreamRange {
MIN(MINIMUM_RANGE_REDIS_API),
MAX(MAXIMUM_RANGE_REDIS_API);

private final String redisApi;
};

/**
* Stream ID used to specify a range of IDs to search. Stream ID bounds can be complete with a
* timestamp and sequence number separated by a dash (<code>"-"</code>), for example <code>
* "1526985054069-0"</code>.<br>
* Stream ID bounds can also be incomplete, with just a timestamp.<br>
* Stream ID bounds are inclusive by default. When <code>isInclusive==false</code>, a <code>"("
* </code> is prepended for the Redis API.
*/
@Getter
class IdBound implements StreamRange {
private final String redisApi;

/**
* Default constructor
*
* @param id The stream id.
*/
private IdBound(String id) {
redisApi = id;
}

/**
* Creates a stream ID boundary by stream id for range search.
*
* @param id The stream id.
*/
public static IdBound of(String id) {
return new IdBound(id);
}

/**
* Creates an incomplete stream ID boundary without the sequence number for range search.
*
* @param timestamp The stream timestamp as ID.
*/
public static IdBound of(long timestamp) {
return new IdBound(Long.toString(timestamp));
}

/**
* Creates an incomplete stream ID exclusive boundary without the sequence number for range
* search.
*
* @param timestamp The stream timestamp as ID.
*/
public static IdBound ofExclusive(long timestamp) {
return new IdBound("(" + timestamp);
}

/**
* Creates a stream ID exclusive boundary by stream id for range search.
*
* @param id The stream id.
*/
public static IdBound ofExclusive(String id) {
return new IdBound("(" + id);
}
}

/**
* Convert StreamRange arguments to a string array
*
* @return arguments converted to an array to be consumed by Redis
*/
static String[] toArgs(StreamRange start, StreamRange end) {
return new String[] {start.getRedisApi(), end.getRedisApi()};
}

/**
* Convert StreamRange arguments to a string array
*
* @return arguments converted to an array to be consumed by Redis
*/
static String[] toArgs(StreamRange start, StreamRange end, long count) {
return ArrayTransformUtils.concatenateArrays(
toArgs(start, end), new String[] {RANGE_COUNT_REDIS_API, Long.toString(count)});
}
}
Loading

0 comments on commit 242b4f1

Please sign in to comment.