Skip to content

Commit

Permalink
Java: Add XTRIM command for streams (valkey-io#1335)
Browse files Browse the repository at this point in the history
* Java: Add `XTRIM` command for streams (#215)

* Java: Add XTRIM command

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update imports; add exception test

Signed-off-by: Andrew Carbonetto <[email protected]>

* Add IT test for non-extent key

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update documentation examples

Signed-off-by: Andrew Carbonetto <[email protected]>

* XTRIM: make exact optional

Signed-off-by: Andrew Carbonetto <[email protected]>

* Consolidate stream options into one class

Signed-off-by: Andrew Carbonetto <[email protected]>

* Remove StreamAddOptions and StreamTrimOptions

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update docs for StreamOptions

Signed-off-by: Andrew Carbonetto <[email protected]>

* Pull StreamOption tests into separate file

Signed-off-by: Andrew Carbonetto <[email protected]>

* Remove extra tests

Signed-off-by: Andrew Carbonetto <[email protected]>

* SPOTLESS

Signed-off-by: Andrew Carbonetto <[email protected]>

* Update tests

Signed-off-by: Andrew Carbonetto <[email protected]>

* Move some tests around

Signed-off-by: Andrew Carbonetto <[email protected]>

* Delete test file

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>

* Clean up example comment

Signed-off-by: Andrew Carbonetto <[email protected]>

---------

Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto authored Apr 25, 2024
1 parent e30f012 commit 2766989
Show file tree
Hide file tree
Showing 8 changed files with 313 additions and 157 deletions.
10 changes: 9 additions & 1 deletion java/client/src/main/java/glide/api/BaseClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZDiff;
import static redis_request.RedisRequestOuterClass.RequestType.ZDiffStore;
import static redis_request.RedisRequestOuterClass.RequestType.ZLexCount;
Expand Down Expand Up @@ -111,7 +112,8 @@
import glide.api.models.commands.RangeOptions.ScoredRangeQuery;
import glide.api.models.commands.ScriptOptions;
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.StreamOptions.StreamAddOptions;
import glide.api.models.commands.StreamOptions.StreamTrimOptions;
import glide.api.models.commands.ZaddOptions;
import glide.api.models.configuration.BaseClientConfiguration;
import glide.api.models.exceptions.RedisException;
Expand Down Expand Up @@ -927,6 +929,12 @@ public CompletableFuture<String> xadd(
return commandManager.submitNewCommand(XAdd, arguments, this::handleStringOrNullResponse);
}

@Override
public CompletableFuture<Long> xtrim(@NonNull String key, @NonNull StreamTrimOptions options) {
String[] arguments = ArrayUtils.addFirst(options.toArgs(), key);
return commandManager.submitNewCommand(XTrim, arguments, this::handleLongResponse);
}

@Override
public CompletableFuture<Long> pttl(@NonNull String key) {
return commandManager.submitNewCommand(PTTL, new String[] {key}, this::handleLongResponse);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.commands;

import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.StreamOptions.StreamAddOptions;
import glide.api.models.commands.StreamOptions.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.StreamOptions.StreamTrimOptions;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

Expand Down Expand Up @@ -52,4 +53,24 @@ public interface StreamBaseCommands {
* }</pre>
*/
CompletableFuture<String> xadd(String key, Map<String, String> values, StreamAddOptions options);

/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options.
* @return The number of entries deleted from the stream.
* @example
* <pre>{@code
* // A nearly exact trimming of the stream to at least a length of 10
* Long trimmed = client.xtrim("key", new MaxLen(false, 10L)).get();
* System.out.println("Number of trimmed entries from stream: " + trimmed);
*
* // An exact trimming of the stream by minimum id of "0-3", limit of 10 entries
* Long trimmed = client.xtrim("key", new MinId(true, "0-3", 10L)).get();
* System.out.println("Number of trimmed entries from stream: " + trimmed);
* }</pre>
*/
CompletableFuture<Long> xtrim(String key, StreamTrimOptions options);
}
20 changes: 18 additions & 2 deletions java/client/src/main/java/glide/api/models/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@
import static redis_request.RedisRequestOuterClass.RequestType.Type;
import static redis_request.RedisRequestOuterClass.RequestType.Unlink;
import static redis_request.RedisRequestOuterClass.RequestType.XAdd;
import static redis_request.RedisRequestOuterClass.RequestType.XTrim;
import static redis_request.RedisRequestOuterClass.RequestType.ZDiff;
import static redis_request.RedisRequestOuterClass.RequestType.ZDiffStore;
import static redis_request.RedisRequestOuterClass.RequestType.ZLexCount;
Expand Down Expand Up @@ -127,8 +128,9 @@
import glide.api.models.commands.SetOptions;
import glide.api.models.commands.SetOptions.ConditionalSet;
import glide.api.models.commands.SetOptions.SetOptionsBuilder;
import glide.api.models.commands.StreamAddOptions;
import glide.api.models.commands.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.StreamOptions.StreamAddOptions;
import glide.api.models.commands.StreamOptions.StreamAddOptions.StreamAddOptionsBuilder;
import glide.api.models.commands.StreamOptions.StreamTrimOptions;
import glide.api.models.commands.ZaddOptions;
import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -1952,6 +1954,20 @@ public T xadd(
return getThis();
}

/**
* Trims the stream by evicting older entries.
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a> for details.
* @param key The key of the stream.
* @param options Stream trim options.
* @return Command Response - The number of entries deleted from the stream.
*/
public T xtrim(@NonNull String key, @NonNull StreamTrimOptions options) {
ArgsArray commandArgs = buildArgs(ArrayUtils.addFirst(options.toArgs(), key));
protobufTransaction.addCommands(buildCommand(XTrim, commandArgs));
return getThis();
}

/**
* Returns the remaining time to live of <code>key</code> that has a timeout, in milliseconds.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,42 +8,77 @@
import lombok.Builder;
import lombok.NonNull;

/**
* Optional arguments to {@link StreamBaseCommands#xadd(String, Map, StreamAddOptions)}
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a>
*/
@Builder
public final class StreamAddOptions {

public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM";
public static final String ID_WILDCARD_REDIS_API = "*";
public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN";
public static final String TRIM_MINID_REDIS_API = "MINID";
public static final String TRIM_EXACT_REDIS_API = "=";
public static final String TRIM_NOT_EXACT_REDIS_API = "~";
public static final String TRIM_LIMIT_REDIS_API = "LIMIT";

/** If set, the new entry will be added with this <code>id</code>. */
private final String id;
/** Optional arguments for {@link StreamBaseCommands} */
public final class StreamOptions {

/**
* If set to <code>false</code>, a new stream won't be created if no stream matches the given key.
* <br>
* Equivalent to <code>NOMKSTREAM</code> in the Redis API.
* Optional arguments to {@link StreamBaseCommands#xadd(String, Map, StreamAddOptions)}
*
* @see <a href="https://redis.io/commands/xadd/">redis.io</a>
*/
private final Boolean makeStream;
@Builder
public static final class StreamAddOptions {
public static final String NO_MAKE_STREAM_REDIS_API = "NOMKSTREAM";
public static final String ID_WILDCARD_REDIS_API = "*";

/** If set, the new entry will be added with this <code>id</code>. */
private final String id;

/**
* If set to <code>false</code>, a new stream won't be created if no stream matches the given
* key. <br>
* Equivalent to <code>NOMKSTREAM</code> in the Redis API.
*/
private final Boolean makeStream;

/** If set, the add operation will also trim the older entries in the stream. */
private final StreamTrimOptions trim;

/**
* Converts options for Xadd into a String[].
*
* @return String[]
*/
public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (makeStream != null && !makeStream) {
optionArgs.add(NO_MAKE_STREAM_REDIS_API);
}

if (trim != null) {
optionArgs.addAll(trim.getRedisApi());
}

if (id != null) {
optionArgs.add(id);
} else {
optionArgs.add(ID_WILDCARD_REDIS_API);
}

/** If set, the add operation will also trim the older entries in the stream. */
private final StreamTrimOptions trim;
return optionArgs.toArray(new String[0]);
}
}

/**
* Optional arguments for {@link StreamBaseCommands#xtrim(String, StreamTrimOptions)}
*
* @see <a href="https://redis.io/commands/xtrim/">redis.io</a>
*/
public abstract static class StreamTrimOptions {

public static final String TRIM_MAXLEN_REDIS_API = "MAXLEN";
public static final String TRIM_MINID_REDIS_API = "MINID";
public static final String TRIM_EXACT_REDIS_API = "=";
public static final String TRIM_NOT_EXACT_REDIS_API = "~";
public static final String TRIM_LIMIT_REDIS_API = "LIMIT";

/**
* If <code>true</code>, the stream will be trimmed exactly. Equivalent to <code>=</code> in the
* Redis API. Otherwise, the stream will be trimmed in a near-exact manner, which is more
* efficient, equivalent to <code>~</code> in the Redis API.
*/
protected boolean exact;
protected Boolean exact;

/** If set, sets the maximal amount of entries that will be deleted. */
protected Long limit;
Expand All @@ -56,23 +91,49 @@ protected List<String> getRedisApi() {
List<String> optionArgs = new ArrayList<>();

optionArgs.add(this.getMethod());
optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API);
if (this.exact != null) {
optionArgs.add(this.exact ? TRIM_EXACT_REDIS_API : TRIM_NOT_EXACT_REDIS_API);
}
optionArgs.add(this.getThreshold());

if (this.limit != null) {
optionArgs.add(TRIM_LIMIT_REDIS_API);
optionArgs.add(this.limit.toString());
}

return optionArgs;
}

/**
* Converts options for {@link StreamBaseCommands#xtrim(String, StreamTrimOptions)} into a
* String[].
*
* @return String[]
*/
public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (this.getRedisApi() != null) {
optionArgs.addAll(this.getRedisApi());
}

return optionArgs.toArray(new String[0]);
}
}

/** Option to trim the stream according to minimum ID. */
public static class MinId extends StreamTrimOptions {
/** Trim the stream according to entry ID. Equivalent to <code>MINID</code> in the Redis API. */
private final String threshold;

/**
* Create a trim option to trim stream based on stream ID.
*
* @param threshold Comparison id.
*/
public MinId(@NonNull String threshold) {
this.threshold = threshold;
}

/**
* Create a trim option to trim stream based on stream ID.
*
Expand All @@ -87,13 +148,12 @@ public MinId(boolean exact, @NonNull String threshold) {
/**
* Create a trim option to trim stream based on stream ID.
*
* @param exact Whether to match exactly on the threshold.
* @param threshold Comparison id.
* @param limit Max number of stream entries to be trimmed.
* @param limit Max number of stream entries to be trimmed for non-exact match.
*/
public MinId(boolean exact, @NonNull String threshold, long limit) {
public MinId(@NonNull String threshold, long limit) {
this.exact = false;
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

Expand All @@ -116,6 +176,15 @@ public static class MaxLen extends StreamTrimOptions {
*/
private final Long threshold;

/**
* Create a Max Length trim option to trim stream based on length.
*
* @param threshold Comparison count.
*/
public MaxLen(long threshold) {
this.threshold = threshold;
}

/**
* Create a Max Length trim option to trim stream based on length.
*
Expand All @@ -130,13 +199,12 @@ public MaxLen(boolean exact, long threshold) {
/**
* Create a Max Length trim option to trim stream entries exceeds the threshold.
*
* @param exact Whether to match exactly on the threshold.
* @param threshold Comparison count.
* @param limit Max number of stream entries to be trimmed.
* @param limit Max number of stream entries to be trimmed for non-exact match.
*/
public MaxLen(boolean exact, long threshold, long limit) {
public MaxLen(long threshold, long limit) {
this.exact = false;
this.threshold = threshold;
this.exact = exact;
this.limit = limit;
}

Expand All @@ -150,29 +218,4 @@ protected String getThreshold() {
return threshold.toString();
}
}

/**
* Converts options for Xadd into a String[].
*
* @return String[]
*/
public String[] toArgs() {
List<String> optionArgs = new ArrayList<>();

if (makeStream != null && !makeStream) {
optionArgs.add(NO_MAKE_STREAM_REDIS_API);
}

if (trim != null) {
optionArgs.addAll(trim.getRedisApi());
}

if (id != null) {
optionArgs.add(id);
} else {
optionArgs.add(ID_WILDCARD_REDIS_API);
}

return optionArgs.toArray(new String[0]);
}
}
Loading

0 comments on commit 2766989

Please sign in to comment.