Skip to content

Commit

Permalink
Python: add BZPOPMIN and BZPOPMAX commands (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-congo authored May 9, 2024
1 parent 2f21079 commit 624ee8d
Show file tree
Hide file tree
Showing 6 changed files with 279 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Python: Added XADD, XTRIM commands ([#1320](https://github.com/aws/glide-for-redis/pull/1320))
* Python: Added BLPOP and BRPOP commands ([#1369](https://github.com/aws/glide-for-redis/pull/1369))
* Python: Added ZRANGESTORE command ([#1377](https://github.com/aws/glide-for-redis/pull/1377))
* Python: Added BZPOPMIN and BZPOPMAX commands (TODO: add PR link)


#### Fixes
Expand Down
88 changes: 88 additions & 0 deletions glide-core/src/client/value_conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub(crate) enum ExpectedReturnType {
Lolwut,
ArrayOfArraysOfDoubleOrNull,
ArrayOfKeyValuePairs,
KeyWithMemberAndScore,
}

pub(crate) fn convert_to_expected_type(
Expand Down Expand Up @@ -287,6 +288,28 @@ pub(crate) fn convert_to_expected_type(
)
.into()),
},
// Used by BZPOPMIN/BZPOPMAX, which return an array consisting of the key of the sorted set that was popped, the popped member, and its score.
// RESP2 returns the score as a string, but RESP3 returns the score as a double. Here we convert string scores into type double.
ExpectedReturnType::KeyWithMemberAndScore => match value {
Value::Nil => Ok(value),
Value::Array(ref array) if array.len() == 3 && matches!(array[2], Value::Double(_)) => {
Ok(value)
}
Value::Array(mut array)
if array.len() == 3
&& matches!(array[2], Value::BulkString(_) | Value::SimpleString(_)) =>
{
array[2] =
convert_to_expected_type(array[2].clone(), Some(ExpectedReturnType::Double))?;
Ok(Value::Array(array))
}
_ => Err((
ErrorKind::TypeError,
"Response couldn't be converted to an array containing a key, member, and score",
format!("(response was {:?})", value),
)
.into()),
},
}
}

Expand Down Expand Up @@ -417,6 +440,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option<ExpectedReturnType> {
b"ZRANK" | b"ZREVRANK" => cmd
.position(b"WITHSCORE")
.map(|_| ExpectedReturnType::ZrankReturnType),
b"BZPOPMIN" | b"BZPOPMAX" => Some(ExpectedReturnType::KeyWithMemberAndScore),
b"SPOP" => {
if cmd.arg_idx(2).is_some() {
Some(ExpectedReturnType::Set)
Expand Down Expand Up @@ -726,6 +750,70 @@ mod tests {
));
}

#[test]
fn convert_bzpopmin_bzpopmax() {
assert!(matches!(
expected_type_for_cmd(
redis::cmd("BZPOPMIN")
.arg("myzset1")
.arg("myzset2")
.arg("1")
),
Some(ExpectedReturnType::KeyWithMemberAndScore)
));

assert!(matches!(
expected_type_for_cmd(
redis::cmd("BZPOPMAX")
.arg("myzset1")
.arg("myzset2")
.arg("1")
),
Some(ExpectedReturnType::KeyWithMemberAndScore)
));

let array_with_double_score = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::Double(2.0),
]);
let result = convert_to_expected_type(
array_with_double_score.clone(),
Some(ExpectedReturnType::KeyWithMemberAndScore),
)
.unwrap();
assert_eq!(array_with_double_score, result);

let array_with_string_score = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::BulkString(b"2.0".to_vec()),
]);
let result = convert_to_expected_type(
array_with_string_score.clone(),
Some(ExpectedReturnType::KeyWithMemberAndScore),
)
.unwrap();
assert_eq!(array_with_double_score, result);

let converted_nil_value =
convert_to_expected_type(Value::Nil, Some(ExpectedReturnType::KeyWithMemberAndScore))
.unwrap();
assert_eq!(Value::Nil, converted_nil_value);

let array_with_unexpected_length = Value::Array(vec![
Value::BulkString(b"key1".to_vec()),
Value::BulkString(b"member1".to_vec()),
Value::Double(2.0),
Value::Double(2.0),
]);
assert!(convert_to_expected_type(
array_with_unexpected_length,
Some(ExpectedReturnType::KeyWithMemberAndScore)
)
.is_err());
}

#[test]
fn convert_zank_zrevrank_only_if_withsocres_is_included() {
assert!(matches!(
Expand Down
66 changes: 66 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2293,6 +2293,39 @@ async def zpopmax(
),
)

async def bzpopmax(
self, keys: List[str], timeout: float
) -> Optional[List[Union[str, float]]]:
"""
Blocks the connection until it removes and returns a member with the highest score from the first non-empty
sorted set, with the given keys being checked in the order they are provided.
When in cluster mode, all keys must map to the same hash slot.
BZPOPMAX is the blocking variant of ZPOPMAX.
BZPOPMAX is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmax for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Returns:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
Examples:
>>> await client.bzpopmax(["my_sorted_set1", "my_sorted_set2"], 0.5)
['my_sorted_set1', 'member1', 10.0] # 'member1' with a score of 10.0 has been removed from 'my_sorted_set'.
"""
return cast(
Optional[List[Union[str, float]]],
await self._execute_command(RequestType.BZPopMax, keys + [str(timeout)]),
)

async def zpopmin(
self, key: str, count: Optional[int] = None
) -> Mapping[str, float]:
Expand Down Expand Up @@ -2325,6 +2358,39 @@ async def zpopmin(
),
)

async def bzpopmin(
self, keys: List[str], timeout: float
) -> Optional[List[Union[str, float]]]:
"""
Blocks the connection until it removes and returns a member with the lowest score from the first non-empty
sorted set, with the given keys being checked in the order they are provided.
When in cluster mode, all keys must map to the same hash slot.
BZPOPMIN is the blocking variant of ZPOPMIN.
BZPOPMIN is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmin for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Returns:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
Examples:
>>> await client.bzpopmin(["my_sorted_set1", "my_sorted_set2"], 0.5)
['my_sorted_set1', 'member1', 1.0] # 'member1' with a score of 1.0 has been removed from 'my_sorted_set'.
"""
return cast(
Optional[List[Union[str, float]]],
await self._execute_command(RequestType.BZPopMin, keys + [str(timeout)]),
)

async def zrange(
self,
key: str,
Expand Down
44 changes: 44 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,28 @@ def zpopmax(
RequestType.ZPopMax, [key, str(count)] if count else [key]
)

def bzpopmax(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
"""
Blocks the connection until it removes and returns a member with the highest score from the first non-empty
sorted set, with the given keys being checked in the order they are provided.
BZPOPMAX is the blocking variant of ZPOPMAX.
BZPOPMAX is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmax for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Command response:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
"""
return self.append_command(RequestType.BZPopMax, keys + [str(timeout)])

def zpopmin(
self: TTransaction, key: str, count: Optional[int] = None
) -> TTransaction:
Expand All @@ -1649,6 +1671,28 @@ def zpopmin(
RequestType.ZPopMin, [key, str(count)] if count else [key]
)

def bzpopmin(self: TTransaction, keys: List[str], timeout: float) -> TTransaction:
"""
Blocks the connection until it removes and returns a member with the lowest score from the first non-empty
sorted set, with the given keys being checked in the order they are provided.
BZPOPMIN is the blocking variant of ZPOPMIN.
BZPOPMIN is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices.
See https://valkey.io/commands/bzpopmin for more details.
Args:
keys (List[str]): The keys of the sorted sets.
timeout (float): The number of seconds to wait for a blocking operation to complete.
A value of 0 will block indefinitely.
Command response:
Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself,
and the member score. If no member could be popped and the `timeout` expired, returns None.
"""
return self.append_command(RequestType.BZPopMin, keys + [str(timeout)])

def zrange(
self: TTransaction,
key: str,
Expand Down
72 changes: 72 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,42 @@ async def test_zpopmin(self, redis_client: TRedisClient):

assert await redis_client.zpopmin("non_exisitng_key") == {}

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bzpopmin(self, redis_client: TRedisClient):
key1 = f"{{testKey}}:{get_random_string(10)}"
key2 = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:non_existing_key"

assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
assert await redis_client.bzpopmin([key1, key2], 0.5) == [key1, "a", 1.0]
assert await redis_client.bzpopmin([non_existing_key, key2], 0.5) == [
key2,
"c",
2.0,
]
assert await redis_client.bzpopmin(["non_existing_key"], 0.5) is None

# key exists, but it is not a sorted set
assert await redis_client.set("foo", "value") == OK
with pytest.raises(RequestError):
await redis_client.bzpopmin(["foo"], 0.5)

# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.bzpopmin(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_bzpopmin_call():
await redis_client.bzpopmin(["non_existent_key"], 0)

# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_bzpopmin_call(), timeout=0.5)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zpopmax(self, redis_client: TRedisClient):
Expand All @@ -1902,6 +1938,42 @@ async def test_zpopmax(self, redis_client: TRedisClient):

assert await redis_client.zpopmax("non_exisitng_key") == {}

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_bzpopmax(self, redis_client: TRedisClient):
key1 = f"{{testKey}}:{get_random_string(10)}"
key2 = f"{{testKey}}:{get_random_string(10)}"
non_existing_key = f"{{testKey}}:non_existing_key"

assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2
assert await redis_client.zadd(key2, {"c": 2.0}) == 1
assert await redis_client.bzpopmax([key1, key2], 0.5) == [key1, "b", 1.5]
assert await redis_client.bzpopmax([non_existing_key, key2], 0.5) == [
key2,
"c",
2.0,
]
assert await redis_client.bzpopmax(["non_existing_key"], 0.5) is None

# key exists, but it is not a sorted set
assert await redis_client.set("foo", "value") == OK
with pytest.raises(RequestError):
await redis_client.bzpopmax(["foo"], 0.5)

# same-slot requirement
if isinstance(redis_client, RedisClusterClient):
with pytest.raises(RequestError) as e:
await redis_client.bzpopmax(["abc", "zxy", "lkn"], 0.5)
assert "CrossSlot" in str(e)

async def endless_bzpopmax_call():
await redis_client.bzpopmax(["non_existent_key"], 0)

# bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to
# avoid having the test block forever
with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(endless_bzpopmax_call(), timeout=0.5)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_zrange_by_index(self, redis_client: TRedisClient):
Expand Down
12 changes: 8 additions & 4 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,12 +227,16 @@ async def transaction_test(
args.append([2.0, 3.0])
transaction.zrangestore(key8, key8, RangeByIndex(0, -1))
args.append(3)
transaction.zpopmin(key8)
args.append({"two": 2.0})
transaction.bzpopmin([key8], 0.5)
args.append([key8, "two", 2.0])
transaction.bzpopmax([key8], 0.5)
args.append([key8, "four", 4.0])
transaction.zpopmax(key8)
args.append({"four": 4})
args.append({"three": 3.0})
transaction.zpopmin(key8)
args.append({})
transaction.zremrangebyscore(key8, InfBound.NEG_INF, InfBound.POS_INF)
args.append(1)
args.append(0)
transaction.zremrangebylex(key8, InfBound.NEG_INF, InfBound.POS_INF)
args.append(0)

Expand Down

0 comments on commit 624ee8d

Please sign in to comment.