diff --git a/CHANGELOG.md b/CHANGELOG.md index 9c1d25ee20..6b7ab09912 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * Python: Added XADD, XTRIM commands ([#1320](https://github.com/aws/glide-for-redis/pull/1320)) * Python: Added BLPOP and BRPOP commands ([#1369](https://github.com/aws/glide-for-redis/pull/1369)) * Python: Added ZRANGESTORE command ([#1377](https://github.com/aws/glide-for-redis/pull/1377)) +* Python: Added BZPOPMIN and BZPOPMAX commands (TODO: add PR link) #### Fixes diff --git a/glide-core/src/client/value_conversion.rs b/glide-core/src/client/value_conversion.rs index 38b9a485f4..820bb117ab 100644 --- a/glide-core/src/client/value_conversion.rs +++ b/glide-core/src/client/value_conversion.rs @@ -21,6 +21,7 @@ pub(crate) enum ExpectedReturnType { Lolwut, ArrayOfArraysOfDoubleOrNull, ArrayOfKeyValuePairs, + KeyWithMemberAndScore, } pub(crate) fn convert_to_expected_type( @@ -287,6 +288,28 @@ pub(crate) fn convert_to_expected_type( ) .into()), }, + // Used by BZPOPMIN/BZPOPMAX, which return an array consisting of the key of the sorted set that was popped, the popped member, and its score. + // RESP2 returns the score as a string, but RESP3 returns the score as a double. Here we convert string scores into type double. + ExpectedReturnType::KeyWithMemberAndScore => match value { + Value::Nil => Ok(value), + Value::Array(ref array) if array.len() == 3 && matches!(array[2], Value::Double(_)) => { + Ok(value) + } + Value::Array(mut array) + if array.len() == 3 + && matches!(array[2], Value::BulkString(_) | Value::SimpleString(_)) => + { + array[2] = + convert_to_expected_type(array[2].clone(), Some(ExpectedReturnType::Double))?; + Ok(Value::Array(array)) + } + _ => Err(( + ErrorKind::TypeError, + "Response couldn't be converted to an array containing a key, member, and score", + format!("(response was {:?})", value), + ) + .into()), + }, } } @@ -417,6 +440,7 @@ pub(crate) fn expected_type_for_cmd(cmd: &Cmd) -> Option { b"ZRANK" | b"ZREVRANK" => cmd .position(b"WITHSCORE") .map(|_| ExpectedReturnType::ZrankReturnType), + b"BZPOPMIN" | b"BZPOPMAX" => Some(ExpectedReturnType::KeyWithMemberAndScore), b"SPOP" => { if cmd.arg_idx(2).is_some() { Some(ExpectedReturnType::Set) @@ -726,6 +750,70 @@ mod tests { )); } + #[test] + fn convert_bzpopmin_bzpopmax() { + assert!(matches!( + expected_type_for_cmd( + redis::cmd("BZPOPMIN") + .arg("myzset1") + .arg("myzset2") + .arg("1") + ), + Some(ExpectedReturnType::KeyWithMemberAndScore) + )); + + assert!(matches!( + expected_type_for_cmd( + redis::cmd("BZPOPMAX") + .arg("myzset1") + .arg("myzset2") + .arg("1") + ), + Some(ExpectedReturnType::KeyWithMemberAndScore) + )); + + let array_with_double_score = Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::BulkString(b"member1".to_vec()), + Value::Double(2.0), + ]); + let result = convert_to_expected_type( + array_with_double_score.clone(), + Some(ExpectedReturnType::KeyWithMemberAndScore), + ) + .unwrap(); + assert_eq!(array_with_double_score, result); + + let array_with_string_score = Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::BulkString(b"member1".to_vec()), + Value::BulkString(b"2.0".to_vec()), + ]); + let result = convert_to_expected_type( + array_with_string_score.clone(), + Some(ExpectedReturnType::KeyWithMemberAndScore), + ) + .unwrap(); + assert_eq!(array_with_double_score, result); + + let converted_nil_value = + convert_to_expected_type(Value::Nil, Some(ExpectedReturnType::KeyWithMemberAndScore)) + .unwrap(); + assert_eq!(Value::Nil, converted_nil_value); + + let array_with_unexpected_length = Value::Array(vec![ + Value::BulkString(b"key1".to_vec()), + Value::BulkString(b"member1".to_vec()), + Value::Double(2.0), + Value::Double(2.0), + ]); + assert!(convert_to_expected_type( + array_with_unexpected_length, + Some(ExpectedReturnType::KeyWithMemberAndScore) + ) + .is_err()); + } + #[test] fn convert_zank_zrevrank_only_if_withsocres_is_included() { assert!(matches!( diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 8bebfd8abc..d4f64d97be 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -2293,6 +2293,39 @@ async def zpopmax( ), ) + async def bzpopmax( + self, keys: List[str], timeout: float + ) -> Optional[List[Union[str, float]]]: + """ + Blocks the connection until it removes and returns a member with the highest score from the first non-empty + sorted set, with the given keys being checked in the order they are provided. + + When in cluster mode, all keys must map to the same hash slot. + + BZPOPMAX is the blocking variant of ZPOPMAX. + + BZPOPMAX is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmax for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Returns: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + + Examples: + >>> await client.bzpopmax(["my_sorted_set1", "my_sorted_set2"], 0.5) + ['my_sorted_set1', 'member1', 10.0] # 'member1' with a score of 10.0 has been removed from 'my_sorted_set'. + """ + return cast( + Optional[List[Union[str, float]]], + await self._execute_command(RequestType.BZPopMax, keys + [str(timeout)]), + ) + async def zpopmin( self, key: str, count: Optional[int] = None ) -> Mapping[str, float]: @@ -2325,6 +2358,39 @@ async def zpopmin( ), ) + async def bzpopmin( + self, keys: List[str], timeout: float + ) -> Optional[List[Union[str, float]]]: + """ + Blocks the connection until it removes and returns a member with the lowest score from the first non-empty + sorted set, with the given keys being checked in the order they are provided. + + When in cluster mode, all keys must map to the same hash slot. + + BZPOPMIN is the blocking variant of ZPOPMIN. + + BZPOPMIN is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmin for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Returns: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + + Examples: + >>> await client.bzpopmin(["my_sorted_set1", "my_sorted_set2"], 0.5) + ['my_sorted_set1', 'member1', 1.0] # 'member1' with a score of 1.0 has been removed from 'my_sorted_set'. + """ + return cast( + Optional[List[Union[str, float]]], + await self._execute_command(RequestType.BZPopMin, keys + [str(timeout)]), + ) + async def zrange( self, key: str, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 6e6efe7f6a..2cd3184202 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -1626,6 +1626,28 @@ def zpopmax( RequestType.ZPopMax, [key, str(count)] if count else [key] ) + def bzpopmax(self: TTransaction, keys: List[str], timeout: float) -> TTransaction: + """ + Blocks the connection until it removes and returns a member with the highest score from the first non-empty + sorted set, with the given keys being checked in the order they are provided. + + BZPOPMAX is the blocking variant of ZPOPMAX. + + BZPOPMAX is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmax for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Command response: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + """ + return self.append_command(RequestType.BZPopMax, keys + [str(timeout)]) + def zpopmin( self: TTransaction, key: str, count: Optional[int] = None ) -> TTransaction: @@ -1649,6 +1671,28 @@ def zpopmin( RequestType.ZPopMin, [key, str(count)] if count else [key] ) + def bzpopmin(self: TTransaction, keys: List[str], timeout: float) -> TTransaction: + """ + Blocks the connection until it removes and returns a member with the lowest score from the first non-empty + sorted set, with the given keys being checked in the order they are provided. + + BZPOPMIN is the blocking variant of ZPOPMIN. + + BZPOPMIN is a client blocking command, see https://github.com/aws/glide-for-redis/wiki/General-Concepts#blocking-commands for more details and best practices. + + See https://valkey.io/commands/bzpopmin for more details. + + Args: + keys (List[str]): The keys of the sorted sets. + timeout (float): The number of seconds to wait for a blocking operation to complete. + A value of 0 will block indefinitely. + + Command response: + Optional[List[Union[str, float]]]: An array containing the key where the member was popped out, the member itself, + and the member score. If no member could be popped and the `timeout` expired, returns None. + """ + return self.append_command(RequestType.BZPopMin, keys + [str(timeout)]) + def zrange( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index 0826519387..d6dfe7d05e 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -1887,6 +1887,42 @@ async def test_zpopmin(self, redis_client: TRedisClient): assert await redis_client.zpopmin("non_exisitng_key") == {} + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_bzpopmin(self, redis_client: TRedisClient): + key1 = f"{{testKey}}:{get_random_string(10)}" + key2 = f"{{testKey}}:{get_random_string(10)}" + non_existing_key = f"{{testKey}}:non_existing_key" + + assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2 + assert await redis_client.zadd(key2, {"c": 2.0}) == 1 + assert await redis_client.bzpopmin([key1, key2], 0.5) == [key1, "a", 1.0] + assert await redis_client.bzpopmin([non_existing_key, key2], 0.5) == [ + key2, + "c", + 2.0, + ] + assert await redis_client.bzpopmin(["non_existing_key"], 0.5) is None + + # key exists, but it is not a sorted set + assert await redis_client.set("foo", "value") == OK + with pytest.raises(RequestError): + await redis_client.bzpopmin(["foo"], 0.5) + + # same-slot requirement + if isinstance(redis_client, RedisClusterClient): + with pytest.raises(RequestError) as e: + await redis_client.bzpopmin(["abc", "zxy", "lkn"], 0.5) + assert "CrossSlot" in str(e) + + async def endless_bzpopmin_call(): + await redis_client.bzpopmin(["non_existent_key"], 0) + + # bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to + # avoid having the test block forever + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(endless_bzpopmin_call(), timeout=0.5) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_zpopmax(self, redis_client: TRedisClient): @@ -1902,6 +1938,42 @@ async def test_zpopmax(self, redis_client: TRedisClient): assert await redis_client.zpopmax("non_exisitng_key") == {} + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_bzpopmax(self, redis_client: TRedisClient): + key1 = f"{{testKey}}:{get_random_string(10)}" + key2 = f"{{testKey}}:{get_random_string(10)}" + non_existing_key = f"{{testKey}}:non_existing_key" + + assert await redis_client.zadd(key1, {"a": 1.0, "b": 1.5}) == 2 + assert await redis_client.zadd(key2, {"c": 2.0}) == 1 + assert await redis_client.bzpopmax([key1, key2], 0.5) == [key1, "b", 1.5] + assert await redis_client.bzpopmax([non_existing_key, key2], 0.5) == [ + key2, + "c", + 2.0, + ] + assert await redis_client.bzpopmax(["non_existing_key"], 0.5) is None + + # key exists, but it is not a sorted set + assert await redis_client.set("foo", "value") == OK + with pytest.raises(RequestError): + await redis_client.bzpopmax(["foo"], 0.5) + + # same-slot requirement + if isinstance(redis_client, RedisClusterClient): + with pytest.raises(RequestError) as e: + await redis_client.bzpopmax(["abc", "zxy", "lkn"], 0.5) + assert "CrossSlot" in str(e) + + async def endless_bzpopmax_call(): + await redis_client.bzpopmax(["non_existent_key"], 0) + + # bzpopmax is called against a non-existing key with no timeout, but we wrap the call in an asyncio timeout to + # avoid having the test block forever + with pytest.raises(asyncio.TimeoutError): + await asyncio.wait_for(endless_bzpopmax_call(), timeout=0.5) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_zrange_by_index(self, redis_client: TRedisClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 555c14d75e..5e6fa9638e 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -227,12 +227,16 @@ async def transaction_test( args.append([2.0, 3.0]) transaction.zrangestore(key8, key8, RangeByIndex(0, -1)) args.append(3) - transaction.zpopmin(key8) - args.append({"two": 2.0}) + transaction.bzpopmin([key8], 0.5) + args.append([key8, "two", 2.0]) + transaction.bzpopmax([key8], 0.5) + args.append([key8, "four", 4.0]) transaction.zpopmax(key8) - args.append({"four": 4}) + args.append({"three": 3.0}) + transaction.zpopmin(key8) + args.append({}) transaction.zremrangebyscore(key8, InfBound.NEG_INF, InfBound.POS_INF) - args.append(1) + args.append(0) transaction.zremrangebylex(key8, InfBound.NEG_INF, InfBound.POS_INF) args.append(0)