From 7372652e8d07a2946a213edabc7b6a54ca892635 Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Wed, 25 Sep 2024 21:07:58 +0200 Subject: [PATCH 1/8] Expoential as a default reconnection policy --- .../native_threads/subscribe_with_retry.py | 50 +++++++++ .../pubnub_asyncio/subscribe_with_retry.py | 59 ++++++++++ pubnub/event_engine/effects.py | 27 +++-- pubnub/managers.py | 44 +++++--- pubnub/pnconfiguration.py | 2 +- pubnub/pubnub.py | 10 ++ tests/integrational/asyncio/test_subscribe.py | 103 ++++++++++++++++++ .../native_threads/test_subscribe.py | 82 ++++++++++++++ tests/unit/test_reconnection_manager.py | 50 +++++++++ 9 files changed, 398 insertions(+), 29 deletions(-) create mode 100644 examples/native_threads/subscribe_with_retry.py create mode 100644 examples/pubnub_asyncio/subscribe_with_retry.py create mode 100644 tests/unit/test_reconnection_manager.py diff --git a/examples/native_threads/subscribe_with_retry.py b/examples/native_threads/subscribe_with_retry.py new file mode 100644 index 00000000..5c1b43d8 --- /dev/null +++ b/examples/native_threads/subscribe_with_retry.py @@ -0,0 +1,50 @@ +import logging +import sys +import time + +from pubnub.pnconfiguration import PNConfiguration +from pubnub.pubnub import PubNub, SubscribeListener +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory + + +class TestListener(SubscribeListener): + status_result = None + disconnected = False + + def status(self, pubnub, status): + if status.category == PNStatusCategory.PNDisconnectedCategory: + print('Could not connect. Exiting...') + self.disconnected = True + + def message(self, pubnub, message): + print(f'Message:\n{message.__dict__}') + + def presence(self, pubnub, presence): + print(f'Presence:\n{presence.__dict__}') + + +logger = logging.getLogger("pubnub") +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +logger.addHandler(handler) + + +config = PNConfiguration() +config.subscribe_key = "demo" +config.publish_key = "demo" +config.user_id = 'example' +config.enable_subscribe = True +config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL +config.origin = '127.0.0.1' +config.ssl = False + +listener = TestListener() + +pubnub = PubNub(config) +pubnub.add_listener(listener) +sub = pubnub.subscribe().channels(['example']).execute() + +while not listener.disconnected: + time.sleep(0.5) +print('Disconnected. Bye.') diff --git a/examples/pubnub_asyncio/subscribe_with_retry.py b/examples/pubnub_asyncio/subscribe_with_retry.py new file mode 100644 index 00000000..15e65e3d --- /dev/null +++ b/examples/pubnub_asyncio/subscribe_with_retry.py @@ -0,0 +1,59 @@ +import asyncio +import logging +import sys + +from pubnub.callbacks import SubscribeCallback +from pubnub.models.consumer.common import PNStatus +from pubnub.pubnub_asyncio import PubNubAsyncio +from pubnub.pnconfiguration import PNConfiguration +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory + +config = PNConfiguration() +config.subscribe_key = "demo" +config.publish_key = "demo" +config.enable_subscribe = True +config.uuid = "test-uuid" +config.origin = "127.0.0.1" +config.ssl = False +config.reconnect_policy = PNReconnectionPolicy.NONE + +pubnub = PubNubAsyncio(config) + +logger = logging.getLogger("pubnub") +logger.setLevel(logging.WARNING) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.WARNING) +logger.addHandler(handler) + + +class SampleCallback(SubscribeCallback): + message_result = None + status_result = None + presence_result = None + + def status(self, pubnub, status): + self.status_result = status + + def message(self, pubnub, message): + self.message_result = message + + def presence(self, pubnub, presence): + self.presence_result = presence + + +async def main(): + listener = SampleCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + print('Could not connect. Exiting...') + break + await asyncio.sleep(1) + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index 04f5b760..dccc4026 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -1,6 +1,5 @@ import asyncio import logging -import math from typing import Optional, Union from pubnub.endpoints.presence.heartbeat import Heartbeat @@ -14,6 +13,7 @@ from pubnub.event_engine.models import events, invocations from pubnub.models.consumer.common import PNStatus from pubnub.workers import BaseMessageWorker +from pubnub.managers import LinearDelay, ExponentialDelay class Effect: @@ -59,11 +59,9 @@ def get_new_stop_event(self): def calculate_reconnection_delay(self, attempts): if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: - delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1) + return ExponentialDelay.calculate(attempts - 1) else: - delay = self.interval - - return delay + return LinearDelay.calculate(attempts - 1) class HandshakeEffect(Effect): @@ -157,10 +155,13 @@ def __init__(self, pubnub_instance, event_engine_instance, invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy - self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL - self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF - self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF + + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + self.max_retry_attempts = ExponentialDelay.MAX_RETRIES + elif self.reconnection_policy is PNReconnectionPolicy.LINEAR: + self.max_retry_attempts = LinearDelay.MAX_RETRIES + else: + self.max_retry_attempts = 0 def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") @@ -175,12 +176,16 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs): raise PubNubException('Unspecified Invocation') def run(self): - if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: + self.logger.warning(f"Reconnect attempt {self.invocation.attempts} of {self.max_retry_attempts}") + if self.reconnection_policy is PNReconnectionPolicy.NONE: self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) else: attempts = self.invocation.attempts delay = self.calculate_reconnection_delay(attempts) - self.logger.warning(f'will reconnect in {delay}s') + if delay < 0: + self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) + return + self.logger.warning(f'Will reconnect in {delay}s') if hasattr(self.pubnub, 'event_loop'): self.run_async(self.delayed_reconnect_async(delay, attempts)) diff --git a/pubnub/managers.py b/pubnub/managers.py index 785b75e4..4377ae35 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -1,10 +1,11 @@ import logging from abc import abstractmethod, ABCMeta -import math import time import copy import base64 +import random + from cbor2 import loads from . import utils @@ -51,33 +52,21 @@ def get_base_path(self): class ReconnectionManager: - INTERVAL = 3 - MINEXPONENTIALBACKOFF = 1 - MAXEXPONENTIALBACKOFF = 32 - def __init__(self, pubnub): self._pubnub = pubnub self._callback = None self._timer = None self._timer_interval = None - self._connection_errors = 1 + self._connection_errors = 0 def set_reconnection_listener(self, reconnection_callback): assert isinstance(reconnection_callback, ReconnectionCallback) self._callback = reconnection_callback def _recalculate_interval(self): - if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.EXPONENTIAL: - self._timer_interval = int(math.pow(2, self._connection_errors) - 1) - if self._timer_interval > self.MAXEXPONENTIALBACKOFF: - self._timer_interval = self.MINEXPONENTIALBACKOFF - self._connection_errors = 1 - logger.debug("timerInterval > MAXEXPONENTIALBACKOFF at: %s" % utils.datetime_now()) - elif self._timer_interval < 1: - self._timer_interval = self.MINEXPONENTIALBACKOFF - logger.debug("timerInterval = %d at: %s" % (self._timer_interval, utils.datetime_now())) - else: - self._timer_interval = self.INTERVAL + policy = self._pubnub.config.reconnect_policy + calculate = (LinearDelay.calculate if policy == PNReconnectionPolicy.LINEAR else ExponentialDelay.calculate) + self._timer_interval = calculate(self._connection_errors) @abstractmethod def start_polling(self): @@ -89,6 +78,27 @@ def _stop_heartbeat_timer(self): self._timer = None +class LinearDelay: + INTERVAL = 2 + MAX_RETRIES = 10 + + @classmethod + def calculate(cls, attempt: int): + return cls.INTERVAL + round(random.random(), 3) if attempt < cls.MAX_RETRIES else -1 + + +class ExponentialDelay: + MIN_DELAY = 2 + MAX_RETRIES = 6 + MIN_BACKOFF = 2 + MAX_BACKOFF = 150 + + @classmethod + def calculate(cls, attempt: int) -> int: + delay = min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt) + round(random.random(), 3)) + return delay if attempt < cls.MAX_RETRIES else -1 + + class StateManager: def __init__(self): self._channels = {} diff --git a/pubnub/pnconfiguration.py b/pubnub/pnconfiguration.py index de00581d..f6f37139 100644 --- a/pubnub/pnconfiguration.py +++ b/pubnub/pnconfiguration.py @@ -39,7 +39,7 @@ def __init__(self): self.log_verbosity = False self.enable_presence_heartbeat = False self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES - self.reconnect_policy = PNReconnectionPolicy.NONE + self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL self.maximum_reconnection_retries = -1 # -1 means unlimited/ 0 means no retries self.daemon = False self.use_random_initialization_vector = True diff --git a/pubnub/pubnub.py b/pubnub/pubnub.py index 94bc201e..052e607a 100644 --- a/pubnub/pubnub.py +++ b/pubnub/pubnub.py @@ -103,6 +103,13 @@ def _register_heartbeat_timer(self): self._recalculate_interval() + if self._timer_interval < 0: + logger.warning("Reconnection retry limit reached. Disconnecting.") + disconnect_status = PNStatus() + disconnect_status.category = PNStatusCategory.PNDisconnectedCategory + self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status) + return + self._timer = threading.Timer(self._timer_interval, self._call_time) self._timer.daemon = True self._timer.start() @@ -129,6 +136,9 @@ def _call_time_callback(self, resp, status): def start_polling(self): if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.NONE: logger.warning("reconnection policy is disabled, please handle reconnection manually.") + disconnect_status = PNStatus() + disconnect_status.category = PNStatusCategory.PNDisconnectedCategory + self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status) return logger.debug("reconnection manager start at: %s" % utils.datetime_now()) diff --git a/tests/integrational/asyncio/test_subscribe.py b/tests/integrational/asyncio/test_subscribe.py index 5760d0ae..431895a7 100644 --- a/tests/integrational/asyncio/test_subscribe.py +++ b/tests/integrational/asyncio/test_subscribe.py @@ -4,10 +4,14 @@ import pubnub as pn from unittest.mock import patch +from pubnub.callbacks import SubscribeCallback +from pubnub.models.consumer.common import PNStatus from pubnub.models.consumer.pubsub import PNMessageResult from pubnub.pubnub_asyncio import AsyncioSubscriptionManager, PubNubAsyncio, AsyncioEnvelope, SubscribeListener from tests.helper import gen_channel, pnconf_enc_env_copy, pnconf_env_copy, pnconf_sub_copy from tests.integrational.vcr_asyncio_sleeper import VCR599Listener, VCR599ReconnectionManager +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory +from pubnub.managers import LinearDelay, ExponentialDelay # from tests.integrational.vcr_helper import pn_vcr pn.set_stream_logger('pubnub', logging.DEBUG) @@ -17,6 +21,21 @@ async def patch_pubnub(pubnub): pubnub._subscription_manager._reconnection_manager = VCR599ReconnectionManager(pubnub) +class TestCallback(SubscribeCallback): + message_result = None + status_result = None + presence_result = None + + def status(self, pubnub, status): + self.status_result = status + + def message(self, pubnub, message): + self.message_result = message + + def presence(self, pubnub, presence): + self.presence_result = presence + + # TODO: refactor cassette # @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/sub_unsub.json', serializer='pn_json', # filter_query_parameters=['pnsdk', 'ee', 'tr']) @@ -403,3 +422,87 @@ async def test_unsubscribe_all(): assert envelope.status.original_response['status'] == 200 await pubnub.stop() + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_none(): + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-none", + reconnect_policy=PNReconnectionPolicy.NONE, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(1) + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_none(): + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-none", + reconnect_policy=PNReconnectionPolicy.NONE, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_none").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_linear(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 if args[0] < LinearDelay.MAX_RETRIES else -1 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-linear", + reconnect_policy=PNReconnectionPolicy.LINEAR, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_linear").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + 1 + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_exponential(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 if args[0] < ExponentialDelay.MAX_RETRIES else -1 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-exponential", + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_linear").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + 1 diff --git a/tests/integrational/native_threads/test_subscribe.py b/tests/integrational/native_threads/test_subscribe.py index 4b0280ff..10cad907 100644 --- a/tests/integrational/native_threads/test_subscribe.py +++ b/tests/integrational/native_threads/test_subscribe.py @@ -1,9 +1,13 @@ import binascii import logging import unittest +import time import pubnub as pn +from unittest.mock import patch +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory from pubnub.exceptions import PubNubException +from pubnub.managers import LinearDelay, ExponentialDelay from pubnub.models.consumer.channel_group import PNChannelGroupsAddChannelResult, PNChannelGroupsRemoveChannelResult from pubnub.models.consumer.pubsub import PNPublishResult, PNMessageResult from pubnub.pubnub import PubNub, SubscribeListener, NonSubscribeListener @@ -15,6 +19,22 @@ pn.set_stream_logger('pubnub', logging.DEBUG) +class DisconnectListener(SubscribeListener): + status_result = None + disconnected = False + + def status(self, pubnub, status): + if status.category == PNStatusCategory.PNDisconnectedCategory: + print('Could not connect. Exiting...') + self.disconnected = True + + def message(self, pubnub, message): + print(f'Message:\n{message.__dict__}') + + def presence(self, pubnub, presence): + print(f'Presence:\n{presence.__dict__}') + + class TestPubNubSubscription(unittest.TestCase): @pn_vcr.use_cassette('tests/integrational/fixtures/native_threads/subscribe/subscribe_unsubscribe.json', filter_query_parameters=['seqn', 'pnsdk', 'tr', 'tt'], serializer='pn_json', @@ -302,3 +322,65 @@ def test_subscribe_pub_unencrypted_unsubscribe(self): self.fail(e) finally: pubnub.stop() + + def test_subscribe_retry_policy_none(self): + ch = "test-subscribe-retry-policy-none" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + reconnect_policy=PNReconnectionPolicy.NONE)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + def test_subscribe_retry_policy_linear(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 if args[0] < LinearDelay.MAX_RETRIES else -1 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-linear" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + 1 + + def test_subscribe_retry_policy_exponential(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 if args[0] < ExponentialDelay.MAX_RETRIES else -1 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-exponential" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + 1 diff --git a/tests/unit/test_reconnection_manager.py b/tests/unit/test_reconnection_manager.py new file mode 100644 index 00000000..c8912f90 --- /dev/null +++ b/tests/unit/test_reconnection_manager.py @@ -0,0 +1,50 @@ +from pubnub.enums import PNReconnectionPolicy +from pubnub.managers import ReconnectionManager +from pubnub.pnconfiguration import PNConfiguration +from pubnub.pubnub import PubNub + + +def assert_more_or_less(given, expected): + assert expected < given < expected + 1 + + +def test_linear_policy(): + config = PNConfiguration() + config.subscribe_key = "test" + config.publish_key = "test" + config.reconnect_policy = PNReconnectionPolicy.LINEAR + config.uuid = "test" + + pubnub = PubNub(config) + reconnection_manager = ReconnectionManager(pubnub) + + for i in range(0, 10): + reconnection_manager._connection_errors = i + reconnection_manager._recalculate_interval() + assert_more_or_less(reconnection_manager._timer_interval, 2) + + reconnection_manager._connection_errors = 10 + reconnection_manager._recalculate_interval() + assert reconnection_manager._timer_interval == -1 + + +def test_exponential_policy(): + config = PNConfiguration() + config.subscribe_key = "test" + config.publish_key = "test" + config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL + config.uuid = "test" + + pubnub = PubNub(config) + reconnection_manager = ReconnectionManager(pubnub) + + expected = [2, 4, 8, 16, 32, 64] + + for i in range(0, 6): + reconnection_manager._connection_errors = i + reconnection_manager._recalculate_interval() + assert_more_or_less(reconnection_manager._timer_interval, expected[i]) + + reconnection_manager._connection_errors = 6 + reconnection_manager._recalculate_interval() + assert reconnection_manager._timer_interval == -1 From d9072dca51d5871102b1e20685d30884adfe90cc Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Thu, 26 Sep 2024 10:26:15 +0200 Subject: [PATCH 2/8] Add respect of user defined retry limit --- pubnub/event_engine/effects.py | 12 +++++++----- pubnub/managers.py | 17 ++++++++++++++--- pubnub/pnconfiguration.py | 2 +- pubnub/pubnub.py | 2 +- tests/unit/test_reconnection_manager.py | 12 ++---------- 5 files changed, 25 insertions(+), 20 deletions(-) diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index dccc4026..c3115ad2 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -157,11 +157,13 @@ def __init__(self, pubnub_instance, event_engine_instance, self.reconnection_policy = pubnub_instance.config.reconnect_policy if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: - self.max_retry_attempts = ExponentialDelay.MAX_RETRIES + self.max_retries = ExponentialDelay.MAX_RETRIES elif self.reconnection_policy is PNReconnectionPolicy.LINEAR: - self.max_retry_attempts = LinearDelay.MAX_RETRIES + self.max_retries = LinearDelay.MAX_RETRIES else: - self.max_retry_attempts = 0 + self.max_retries = 0 + if pubnub_instance.config.maximum_reconnection_retries is not None: + self.max_retries = min(self.max_retries, pubnub_instance.config.maximum_reconnection_retries) def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") @@ -176,13 +178,13 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs): raise PubNubException('Unspecified Invocation') def run(self): - self.logger.warning(f"Reconnect attempt {self.invocation.attempts} of {self.max_retry_attempts}") + self.logger.warning(f"Reconnect attempt {self.invocation.attempts} of {self.max_retries}") if self.reconnection_policy is PNReconnectionPolicy.NONE: self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) else: attempts = self.invocation.attempts delay = self.calculate_reconnection_delay(attempts) - if delay < 0: + if self.max_retries > 0 and attempts > self.max_retries: self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) return self.logger.warning(f'Will reconnect in {delay}s') diff --git a/pubnub/managers.py b/pubnub/managers.py index 4377ae35..994630d5 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -68,6 +68,18 @@ def _recalculate_interval(self): calculate = (LinearDelay.calculate if policy == PNReconnectionPolicy.LINEAR else ExponentialDelay.calculate) self._timer_interval = calculate(self._connection_errors) + def _retry_limit_reached(self): + user_limit = self._pubnub.config.maximum_reconnection_retries + policy = self._pubnub.config.reconnect_policy + + if user_limit == 0 or policy == PNReconnectionPolicy.NONE: + return True + elif user_limit == -1: + return False + else: + limit = LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR else ExponentialDelay.MAX_RETRIES + return self._connection_errors > min(user_limit, limit) + @abstractmethod def start_polling(self): pass @@ -84,7 +96,7 @@ class LinearDelay: @classmethod def calculate(cls, attempt: int): - return cls.INTERVAL + round(random.random(), 3) if attempt < cls.MAX_RETRIES else -1 + return cls.INTERVAL + round(random.random(), 3) class ExponentialDelay: @@ -95,8 +107,7 @@ class ExponentialDelay: @classmethod def calculate(cls, attempt: int) -> int: - delay = min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt) + round(random.random(), 3)) - return delay if attempt < cls.MAX_RETRIES else -1 + return min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt)) + round(random.random(), 3) class StateManager: diff --git a/pubnub/pnconfiguration.py b/pubnub/pnconfiguration.py index f6f37139..e712d70a 100644 --- a/pubnub/pnconfiguration.py +++ b/pubnub/pnconfiguration.py @@ -40,7 +40,7 @@ def __init__(self): self.enable_presence_heartbeat = False self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL - self.maximum_reconnection_retries = -1 # -1 means unlimited/ 0 means no retries + self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries self.daemon = False self.use_random_initialization_vector = True self.suppress_leave_events = False diff --git a/pubnub/pubnub.py b/pubnub/pubnub.py index 052e607a..cc511e4b 100644 --- a/pubnub/pubnub.py +++ b/pubnub/pubnub.py @@ -103,7 +103,7 @@ def _register_heartbeat_timer(self): self._recalculate_interval() - if self._timer_interval < 0: + if self._timer_interval < 0 or self._retry_limit_reached(): logger.warning("Reconnection retry limit reached. Disconnecting.") disconnect_status = PNStatus() disconnect_status.category = PNStatusCategory.PNDisconnectedCategory diff --git a/tests/unit/test_reconnection_manager.py b/tests/unit/test_reconnection_manager.py index c8912f90..e14c10bd 100644 --- a/tests/unit/test_reconnection_manager.py +++ b/tests/unit/test_reconnection_manager.py @@ -23,10 +23,6 @@ def test_linear_policy(): reconnection_manager._recalculate_interval() assert_more_or_less(reconnection_manager._timer_interval, 2) - reconnection_manager._connection_errors = 10 - reconnection_manager._recalculate_interval() - assert reconnection_manager._timer_interval == -1 - def test_exponential_policy(): config = PNConfiguration() @@ -38,13 +34,9 @@ def test_exponential_policy(): pubnub = PubNub(config) reconnection_manager = ReconnectionManager(pubnub) - expected = [2, 4, 8, 16, 32, 64] + expected = [2, 4, 8, 16, 32, 64, 128, 150, 150, 150] - for i in range(0, 6): + for i in range(0, 10): reconnection_manager._connection_errors = i reconnection_manager._recalculate_interval() assert_more_or_less(reconnection_manager._timer_interval, expected[i]) - - reconnection_manager._connection_errors = 6 - reconnection_manager._recalculate_interval() - assert reconnection_manager._timer_interval == -1 From 16a1f797842109669964e2a6979e253c84b53e7c Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Thu, 26 Sep 2024 10:52:23 +0200 Subject: [PATCH 3/8] aaaaaaaaaaaaaaasync --- tests/acceptance/subscribe/steps/given_steps.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tests/acceptance/subscribe/steps/given_steps.py b/tests/acceptance/subscribe/steps/given_steps.py index 9f5e6b9d..58f5eca1 100644 --- a/tests/acceptance/subscribe/steps/given_steps.py +++ b/tests/acceptance/subscribe/steps/given_steps.py @@ -1,6 +1,7 @@ import logging from behave import given +from behave.api.async_step import async_run_until_complete from io import StringIO from pubnub.enums import PNReconnectionPolicy from pubnub.pubnub_asyncio import PubNubAsyncio, EventEngineSubscriptionManager @@ -9,7 +10,8 @@ @given("the demo keyset with event engine enabled") -def step_impl(context: PNContext): +@async_run_until_complete +async def step_impl(context: PNContext): context.log_stream = StringIO() logger = logging.getLogger('pubnub').getChild('subscribe') logger.setLevel(logging.DEBUG) @@ -27,7 +29,8 @@ def step_impl(context: PNContext): @given("a linear reconnection policy with {max_retries} retries") -def step_impl(context: PNContext, max_retries: str): +@async_run_until_complete +async def step_impl(context: PNContext, max_retries: str): context.pubnub.config.reconnect_policy = PNReconnectionPolicy.LINEAR context.pubnub.config.maximum_reconnection_retries = int(max_retries) @@ -38,7 +41,8 @@ def step_impl(context: PNContext, max_retries: str): @given("the demo keyset with Presence EE enabled") -def step_impl(context: PNContext): +@async_run_until_complete +async def step_impl(context: PNContext): context.log_stream_pubnub = StringIO() logger = logging.getLogger('pubnub') logger.setLevel(logging.DEBUG) @@ -66,6 +70,7 @@ def step_impl(context: PNContext): @given("heartbeatInterval set to '{interval}', timeout set to '{timeout}'" " and suppressLeaveEvents set to '{suppress_leave}'") -def step_impl(context: PNContext, interval: str, timeout: str, suppress_leave: str): +@async_run_until_complete +async def step_impl(context: PNContext, interval: str, timeout: str, suppress_leave: str): context.pn_config.set_presence_timeout_with_custom_interval(int(timeout), int(interval)) context.pn_config.suppress_leave_events = True if suppress_leave == 'true' else False From 30c4a9256330d5aa49079a21ff787501a44e433a Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Thu, 26 Sep 2024 13:03:11 +0200 Subject: [PATCH 4/8] Changing names --- pubnub/event_engine/effects.py | 15 ++++++--------- tests/acceptance/subscribe/environment.py | 3 ++- tests/acceptance/subscribe/steps/when_steps.py | 6 ++++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index c3115ad2..89537263 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -157,13 +157,13 @@ def __init__(self, pubnub_instance, event_engine_instance, self.reconnection_policy = pubnub_instance.config.reconnect_policy if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: - self.max_retries = ExponentialDelay.MAX_RETRIES + self.max_retry_attempts = ExponentialDelay.MAX_RETRIES elif self.reconnection_policy is PNReconnectionPolicy.LINEAR: - self.max_retries = LinearDelay.MAX_RETRIES + self.max_retry_attempts = LinearDelay.MAX_RETRIES else: - self.max_retries = 0 + self.max_retry_attempts = 0 if pubnub_instance.config.maximum_reconnection_retries is not None: - self.max_retries = min(self.max_retries, pubnub_instance.config.maximum_reconnection_retries) + self.max_retry_attempts = min(self.max_retry_attempts, pubnub_instance.config.maximum_reconnection_retries) def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") @@ -178,15 +178,12 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs): raise PubNubException('Unspecified Invocation') def run(self): - self.logger.warning(f"Reconnect attempt {self.invocation.attempts} of {self.max_retries}") - if self.reconnection_policy is PNReconnectionPolicy.NONE: + self.logger.warning(f"Reconnect attempt {self.invocation.attempts} of {self.max_retry_attempts}") + if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) else: attempts = self.invocation.attempts delay = self.calculate_reconnection_delay(attempts) - if self.max_retries > 0 and attempts > self.max_retries: - self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) - return self.logger.warning(f'Will reconnect in {delay}s') if hasattr(self.pubnub, 'event_loop'): self.run_async(self.delayed_reconnect_async(delay, attempts)) diff --git a/tests/acceptance/subscribe/environment.py b/tests/acceptance/subscribe/environment.py index dea2c0c7..c949f5aa 100644 --- a/tests/acceptance/subscribe/environment.py +++ b/tests/acceptance/subscribe/environment.py @@ -43,7 +43,8 @@ def before_scenario(context: Context, feature): def after_scenario(context: Context, feature): loop = asyncio.get_event_loop() loop.run_until_complete(context.pubnub.stop()) - loop.run_until_complete(asyncio.sleep(0.1)) + loop.run_until_complete(asyncio.sleep(0.3)) + del context.pubnub for tag in feature.tags: if "contract" in tag: diff --git a/tests/acceptance/subscribe/steps/when_steps.py b/tests/acceptance/subscribe/steps/when_steps.py index 63f4ffab..e5625643 100644 --- a/tests/acceptance/subscribe/steps/when_steps.py +++ b/tests/acceptance/subscribe/steps/when_steps.py @@ -4,12 +4,14 @@ @when('I subscribe') -def step_impl(context: PNContext): +@async_run_until_complete +async def step_impl(context: PNContext): context.pubnub.subscribe().channels('foo').execute() @when('I subscribe with timetoken {timetoken}') -def step_impl(context: PNContext, timetoken: str): # noqa F811 +@async_run_until_complete +async def step_impl(context: PNContext, timetoken: str): # noqa F811 callback = AcceptanceCallback() context.pubnub.add_listener(callback) context.pubnub.subscribe().channels('foo').with_timetoken(int(timetoken)).execute() From 27be4575f6591033c1183a5d6ede7a591406e80c Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Thu, 26 Sep 2024 22:39:54 +0200 Subject: [PATCH 5/8] IT WAS TASK LEAK :( --- pubnub/event_engine/effects.py | 34 ++++++----- pubnub/managers.py | 9 ++- pubnub/pubnub.py | 6 +- tests/acceptance/subscribe/environment.py | 10 +++- tests/integrational/asyncio/test_subscribe.py | 56 +++++++++++++++++-- .../native_threads/test_subscribe.py | 52 ++++++++++++++++- 6 files changed, 141 insertions(+), 26 deletions(-) diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index 89537263..72c6b918 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -57,12 +57,6 @@ def get_new_stop_event(self): self.logger.debug(f'creating new stop_event({id(event)}) for {self.__class__.__name__}') return event - def calculate_reconnection_delay(self, attempts): - if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: - return ExponentialDelay.calculate(attempts - 1) - else: - return LinearDelay.calculate(attempts - 1) - class HandshakeEffect(Effect): def run(self): @@ -155,15 +149,15 @@ def __init__(self, pubnub_instance, event_engine_instance, invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy + self.interval = pubnub_instance.config.RECONNECTION_INTERVAL if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: self.max_retry_attempts = ExponentialDelay.MAX_RETRIES elif self.reconnection_policy is PNReconnectionPolicy.LINEAR: self.max_retry_attempts = LinearDelay.MAX_RETRIES - else: - self.max_retry_attempts = 0 + if pubnub_instance.config.maximum_reconnection_retries is not None: - self.max_retry_attempts = min(self.max_retry_attempts, pubnub_instance.config.maximum_reconnection_retries) + self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") @@ -177,8 +171,15 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs): self.logger.error(f"Success called on Unspecific event. TT:{timetoken}, Reg: {region}, KWARGS: {kwargs.keys()}") raise PubNubException('Unspecified Invocation') + def calculate_reconnection_delay(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + delay = ExponentialDelay.calculate(attempts) + else: + delay = LinearDelay.calculate(attempts) + + return delay + def run(self): - self.logger.warning(f"Reconnect attempt {self.invocation.attempts} of {self.max_retry_attempts}") if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) else: @@ -318,7 +319,8 @@ def run(self): async def heartbeat_wait(self, wait_time: int, stop_event): try: await asyncio.sleep(wait_time) - self.event_engine.trigger(events.HeartbeatTimesUpEvent()) + if not stop_event.is_set(): + self.event_engine.trigger(events.HeartbeatTimesUpEvent()) except asyncio.CancelledError: pass @@ -346,8 +348,14 @@ def __init__(self, pubnub_instance, event_engine_instance, self.reconnection_policy = pubnub_instance.config.reconnect_policy self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries self.interval = pubnub_instance.config.RECONNECTION_INTERVAL - self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF - self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF + + def calculate_reconnection_delay(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + delay = ExponentialDelay.calculate(attempts) + else: + delay = LinearDelay.calculate(attempts) + + return delay def run(self): if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: diff --git a/pubnub/managers.py b/pubnub/managers.py index 994630d5..8c460a73 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -76,9 +76,12 @@ def _retry_limit_reached(self): return True elif user_limit == -1: return False - else: - limit = LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR else ExponentialDelay.MAX_RETRIES - return self._connection_errors > min(user_limit, limit) + + policy_limit = (LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR + else ExponentialDelay.MAX_RETRIES) + if user_limit is not None: + return self._connection_errors >= min(user_limit, policy_limit) + return self._connection_errors > policy_limit @abstractmethod def start_polling(self): diff --git a/pubnub/pubnub.py b/pubnub/pubnub.py index cc511e4b..57ba6229 100644 --- a/pubnub/pubnub.py +++ b/pubnub/pubnub.py @@ -101,15 +101,15 @@ def __init__(self, pubnub): def _register_heartbeat_timer(self): self.stop_heartbeat_timer() - self._recalculate_interval() - - if self._timer_interval < 0 or self._retry_limit_reached(): + if self._retry_limit_reached(): logger.warning("Reconnection retry limit reached. Disconnecting.") disconnect_status = PNStatus() disconnect_status.category = PNStatusCategory.PNDisconnectedCategory self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status) return + self._recalculate_interval() + self._timer = threading.Timer(self._timer_interval, self._call_time) self._timer.daemon = True self._timer.start() diff --git a/tests/acceptance/subscribe/environment.py b/tests/acceptance/subscribe/environment.py index c949f5aa..8f4740a3 100644 --- a/tests/acceptance/subscribe/environment.py +++ b/tests/acceptance/subscribe/environment.py @@ -43,7 +43,15 @@ def before_scenario(context: Context, feature): def after_scenario(context: Context, feature): loop = asyncio.get_event_loop() loop.run_until_complete(context.pubnub.stop()) - loop.run_until_complete(asyncio.sleep(0.3)) + # asyncio cleaning all pending tasks to eliminate any potential state changes + pending_tasks = asyncio.all_tasks(loop) + for task in pending_tasks: + task.cancel() + try: + loop.run_until_complete(task) + except asyncio.CancelledError: + pass + loop.run_until_complete(asyncio.sleep(1.5)) del context.pubnub for tag in feature.tags: diff --git a/tests/integrational/asyncio/test_subscribe.py b/tests/integrational/asyncio/test_subscribe.py index 431895a7..b7df24db 100644 --- a/tests/integrational/asyncio/test_subscribe.py +++ b/tests/integrational/asyncio/test_subscribe.py @@ -464,7 +464,7 @@ async def test_subscribe_failing_reconnect_policy_none(): async def test_subscribe_failing_reconnect_policy_linear(): # we don't test the actual delay calculation here, just everything around it def mock_calculate(*args, **kwargs): - return 0.2 if args[0] < LinearDelay.MAX_RETRIES else -1 + return 0.2 with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: config = pnconf_env_copy(enable_subscribe=True, @@ -481,14 +481,14 @@ def mock_calculate(*args, **kwargs): and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: break await asyncio.sleep(0.5) - assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + 1 + assert calculate_mock.call_count == LinearDelay.MAX_RETRIES @pytest.mark.asyncio async def test_subscribe_failing_reconnect_policy_exponential(): # we don't test the actual delay calculation here, just everything around it def mock_calculate(*args, **kwargs): - return 0.2 if args[0] < ExponentialDelay.MAX_RETRIES else -1 + return 0.2 with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: config = pnconf_env_copy(enable_subscribe=True, @@ -497,6 +497,30 @@ def mock_calculate(*args, **kwargs): origin='127.0.0.1') pubnub = PubNubAsyncio(config) + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_exponential").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_linear_with_max_retries(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, + uuid="test-subscribe-failing-reconnect-policy-linear-with-max-retries", + reconnect_policy=PNReconnectionPolicy.LINEAR, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + listener = TestCallback() pubnub.add_listener(listener) pubnub.subscribe().channels("my_channel_linear").execute() @@ -505,4 +529,28 @@ def mock_calculate(*args, **kwargs): and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: break await asyncio.sleep(0.5) - assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + 1 + assert calculate_mock.call_count == 3 + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_exponential_with_max_retries(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, + uuid="test-subscribe-failing-reconnect-policy-exponential-with-max-retries", + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_exponential").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == 3 diff --git a/tests/integrational/native_threads/test_subscribe.py b/tests/integrational/native_threads/test_subscribe.py index 10cad907..4f7d0113 100644 --- a/tests/integrational/native_threads/test_subscribe.py +++ b/tests/integrational/native_threads/test_subscribe.py @@ -342,7 +342,7 @@ def test_subscribe_retry_policy_none(self): def test_subscribe_retry_policy_linear(self): # we don't test the actual delay calculation here, just everything around it def mock_calculate(*args, **kwargs): - return 0.2 if args[0] < LinearDelay.MAX_RETRIES else -1 + return 0.2 with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: ch = "test-subscribe-retry-policy-linear" @@ -365,7 +365,7 @@ def mock_calculate(*args, **kwargs): def test_subscribe_retry_policy_exponential(self): # we don't test the actual delay calculation here, just everything around it def mock_calculate(*args, **kwargs): - return 0.2 if args[0] < ExponentialDelay.MAX_RETRIES else -1 + return 0.2 with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: ch = "test-subscribe-retry-policy-exponential" @@ -384,3 +384,51 @@ def mock_calculate(*args, **kwargs): self.fail(e) assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + 1 + + def test_subscribe_retry_policy_linear_with_max_retries(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-linear" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + maximum_reconnection_retries=3, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == 3 + + def test_subscribe_retry_policy_exponential_with_max_retries(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-exponential" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + maximum_reconnection_retries=3, + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == 3 From b6746dbcc47527bc54b73b6dc38bb278baa48b0d Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Thu, 26 Sep 2024 22:55:51 +0200 Subject: [PATCH 6/8] That one test which still needs no reconnect policy --- tests/integrational/asyncio/test_unsubscribe_status.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integrational/asyncio/test_unsubscribe_status.py b/tests/integrational/asyncio/test_unsubscribe_status.py index 95ed40a3..519c23e3 100644 --- a/tests/integrational/asyncio/test_unsubscribe_status.py +++ b/tests/integrational/asyncio/test_unsubscribe_status.py @@ -64,6 +64,7 @@ async def test_access_denied_unsubscribe_operation(): pnconf = pnconf_pam_copy() pnconf.secret_key = None pnconf.enable_subscribe = True + pnconf.reconnect_policy = pn.enums.PNReconnectionPolicy.NONE pubnub = PubNubAsyncio(pnconf) From 2aad937af3138c0ddd94ebe4386a6e0f309bcd68 Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Tue, 1 Oct 2024 14:21:38 +0200 Subject: [PATCH 7/8] Add custom delay for linear policy --- pubnub/event_engine/effects.py | 12 ++++++---- pubnub/managers.py | 9 +++++-- pubnub/pnconfiguration.py | 4 +--- .../acceptance/subscribe/steps/then_steps.py | 1 + tests/integrational/asyncio/test_subscribe.py | 24 +++++++++++++++++++ .../native_threads/test_subscribe.py | 24 +++++++++++++++++++ 6 files changed, 65 insertions(+), 9 deletions(-) diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index 72c6b918..ae8fd2ad 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -149,7 +149,7 @@ def __init__(self, pubnub_instance, event_engine_instance, invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL + self.interval = pubnub_instance.config.reconnection_interval if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: self.max_retry_attempts = ExponentialDelay.MAX_RETRIES @@ -174,8 +174,10 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs): def calculate_reconnection_delay(self, attempts): if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: delay = ExponentialDelay.calculate(attempts) - else: + elif self.interval is None: delay = LinearDelay.calculate(attempts) + else: + delay = self.interval return delay @@ -347,13 +349,15 @@ def __init__(self, pubnub_instance, event_engine_instance, super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL + self.interval = pubnub_instance.config.reconnection_interval def calculate_reconnection_delay(self, attempts): if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: delay = ExponentialDelay.calculate(attempts) - else: + elif self.interval is None: delay = LinearDelay.calculate(attempts) + else: + delay = self.interval return delay diff --git a/pubnub/managers.py b/pubnub/managers.py index 8c460a73..fc222869 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -65,8 +65,13 @@ def set_reconnection_listener(self, reconnection_callback): def _recalculate_interval(self): policy = self._pubnub.config.reconnect_policy - calculate = (LinearDelay.calculate if policy == PNReconnectionPolicy.LINEAR else ExponentialDelay.calculate) - self._timer_interval = calculate(self._connection_errors) + interval = self._pubnub.config.reconnection_interval + if policy == PNReconnectionPolicy.LINEAR and interval is not None: + self._timer_interval = interval + elif policy == PNReconnectionPolicy.LINEAR: + self._timer_interval = LinearDelay.calculate(self._connection_errors) + else: + self._timer_interval = ExponentialDelay.calculate(self._connection_errors) def _retry_limit_reached(self): user_limit = self._pubnub.config.maximum_reconnection_retries diff --git a/pubnub/pnconfiguration.py b/pubnub/pnconfiguration.py index e712d70a..2d88cdee 100644 --- a/pubnub/pnconfiguration.py +++ b/pubnub/pnconfiguration.py @@ -11,9 +11,6 @@ class PNConfiguration(object): DEFAULT_PRESENCE_TIMEOUT = 300 DEFAULT_HEARTBEAT_INTERVAL = 280 ALLOWED_AES_MODES = [AES.MODE_CBC, AES.MODE_GCM] - RECONNECTION_INTERVAL = 3 - RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1 - RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32 DEFAULT_CRYPTO_MODULE = LegacyCryptoModule _locked = False @@ -41,6 +38,7 @@ def __init__(self): self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries + self.reconnection_interval = None # if None is left the default value from LinearDelay is used self.daemon = False self.use_random_initialization_vector = True self.suppress_leave_events = False diff --git a/tests/acceptance/subscribe/steps/then_steps.py b/tests/acceptance/subscribe/steps/then_steps.py index 26c84c63..ef09d821 100644 --- a/tests/acceptance/subscribe/steps/then_steps.py +++ b/tests/acceptance/subscribe/steps/then_steps.py @@ -125,6 +125,7 @@ async def step_impl(ctx): @async_run_until_complete async def step_impl(context, channel1, channel2): context.pubnub.unsubscribe().channels([channel1, channel2]).execute() + await asyncio.sleep(0.5) @then(u'I don\'t observe any Events and Invocations of the Presence EE') diff --git a/tests/integrational/asyncio/test_subscribe.py b/tests/integrational/asyncio/test_subscribe.py index b7df24db..e98c94b5 100644 --- a/tests/integrational/asyncio/test_subscribe.py +++ b/tests/integrational/asyncio/test_subscribe.py @@ -554,3 +554,27 @@ def mock_calculate(*args, **kwargs): break await asyncio.sleep(0.5) assert calculate_mock.call_count == 3 + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_linear_with_custom_interval(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, reconnection_interval=1, + uuid="test-subscribe-failing-reconnect-policy-linear-with-max-retries", + reconnect_policy=PNReconnectionPolicy.LINEAR, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_linear").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == 0 diff --git a/tests/integrational/native_threads/test_subscribe.py b/tests/integrational/native_threads/test_subscribe.py index 4f7d0113..29b6aa6b 100644 --- a/tests/integrational/native_threads/test_subscribe.py +++ b/tests/integrational/native_threads/test_subscribe.py @@ -432,3 +432,27 @@ def mock_calculate(*args, **kwargs): self.fail(e) assert calculate_mock.call_count == 3 + + def test_subscribe_retry_policy_linear_with_custom_interval(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-linear" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + maximum_reconnection_retries=3, reconnection_interval=1, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == 0 From be2ffa9cc11a84bffb427715e0a702222743c539 Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Tue, 1 Oct 2024 14:27:54 +0200 Subject: [PATCH 8/8] clean up old variables --- tests/acceptance/subscribe/steps/given_steps.py | 2 +- tests/functional/event_engine/test_managed_effect.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/acceptance/subscribe/steps/given_steps.py b/tests/acceptance/subscribe/steps/given_steps.py index 58f5eca1..493b7135 100644 --- a/tests/acceptance/subscribe/steps/given_steps.py +++ b/tests/acceptance/subscribe/steps/given_steps.py @@ -60,7 +60,7 @@ async def step_impl(context: PNContext): context.pn_config.enable_presence_heartbeat = True context.pn_config.reconnect_policy = PNReconnectionPolicy.LINEAR context.pn_config.subscribe_request_timeout = 10 - context.pn_config.RECONNECTION_INTERVAL = 2 + context.pn_config.reconnection_interval = 2 context.pn_config.set_presence_timeout(3) context.pubnub = PubNubAsyncio(context.pn_config, subscription_manager=EventEngineSubscriptionManager) diff --git a/tests/functional/event_engine/test_managed_effect.py b/tests/functional/event_engine/test_managed_effect.py index c59049d2..bea019ad 100644 --- a/tests/functional/event_engine/test_managed_effect.py +++ b/tests/functional/event_engine/test_managed_effect.py @@ -15,9 +15,7 @@ class FakeConfig: reconnect_policy = PNReconnectionPolicy.NONE - RECONNECTION_INTERVAL = 1 - RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1 - RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32 + reconnection_interval = 1 maximum_reconnection_retries = 3