Skip to content

Commit

Permalink
Python: add XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands
Browse files Browse the repository at this point in the history
  • Loading branch information
aaron-congo committed Jun 25, 2024
1 parent 51ffa3b commit ed73149
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* Python: Added XREVRANGE command ([#1625](https://github.com/aws/glide-for-redis/pull/1625))
* Python: Added XREAD command ([#1644](https://github.com/aws/glide-for-redis/pull/1644))
* Python: Added XGROUP CREATE and XGROUP DESTROY commands ([#1646](https://github.com/aws/glide-for-redis/pull/1646))
* Python: Added XGROUP CREATECONSUMER and XGROUP DELCONSUMER commands ([#1658](https://github.com/aws/glide-for-redis/pull/1658))

### Breaking Changes
* Node: Update XREAD to return a Map of Map ([#1494](https://github.com/aws/glide-for-redis/pull/1494))
Expand Down
54 changes: 54 additions & 0 deletions python/python/glide/async_commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2862,6 +2862,60 @@ async def xgroup_destroy(self, key: str, group_name: str) -> bool:
await self._execute_command(RequestType.XGroupDestroy, [key, group_name]),
)

async def xgroup_create_consumer(
self, key: str, group_name: str, consumer: str
) -> bool:
"""
Creates a consumer named `consumer` in the consumer group `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-createconsumer for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
consumer (str): The newly created consumer.
Returns:
bool: True if the consumer is created. Otherwise, returns False.
Examples:
>>> await client.xgroup_create_consumer("mystream", "mygroup", "myconsumer")
True # The consumer "myconsumer" was created in consumer group "mygroup" for the stream "mystream".
"""
return cast(
bool,
await self._execute_command(
RequestType.XGroupCreateConsumer, [key, group_name, consumer]
),
)

async def xgroup_del_consumer(
self, key: str, group_name: str, consumer: str
) -> int:
"""
Deletes a consumer named `consumer` in the consumer group `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-delconsumer for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
consumer (str): The consumer to delete.
Returns:
int: The number of pending messages the `consumer` had before it was deleted.
Examples:
>>> await client.xgroup_del_consumer("mystream", "mygroup", "myconsumer")
5 # Consumer "myconsumer" was deleted, and had 5 pending messages unclaimed.
"""
return cast(
int,
await self._execute_command(
RequestType.XGroupDelConsumer, [key, group_name, consumer]
),
)

async def geoadd(
self,
key: str,
Expand Down
40 changes: 40 additions & 0 deletions python/python/glide/async_commands/transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -1999,6 +1999,46 @@ def xgroup_destroy(self: TTransaction, key: str, group_name: str) -> TTransactio
"""
return self.append_command(RequestType.XGroupDestroy, [key, group_name])

def xgroup_create_consumer(
self: TTransaction, key: str, group_name: str, consumer: str
) -> TTransaction:
"""
Creates a consumer named `consumer` in the consumer group `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-createconsumer for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
consumer (str): The newly created consumer.
Command response:
bool: True if the consumer is created. Otherwise, returns False.
"""
return self.append_command(
RequestType.XGroupCreateConsumer, [key, group_name, consumer]
)

def xgroup_del_consumer(
self: TTransaction, key: str, group_name: str, consumer: str
) -> TTransaction:
"""
Deletes a consumer named `consumer` in the consumer group `group_name` for the stream stored at `key`.
See https://valkey.io/commands/xgroup-delconsumer for more details.
Args:
key (str): The key of the stream.
group_name (str): The consumer group name.
consumer (str): The consumer to delete.
Command response:
int: The number of pending messages the `consumer` had before it was deleted.
"""
return self.append_command(
RequestType.XGroupDelConsumer, [key, group_name, consumer]
)

def geoadd(
self: TTransaction,
key: str,
Expand Down
65 changes: 65 additions & 0 deletions python/python/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5153,6 +5153,71 @@ async def test_xgroup_create_xgroup_destroy(
with pytest.raises(RequestError):
await redis_client.xgroup_destroy(string_key, group_name1)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_xgroup_create_consumer_xgroup_del_consumer(
self, redis_client: TRedisClient, cluster_mode, protocol, request
):
key = get_random_string(10)
non_existing_key = get_random_string(10)
string_key = get_random_string(10)
group_name = get_random_string(10)
consumer = get_random_string(10)
stream_id0 = "0"

# create group and consumer for the group
assert (
await redis_client.xgroup_create(
key, group_name, stream_id0, StreamGroupOptions(make_stream=True)
)
== OK
)
assert (
await redis_client.xgroup_create_consumer(key, group_name, consumer) is True
)

# attempting to create/delete a consumer for a group that does not exist results in a NOGROUP request error
with pytest.raises(RequestError):
await redis_client.xgroup_create_consumer(
key, "non_existing_group", consumer
)
with pytest.raises(RequestError):
await redis_client.xgroup_del_consumer(key, "non_existing_group", consumer)

# attempt to create consumer for group again
assert (
await redis_client.xgroup_create_consumer(key, group_name, consumer)
is False
)

# attempting to delete a consumer that has not been created yet returns 0
assert (
await redis_client.xgroup_del_consumer(
key, group_name, "non_existing_consumer"
)
== 0
)

# TODO: use XREADGROUP to mark pending messages for the consumer so that we get non-zero return
assert await redis_client.xgroup_del_consumer(key, group_name, consumer) == 0

# attempting to call XGROUP CREATECONSUMER or XGROUP DELCONSUMER with a non-existing key should raise an error
with pytest.raises(RequestError):
await redis_client.xgroup_create_consumer(
non_existing_key, group_name, consumer
)
with pytest.raises(RequestError):
await redis_client.xgroup_del_consumer(
non_existing_key, group_name, consumer
)

# key exists, but it is not a stream
assert await redis_client.set(string_key, "foo") == OK
with pytest.raises(RequestError):
await redis_client.xgroup_create_consumer(string_key, group_name, consumer)
with pytest.raises(RequestError):
await redis_client.xgroup_del_consumer(string_key, group_name, consumer)

@pytest.mark.parametrize("cluster_mode", [True, False])
@pytest.mark.parametrize("protocol", [ProtocolVersion.RESP2, ProtocolVersion.RESP3])
async def test_pfadd(self, redis_client: TRedisClient):
Expand Down
5 changes: 5 additions & 0 deletions python/python/tests/test_transaction.py
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,17 @@ async def transaction_test(

group_name1 = get_random_string(10)
group_name2 = get_random_string(10)
consumer = get_random_string(10)
transaction.xgroup_create(key11, group_name1, "0-0")
args.append(OK)
transaction.xgroup_create(
key11, group_name2, "0-0", StreamGroupOptions(make_stream=True)
)
args.append(OK)
transaction.xgroup_create_consumer(key11, group_name1, consumer)
args.append(True)
transaction.xgroup_del_consumer(key11, group_name1, consumer)
args.append(0)
transaction.xgroup_destroy(key11, group_name1)
args.append(True)
transaction.xgroup_destroy(key11, group_name2)
Expand Down

0 comments on commit ed73149

Please sign in to comment.