From 0d47e587cffcfcd2ee380040d98263647033d41c Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Tue, 1 Oct 2024 15:09:02 +0200 Subject: [PATCH] Expoential as a default reconnection policy (#196) * Expoential as a default reconnection policy * Add respect of user defined retry limit * That one test which still needs no reconnect policy * Add custom delay for linear policy * clean up old variables --- .../native_threads/subscribe_with_retry.py | 50 +++++ .../pubnub_asyncio/subscribe_with_retry.py | 59 ++++++ pubnub/event_engine/effects.py | 52 ++++-- pubnub/managers.py | 61 ++++-- pubnub/pnconfiguration.py | 8 +- pubnub/pubnub.py | 10 + tests/acceptance/subscribe/environment.py | 11 +- .../acceptance/subscribe/steps/given_steps.py | 15 +- .../acceptance/subscribe/steps/then_steps.py | 1 + .../acceptance/subscribe/steps/when_steps.py | 6 +- .../event_engine/test_managed_effect.py | 4 +- tests/integrational/asyncio/test_subscribe.py | 175 ++++++++++++++++++ .../asyncio/test_unsubscribe_status.py | 1 + .../native_threads/test_subscribe.py | 154 +++++++++++++++ tests/unit/test_reconnection_manager.py | 42 +++++ 15 files changed, 599 insertions(+), 50 deletions(-) create mode 100644 examples/native_threads/subscribe_with_retry.py create mode 100644 examples/pubnub_asyncio/subscribe_with_retry.py create mode 100644 tests/unit/test_reconnection_manager.py diff --git a/examples/native_threads/subscribe_with_retry.py b/examples/native_threads/subscribe_with_retry.py new file mode 100644 index 00000000..5c1b43d8 --- /dev/null +++ b/examples/native_threads/subscribe_with_retry.py @@ -0,0 +1,50 @@ +import logging +import sys +import time + +from pubnub.pnconfiguration import PNConfiguration +from pubnub.pubnub import PubNub, SubscribeListener +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory + + +class TestListener(SubscribeListener): + status_result = None + disconnected = False + + def status(self, pubnub, status): + if status.category == PNStatusCategory.PNDisconnectedCategory: + print('Could not connect. Exiting...') + self.disconnected = True + + def message(self, pubnub, message): + print(f'Message:\n{message.__dict__}') + + def presence(self, pubnub, presence): + print(f'Presence:\n{presence.__dict__}') + + +logger = logging.getLogger("pubnub") +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +logger.addHandler(handler) + + +config = PNConfiguration() +config.subscribe_key = "demo" +config.publish_key = "demo" +config.user_id = 'example' +config.enable_subscribe = True +config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL +config.origin = '127.0.0.1' +config.ssl = False + +listener = TestListener() + +pubnub = PubNub(config) +pubnub.add_listener(listener) +sub = pubnub.subscribe().channels(['example']).execute() + +while not listener.disconnected: + time.sleep(0.5) +print('Disconnected. Bye.') diff --git a/examples/pubnub_asyncio/subscribe_with_retry.py b/examples/pubnub_asyncio/subscribe_with_retry.py new file mode 100644 index 00000000..15e65e3d --- /dev/null +++ b/examples/pubnub_asyncio/subscribe_with_retry.py @@ -0,0 +1,59 @@ +import asyncio +import logging +import sys + +from pubnub.callbacks import SubscribeCallback +from pubnub.models.consumer.common import PNStatus +from pubnub.pubnub_asyncio import PubNubAsyncio +from pubnub.pnconfiguration import PNConfiguration +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory + +config = PNConfiguration() +config.subscribe_key = "demo" +config.publish_key = "demo" +config.enable_subscribe = True +config.uuid = "test-uuid" +config.origin = "127.0.0.1" +config.ssl = False +config.reconnect_policy = PNReconnectionPolicy.NONE + +pubnub = PubNubAsyncio(config) + +logger = logging.getLogger("pubnub") +logger.setLevel(logging.WARNING) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.WARNING) +logger.addHandler(handler) + + +class SampleCallback(SubscribeCallback): + message_result = None + status_result = None + presence_result = None + + def status(self, pubnub, status): + self.status_result = status + + def message(self, pubnub, message): + self.message_result = message + + def presence(self, pubnub, presence): + self.presence_result = presence + + +async def main(): + listener = SampleCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + print('Could not connect. Exiting...') + break + await asyncio.sleep(1) + + +if __name__ == "__main__": + loop = asyncio.get_event_loop() + loop.run_until_complete(main()) + loop.close() diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index 04f5b760..ae8fd2ad 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -1,6 +1,5 @@ import asyncio import logging -import math from typing import Optional, Union from pubnub.endpoints.presence.heartbeat import Heartbeat @@ -14,6 +13,7 @@ from pubnub.event_engine.models import events, invocations from pubnub.models.consumer.common import PNStatus from pubnub.workers import BaseMessageWorker +from pubnub.managers import LinearDelay, ExponentialDelay class Effect: @@ -57,14 +57,6 @@ def get_new_stop_event(self): self.logger.debug(f'creating new stop_event({id(event)}) for {self.__class__.__name__}') return event - def calculate_reconnection_delay(self, attempts): - if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: - delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1) - else: - delay = self.interval - - return delay - class HandshakeEffect(Effect): def run(self): @@ -157,10 +149,15 @@ def __init__(self, pubnub_instance, event_engine_instance, invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None: super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy - self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL - self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF - self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF + self.interval = pubnub_instance.config.reconnection_interval + + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + self.max_retry_attempts = ExponentialDelay.MAX_RETRIES + elif self.reconnection_policy is PNReconnectionPolicy.LINEAR: + self.max_retry_attempts = LinearDelay.MAX_RETRIES + + if pubnub_instance.config.maximum_reconnection_retries is not None: + self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0): self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}") @@ -174,13 +171,23 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs): self.logger.error(f"Success called on Unspecific event. TT:{timetoken}, Reg: {region}, KWARGS: {kwargs.keys()}") raise PubNubException('Unspecified Invocation') + def calculate_reconnection_delay(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + delay = ExponentialDelay.calculate(attempts) + elif self.interval is None: + delay = LinearDelay.calculate(attempts) + else: + delay = self.interval + + return delay + def run(self): if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts) else: attempts = self.invocation.attempts delay = self.calculate_reconnection_delay(attempts) - self.logger.warning(f'will reconnect in {delay}s') + self.logger.warning(f'Will reconnect in {delay}s') if hasattr(self.pubnub, 'event_loop'): self.run_async(self.delayed_reconnect_async(delay, attempts)) @@ -314,7 +321,8 @@ def run(self): async def heartbeat_wait(self, wait_time: int, stop_event): try: await asyncio.sleep(wait_time) - self.event_engine.trigger(events.HeartbeatTimesUpEvent()) + if not stop_event.is_set(): + self.event_engine.trigger(events.HeartbeatTimesUpEvent()) except asyncio.CancelledError: pass @@ -341,9 +349,17 @@ def __init__(self, pubnub_instance, event_engine_instance, super().__init__(pubnub_instance, event_engine_instance, invocation) self.reconnection_policy = pubnub_instance.config.reconnect_policy self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries - self.interval = pubnub_instance.config.RECONNECTION_INTERVAL - self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF - self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF + self.interval = pubnub_instance.config.reconnection_interval + + def calculate_reconnection_delay(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + delay = ExponentialDelay.calculate(attempts) + elif self.interval is None: + delay = LinearDelay.calculate(attempts) + else: + delay = self.interval + + return delay def run(self): if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts: diff --git a/pubnub/managers.py b/pubnub/managers.py index 785b75e4..fc222869 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -1,10 +1,11 @@ import logging from abc import abstractmethod, ABCMeta -import math import time import copy import base64 +import random + from cbor2 import loads from . import utils @@ -51,33 +52,41 @@ def get_base_path(self): class ReconnectionManager: - INTERVAL = 3 - MINEXPONENTIALBACKOFF = 1 - MAXEXPONENTIALBACKOFF = 32 - def __init__(self, pubnub): self._pubnub = pubnub self._callback = None self._timer = None self._timer_interval = None - self._connection_errors = 1 + self._connection_errors = 0 def set_reconnection_listener(self, reconnection_callback): assert isinstance(reconnection_callback, ReconnectionCallback) self._callback = reconnection_callback def _recalculate_interval(self): - if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.EXPONENTIAL: - self._timer_interval = int(math.pow(2, self._connection_errors) - 1) - if self._timer_interval > self.MAXEXPONENTIALBACKOFF: - self._timer_interval = self.MINEXPONENTIALBACKOFF - self._connection_errors = 1 - logger.debug("timerInterval > MAXEXPONENTIALBACKOFF at: %s" % utils.datetime_now()) - elif self._timer_interval < 1: - self._timer_interval = self.MINEXPONENTIALBACKOFF - logger.debug("timerInterval = %d at: %s" % (self._timer_interval, utils.datetime_now())) + policy = self._pubnub.config.reconnect_policy + interval = self._pubnub.config.reconnection_interval + if policy == PNReconnectionPolicy.LINEAR and interval is not None: + self._timer_interval = interval + elif policy == PNReconnectionPolicy.LINEAR: + self._timer_interval = LinearDelay.calculate(self._connection_errors) else: - self._timer_interval = self.INTERVAL + self._timer_interval = ExponentialDelay.calculate(self._connection_errors) + + def _retry_limit_reached(self): + user_limit = self._pubnub.config.maximum_reconnection_retries + policy = self._pubnub.config.reconnect_policy + + if user_limit == 0 or policy == PNReconnectionPolicy.NONE: + return True + elif user_limit == -1: + return False + + policy_limit = (LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR + else ExponentialDelay.MAX_RETRIES) + if user_limit is not None: + return self._connection_errors >= min(user_limit, policy_limit) + return self._connection_errors > policy_limit @abstractmethod def start_polling(self): @@ -89,6 +98,26 @@ def _stop_heartbeat_timer(self): self._timer = None +class LinearDelay: + INTERVAL = 2 + MAX_RETRIES = 10 + + @classmethod + def calculate(cls, attempt: int): + return cls.INTERVAL + round(random.random(), 3) + + +class ExponentialDelay: + MIN_DELAY = 2 + MAX_RETRIES = 6 + MIN_BACKOFF = 2 + MAX_BACKOFF = 150 + + @classmethod + def calculate(cls, attempt: int) -> int: + return min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt)) + round(random.random(), 3) + + class StateManager: def __init__(self): self._channels = {} diff --git a/pubnub/pnconfiguration.py b/pubnub/pnconfiguration.py index de00581d..2d88cdee 100644 --- a/pubnub/pnconfiguration.py +++ b/pubnub/pnconfiguration.py @@ -11,9 +11,6 @@ class PNConfiguration(object): DEFAULT_PRESENCE_TIMEOUT = 300 DEFAULT_HEARTBEAT_INTERVAL = 280 ALLOWED_AES_MODES = [AES.MODE_CBC, AES.MODE_GCM] - RECONNECTION_INTERVAL = 3 - RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1 - RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32 DEFAULT_CRYPTO_MODULE = LegacyCryptoModule _locked = False @@ -39,8 +36,9 @@ def __init__(self): self.log_verbosity = False self.enable_presence_heartbeat = False self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES - self.reconnect_policy = PNReconnectionPolicy.NONE - self.maximum_reconnection_retries = -1 # -1 means unlimited/ 0 means no retries + self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL + self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries + self.reconnection_interval = None # if None is left the default value from LinearDelay is used self.daemon = False self.use_random_initialization_vector = True self.suppress_leave_events = False diff --git a/pubnub/pubnub.py b/pubnub/pubnub.py index 94bc201e..57ba6229 100644 --- a/pubnub/pubnub.py +++ b/pubnub/pubnub.py @@ -101,6 +101,13 @@ def __init__(self, pubnub): def _register_heartbeat_timer(self): self.stop_heartbeat_timer() + if self._retry_limit_reached(): + logger.warning("Reconnection retry limit reached. Disconnecting.") + disconnect_status = PNStatus() + disconnect_status.category = PNStatusCategory.PNDisconnectedCategory + self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status) + return + self._recalculate_interval() self._timer = threading.Timer(self._timer_interval, self._call_time) @@ -129,6 +136,9 @@ def _call_time_callback(self, resp, status): def start_polling(self): if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.NONE: logger.warning("reconnection policy is disabled, please handle reconnection manually.") + disconnect_status = PNStatus() + disconnect_status.category = PNStatusCategory.PNDisconnectedCategory + self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status) return logger.debug("reconnection manager start at: %s" % utils.datetime_now()) diff --git a/tests/acceptance/subscribe/environment.py b/tests/acceptance/subscribe/environment.py index dea2c0c7..8f4740a3 100644 --- a/tests/acceptance/subscribe/environment.py +++ b/tests/acceptance/subscribe/environment.py @@ -43,7 +43,16 @@ def before_scenario(context: Context, feature): def after_scenario(context: Context, feature): loop = asyncio.get_event_loop() loop.run_until_complete(context.pubnub.stop()) - loop.run_until_complete(asyncio.sleep(0.1)) + # asyncio cleaning all pending tasks to eliminate any potential state changes + pending_tasks = asyncio.all_tasks(loop) + for task in pending_tasks: + task.cancel() + try: + loop.run_until_complete(task) + except asyncio.CancelledError: + pass + loop.run_until_complete(asyncio.sleep(1.5)) + del context.pubnub for tag in feature.tags: if "contract" in tag: diff --git a/tests/acceptance/subscribe/steps/given_steps.py b/tests/acceptance/subscribe/steps/given_steps.py index 9f5e6b9d..493b7135 100644 --- a/tests/acceptance/subscribe/steps/given_steps.py +++ b/tests/acceptance/subscribe/steps/given_steps.py @@ -1,6 +1,7 @@ import logging from behave import given +from behave.api.async_step import async_run_until_complete from io import StringIO from pubnub.enums import PNReconnectionPolicy from pubnub.pubnub_asyncio import PubNubAsyncio, EventEngineSubscriptionManager @@ -9,7 +10,8 @@ @given("the demo keyset with event engine enabled") -def step_impl(context: PNContext): +@async_run_until_complete +async def step_impl(context: PNContext): context.log_stream = StringIO() logger = logging.getLogger('pubnub').getChild('subscribe') logger.setLevel(logging.DEBUG) @@ -27,7 +29,8 @@ def step_impl(context: PNContext): @given("a linear reconnection policy with {max_retries} retries") -def step_impl(context: PNContext, max_retries: str): +@async_run_until_complete +async def step_impl(context: PNContext, max_retries: str): context.pubnub.config.reconnect_policy = PNReconnectionPolicy.LINEAR context.pubnub.config.maximum_reconnection_retries = int(max_retries) @@ -38,7 +41,8 @@ def step_impl(context: PNContext, max_retries: str): @given("the demo keyset with Presence EE enabled") -def step_impl(context: PNContext): +@async_run_until_complete +async def step_impl(context: PNContext): context.log_stream_pubnub = StringIO() logger = logging.getLogger('pubnub') logger.setLevel(logging.DEBUG) @@ -56,7 +60,7 @@ def step_impl(context: PNContext): context.pn_config.enable_presence_heartbeat = True context.pn_config.reconnect_policy = PNReconnectionPolicy.LINEAR context.pn_config.subscribe_request_timeout = 10 - context.pn_config.RECONNECTION_INTERVAL = 2 + context.pn_config.reconnection_interval = 2 context.pn_config.set_presence_timeout(3) context.pubnub = PubNubAsyncio(context.pn_config, subscription_manager=EventEngineSubscriptionManager) @@ -66,6 +70,7 @@ def step_impl(context: PNContext): @given("heartbeatInterval set to '{interval}', timeout set to '{timeout}'" " and suppressLeaveEvents set to '{suppress_leave}'") -def step_impl(context: PNContext, interval: str, timeout: str, suppress_leave: str): +@async_run_until_complete +async def step_impl(context: PNContext, interval: str, timeout: str, suppress_leave: str): context.pn_config.set_presence_timeout_with_custom_interval(int(timeout), int(interval)) context.pn_config.suppress_leave_events = True if suppress_leave == 'true' else False diff --git a/tests/acceptance/subscribe/steps/then_steps.py b/tests/acceptance/subscribe/steps/then_steps.py index 26c84c63..ef09d821 100644 --- a/tests/acceptance/subscribe/steps/then_steps.py +++ b/tests/acceptance/subscribe/steps/then_steps.py @@ -125,6 +125,7 @@ async def step_impl(ctx): @async_run_until_complete async def step_impl(context, channel1, channel2): context.pubnub.unsubscribe().channels([channel1, channel2]).execute() + await asyncio.sleep(0.5) @then(u'I don\'t observe any Events and Invocations of the Presence EE') diff --git a/tests/acceptance/subscribe/steps/when_steps.py b/tests/acceptance/subscribe/steps/when_steps.py index 63f4ffab..e5625643 100644 --- a/tests/acceptance/subscribe/steps/when_steps.py +++ b/tests/acceptance/subscribe/steps/when_steps.py @@ -4,12 +4,14 @@ @when('I subscribe') -def step_impl(context: PNContext): +@async_run_until_complete +async def step_impl(context: PNContext): context.pubnub.subscribe().channels('foo').execute() @when('I subscribe with timetoken {timetoken}') -def step_impl(context: PNContext, timetoken: str): # noqa F811 +@async_run_until_complete +async def step_impl(context: PNContext, timetoken: str): # noqa F811 callback = AcceptanceCallback() context.pubnub.add_listener(callback) context.pubnub.subscribe().channels('foo').with_timetoken(int(timetoken)).execute() diff --git a/tests/functional/event_engine/test_managed_effect.py b/tests/functional/event_engine/test_managed_effect.py index c59049d2..bea019ad 100644 --- a/tests/functional/event_engine/test_managed_effect.py +++ b/tests/functional/event_engine/test_managed_effect.py @@ -15,9 +15,7 @@ class FakeConfig: reconnect_policy = PNReconnectionPolicy.NONE - RECONNECTION_INTERVAL = 1 - RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1 - RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32 + reconnection_interval = 1 maximum_reconnection_retries = 3 diff --git a/tests/integrational/asyncio/test_subscribe.py b/tests/integrational/asyncio/test_subscribe.py index 5760d0ae..e98c94b5 100644 --- a/tests/integrational/asyncio/test_subscribe.py +++ b/tests/integrational/asyncio/test_subscribe.py @@ -4,10 +4,14 @@ import pubnub as pn from unittest.mock import patch +from pubnub.callbacks import SubscribeCallback +from pubnub.models.consumer.common import PNStatus from pubnub.models.consumer.pubsub import PNMessageResult from pubnub.pubnub_asyncio import AsyncioSubscriptionManager, PubNubAsyncio, AsyncioEnvelope, SubscribeListener from tests.helper import gen_channel, pnconf_enc_env_copy, pnconf_env_copy, pnconf_sub_copy from tests.integrational.vcr_asyncio_sleeper import VCR599Listener, VCR599ReconnectionManager +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory +from pubnub.managers import LinearDelay, ExponentialDelay # from tests.integrational.vcr_helper import pn_vcr pn.set_stream_logger('pubnub', logging.DEBUG) @@ -17,6 +21,21 @@ async def patch_pubnub(pubnub): pubnub._subscription_manager._reconnection_manager = VCR599ReconnectionManager(pubnub) +class TestCallback(SubscribeCallback): + message_result = None + status_result = None + presence_result = None + + def status(self, pubnub, status): + self.status_result = status + + def message(self, pubnub, message): + self.message_result = message + + def presence(self, pubnub, presence): + self.presence_result = presence + + # TODO: refactor cassette # @pn_vcr.use_cassette('tests/integrational/fixtures/asyncio/subscription/sub_unsub.json', serializer='pn_json', # filter_query_parameters=['pnsdk', 'ee', 'tr']) @@ -403,3 +422,159 @@ async def test_unsubscribe_all(): assert envelope.status.original_response['status'] == 200 await pubnub.stop() + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_none(): + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-none", + reconnect_policy=PNReconnectionPolicy.NONE, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(1) + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_none(): + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-none", + reconnect_policy=PNReconnectionPolicy.NONE, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_none").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_linear(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-linear", + reconnect_policy=PNReconnectionPolicy.LINEAR, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_linear").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_exponential(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, + uuid="test-subscribe-failing-reconnect-policy-exponential", + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_exponential").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_linear_with_max_retries(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, + uuid="test-subscribe-failing-reconnect-policy-linear-with-max-retries", + reconnect_policy=PNReconnectionPolicy.LINEAR, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_linear").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == 3 + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_exponential_with_max_retries(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, + uuid="test-subscribe-failing-reconnect-policy-exponential-with-max-retries", + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_exponential").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == 3 + + +@pytest.mark.asyncio +async def test_subscribe_failing_reconnect_policy_linear_with_custom_interval(): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, reconnection_interval=1, + uuid="test-subscribe-failing-reconnect-policy-linear-with-max-retries", + reconnect_policy=PNReconnectionPolicy.LINEAR, + origin='127.0.0.1') + pubnub = PubNubAsyncio(config) + + listener = TestCallback() + pubnub.add_listener(listener) + pubnub.subscribe().channels("my_channel_linear").execute() + while True: + if isinstance(listener.status_result, PNStatus) \ + and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory: + break + await asyncio.sleep(0.5) + assert calculate_mock.call_count == 0 diff --git a/tests/integrational/asyncio/test_unsubscribe_status.py b/tests/integrational/asyncio/test_unsubscribe_status.py index 95ed40a3..519c23e3 100644 --- a/tests/integrational/asyncio/test_unsubscribe_status.py +++ b/tests/integrational/asyncio/test_unsubscribe_status.py @@ -64,6 +64,7 @@ async def test_access_denied_unsubscribe_operation(): pnconf = pnconf_pam_copy() pnconf.secret_key = None pnconf.enable_subscribe = True + pnconf.reconnect_policy = pn.enums.PNReconnectionPolicy.NONE pubnub = PubNubAsyncio(pnconf) diff --git a/tests/integrational/native_threads/test_subscribe.py b/tests/integrational/native_threads/test_subscribe.py index 4b0280ff..29b6aa6b 100644 --- a/tests/integrational/native_threads/test_subscribe.py +++ b/tests/integrational/native_threads/test_subscribe.py @@ -1,9 +1,13 @@ import binascii import logging import unittest +import time import pubnub as pn +from unittest.mock import patch +from pubnub.enums import PNReconnectionPolicy, PNStatusCategory from pubnub.exceptions import PubNubException +from pubnub.managers import LinearDelay, ExponentialDelay from pubnub.models.consumer.channel_group import PNChannelGroupsAddChannelResult, PNChannelGroupsRemoveChannelResult from pubnub.models.consumer.pubsub import PNPublishResult, PNMessageResult from pubnub.pubnub import PubNub, SubscribeListener, NonSubscribeListener @@ -15,6 +19,22 @@ pn.set_stream_logger('pubnub', logging.DEBUG) +class DisconnectListener(SubscribeListener): + status_result = None + disconnected = False + + def status(self, pubnub, status): + if status.category == PNStatusCategory.PNDisconnectedCategory: + print('Could not connect. Exiting...') + self.disconnected = True + + def message(self, pubnub, message): + print(f'Message:\n{message.__dict__}') + + def presence(self, pubnub, presence): + print(f'Presence:\n{presence.__dict__}') + + class TestPubNubSubscription(unittest.TestCase): @pn_vcr.use_cassette('tests/integrational/fixtures/native_threads/subscribe/subscribe_unsubscribe.json', filter_query_parameters=['seqn', 'pnsdk', 'tr', 'tt'], serializer='pn_json', @@ -302,3 +322,137 @@ def test_subscribe_pub_unencrypted_unsubscribe(self): self.fail(e) finally: pubnub.stop() + + def test_subscribe_retry_policy_none(self): + ch = "test-subscribe-retry-policy-none" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + reconnect_policy=PNReconnectionPolicy.NONE)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + def test_subscribe_retry_policy_linear(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-linear" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == LinearDelay.MAX_RETRIES + 1 + + def test_subscribe_retry_policy_exponential(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-exponential" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == ExponentialDelay.MAX_RETRIES + 1 + + def test_subscribe_retry_policy_linear_with_max_retries(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-linear" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + maximum_reconnection_retries=3, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == 3 + + def test_subscribe_retry_policy_exponential_with_max_retries(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.ExponentialDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-exponential" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + maximum_reconnection_retries=3, + reconnect_policy=PNReconnectionPolicy.EXPONENTIAL)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == 3 + + def test_subscribe_retry_policy_linear_with_custom_interval(self): + # we don't test the actual delay calculation here, just everything around it + def mock_calculate(*args, **kwargs): + return 0.2 + + with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock: + ch = "test-subscribe-retry-policy-linear" + pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1', + maximum_reconnection_retries=3, reconnection_interval=1, + reconnect_policy=PNReconnectionPolicy.LINEAR)) + listener = DisconnectListener() + + try: + pubnub.add_listener(listener) + pubnub.subscribe().channels(ch).execute() + + while not listener.disconnected: + time.sleep(0.5) + + except PubNubException as e: + self.fail(e) + + assert calculate_mock.call_count == 0 diff --git a/tests/unit/test_reconnection_manager.py b/tests/unit/test_reconnection_manager.py new file mode 100644 index 00000000..e14c10bd --- /dev/null +++ b/tests/unit/test_reconnection_manager.py @@ -0,0 +1,42 @@ +from pubnub.enums import PNReconnectionPolicy +from pubnub.managers import ReconnectionManager +from pubnub.pnconfiguration import PNConfiguration +from pubnub.pubnub import PubNub + + +def assert_more_or_less(given, expected): + assert expected < given < expected + 1 + + +def test_linear_policy(): + config = PNConfiguration() + config.subscribe_key = "test" + config.publish_key = "test" + config.reconnect_policy = PNReconnectionPolicy.LINEAR + config.uuid = "test" + + pubnub = PubNub(config) + reconnection_manager = ReconnectionManager(pubnub) + + for i in range(0, 10): + reconnection_manager._connection_errors = i + reconnection_manager._recalculate_interval() + assert_more_or_less(reconnection_manager._timer_interval, 2) + + +def test_exponential_policy(): + config = PNConfiguration() + config.subscribe_key = "test" + config.publish_key = "test" + config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL + config.uuid = "test" + + pubnub = PubNub(config) + reconnection_manager = ReconnectionManager(pubnub) + + expected = [2, 4, 8, 16, 32, 64, 128, 150, 150, 150] + + for i in range(0, 10): + reconnection_manager._connection_errors = i + reconnection_manager._recalculate_interval() + assert_more_or_less(reconnection_manager._timer_interval, expected[i])