diff --git a/CHANGELOG.md b/CHANGELOG.md index 54e6b6b29a..ee1e877d58 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,9 +53,9 @@ * Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625)) * Python: Added XREAD command ([#1644](https://github.com/aws/glide-for-redis/pull/1644)) * Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646)) +* Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658)) * Python: Added LOLWUT command ([#1657](https://github.com/aws/glide-for-redis/pull/1657)) - ### Breaking Changes * Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494)) diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index 5b8dec4a6a..2be433c1f1 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -2862,6 +2862,60 @@ async def xgroup_destroy(self, key: str, group_name: str) -> bool: await self._execute_command(RequestType.XGroupDestroy, [key, group_name]), ) + async def xgroup_create_consumer( + self, key: str, group_name: str, consumer_name: str + ) -> bool: + """ + Creates a consumer named `consumer_name` in the consumer group `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-createconsumer for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The newly created consumer. + + Returns: + bool: True if the consumer is created. Otherwise, returns False. + + Examples: + >>> await client.xgroup_create_consumer("mystream", "mygroup", "myconsumer") + True # The consumer "myconsumer" was created in consumer group "mygroup" for the stream "mystream". + """ + return cast( + bool, + await self._execute_command( + RequestType.XGroupCreateConsumer, [key, group_name, consumer_name] + ), + ) + + async def xgroup_del_consumer( + self, key: str, group_name: str, consumer_name: str + ) -> int: + """ + Deletes a consumer named `consumer_name` in the consumer group `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-delconsumer for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The consumer to delete. + + Returns: + int: The number of pending messages the `consumer` had before it was deleted. + + Examples: + >>> await client.xgroup_del_consumer("mystream", "mygroup", "myconsumer") + 5 # Consumer "myconsumer" was deleted, and had 5 pending messages unclaimed. + """ + return cast( + int, + await self._execute_command( + RequestType.XGroupDelConsumer, [key, group_name, consumer_name] + ), + ) + async def geoadd( self, key: str, diff --git a/python/python/glide/async_commands/transaction.py b/python/python/glide/async_commands/transaction.py index 829b6744cb..1a821d8305 100644 --- a/python/python/glide/async_commands/transaction.py +++ b/python/python/glide/async_commands/transaction.py @@ -1999,6 +1999,46 @@ def xgroup_destroy(self: TTransaction, key: str, group_name: str) -> TTransactio """ return self.append_command(RequestType.XGroupDestroy, [key, group_name]) + def xgroup_create_consumer( + self: TTransaction, key: str, group_name: str, consumer_name: str + ) -> TTransaction: + """ + Creates a consumer named `consumer_name` in the consumer group `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-createconsumer for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The newly created consumer. + + Command response: + bool: True if the consumer is created. Otherwise, returns False. + """ + return self.append_command( + RequestType.XGroupCreateConsumer, [key, group_name, consumer_name] + ) + + def xgroup_del_consumer( + self: TTransaction, key: str, group_name: str, consumer_name: str + ) -> TTransaction: + """ + Deletes a consumer named `consumer_name` in the consumer group `group_name` for the stream stored at `key`. + + See https://valkey.io/commands/xgroup-delconsumer for more details. + + Args: + key (str): The key of the stream. + group_name (str): The consumer group name. + consumer_name (str): The consumer to delete. + + Command response: + int: The number of pending messages the `consumer` had before it was deleted. + """ + return self.append_command( + RequestType.XGroupDelConsumer, [key, group_name, consumer_name] + ) + def geoadd( self: TTransaction, key: str, diff --git a/python/python/tests/test_async_client.py b/python/python/tests/test_async_client.py index b74f01e2a8..26f54849a6 100644 --- a/python/python/tests/test_async_client.py +++ b/python/python/tests/test_async_client.py @@ -5153,6 +5153,71 @@ async def test_xgroup_create_xgroup_destroy( with pytest.raises(RequestError): await redis_client.xgroup_destroy(string_key, group_name1) + @pytest.mark.parametrize("cluster_mode", [True, False]) + @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) + async def test_xgroup_create_consumer_xgroup_del_consumer( + self, redis_client: TRedisClient, cluster_mode, protocol, request + ): + key = get_random_string(10) + non_existing_key = get_random_string(10) + string_key = get_random_string(10) + group_name = get_random_string(10) + consumer = get_random_string(10) + stream_id0 = "0" + + # create group and consumer for the group + assert ( + await redis_client.xgroup_create( + key, group_name, stream_id0, StreamGroupOptions(make_stream=True) + ) + == OK + ) + assert ( + await redis_client.xgroup_create_consumer(key, group_name, consumer) is True + ) + + # attempting to create/delete a consumer for a group that does not exist results in a NOGROUP request error + with pytest.raises(RequestError): + await redis_client.xgroup_create_consumer( + key, "non_existing_group", consumer + ) + with pytest.raises(RequestError): + await redis_client.xgroup_del_consumer(key, "non_existing_group", consumer) + + # attempt to create consumer for group again + assert ( + await redis_client.xgroup_create_consumer(key, group_name, consumer) + is False + ) + + # attempting to delete a consumer that has not been created yet returns 0 + assert ( + await redis_client.xgroup_del_consumer( + key, group_name, "non_existing_consumer" + ) + == 0 + ) + + # TODO: use XREADGROUP to mark pending messages for the consumer so that we get non-zero return + assert await redis_client.xgroup_del_consumer(key, group_name, consumer) == 0 + + # attempting to call XGROUP CREATECONSUMER or XGROUP DELCONSUMER with a non-existing key should raise an error + with pytest.raises(RequestError): + await redis_client.xgroup_create_consumer( + non_existing_key, group_name, consumer + ) + with pytest.raises(RequestError): + await redis_client.xgroup_del_consumer( + non_existing_key, group_name, consumer + ) + + # key exists, but it is not a stream + assert await redis_client.set(string_key, "foo") == OK + with pytest.raises(RequestError): + await redis_client.xgroup_create_consumer(string_key, group_name, consumer) + with pytest.raises(RequestError): + await redis_client.xgroup_del_consumer(string_key, group_name, consumer) + @pytest.mark.parametrize("cluster_mode", [True, False]) @pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3]) async def test_pfadd(self, redis_client: TRedisClient): diff --git a/python/python/tests/test_transaction.py b/python/python/tests/test_transaction.py index 060825a3e2..aa7641a301 100644 --- a/python/python/tests/test_transaction.py +++ b/python/python/tests/test_transaction.py @@ -490,12 +490,17 @@ async def transaction_test( group_name1 = get_random_string(10) group_name2 = get_random_string(10) + consumer = get_random_string(10) transaction.xgroup_create(key11, group_name1, "0-0") args.append(OK) transaction.xgroup_create( key11, group_name2, "0-0", StreamGroupOptions(make_stream=True) ) args.append(OK) + transaction.xgroup_create_consumer(key11, group_name1, consumer) + args.append(True) + transaction.xgroup_del_consumer(key11, group_name1, consumer) + args.append(0) transaction.xgroup_destroy(key11, group_name1) args.append(True) transaction.xgroup_destroy(key11, group_name2)