Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Java: Add the XRANGE command #1501

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {

// TODO use enum to avoid mistakes
match command.as_slice() {
b"HGETALL" | b"XREAD" | b"CONFIG GET" | b"FT.CONFIG GET" | b"HELLO" => {
b"HGETALL" | b"XREAD" | b"CONFIG GET" | b"FT.CONFIG GET" | b"HELLO" | b"XRANGE" => {
Some(ExpectedReturnType::Map)
}
b"INCRBYFLOAT" | b"HINCRBYFLOAT" | b"ZINCRBY" => Some(ExpectedReturnType::Double),
Expand Down
1 change: 1 addition & 0 deletions glide-core/src/protobuf/redis_request.proto
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ enum RequestType {
XLen = 159;
LSet = 165;
XDel = 166;
XRange = 167;
LMove = 168;
}

Expand Down
3 changes: 3 additions & 0 deletions glide-core/src/request_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ pub enum RequestType {
XLen = 159,
LSet = 165,
XDel = 166,
XRange = 167,
LMove = 168,
}

Expand Down Expand Up @@ -338,6 +339,7 @@ impl From<::protobuf::EnumOrUnknown<ProtobufRequestType>> for RequestType {
ProtobufRequestType::XLen => RequestType::XLen,
ProtobufRequestType::LSet => RequestType::LSet,
ProtobufRequestType::XDel => RequestType::XDel,
ProtobufRequestType::XRange => RequestType::XRange,
ProtobufRequestType::LMove => RequestType::LMove,
}
}
Expand Down Expand Up @@ -505,6 +507,7 @@ impl RequestType {
RequestType::XLen => Some(cmd("XLEN")),
RequestType::LSet => Some(cmd("LSET")),
RequestType::XDel => Some(cmd("XDEL")),
RequestType::XRange => Some(cmd("XRANGE")),
RequestType::LMove => Some(cmd("LMOVE")),
}
}
Expand Down
18 changes: 18 additions & 0 deletions java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
Expand Down Expand Up @@ -163,6 +164,7 @@
import glide.api.models.commands.geospatial.GeoUnit;
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamTrimOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.exceptions.RedisException;
Expand Down Expand Up @@ -1244,6 +1246,22 @@ public CompletableFuture<Long> xdel(@NonNull String key, @NonNull String[] ids)
return commandManager.submitNewCommand(XDel, arguments, this::handleLongResponse);
}

@Override
public CompletableFuture<Map<String, String[]>> xrange(
@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end) {
String[] arguments = ArrayUtils.addFirst(StreamRange.toArgs(start, end), key);
return commandManager.submitNewCommand(
XRange, arguments, response -> castMapOfArrays(handleMapResponse(response), String.class));
}

