From a45811c4f508da0606adbfc4f808674e01304190 Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Tue, 1 Oct 2024 10:09:40 +0200 Subject: [PATCH] add option to use custom interval --- pubnub/event_engine/effects.py | 12 ++++++---- pubnub/managers.py | 9 +++++-- pubnub/pnconfiguration.py | 4 +--- tests/integrational/asyncio/test_subscribe.py | 24 +++++++++++++++++++ .../native_threads/test_subscribe.py | 24 +++++++++++++++++++ 5 files changed, 64 insertions(+), 9 deletions(-) diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index 72c6b918..ae8fd2ad 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -149,7 +149,7 @@ def __init__(self, pubnub_instance, event_engine_instance, invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL + self.interval = pubnub_instance.config.reconnection_interval if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: self.max_retry_attempts = ExponentialDelay.MAX_RETRIES @@ -174,8 +174,10 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs): def calculate_reconnection_delay(self, attempts): if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: delay = ExponentialDelay.calculate(attempts) - else: + elif self.interval is None: delay = LinearDelay.calculate(attempts) + else: + delay = self.interval return delay @@ -347,13 +349,15 @@ def __init__(self, pubnub_instance, event_engine_instance, super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL + self.interval = pubnub_instance.config.reconnection_interval def calculate_reconnection_delay(self, attempts): if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: delay = ExponentialDelay.calculate(attempts) - else: + elif self.interval is None: delay = LinearDelay.calculate(attempts) + else: + delay = self.interval return delay diff --git a/pubnub/managers.py b/pubnub/managers.py index 8c460a73..fc222869 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -65,8 +65,13 @@ def set_reconnection_listener(self, reconnection_callback): def _recalculate_interval(self): policy = self._pubnub.config.reconnect_policy - calculate = (LinearDelay.calculate if policy == PNReconnectionPolicy.LINEAR else ExponentialDelay.calculate) - self._timer_interval = calculate(self._connection_errors) + interval = self._pubnub.config.reconnection_interval + if policy == PNReconnectionPolicy.LINEAR and interval is not None: + self._timer_interval = interval + elif policy == PNReconnectionPolicy.LINEAR: + self._timer_interval = LinearDelay.calculate(self._connection_errors) + else: + self._timer_interval = ExponentialDelay.calculate(self._connection_errors) def _retry_limit_reached(self): user_limit = self._pubnub.config.maximum_reconnection_retries diff --git a/pubnub/pnconfiguration.py b/pubnub/pnconfiguration.py index e712d70a..2d88cdee 100644 --- a/pubnub/pnconfiguration.py +++ b/pubnub/pnconfiguration.py @@ -11,9 +11,6 @@ class PNConfiguration(object): DEFAULT_PRESENCE_TIMEOUT = 300 DEFAULT_HEARTBEAT_INTERVAL = 280 ALLOWED_AES_MODES = [AES.MODE_CBC, AES.MODE_GCM] - RECONNECTION_INTERVAL = 3 - RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1 - RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32 DEFAULT_CRYPTO_MODULE = LegacyCryptoModule _locked = False @@ -41,6 +38,7 @@ def __init__(self): self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries + self.reconnection_interval = None # if None is left the default value from LinearDelay is used self.daemon = False self.use_random_initialization_vector = True self.suppress_leave_events = False diff --git a/tests/integrational/asyncio/test_subscribe.py b/tests/integrational/asyncio/test_subscribe.py index b7df24db..e98c94b5 100644 --- a/tests/integrational/asyncio/test_subscribe.py +++ b/tests/integrational/asyncio/test_subscribe.py @@ -554,3 +554,27 @@ def mock_calculate(*args, **kwargs): break await asyncio.sleep(0.5) assert calculate_mock.call_count == 3 + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_linear_with_custom_interval(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, reconnection_interval=1, + uuid="test-subscribe-failing-reconnect-policy-linear-with-max-retries", + reconnect_policy=PNReconnectionPolicy.LINEAR, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_linear").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == 0 diff --git a/tests/integrational/native_threads/test_subscribe.py b/tests/integrational/native_threads/test_subscribe.py index 4f7d0113..29b6aa6b 100644 --- a/tests/integrational/native_threads/test_subscribe.py +++ b/tests/integrational/native_threads/test_subscribe.py @@ -432,3 +432,27 @@ def mock_calculate(*args, **kwargs): self.fail(e) assert calculate_mock.call_count == 3 + + def test_subscribe_retry_policy_linear_with_custom_interval(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-linear" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + maximum_reconnection_retries=3, reconnection_interval=1, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == 0