Skip to content

Commit

Permalink
Python: enable routing by node address. (valkey-io#1038)
Browse files Browse the repository at this point in the history
* Python: enable routing by node address.

* Added correct error handling to the split.

* Fix check

* fix

* Updated host's documentation.

* fix docs

* Update python/python/glide/routes.py

Co-authored-by: Yury-Fridlyand <[email protected]>

* Update node/src/RedisClusterClient.ts

Co-authored-by: Yury-Fridlyand <[email protected]>

---------

Co-authored-by: Shachar Langbeheim <[email protected]>
Co-authored-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
3 people authored Mar 6, 2024
1 parent 94e1186 commit 25a30bd
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 9 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#### Changes

* Node: Allow routing Cluster requests by address. ([#1021](https://github.com/aws/glide-for-redis/pull/1021))
* Python Node: Allow routing Cluster requests by address. ([#1021](https://github.com/aws/glide-for-redis/pull/1021))
* Python: Added HSETNX command. ([#954](https://github.com/aws/glide-for-redis/pull/954))
* Python: Added SISMEMBER command ([#971](https://github.com/aws/glide-for-redis/pull/971))
* Python, Node: Added TYPE command ([#945](https://github.com/aws/glide-for-redis/pull/945), [#980](https://github.com/aws/glide-for-redis/pull/980))
Expand Down
14 changes: 9 additions & 5 deletions node/src/RedisClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
createInfo,
createPing,
} from "./Commands";
import { RequestError } from "./Errors";
import { connection_request, redis_request } from "./ProtobufMessage";
import { ClusterTransaction } from "./Transaction";

Expand Down Expand Up @@ -56,11 +57,11 @@ export type SlotKeyTypes = {
export type RouteByAddress = {
type: "routeByAddress";
/**
* DNS name of the host.
*The endpoint of the node. If `port` is not provided, should be in the `${address}:${port}` format, where `address` is the preferred endpoint as shown in the output of the `CLUSTER SLOTS` command.
*/
host: string;
/**
* The port to access on the node. If port is not provided, `host` is assumed to be in the format `{hostname}:{port}`.
* The port to access on the node. If port is not provided, `host` is assumed to be in the format `${address}:${port}`.
*/
port?: number;
};
Expand Down Expand Up @@ -146,7 +147,7 @@ function toProtobufRoute(
const split = host.split(":");

if (split.length !== 2) {
throw new Error(
throw new RequestError(
"No port provided, expected host to be formatted as `{hostname}:{port}`. Received " +
host
);
Expand Down Expand Up @@ -243,15 +244,18 @@ export class RedisClusterClient extends BaseClient {
/** Ping the Redis server.
* See https://redis.io/commands/ping/ for details.
*
* @param message - An optional message to include in the PING command.
* @param message - An optional message to include in the PING command.
* If not provided, the server will respond with "PONG".
* If provided, the server will respond with a copy of the message.
* @param route - The command will be routed to all primaries, unless `route` is provided, in which
* case the client will route the command to the nodes defined by `route`.
* @returns - "PONG" if `message` is not provided, otherwise return a copy of `message`.
*/
public ping(message?: string, route?: Routes): Promise<string> {
return this.createWritePromise(createPing(message), toProtobufRoute(route));
return this.createWritePromise(
createPing(message),
toProtobufRoute(route)
);
}

/** Get information and statistics about the Redis server.
Expand Down
2 changes: 2 additions & 0 deletions python/python/glide/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from glide.routes import (
AllNodes,
AllPrimaries,
ByAddressRoute,
RandomNode,
SlotIdRoute,
SlotKeyRoute,
Expand Down Expand Up @@ -80,6 +81,7 @@
"SlotType",
"AllNodes",
"AllPrimaries",
"ByAddressRoute",
"RandomNode",
"SlotKeyRoute",
"SlotIdRoute",
Expand Down
4 changes: 2 additions & 2 deletions python/python/glide/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from glide.protobuf.connection_request_pb2 import ConnectionRequest
from glide.protobuf.redis_request_pb2 import RedisRequest
from glide.routes import RandomNode, SlotIdRoute, SlotKeyRoute
from glide.routes import ByAddressRoute, RandomNode, SlotIdRoute, SlotKeyRoute

OK: str = "OK"
DEFAULT_READ_BYTES_SIZE: int = pow(2, 16)
Expand All @@ -26,4 +26,4 @@
# When routing to a single node, response will be T
# Otherwise, response will be : {Address : response , ... } with type of Dict[str, T].
TClusterResponse = Union[T, Dict[str, T]]
TSingleNodeRoute = Union[RandomNode, SlotKeyRoute, SlotIdRoute]
TSingleNodeRoute = Union[RandomNode, SlotKeyRoute, SlotIdRoute, ByAddressRoute]
29 changes: 28 additions & 1 deletion python/python/glide/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from enum import Enum
from typing import Optional

from glide.exceptions import RequestError
from glide.protobuf.redis_request_pb2 import RedisRequest, SimpleRoutes
from glide.protobuf.redis_request_pb2 import SlotTypes as ProtoSlotTypes

Expand Down Expand Up @@ -48,6 +49,29 @@ def __init__(self, slot_type: SlotType, slot_id: int) -> None:
self.slot_id = slot_id


class ByAddressRoute(Route):
def __init__(self, host: str, port: Optional[int] = None) -> None:
"""Routes a request to a node by its address
Args:
host (str): The endpoint of the node. If `port` is not provided, should be in the f"{address}:{port}" format, where `address` is the preferred endpoint as shown in the output of the `CLUSTER SLOTS` command.
port (Optional[int]): The port to access on the node. If port is not provided, `host` is assumed to be in the format f"{address}:{port}".
"""
super().__init__()
if port is None:
split = host.split(":")
if len(split) < 2:
raise RequestError(
"No port provided, expected host to be formatted as {hostname}:{port}`. Received "
+ host
)
self.host = split[0]
self.port = int(split[1])
else:
self.host = host
self.port = port


def to_protobuf_slot_type(slot_type: SlotType) -> ProtoSlotTypes.ValueType:
return (
ProtoSlotTypes.Primary
Expand All @@ -71,5 +95,8 @@ def set_protobuf_route(request: RedisRequest, route: Optional[Route]) -> None:
elif isinstance(route, SlotIdRoute):
request.route.slot_id_route.slot_type = to_protobuf_slot_type(route.slot_type)
request.route.slot_id_route.slot_id = route.slot_id
elif isinstance(route, ByAddressRoute):
request.route.by_address_route.host = route.host
request.route.by_address_route.port = route.port
else:
raise Exception(f"Received invalid route type: {type(route)}")
raise RequestError(f"Received invalid route type: {type(route)}")
39 changes: 39 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from glide.routes import (
AllNodes,
AllPrimaries,
ByAddressRoute,
RandomNode,
Route,
SlotIdRoute,
Expand Down Expand Up @@ -1656,6 +1657,44 @@ async def test_info_random_route(self, redis_client: RedisClusterClient):
assert isinstance(info, str)
assert "# Server" in info

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_cluster_route_by_address_reaches_correct_node(
self, redis_client: RedisClusterClient
):
cluster_nodes = await redis_client.custom_command(
["cluster", "nodes"], RandomNode()
)
assert isinstance(cluster_nodes, str)
host = (
[line for line in cluster_nodes.split("\n") if "myself" in line][0]
.split(" ")[1]
.split("@")[0]
)

second_result = await redis_client.custom_command(
["cluster", "nodes"], ByAddressRoute(host)
)

assert cluster_nodes == second_result

host, port = host.split(":")
port_as_int = int(port)

third_result = await redis_client.custom_command(
["cluster", "nodes"], ByAddressRoute(host, port_as_int)
)

assert cluster_nodes == third_result

@pytest.mark.parametrize("cluster_mode", [True])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_cluster_fail_routing_by_address_if_no_port_is_provided(
self, redis_client: RedisClusterClient
):
with pytest.raises(RequestError) as e:
await redis_client.info(route=ByAddressRoute("foo"))


@pytest.mark.asyncio
class TestExceptions:
Expand Down

0 comments on commit 25a30bd

Please sign in to comment.