Skip to content

Commit

Permalink
Python: add BZPOPMIN and BZPOPMAX commands (valkey-io#1399)
Browse files Browse the repository at this point in the history
* Python: add BZPOPMIN and BZPOPMAX commands (#266)

* Update PR link

* PR suggestions

* Fix rust
  • Loading branch information
aaron-congo authored May 14, 2024
1 parent c726596 commit 8c590a7
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
* Python: Added ZRANGESTORE command ([#1377](https://github.com/aws/glide-for-redis/pull/1377))
* Python: Added ZDIFFSTORE command ([#1378](https://github.com/aws/glide-for-redis/pull/1378))
* Python: Added ZDIFF command ([#1401](https://github.com/aws/glide-for-redis/pull/1401))
* Python: Added BZPOPMIN and BZPOPMAX commands ([#1399](https://github.com/aws/glide-for-redis/pull/1399))


#### Fixes
* Python: Fix typing error "‘type’ object is not subscriptable" ([#1203](https://github.com/aws/glide-for-redis/pull/1203))
Expand Down
88 changes: 88 additions & 0 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub(crate) enum ExpectedReturnType {
ArrayOfArraysOfDoubleOrNull,
ArrayOfKeyValuePairs,
ZMPopReturnType,
KeyWithMemberAndScore,
}

pub(crate) fn convert_to_expected_type(
Expand Down Expand Up @@ -320,6 +321,28 @@ pub(crate) fn convert_to_expected_type(
)
.into()),
},
// Used by BZPOPMIN/BZPOPMAX, which return an array consisting of the key of the sorted set that was popped, the popped member, and its score.
// RESP2 returns the score as a string, but RESP3 returns the score as a double. Here we convert string scores into type double.
ExpectedReturnType::KeyWithMemberAndScore => match value {
Value::Nil => Ok(value),
Value::Array(ref array) if array.len() == 3 && matches!(array[2], Value::Double(_)) => {
Ok(value)
}
Value::Array(mut array)
if array.len() == 3
&& matches!(array[2], Value::BulkString(_) | Value::SimpleString(_)) =>
{
array[2] =
convert_to_expected_type(array[2].clone(), Some(ExpectedReturnType::Double))?;
Ok(Value::Array(array))
}
_ => Err((
ErrorKind::TypeError,
"Response couldn't be converted to an array containing a key, member, and score",
format!("(response was {:?})", value),
)
.into()),
},
}
}

Expand Down Expand Up @@ -454,6 +477,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
b"ZRANK" | b"ZREVRANK" => cmd
.position(b"WITHSCORE")
.map(|_| ExpectedReturnType::ZRankReturnType),
b"BZPOPMIN" | b"BZPOPMAX" => Some(ExpectedReturnType::KeyWithMemberAndScore),
b"SPOP" => {
if cmd.arg_idx(2).is_some() {
Some(ExpectedReturnType::Set)
Expand Down Expand Up @@ -815,6 +839,70 @@ mod tests {
));
}

#[test]
fn convert_bzpopmin_bzpopmax() {
assert!(matches!(
expected_type_for_cmd(
redis::cmd("BZPOPMIN")
.arg("myzset1")
.arg("myzset2")
.arg("1")
),
Some(ExpectedReturnType::KeyWithMemberAndScore)
));

assert!(matches!(
expected_type_for_cmd(
redis::cmd("BZPOPMAX")
.arg("myzset1")
.arg("myzset2")
.arg("1")
),
Some(ExpectedReturnType::KeyWithMemberAndScore)
));

let array_with_double_score = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::Double(2.0),
]);
let result = convert_to_expected_type(
array_with_double_score.clone(),
Some(ExpectedReturnType::KeyWithMemberAndScore),
)
.unwrap();
assert_eq!(array_with_double_score, result);

let array_with_string_score = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::BulkString(b"2.0".to_vec()),
]);
let result = convert_to_expected_type(
array_with_string_score.clone(),
Some(ExpectedReturnType::KeyWithMemberAndScore),
)
.unwrap();
assert_eq!(array_with_double_score, result);

let converted_nil_value =
convert_to_expected_type(Value::Nil, Some(ExpectedReturnType::KeyWithMemberAndScore))
.unwrap();
assert_eq!(Value::Nil, converted_nil_value);

let array_with_unexpected_length = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::Double(2.0),
Value::Double(2.0),
]);
assert!(convert_to_expected_type(
array_with_unexpected_length,
Some(ExpectedReturnType::KeyWithMemberAndScore)
)
.is_err());
}

#[test]
fn convert_zank_zrevrank_only_if_withsocres_is_included() {
assert!(matches!(
Expand Down
72 changes: 72 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2285,6 +2285,42 @@ async def zpopmax(
),
)

async def bzpopmax(
self, keys: List[str], timeout: float
) -> Optional[List[Union[str, float]]]:
"""
Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.
When in cluster mode, all keys must map to the same hash slot.
`BZPOPMAX` is the blocking variant of `ZPOPMAX`.
`BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmax for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Returns:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
Examples:
>>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0})
2 # Two elements have been added to the sorted set at "my_sorted_set1".
>>> await client.bzpopmax(["my_sorted_set1", "my_sorted_set2"], 0.5)
['my_sorted_set1', 'member1', 10.0] # "member1" with a score of 10.0 has been removed from "my_sorted_set1".
"""
return cast(
Optional[List[Union[str, float]]],
await self._execute_command(RequestType.BZPopMax, keys + [str(timeout)]),
)

async def zpopmin(
self, key: str, count: Optional[int] = None
) -> Mapping[str, float]:
Expand Down Expand Up @@ -2317,6 +2353,42 @@ async def zpopmin(
),
)

async def bzpopmin(
self, keys: List[str], timeout: float
) -> Optional[List[Union[str, float]]]:
"""
Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.
When in cluster mode, all keys must map to the same hash slot.
`BZPOPMIN` is the blocking variant of `ZPOPMIN`.
`BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmin for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Returns:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
Examples:
>>> await client.zadd("my_sorted_set1", {"member1": 10.0, "member2": 5.0})
2 # Two elements have been added to the sorted set at "my_sorted_set1".
>>> await client.bzpopmin(["my_sorted_set1", "my_sorted_set2"], 0.5)
['my_sorted_set1', 'member2', 5.0] # "member2" with a score of 5.0 has been removed from "my_sorted_set1".
"""
return cast(
Optional[List[Union[str, float]]],
await self._execute_command(RequestType.BZPopMin, keys + [str(timeout)]),
)

async def zrange(
self,
key: str,
Expand Down
46 changes: 46 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1624,6 +1624,29 @@ def zpopmax(
RequestType.ZPopMax, [key, str(count)] if count else [key]
)

def bzpopmax(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
"""
Pops the member with the highest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.
`BZPOPMAX` is the blocking variant of `ZPOPMAX`.
`BZPOPMAX` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmax for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Command response:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
"""
return self.append_command(RequestType.BZPopMax, keys + [str(timeout)])

def zpopmin(
self: TTransaction, key: str, count: Optional[int] = None
) -> TTransaction:
Expand All @@ -1647,6 +1670,29 @@ def zpopmin(
RequestType.ZPopMin, [key, str(count)] if count else [key]
)

def bzpopmin(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
"""
Pops the member with the lowest score from the first non-empty sorted set, with the given keys being checked in
the order that they are given. Blocks the connection when there are no members to remove from any of the given
sorted sets.
`BZPOPMIN` is the blocking variant of `ZPOPMIN`.
`BZPOPMIN` is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmin for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Command response:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
"""
return self.append_command(RequestType.BZPopMin, keys + [str(timeout)])

def zrange(
self: TTransaction,
key: str,
Expand Down
80 changes: 80 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1833,6 +1833,46 @@ async def test_zpopmin(self, redis_client: TRedisClient):

assert await redis_client.zpopmin("non_exisitng_key") == {}

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bzpopmin(self, redis_client: TRedisClient):
key1 = f"{{testKey}}:{get_random_string(10)}"
key2 = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:non_existing_key"

assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
assert await redis_client.bzpopmin([key1, key2], 0.5) == [key1, "a", 1.0]
assert await redis_client.bzpopmin([non_existing_key, key2], 0.5) == [
key2,
"c",
2.0,
]
assert await redis_client.bzpopmin(["non_existing_key"], 0.5) is None

# invalid argument - key list must not be empty
with pytest.raises(RequestError):
await redis_client.bzpopmin([], 0.5)

# key exists, but it is not a sorted set
assert await redis_client.set("foo", "value") == OK
with pytest.raises(RequestError):
await redis_client.bzpopmin(["foo"], 0.5)

# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.bzpopmin(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_bzpopmin_call():
await redis_client.bzpopmin(["non_existent_key"], 0)

# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_bzpopmin_call(), timeout=0.5)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zpopmax(self, redis_client: TRedisClient):
Expand All @@ -1852,6 +1892,46 @@ async def test_zpopmax(self, redis_client: TRedisClient):

assert await redis_client.zpopmax("non_exisitng_key") == {}

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bzpopmax(self, redis_client: TRedisClient):
key1 = f"{{testKey}}:{get_random_string(10)}"
key2 = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:non_existing_key"

assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
assert await redis_client.bzpopmax([key1, key2], 0.5) == [key1, "b", 1.5]
assert await redis_client.bzpopmax([non_existing_key, key2], 0.5) == [
key2,
"c",
2.0,
]
assert await redis_client.bzpopmax(["non_existing_key"], 0.5) is None

# invalid argument - key list must not be empty
with pytest.raises(RequestError):
await redis_client.bzpopmax([], 0.5)

# key exists, but it is not a sorted set
assert await redis_client.set("foo", "value") == OK
with pytest.raises(RequestError):
await redis_client.bzpopmax(["foo"], 0.5)

# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.bzpopmax(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_bzpopmax_call():
await redis_client.bzpopmax(["non_existent_key"], 0)

# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_bzpopmax_call(), timeout=0.5)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zrange_by_index(self, redis_client: TRedisClient):
Expand Down
12 changes: 8 additions & 4 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,16 @@ async def transaction_test(
args.append([2.0, 3.0])
transaction.zrangestore(key8, key8, RangeByIndex(0, -1))
args.append(3)
transaction.zpopmin(key8)
args.append({"two": 2.0})
transaction.bzpopmin([key8], 0.5)
args.append([key8, "two", 2.0])
transaction.bzpopmax([key8], 0.5)
args.append([key8, "four", 4.0])
transaction.zpopmax(key8)
args.append({"four": 4})
args.append({"three": 3.0})
transaction.zpopmin(key8)
args.append({})
transaction.zremrangebyscore(key8, InfBound.NEG_INF, InfBound.POS_INF)
args.append(1)
args.append(0)
transaction.zremrangebylex(key8, InfBound.NEG_INF, InfBound.POS_INF)
args.append(0)
transaction.zdiffstore(key8, [key8, key8])
Expand Down

0 comments on commit 8c590a7

Please sign in to comment.