diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fb8977d45..12854040b7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ #### Changes +* Java: Added PUBSUB CHANNELS, NUMPAT and NUMSUB commands ([#2105](https://github.com/valkey-io/valkey-glide/pull/2105)) * Java: Added binary support for custom command ([#2109](https://github.com/valkey-io/valkey-glide/pull/2109)) * Node: Added SSCAN command ([#2132](https://github.com/valkey-io/valkey-glide/pull/2132)) * Node: Added FUNCTION KILL command ([#2114](https://github.com/valkey-io/valkey-glide/pull/2114)) diff --git a/java/client/src/main/java/glide/api/BaseClient.java b/java/client/src/main/java/glide/api/BaseClient.java index 9c6297f0ad..91182e797e 100644 --- a/java/client/src/main/java/glide/api/BaseClient.java +++ b/java/client/src/main/java/glide/api/BaseClient.java @@ -83,6 +83,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfAdd; import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -4495,6 +4498,54 @@ public CompletableFuture publish( }); } + @Override + public CompletableFuture pubsubChannels() { + return commandManager.submitNewCommand( + PubSubChannels, + new String[0], + response -> castArray(handleArrayResponse(response), String.class)); + } + + @Override + public CompletableFuture pubsubChannelsBinary() { + return commandManager.submitNewCommand( + PubSubChannels, + new GlideString[0], + response -> castArray(handleArrayResponseBinary(response), GlideString.class)); + } + + @Override + public CompletableFuture pubsubChannels(@NonNull String pattern) { + return commandManager.submitNewCommand( + PubSubChannels, + new String[] {pattern}, + response -> castArray(handleArrayResponse(response), String.class)); + } + + @Override + public CompletableFuture pubsubChannels(@NonNull GlideString pattern) { + return commandManager.submitNewCommand( + PubSubChannels, + new GlideString[] {pattern}, + response -> castArray(handleArrayResponseBinary(response), GlideString.class)); + } + + @Override + public CompletableFuture pubsubNumPat() { + return commandManager.submitNewCommand(PubSubNumPat, new String[0], this::handleLongResponse); + } + + @Override + public CompletableFuture> pubsubNumSub(@NonNull String[] channels) { + return commandManager.submitNewCommand(PubSubNumSub, channels, this::handleMapResponse); + } + + @Override + public CompletableFuture> pubsubNumSub(@NonNull GlideString[] channels) { + return commandManager.submitNewCommand( + PubSubNumSub, channels, this::handleBinaryStringMapResponse); + } + @Override public CompletableFuture watch(@NonNull String[] keys) { return commandManager.submitNewCommand(Watch, keys, this::handleStringResponse); diff --git a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java index 6906d98e06..d83038b3b3 100644 --- a/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java +++ b/java/client/src/main/java/glide/api/commands/PubSubBaseCommands.java @@ -2,6 +2,7 @@ package glide.api.commands; import glide.api.models.GlideString; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -40,4 +41,124 @@ public interface PubSubBaseCommands { * } */ CompletableFuture publish(GlideString message, GlideString channel); + + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @return An Array of all active channels. + * @example + *
{@code
+     * String[] response = client.pubsubChannels().get();
+     * assert Arrays.equals(new String[] { "channel1", "channel2" });
+     * }
+ */ + CompletableFuture pubsubChannels(); + + /** + * Lists the currently active channels.
+ * Unlike of {@link #pubsubChannels()}, returns channel names as {@link GlideString}s. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @return An Array of all active channels. + * @example + *
{@code
+     * GlideString[] response = client.pubsubChannels().get();
+     * assert Arrays.equals(new GlideString[] { "channel1", "channel2" });
+     * }
+ */ + CompletableFuture pubsubChannelsBinary(); + + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @param pattern A glob-style pattern to match active channels. + * @return An Array of currently active channels matching the given pattern. + * @example + *
{@code
+     * String[] response = client.pubsubChannels("news.*").get();
+     * assert Arrays.equals(new String[] { "news.sports", "news.weather" });
+     * }
+ */ + CompletableFuture pubsubChannels(String pattern); + + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @param pattern A glob-style pattern to match active channels. + * @return An Array of currently active channels matching the given pattern. + * @example + *
{@code
+     * GlideString[] response = client.pubsubChannels(gs("news.*")).get();
+     * assert Arrays.equals(new GlideString[] { gs("news.sports"), gs("news.weather") });
+     * }
+ */ + CompletableFuture pubsubChannels(GlideString pattern); + + /** + * Returns the number of unique patterns that are subscribed to by clients. + * + * @apiNote + *
    + *
  • When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + *
  • This is the total number of unique patterns all the clients are subscribed to, not + * the count of clients subscribed to patterns. + *
+ * + * @see valkey.io for details. + * @return The number of unique patterns. + * @example + *
{@code
+     * Long result = client.pubsubNumPat().get();
+     * assert result == 3L;
+     * }
+ */ + CompletableFuture pubsubNumPat(); + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single map. + * @see valkey.io for details. + * @param channels The list of channels to query for the number of subscribers. + * @return A Map where keys are the channel names and values are the numbers of + * subscribers. + * @example + *
{@code
+     * Map result = client.pubsubNumSub(new String[] {"channel1", "channel2"}).get();
+     * assert result.equals(Map.of("channel1", 3L, "channel2", 5L));
+     * }
+ */ + CompletableFuture> pubsubNumSub(String[] channels); + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single map. + * @see valkey.io for details. + * @param channels The list of channels to query for the number of subscribers. + * @return A Map where keys are the channel names and values are the numbers of + * subscribers. + * @example + *
{@code
+     * Map result = client.pubsubNumSub(new GlideString[] {gs("channel1"), gs("channel2")}).get();
+     * assert result.equals(Map.of(gs("channel1"), 3L, gs("channel2"), 5L));
+     * }
+ */ + CompletableFuture> pubsubNumSub(GlideString[] channels); } diff --git a/java/client/src/main/java/glide/api/models/BaseTransaction.java b/java/client/src/main/java/glide/api/models/BaseTransaction.java index ccf9b956c4..cf2b77401e 100644 --- a/java/client/src/main/java/glide/api/models/BaseTransaction.java +++ b/java/client/src/main/java/glide/api/models/BaseTransaction.java @@ -105,6 +105,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; import static command_request.CommandRequestOuterClass.RequestType.Ping; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -6295,6 +6298,75 @@ public T publish(@NonNull ArgType message, @NonNull ArgType channel) { return getThis(); } + /** + * Lists the currently active channels. + * + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @return Command response - An Array of all active channels. + */ + public T pubsubChannels() { + protobufTransaction.addCommands(buildCommand(PubSubChannels)); + return getThis(); + } + + /** + * Lists the currently active channels. + * + * @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type + * will throw {@link IllegalArgumentException}. + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + * @see valkey.io for details. + * @param pattern A glob-style pattern to match active channels. + * @return Command response - An Array of currently active channels matching the + * given pattern. + */ + public T pubsubChannels(@NonNull ArgType pattern) { + checkTypeOrThrow(pattern); + protobufTransaction.addCommands(buildCommand(PubSubChannels, newArgsBuilder().add(pattern))); + return getThis(); + } + + /** + * Returns the number of unique patterns that are subscribed to by clients. + * + * @apiNote + *
    + *
  • When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single array. + *
  • This is the total number of unique patterns all the clients are subscribed to, not + * the count of clients subscribed to patterns. + *
+ * + * @see valkey.io for details. + * @return Command response - The number of unique patterns. + */ + public T pubsubNumPat() { + protobufTransaction.addCommands(buildCommand(PubSubNumPat)); + return getThis(); + } + + /** + * Returns the number of subscribers (exclusive of clients subscribed to patterns) for the + * specified channels. + * + * @implNote {@link ArgType} is limited to {@link String} or {@link GlideString}, any other type + * will throw {@link IllegalArgumentException}. + * @apiNote When in cluster mode, the command is routed to all nodes, and aggregates the response + * into a single map. + * @see valkey.io for details. + * @param channels The list of channels to query for the number of subscribers. + * @return Command response - A Map where keys are the channel names and values are + * the numbers of subscribers. + */ + public T pubsubNumSub(@NonNull ArgType[] channels) { + checkTypeOrThrow(channels); + protobufTransaction.addCommands(buildCommand(PubSubNumSub, newArgsBuilder().add(channels))); + return getThis(); + } + /** * Gets the union of all the given sets. * diff --git a/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java index a29ddd3d83..c45d6abb33 100644 --- a/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/ClusterSubscriptionConfiguration.java @@ -1,6 +1,8 @@ /** Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0 */ package glide.api.models.configuration; +import static glide.api.models.GlideString.gs; + import glide.api.GlideClusterClient; import glide.api.models.GlideString; import java.util.HashMap; @@ -91,6 +93,17 @@ public ClusterSubscriptionConfigurationBuilder subscription( return this; } + /** + * Add a subscription to a channel or to multiple channels if {@link + * PubSubClusterChannelMode#PATTERN} is used.
+ * See {@link ClusterSubscriptionConfiguration#subscriptions}. + */ + public ClusterSubscriptionConfigurationBuilder subscription( + PubSubClusterChannelMode mode, String channelOrPattern) { + addSubscription(subscriptions, mode, gs(channelOrPattern)); + return this; + } + /** * Set all subscriptions in a bulk. Rewrites previously stored configurations.
* See {@link ClusterSubscriptionConfiguration#subscriptions}. diff --git a/java/client/src/test/java/glide/api/GlideClientTest.java b/java/client/src/test/java/glide/api/GlideClientTest.java index 9c52ba733a..50e812a511 100644 --- a/java/client/src/test/java/glide/api/GlideClientTest.java +++ b/java/client/src/test/java/glide/api/GlideClientTest.java @@ -107,6 +107,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; import static command_request.CommandRequestOuterClass.RequestType.Ping; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -13518,6 +13521,170 @@ public void publish_returns_success() { assertEquals(OK, payload); } + @SneakyThrows + @Test + public void pubsubChannels_returns_success() { + // setup + String[] arguments = new String[0]; + String[] value = new String[] {"ch1", "ch2"}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannels(); + String[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubChannelsBinary_returns_success() { + // setup + GlideString[] arguments = new GlideString[0]; + GlideString[] value = new GlideString[] {gs("ch1"), gs("ch2")}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannelsBinary(); + GlideString[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubChannels_with_pattern_returns_success() { + // setup + String pattern = "ch*"; + String[] arguments = new String[] {pattern}; + String[] value = new String[] {"ch1", "ch2"}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannels(pattern); + String[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubChannelsBinary_with_pattern_returns_success() { + // setup + GlideString pattern = gs("ch*"); + GlideString[] arguments = new GlideString[] {pattern}; + GlideString[] value = new GlideString[] {gs("ch1"), gs("ch2")}; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubChannels), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubChannels(pattern); + GlideString[] payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubNumPat_returns_success() { + // setup + String[] arguments = new String[0]; + Long value = 42L; + + CompletableFuture testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.submitNewCommand(eq(PubSubNumPat), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture response = service.pubsubNumPat(); + Long payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubNumSub_returns_success() { + // setup + String[] arguments = new String[] {"ch1", "ch2"}; + Map value = Map.of(); + + CompletableFuture> testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.>submitNewCommand(eq(PubSubNumSub), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture> response = service.pubsubNumSub(arguments); + Map payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + + @SneakyThrows + @Test + public void pubsubNumSubBinary_returns_success() { + // setup + GlideString[] arguments = new GlideString[] {gs("ch1"), gs("ch2")}; + Map value = Map.of(); + + CompletableFuture> testResponse = new CompletableFuture<>(); + testResponse.complete(value); + + // match on protobuf request + when(commandManager.>submitNewCommand( + eq(PubSubNumSub), eq(arguments), any())) + .thenReturn(testResponse); + + // exercise + CompletableFuture> response = service.pubsubNumSub(arguments); + Map payload = response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + } + @SneakyThrows @Test public void sunion_returns_success() { diff --git a/java/client/src/test/java/glide/api/models/TransactionTests.java b/java/client/src/test/java/glide/api/models/TransactionTests.java index aa49a6849b..c112ff7599 100644 --- a/java/client/src/test/java/glide/api/models/TransactionTests.java +++ b/java/client/src/test/java/glide/api/models/TransactionTests.java @@ -104,6 +104,9 @@ import static command_request.CommandRequestOuterClass.RequestType.PfCount; import static command_request.CommandRequestOuterClass.RequestType.PfMerge; import static command_request.CommandRequestOuterClass.RequestType.Ping; +import static command_request.CommandRequestOuterClass.RequestType.PubSubChannels; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumPat; +import static command_request.CommandRequestOuterClass.RequestType.PubSubNumSub; import static command_request.CommandRequestOuterClass.RequestType.Publish; import static command_request.CommandRequestOuterClass.RequestType.RPop; import static command_request.CommandRequestOuterClass.RequestType.RPush; @@ -1278,6 +1281,18 @@ InfScoreBound.NEGATIVE_INFINITY, new ScoreBoundary(3, false), new Limit(1, 2)), transaction.publish("msg", "ch1"); results.add(Pair.of(Publish, buildArgs("ch1", "msg"))); + transaction.pubsubChannels(); + results.add(Pair.of(PubSubChannels, buildArgs())); + + transaction.pubsubChannels("pattern"); + results.add(Pair.of(PubSubChannels, buildArgs("pattern"))); + + transaction.pubsubNumPat(); + results.add(Pair.of(PubSubNumPat, buildArgs())); + + transaction.pubsubNumSub(new String[] {"ch1", "ch2"}); + results.add(Pair.of(PubSubNumSub, buildArgs("ch1", "ch2"))); + transaction.lcsIdx("key1", "key2"); results.add(Pair.of(LCS, buildArgs("key1", "key2", IDX_COMMAND_STRING))); diff --git a/java/integTest/src/test/java/glide/PubSubTests.java b/java/integTest/src/test/java/glide/PubSubTests.java index 6ca9b3691f..19166cbbf0 100644 --- a/java/integTest/src/test/java/glide/PubSubTests.java +++ b/java/integTest/src/test/java/glide/PubSubTests.java @@ -2,6 +2,7 @@ package glide; import static glide.TestConfiguration.SERVER_VERSION; +import static glide.TestUtilities.assertDeepEquals; import static glide.TestUtilities.commonClientConfig; import static glide.TestUtilities.commonClusterClientConfig; import static glide.api.BaseClient.OK; @@ -27,6 +28,8 @@ import glide.api.models.configuration.BaseSubscriptionConfiguration.MessageCallback; import glide.api.models.configuration.ClusterSubscriptionConfiguration; import glide.api.models.configuration.ClusterSubscriptionConfiguration.PubSubClusterChannelMode; +import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute; +import glide.api.models.configuration.RequestRoutingConfiguration.SlotType; import glide.api.models.configuration.StandaloneSubscriptionConfiguration; import glide.api.models.configuration.StandaloneSubscriptionConfiguration.PubSubChannelMode; import glide.api.models.exceptions.ConfigurationError; @@ -49,8 +52,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.SneakyThrows; +import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.tuple.Pair; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -111,10 +115,9 @@ private BaseClient createClientWithSubscriptions( @SneakyThrows private BaseClient createClient(boolean standalone) { - if (standalone) { - return GlideClient.createClient(commonClientConfig().build()).get(); - } - return GlideClusterClient.createClient(commonClusterClientConfig().build()).get(); + return standalone + ? GlideClient.createClient(commonClientConfig().build()).get() + : GlideClusterClient.createClient(commonClusterClientConfig().build()).get(); } /** @@ -128,17 +131,23 @@ private BaseClient createClient(boolean standalone) { private static final int MESSAGE_DELIVERY_DELAY = 500; // ms - @BeforeEach + @AfterEach @SneakyThrows public void cleanup() { for (var client : clients) { if (client instanceof GlideClusterClient) { - ((GlideClusterClient) client).customCommand(new String[] {"unsubscribe"}, ALL_NODES).get(); - ((GlideClusterClient) client).customCommand(new String[] {"punsubscribe"}, ALL_NODES).get(); - ((GlideClusterClient) client).customCommand(new String[] {"sunsubscribe"}, ALL_NODES).get(); + ((GlideClusterClient) client) + .customCommand(new GlideString[] {gs("unsubscribe")}, ALL_NODES) + .get(); + ((GlideClusterClient) client) + .customCommand(new GlideString[] {gs("punsubscribe")}, ALL_NODES) + .get(); + ((GlideClusterClient) client) + .customCommand(new GlideString[] {gs("sunsubscribe")}, ALL_NODES) + .get(); } else { - ((GlideClient) client).customCommand(new String[] {"unsubscribe"}).get(); - ((GlideClient) client).customCommand(new String[] {"punsubscribe"}).get(); + ((GlideClient) client).customCommand(new GlideString[] {gs("unsubscribe")}).get(); + ((GlideClient) client).customCommand(new GlideString[] {gs("punsubscribe")}).get(); } client.close(); } @@ -1232,7 +1241,7 @@ public void pubsub_with_binary(boolean standalone) { createClientWithSubscriptions( standalone, subscriptions, Optional.of(callback), Optional.of(callbackMessages)); var sender = createClient(standalone); - clients.addAll(Arrays.asList(listener, listener2, sender)); + clients.addAll(List.of(listener, listener2, sender)); assertEquals(OK, sender.publish(message.getMessage(), channel).get()); Thread.sleep(MESSAGE_DELIVERY_DELAY); // deliver the messages @@ -1241,4 +1250,222 @@ public void pubsub_with_binary(boolean standalone) { assertEquals(1, callbackMessages.size()); assertEquals(message, callbackMessages.get(0)); } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_channels(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + var client = createClient(standalone); + assertEquals(0, client.pubsubChannels().get().length); + assertEquals(0, client.pubsubChannelsBinary().get().length); + assertEquals(0, client.pubsubChannels("**").get().length); + assertEquals(0, client.pubsubChannels(gs("**")).get().length); + + var channels = Set.of("test_channel1", "test_channel2", "some_channel"); + String pattern = "test_*"; + + Map> subscriptions = + standalone + ? Map.of( + PubSubChannelMode.EXACT, + channels.stream().map(GlideString::gs).collect(Collectors.toSet())) + : Map.of( + PubSubClusterChannelMode.EXACT, + channels.stream().map(GlideString::gs).collect(Collectors.toSet())); + + var listener = createClientWithSubscriptions(standalone, subscriptions); + clients.addAll(List.of(client, listener)); + + // test without pattern + assertEquals(channels, Set.of(client.pubsubChannels().get())); + assertEquals(channels, Set.of(listener.pubsubChannels().get())); + assertEquals( + channels.stream().map(GlideString::gs).collect(Collectors.toSet()), + Set.of(client.pubsubChannelsBinary().get())); + assertEquals( + channels.stream().map(GlideString::gs).collect(Collectors.toSet()), + Set.of(listener.pubsubChannelsBinary().get())); + + // test with pattern + assertEquals( + Set.of("test_channel1", "test_channel2"), Set.of(client.pubsubChannels(pattern).get())); + assertEquals( + Set.of(gs("test_channel1"), gs("test_channel2")), + Set.of(client.pubsubChannels(gs(pattern)).get())); + assertEquals( + Set.of("test_channel1", "test_channel2"), Set.of(listener.pubsubChannels(pattern).get())); + assertEquals( + Set.of(gs("test_channel1"), gs("test_channel2")), + Set.of(listener.pubsubChannels(gs(pattern)).get())); + + // test with non-matching pattern + assertEquals(0, client.pubsubChannels("non_matching_*").get().length); + assertEquals(0, client.pubsubChannels(gs("non_matching_*")).get().length); + assertEquals(0, listener.pubsubChannels("non_matching_*").get().length); + assertEquals(0, listener.pubsubChannels(gs("non_matching_*")).get().length); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_numpat(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + var client = createClient(standalone); + assertEquals(0, client.pubsubNumPat().get()); + + var patterns = Set.of("news.*", "announcements.*"); + + Map> subscriptions = + standalone + ? Map.of( + PubSubChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())) + : Map.of( + PubSubClusterChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())); + + var listener = createClientWithSubscriptions(standalone, subscriptions); + clients.addAll(List.of(client, listener)); + + assertEquals(2, client.pubsubNumPat().get()); + assertEquals(2, listener.pubsubNumPat().get()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_numsub(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + // no channels exists yet + var client = createClient(standalone); + var channels = new String[] {"channel1", "channel2", "channel3"}; + assertEquals( + Arrays.stream(channels).collect(Collectors.toMap(c -> c, c -> 0L)), + client.pubsubNumSub(channels).get()); + + Map> subscriptions1 = + standalone + ? Map.of( + PubSubChannelMode.EXACT, Set.of(gs("channel1"), gs("channel2"), gs("channel3"))) + : Map.of( + PubSubClusterChannelMode.EXACT, + Set.of(gs("channel1"), gs("channel2"), gs("channel3"))); + var listener1 = createClientWithSubscriptions(standalone, subscriptions1); + + Map> subscriptions2 = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(gs("channel2"), gs("channel3"))) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(gs("channel2"), gs("channel3"))); + var listener2 = createClientWithSubscriptions(standalone, subscriptions2); + + Map> subscriptions3 = + standalone + ? Map.of(PubSubChannelMode.EXACT, Set.of(gs("channel3"))) + : Map.of(PubSubClusterChannelMode.EXACT, Set.of(gs("channel3"))); + var listener3 = createClientWithSubscriptions(standalone, subscriptions3); + + Map> subscriptions4 = + standalone + ? Map.of(PubSubChannelMode.PATTERN, Set.of(gs("channel*"))) + : Map.of(PubSubClusterChannelMode.PATTERN, Set.of(gs("channel*"))); + var listener4 = createClientWithSubscriptions(standalone, subscriptions4); + + clients.addAll(List.of(client, listener1, listener2, listener3, listener4)); + + var expected = Map.of("channel1", 1L, "channel2", 2L, "channel3", 3L, "channel4", 0L); + assertEquals(expected, client.pubsubNumSub(ArrayUtils.addFirst(channels, "channel4")).get()); + assertEquals(expected, listener1.pubsubNumSub(ArrayUtils.addFirst(channels, "channel4")).get()); + + var expectedGs = + Map.of(gs("channel1"), 1L, gs("channel2"), 2L, gs("channel3"), 3L, gs("channel4"), 0L); + assertEquals( + expectedGs, + client + .pubsubNumSub( + new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")}) + .get()); + assertEquals( + expectedGs, + listener2 + .pubsubNumSub( + new GlideString[] {gs("channel1"), gs("channel2"), gs("channel3"), gs("channel4")}) + .get()); + } + + @SneakyThrows + @ParameterizedTest(name = "standalone = {0}") + @ValueSource(booleans = {true, false}) + public void pubsub_channels_and_numpat_and_numsub_in_transaction(boolean standalone) { + assumeTrue(SERVER_VERSION.isGreaterThanOrEqualTo("7.0.0"), "This feature added in version 7"); + + var prefix = "{boo}-"; + var route = new SlotKeyRoute(prefix, SlotType.PRIMARY); + var client = createClient(standalone); + var channels = + new String[] {prefix + "test_channel1", prefix + "test_channel2", prefix + "some_channel"}; + var patterns = Set.of(prefix + "news.*", prefix + "announcements.*"); + String pattern = prefix + "test_*"; + + var transaction = + (standalone ? new Transaction() : new ClusterTransaction()) + .pubsubChannels() + .pubsubChannels(pattern) + .pubsubNumPat() + .pubsubNumSub(channels); + // no channels exists yet + var result = + standalone + ? ((GlideClient) client).exec((Transaction) transaction).get() + : ((GlideClusterClient) client).exec((ClusterTransaction) transaction, route).get(); + assertDeepEquals( + new Object[] { + new String[0], // pubsubChannels() + new String[0], // pubsubChannels(pattern) + 0L, // pubsubNumPat() + Arrays.stream(channels) + .collect(Collectors.toMap(c -> c, c -> 0L)), // pubsubNumSub(channels) + }, + result); + + Map> subscriptions = + standalone + ? Map.of( + PubSubChannelMode.EXACT, + Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), + PubSubChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())) + : Map.of( + PubSubClusterChannelMode.EXACT, + Arrays.stream(channels).map(GlideString::gs).collect(Collectors.toSet()), + PubSubClusterChannelMode.PATTERN, + patterns.stream().map(GlideString::gs).collect(Collectors.toSet())); + + var listener = createClientWithSubscriptions(standalone, subscriptions); + clients.addAll(List.of(client, listener)); + + result = + standalone + ? ((GlideClient) client).exec((Transaction) transaction).get() + : ((GlideClusterClient) client).exec((ClusterTransaction) transaction, route).get(); + + // convert arrays to sets, because we can't compare arrays - they received reordered + result[0] = Set.of((Object[]) result[0]); + result[1] = Set.of((Object[]) result[1]); + + assertDeepEquals( + new Object[] { + Set.of(channels), // pubsubChannels() + Set.of("{boo}-test_channel1", "{boo}-test_channel2"), // pubsubChannels(pattern) + 2L, // pubsubNumPat() + Arrays.stream(channels) + .collect(Collectors.toMap(c -> c, c -> 1L)), // pubsubNumSub(channels) + }, + result); + } }