Skip to content

Commit

Permalink
Merge pull request valkey-io#1112 from Bit-Quill/java/integ_SanH_add_…
Browse files Browse the repository at this point in the history
…ByAdressNode

Java: enable routing by node address. (#137)
  • Loading branch information
Elen-Ghulam authored Mar 13, 2024
2 parents 09dbcea + caca72e commit 290479b
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
/** Copyright GLIDE-for-Redis Project Contributors - SPDX Identifier: Apache-2.0 */
package glide.api.models.configuration;

import glide.api.models.exceptions.RequestException;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

/** Request routing configuration. */
Expand Down Expand Up @@ -79,4 +81,48 @@ public boolean isSingleNodeRoute() {
return true;
}
}

/** Routes a request to a node by its address */
@Getter
public static class ByAddressRoute implements Route {
/**
* The endpoint of the node. If <code>port</code> is not provided, should be in the <code>
* "address:port"</code> format, where <code>address</code> is the preferred endpoint as shown
* in the output of the <code>CLUSTER SLOTS</code> command.
*/
private final String host;

/**
* The port to access the node. If port is not provided, <code>host</code> is assumed to be in
* the format <code>"address:port"</code>.
*/
private final int port;

public ByAddressRoute(@NonNull String host, int port) {
this.host = host;
this.port = port;
}

public ByAddressRoute(@NonNull String host) {
String[] split = host.split(":");
if (split.length < 2) {
throw new RequestException(
"No port provided, and host is not in the expected format 'hostname:port'. Received: "
+ host);
}

this.host = split[0];

try {
this.port = Integer.parseInt(split[1]);
} catch (NumberFormatException e) {
throw new RequestException("Port must be a valid integer. Received: " + split[1]);
}
}

@Override
public boolean isSingleNodeRoute() {
return true;
}
}
}
11 changes: 10 additions & 1 deletion java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@

import glide.api.models.ClusterTransaction;
import glide.api.models.Transaction;
import glide.api.models.configuration.RequestRoutingConfiguration.ByAddressRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.Route;
import glide.api.models.configuration.RequestRoutingConfiguration.SimpleRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotIdRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute;
import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.RequestException;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import java.util.Optional;
Expand Down Expand Up @@ -231,8 +233,15 @@ private RedisRequest.Builder prepareRedisRequestRoute(RedisRequest.Builder build
.setSlotKey(((SlotKeyRoute) route).getSlotKey())
.setSlotType(
SlotTypes.forNumber(((SlotKeyRoute) route).getSlotType().ordinal()))));
} else if (route instanceof ByAddressRoute) {
builder.setRoute(
Routes.newBuilder()
.setByAddressRoute(
RedisRequestOuterClass.ByAddressRoute.newBuilder()
.setHost(((ByAddressRoute) route).getHost())
.setPort(((ByAddressRoute) route).getPort())));
} else {
throw new IllegalArgumentException("Unknown type of route");
throw new RequestException("Unknown type of route");
}
return builder;
}
Expand Down
28 changes: 27 additions & 1 deletion java/client/src/test/java/glide/managers/CommandManagerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import glide.api.models.ClusterTransaction;
import glide.api.models.Transaction;
import glide.api.models.configuration.RequestRoutingConfiguration.ByAddressRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SimpleRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotIdRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotType;
import glide.api.models.exceptions.RequestException;
import glide.connectors.handlers.ChannelHandler;
import java.util.LinkedList;
import java.util.Map;
Expand Down Expand Up @@ -221,14 +223,38 @@ public void prepare_request_with_slot_key_routes(SlotType slotType) {
() -> assertFalse(requestBuilder.getRoute().hasSlotIdRoute()));
}

@Test
public void prepare_request_with_by_address_route() {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);
when(channelHandler.isClosed()).thenReturn(false);

ArgumentCaptor<RedisRequest.Builder> captor =
ArgumentCaptor.forClass(RedisRequest.Builder.class);

service.submitNewCommand(
CustomCommand, new String[0], new ByAddressRoute("testhost", 6379), r -> null);
verify(channelHandler).write(captor.capture(), anyBoolean());
var requestBuilder = captor.getValue();

assertAll(
() -> assertTrue(requestBuilder.hasRoute()),
() -> assertTrue(requestBuilder.getRoute().hasByAddressRoute()),
() -> assertEquals("testhost", requestBuilder.getRoute().getByAddressRoute().getHost()),
() -> assertEquals(6379, requestBuilder.getRoute().getByAddressRoute().getPort()),
() -> assertFalse(requestBuilder.getRoute().hasSimpleRoutes()),
() -> assertFalse(requestBuilder.getRoute().hasSlotIdRoute()),
() -> assertFalse(requestBuilder.getRoute().hasSlotKeyRoute()));
}

@Test
public void prepare_request_with_unknown_route_type() {
CompletableFuture<Response> future = new CompletableFuture<>();
when(channelHandler.write(any(), anyBoolean())).thenReturn(future);

var exception =
assertThrows(
IllegalArgumentException.class,
RequestException.class,
() -> service.submitNewCommand(CustomCommand, new String[0], () -> false, r -> null));
assertEquals("Unknown type of route", exception.getMessage());
}
Expand Down
47 changes: 47 additions & 0 deletions java/integTest/src/test/java/glide/cluster/CommandTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
import static glide.api.models.commands.InfoOptions.Section.MEMORY;
import static glide.api.models.commands.InfoOptions.Section.REPLICATION;
import static glide.api.models.commands.InfoOptions.Section.STATS;
import static glide.api.models.configuration.RequestRoutingConfiguration.ByAddressRoute;
import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleRoute.ALL_NODES;
import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleRoute.ALL_PRIMARIES;
import static glide.api.models.configuration.RequestRoutingConfiguration.SimpleRoute.RANDOM;
import static glide.api.models.configuration.RequestRoutingConfiguration.SlotType.PRIMARY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -30,6 +32,7 @@
import glide.api.models.configuration.RedisClusterClientConfiguration;
import glide.api.models.configuration.RequestRoutingConfiguration.SlotKeyRoute;
import glide.api.models.exceptions.RequestException;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -366,4 +369,48 @@ public void config_rewrite_non_existent_config_file() {
assertThrows(ExecutionException.class, () -> clusterClient.configRewrite().get());
assertTrue(executionException.getCause() instanceof RequestException);
}

@Test
@SneakyThrows
public void cluster_route_by_address_reaches_correct_node() {
String initialNode =
(String)
clusterClient
.customCommand(new String[] {"cluster", "nodes"}, RANDOM)
.get()
.getSingleValue();

String host =
Arrays.stream(initialNode.split("\n"))
.filter(line -> line.contains("myself"))
.findFirst()
.map(line -> line.split(" ")[1].split("@")[0])
.orElse(null);
assertNotNull(host);

String specifiedClusterNode1 =
(String)
clusterClient
.customCommand(new String[] {"cluster", "nodes"}, new ByAddressRoute(host))
.get()
.getSingleValue();
assertEquals(initialNode, specifiedClusterNode1);

String[] splitHost = host.split(":");
String specifiedClusterNode2 =
(String)
clusterClient
.customCommand(
new String[] {"cluster", "nodes"},
new ByAddressRoute(splitHost[0], Integer.parseInt(splitHost[1])))
.get()
.getSingleValue();
assertEquals(initialNode, specifiedClusterNode2);
}

@Test
@SneakyThrows
public void cluster_fail_routing_by_address_if_no_port_is_provided() {
assertThrows(RequestException.class, () -> clusterClient.info(new ByAddressRoute("foo")).get());
}
}

0 comments on commit 290479b

Please sign in to comment.