From 3e939cf9e0094e297e5da156f832e2e042f907a5 Mon Sep 17 00:00:00 2001 From: ikolomi Date: Tue, 25 Jun 2024 15:27:33 +0300 Subject: [PATCH] Pubsub fixups: 1. Logging instead of exceptions on the pending futures due to disconnect notifications. 2. Typos fixes --- glide-core/src/socket_listener.rs | 2 +- .../glide/async_commands/cluster_commands.py | 2 +- python/python/glide/async_commands/core.py | 5 ++-- .../async_commands/standalone_commands.py | 2 +- python/python/glide/config.py | 2 +- python/python/glide/redis_client.py | 24 +++++++++---------- 6 files changed, 18 insertions(+), 19 deletions(-) diff --git a/glide-core/src/socket_listener.rs b/glide-core/src/socket_listener.rs index 2c9f91d753..99abe9f236 100644 --- a/glide-core/src/socket_listener.rs +++ b/glide-core/src/socket_listener.rs @@ -527,7 +527,7 @@ async fn push_manager_loop(mut push_rx: mpsc::UnboundedReceiver, write let result = push_rx.recv().await; match result { None => { - log_trace("push manager loop", "got None as from push manager"); + log_error("push manager loop", "got None from push manager"); return; } Some(push_msg) => { diff --git a/python/python/glide/async_commands/cluster_commands.py b/python/python/glide/async_commands/cluster_commands.py index 65957bc5f7..a30ca943cc 100644 --- a/python/python/glide/async_commands/cluster_commands.py +++ b/python/python/glide/async_commands/cluster_commands.py @@ -460,7 +460,7 @@ async def sort_store( async def publish(self, message: str, channel: str, sharded: bool = False) -> int: """ - Publish message on pubsub channel. + Publish a message on pubsub channel. This command aggregates PUBLISH and SPUBLISH commands functionalities. The mode is selected using the 'sharded' parameter See https://valkey.io/commands/publish and https://valkey.io/commands/spublish for more details. diff --git a/python/python/glide/async_commands/core.py b/python/python/glide/async_commands/core.py index f509d647bc..5d3840893b 100644 --- a/python/python/glide/async_commands/core.py +++ b/python/python/glide/async_commands/core.py @@ -5056,7 +5056,8 @@ async def getex( @dataclass class PubSubMsg: - """Describes incoming pubsub message + """ + Describes the incoming pubsub message Attributes: message (str): Incoming message. @@ -5087,7 +5088,7 @@ async def get_pubsub_message(self) -> PubSubMsg: def try_get_pubsub_message(self) -> Optional[PubSubMsg]: """ - Tries to returns the next pubsub message. + Tries to return the next pubsub message. Throws WrongConfiguration in cases: 1. No pubsub subscriptions are configured for the client 2. Callback is configured with the pubsub subsciptions diff --git a/python/python/glide/async_commands/standalone_commands.py b/python/python/glide/async_commands/standalone_commands.py index dbab238b60..7f006d39dc 100644 --- a/python/python/glide/async_commands/standalone_commands.py +++ b/python/python/glide/async_commands/standalone_commands.py @@ -418,7 +418,7 @@ async def sort_store( async def publish(self, message: str, channel: str) -> TOK: """ - Publish message on pubsub channel. + Publish a message on pubsub channel. See https://valkey.io/commands/publish for more details. Args: diff --git a/python/python/glide/config.py b/python/python/glide/config.py index b20f5304d0..0372d8ff39 100644 --- a/python/python/glide/config.py +++ b/python/python/glide/config.py @@ -386,7 +386,7 @@ class PubSubSubscriptions: channels_and_patterns (Dict[ClusterClientConfiguration.PubSubChannelModes, Set[str]]): Channels and patterns by modes. callback (Optional[Callable[[CoreCommands.PubSubMsg, Any], None]]): - Optional callback to accept the incomming messages. + Optional callback to accept the incoming messages. context (Any): Arbitrary context to pass to the callback. """ diff --git a/python/python/glide/redis_client.py b/python/python/glide/redis_client.py index edb9fd4122..953a9fef8b 100644 --- a/python/python/glide/redis_client.py +++ b/python/python/glide/redis_client.py @@ -263,7 +263,7 @@ async def get_pubsub_message(self) -> CoreCommands.PubSubMsg: if not self.config._is_pubsub_configured(): raise WrongConfiguration( - "The operation will never complete since there was no pubsbub subscriptions applied to the client." + "The operation will never complete since there was no pubsub subscriptions applied to the client." ) if self.config._get_pubsub_callback_and_context()[0] is not None: @@ -323,13 +323,11 @@ def _notification_to_pubsub_message_safe( Dict[str, Any], value_from_pointer(response.resp_pointer) ) message_kind = push_notification["kind"] - if message_kind == "Disconnect": - # cancel all futures since we dont know how many (if any) messages wont arrive - # TODO: consider cancelling a single future - self._cancel_pubsub_futures_with_exception_safe( - ConnectionError( - "Warning, transport disconnect occured, messages might be lost" - ) + if message_kind == "Disconnection": + ClientLogger.log( + LogLevel.WARN, + "disconnect notification", + "Transport disconnected, messages might be lost", ) elif ( message_kind == "Message" @@ -353,11 +351,11 @@ def _notification_to_pubsub_message_safe( ): pass else: - err_msg = f"Unsupported push message: '{message_kind}'" - ClientLogger.log(LogLevel.ERROR, "pubsub message", err_msg) - # cancel all futures since its a serious - # TODO: consider cancelling a single future - self._cancel_pubsub_futures_with_exception_safe(ConnectionError(err_msg)) + ClientLogger.log( + LogLevel.WARN, + "unknown notification", + f"Unknown notification message: '{message_kind}'", + ) return pubsub_message