@Override
public CompletableFuture<Map<String, String[]>> xrange(
@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end, long count) {
String[] arguments = ArrayUtils.addFirst(StreamRange.toArgs(start, end, count), key);
return commandManager.submitNewCommand(
XRange, arguments, response -> castMapOfArrays(handleMapResponse(response), String.class));
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamRange.IdBound;
import glide.api.models.commands.stream.StreamRange.InfRangeBound;
import glide.api.models.commands.stream.StreamTrimOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -105,4 +108,61 @@ public interface StreamBaseCommands {
* }</pre>
*/
CompletableFuture<Long> xdel(String key, String[] ids);

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range, use {@link IdBound#of} to specify a stream ID,
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
* or {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID. Use {@link
* InfRangeBound#MIN} to start with the minimum available ID.
* @param end Ending stream ID bound for range, use {@link IdBound#of} to specify a stream ID, or
* {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID. Use {@link
* InfRangeBound#MAX>} to end with the maximum available ID.
* @return A <code>Map</code> of key to stream entry data, where entry data is an array with pairs
* of item, data.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return A <code>Map</code> of key to stream entry data, where entry data is an array with pairs
* of item, data.
* @return A <code>Map</code> of entry ID to entry data.

We don't have pairs there - I mean we don't return data as pairs. I'd like to say that the data is usually flat pairs, but I don't know how.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could say "pairings" to distinguish from the Pair object.

* @example
* <pre>{@code
* // Retrieve all stream entries
* Map<String, String[]> result = client.xrange("key", InfRangeBound.MIN, InfRangeBound.MAX).get();
* result.forEach((k, v) -> {
* System.out.println("Stream ID: " + k);
* for (int i = 0; i < v.length;) {
* System.out.println(v[i++] + ": " + v[i++]);
* }
* });
* // Retrieve exactly one stream entry by id
* Map<String, String[]> result = client.xrange("key", IdBound.of(streamId), IdBound.of(streamId)).get();
* System.out.println("Stream ID: " + streamid + " -> " + Arrays.toString(result.get(streamid)));
* }</pre>
*/
CompletableFuture<Map<String, String[]>> xrange(String key, StreamRange start, StreamRange end);

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range, use {@link IdBound#of} to specify a stream ID,
* or {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID. Use {@link
* InfRangeBound#MIN} to start with the minimum available ID.
* @param end Ending stream ID bound for range, use {@link IdBound#of} to specify a stream ID, or
* {@link IdBound#ofExclusive} to specify an exclusive bounded stream ID. Use {@link
* InfRangeBound#MAX>} to end with the maximum available ID.
* @param count Maximum count of stream entries to return.
* @return A <code>Map</code> of key to stream entry data, where entry data is an array with pairs
* of item, data.
* @example
* <pre>{@code
* // Retrieve the first 2 stream entries
* Map<String, String[]> result = client.xrange("key", InfRangeBound.MIN, InfRangeBound.MAX, 2).get();
* result.forEach((k, v) -> {
* System.out.println("Stream ID: " + k);
* for (int i = 0; i < v.length;) {
* System.out.println(v[i++] + ": " + v[i++]);
* }
* });
* }</pre>
*/
CompletableFuture<Map<String, String[]>> xrange(
String key, StreamRange start, StreamRange end, long count);
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XDel;
import static redis_request.RedisRequestOuterClass.RequestType.XLen;
import static redis_request.RedisRequestOuterClass.RequestType.XRange;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZAdd;
import static redis_request.RedisRequestOuterClass.RequestType.ZCard;
Expand Down Expand Up @@ -187,6 +188,7 @@
import glide.api.models.commands.geospatial.GeospatialData;
import glide.api.models.commands.stream.StreamAddOptions;
import glide.api.models.commands.stream.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.stream.StreamRange;
import glide.api.models.commands.stream.StreamTrimOptions;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -2659,12 +2661,55 @@ public T xlen(@NonNull String key) {
* less than the number of entries in <code>ids</code>, if the specified <code>ids</code>
* don't exist in the stream.
*/
public T xdel(String key, String[] ids) {
public T xdel(@NonNull String key, @NonNull String[] ids) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably small enough to sneak in but was this intentional? changes on another command?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. This was an intentionally sneaky move...

ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(ids, key));
protobufTransaction.addCommands(buildCommand(XDel, commandArgs));
return getThis();
}

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range, use {@link StreamRange.IdBound#of} to specify
* a stream ID, or {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded
* stream ID. Use {@link StreamRange.InfRangeBound#MIN} to start with the minimum available
* ID.
* @param end Ending stream ID bound for range, use {@link StreamRange.IdBound#of} to specify a
* stream ID, or {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded
* stream ID. Use {@link StreamRange.InfRangeBound#MAX>} to end with the maximum available ID.
* @return Command Response - A <code>Map</code> of key to stream entry data, where entry data is
* an array with pairs of item, data.
*/
public T xrange(@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(StreamRange.toArgs(start, end), key));
protobufTransaction.addCommands(buildCommand(XRange, commandArgs));
return getThis();
}

/**
* Returns stream entries matching a given range of IDs.
*
* @param key The key of the stream.
* @param start Starting stream ID bound for range, use {@link StreamRange.IdBound#of} to specify
* a stream ID, or {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded
* stream ID. Use {@link StreamRange.InfRangeBound#MIN} to start with the minimum available
* ID.
* @param end Ending stream ID bound for range, use {@link StreamRange.IdBound#of} to specify a
* stream ID, or {@link StreamRange.IdBound#ofExclusive} to specify an exclusive bounded
* stream ID. Use {@link StreamRange.InfRangeBound#MAX>} to end with the maximum available ID.
* * @param count Maximum count of stream entries to return.
* @return Command Response - A <code>Map</code> of key to stream entry data, where entry data is
* an array with pairs of item, data.
acarbonetto marked this conversation as resolved.
Show resolved Hide resolved
*/
public T xrange(
@NonNull String key, @NonNull StreamRange start, @NonNull StreamRange end, long count) {
ArgsArray commandArgs =
buildArgs(ArrayUtils.addFirst(StreamRange.toArgs(start, end, count), key));
protobufTransaction.addCommands(buildCommand(XRange, commandArgs));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.commands.stream;

import glide.utils.ArrayTransformUtils;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* Arguments for {@link glide.api.commands.StreamBaseCommands#xrange} and {@link
* glide.api.commands.StreamBaseCommands#xrevrange} to specify the starting and ending range for the
* stream search by stream ID.
*
* @see <a href="https://redis.io/commands/xrange/">redis.io</a>
* @see <a href="https://redis.io/commands/xrevrange/">redis.io</a>
*/
public interface StreamRange {

String getRedisApi();

String MINIMUM_RANGE_REDIS_API = "-";
String MAXIMUM_RANGE_REDIS_API = "+";
String RANGE_COUNT_REDIS_API = "COUNT";

/**
* Enumeration representing minimum or maximum stream entry bounds for the range search, to get
* the first or last stream ID.
*/
@RequiredArgsConstructor
@Getter
enum InfRangeBound implements StreamRange {
MIN(MINIMUM_RANGE_REDIS_API),
MAX(MAXIMUM_RANGE_REDIS_API);

private final String redisApi;
};

/**
* Stream ID used to specify a range of IDs to search. Stream ID bounds can be complete with a
* timestamp and sequence number separated by a dash (<code>"-"</code>), for example <code>
* "1526985054069-0"</code>.<br>
* Stream ID bounds can also be incomplete, with just a timestamp.<br>
* Stream ID bounds are inclusive by default. When <code>isInclusive==false</code>, a <code>"("
* </code> is prepended for the Redis API.
*/
@Getter
class IdBound implements StreamRange {
private final String redisApi;

/**
* Default constructor
*
* @param id The stream id.
*/
private IdBound(String id) {
redisApi = id;
}

/**
* Creates a stream ID boundary by stream id for range search.
*
* @param id The stream id.
*/
public static IdBound of(String id) {
return new IdBound(id);
}

/**
* Creates an incomplete stream ID boundary without the sequence number for range search.
*
* @param timestamp The stream timestamp as ID.
*/
public static IdBound of(long timestamp) {
return new IdBound(Long.toString(timestamp));
}

/**
* Creates an incomplete stream ID exclusive boundary without the sequence number for range
* search.
*
* @param timestamp The stream timestamp as ID.
*/
public static IdBound ofExclusive(long timestamp) {
return new IdBound("(" + timestamp);
}

/**
* Creates a stream ID exclusive boundary by stream id for range search.
*
* @param id The stream id.
*/
public static IdBound ofExclusive(String id) {
return new IdBound("(" + id);
}
}

/**
* Convert StreamRange arguments to a string array
*
* @return arguments converted to an array to be consumed by Redis
*/
static String[] toArgs(StreamRange start, StreamRange end) {
return new String[] {start.getRedisApi(), end.getRedisApi()};
}

/**
* Convert StreamRange arguments to a string array
*
* @return arguments converted to an array to be consumed by Redis
*/
static String[] toArgs(StreamRange start, StreamRange end, long count) {
return ArrayTransformUtils.concatenateArrays(
toArgs(start, end), new String[] {RANGE_COUNT_REDIS_API, Long.toString(count)});
}
}
Loading
Loading