Skip to content

Commit

Permalink
Expoential as a default reconnection policy (#196)
Browse files Browse the repository at this point in the history
* Expoential as a default reconnection policy

* Add respect of user defined retry limit

* That one test which still needs no reconnect policy

* Add custom delay for linear policy

* clean up old variables
  • Loading branch information
seba-aln authored Oct 1, 2024
1 parent 16f9086 commit 0d47e58
Show file tree
Hide file tree
Showing 15 changed files with 599 additions and 50 deletions.
50 changes: 50 additions & 0 deletions examples/native_threads/subscribe_with_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
import sys
import time

from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub import PubNub, SubscribeListener
from pubnub.enums import PNReconnectionPolicy, PNStatusCategory


class TestListener(SubscribeListener):
status_result = None
disconnected = False

def status(self, pubnub, status):
if status.category == PNStatusCategory.PNDisconnectedCategory:
print('Could not connect. Exiting...')
self.disconnected = True

def message(self, pubnub, message):
print(f'Message:\n{message.__dict__}')

def presence(self, pubnub, presence):
print(f'Presence:\n{presence.__dict__}')


logger = logging.getLogger("pubnub")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)


config = PNConfiguration()
config.subscribe_key = "demo"
config.publish_key = "demo"
config.user_id = 'example'
config.enable_subscribe = True
config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
config.origin = '127.0.0.1'
config.ssl = False

listener = TestListener()

pubnub = PubNub(config)
pubnub.add_listener(listener)
sub = pubnub.subscribe().channels(['example']).execute()

while not listener.disconnected:
time.sleep(0.5)
print('Disconnected. Bye.')
59 changes: 59 additions & 0 deletions examples/pubnub_asyncio/subscribe_with_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import logging
import sys

from pubnub.callbacks import SubscribeCallback
from pubnub.models.consumer.common import PNStatus
from pubnub.pubnub_asyncio import PubNubAsyncio
from pubnub.pnconfiguration import PNConfiguration
from pubnub.enums import PNReconnectionPolicy, PNStatusCategory

config = PNConfiguration()
config.subscribe_key = "demo"
config.publish_key = "demo"
config.enable_subscribe = True
config.uuid = "test-uuid"
config.origin = "127.0.0.1"
config.ssl = False
config.reconnect_policy = PNReconnectionPolicy.NONE

pubnub = PubNubAsyncio(config)

logger = logging.getLogger("pubnub")
logger.setLevel(logging.WARNING)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.WARNING)
logger.addHandler(handler)


class SampleCallback(SubscribeCallback):
message_result = None
status_result = None
presence_result = None

def status(self, pubnub, status):
self.status_result = status

def message(self, pubnub, message):
self.message_result = message

def presence(self, pubnub, presence):
self.presence_result = presence


async def main():
listener = SampleCallback()
pubnub.add_listener(listener)
pubnub.subscribe().channels("my_channel").execute()
while True:
if isinstance(listener.status_result, PNStatus) \
and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory:
print('Could not connect. Exiting...')
break
await asyncio.sleep(1)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
52 changes: 34 additions & 18 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import math

from typing import Optional, Union
from pubnub.endpoints.presence.heartbeat import Heartbeat
Expand All @@ -14,6 +13,7 @@
from pubnub.event_engine.models import events, invocations
from pubnub.models.consumer.common import PNStatus
from pubnub.workers import BaseMessageWorker
from pubnub.managers import LinearDelay, ExponentialDelay


class Effect:
Expand Down Expand Up @@ -57,14 +57,6 @@ def get_new_stop_event(self):
self.logger.debug(f'creating new stop_event({id(event)}) for {self.__class__.__name__}')
return event

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1)
else:
delay = self.interval

return delay


class HandshakeEffect(Effect):
def run(self):
Expand Down Expand Up @@ -157,10 +149,15 @@ def __init__(self, pubnub_instance, event_engine_instance,
invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None:
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
self.interval = pubnub_instance.config.RECONNECTION_INTERVAL
self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF
self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF
self.interval = pubnub_instance.config.reconnection_interval

if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
elif self.reconnection_policy is PNReconnectionPolicy.LINEAR:
self.max_retry_attempts = LinearDelay.MAX_RETRIES

if pubnub_instance.config.maximum_reconnection_retries is not None:
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries

def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0):
self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}")
Expand All @@ -174,13 +171,23 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs):
self.logger.error(f"Success called on Unspecific event. TT:{timetoken}, Reg: {region}, KWARGS: {kwargs.keys()}")
raise PubNubException('Unspecified Invocation')

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
else:
delay = self.interval

return delay

def run(self):
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts)
else:
attempts = self.invocation.attempts
delay = self.calculate_reconnection_delay(attempts)
self.logger.warning(f'will reconnect in {delay}s')
self.logger.warning(f'Will reconnect in {delay}s')
if hasattr(self.pubnub, 'event_loop'):
self.run_async(self.delayed_reconnect_async(delay, attempts))

Expand Down Expand Up @@ -314,7 +321,8 @@ def run(self):
async def heartbeat_wait(self, wait_time: int, stop_event):
try:
await asyncio.sleep(wait_time)
self.event_engine.trigger(events.HeartbeatTimesUpEvent())
if not stop_event.is_set():
self.event_engine.trigger(events.HeartbeatTimesUpEvent())
except asyncio.CancelledError:
pass

Expand All @@ -341,9 +349,17 @@ def __init__(self, pubnub_instance, event_engine_instance,
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
self.interval = pubnub_instance.config.RECONNECTION_INTERVAL
self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF
self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF
self.interval = pubnub_instance.config.reconnection_interval

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
else:
delay = self.interval

return delay

def run(self):
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
Expand Down
61 changes: 45 additions & 16 deletions pubnub/managers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from abc import abstractmethod, ABCMeta

import math
import time
import copy
import base64
import random

from cbor2 import loads

from . import utils
Expand Down Expand Up @@ -51,33 +52,41 @@ def get_base_path(self):


class ReconnectionManager:
INTERVAL = 3
MINEXPONENTIALBACKOFF = 1
MAXEXPONENTIALBACKOFF = 32

def __init__(self, pubnub):
self._pubnub = pubnub
self._callback = None
self._timer = None
self._timer_interval = None
self._connection_errors = 1
self._connection_errors = 0

def set_reconnection_listener(self, reconnection_callback):
assert isinstance(reconnection_callback, ReconnectionCallback)
self._callback = reconnection_callback

def _recalculate_interval(self):
if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.EXPONENTIAL:
self._timer_interval = int(math.pow(2, self._connection_errors) - 1)
if self._timer_interval > self.MAXEXPONENTIALBACKOFF:
self._timer_interval = self.MINEXPONENTIALBACKOFF
self._connection_errors = 1
logger.debug("timerInterval > MAXEXPONENTIALBACKOFF at: %s" % utils.datetime_now())
elif self._timer_interval < 1:
self._timer_interval = self.MINEXPONENTIALBACKOFF
logger.debug("timerInterval = %d at: %s" % (self._timer_interval, utils.datetime_now()))
policy = self._pubnub.config.reconnect_policy
interval = self._pubnub.config.reconnection_interval
if policy == PNReconnectionPolicy.LINEAR and interval is not None:
self._timer_interval = interval
elif policy == PNReconnectionPolicy.LINEAR:
self._timer_interval = LinearDelay.calculate(self._connection_errors)
else:
self._timer_interval = self.INTERVAL
self._timer_interval = ExponentialDelay.calculate(self._connection_errors)

def _retry_limit_reached(self):
user_limit = self._pubnub.config.maximum_reconnection_retries
policy = self._pubnub.config.reconnect_policy

if user_limit == 0 or policy == PNReconnectionPolicy.NONE:
return True
elif user_limit == -1:
return False

policy_limit = (LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR
else ExponentialDelay.MAX_RETRIES)
if user_limit is not None:
return self._connection_errors >= min(user_limit, policy_limit)
return self._connection_errors > policy_limit

@abstractmethod
def start_polling(self):
Expand All @@ -89,6 +98,26 @@ def _stop_heartbeat_timer(self):
self._timer = None


class LinearDelay:
INTERVAL = 2
MAX_RETRIES = 10

@classmethod
def calculate(cls, attempt: int):
return cls.INTERVAL + round(random.random(), 3)


class ExponentialDelay:
MIN_DELAY = 2
MAX_RETRIES = 6
MIN_BACKOFF = 2
MAX_BACKOFF = 150

@classmethod
def calculate(cls, attempt: int) -> int:
return min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt)) + round(random.random(), 3)


class StateManager:
def __init__(self):
self._channels = {}
Expand Down
8 changes: 3 additions & 5 deletions pubnub/pnconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ class PNConfiguration(object):
DEFAULT_PRESENCE_TIMEOUT = 300
DEFAULT_HEARTBEAT_INTERVAL = 280
ALLOWED_AES_MODES = [AES.MODE_CBC, AES.MODE_GCM]
RECONNECTION_INTERVAL = 3
RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1
RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32
DEFAULT_CRYPTO_MODULE = LegacyCryptoModule
_locked = False

Expand All @@ -39,8 +36,9 @@ def __init__(self):
self.log_verbosity = False
self.enable_presence_heartbeat = False
self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES
self.reconnect_policy = PNReconnectionPolicy.NONE
self.maximum_reconnection_retries = -1 # -1 means unlimited/ 0 means no retries
self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries
self.reconnection_interval = None # if None is left the default value from LinearDelay is used
self.daemon = False
self.use_random_initialization_vector = True
self.suppress_leave_events = False
Expand Down
10 changes: 10 additions & 0 deletions pubnub/pubnub.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ def __init__(self, pubnub):
def _register_heartbeat_timer(self):
self.stop_heartbeat_timer()

if self._retry_limit_reached():
logger.warning("Reconnection retry limit reached. Disconnecting.")
disconnect_status = PNStatus()
disconnect_status.category = PNStatusCategory.PNDisconnectedCategory
self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status)
return

self._recalculate_interval()

self._timer = threading.Timer(self._timer_interval, self._call_time)
Expand Down Expand Up @@ -129,6 +136,9 @@ def _call_time_callback(self, resp, status):
def start_polling(self):
if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.NONE:
logger.warning("reconnection policy is disabled, please handle reconnection manually.")
disconnect_status = PNStatus()
disconnect_status.category = PNStatusCategory.PNDisconnectedCategory
self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status)
return

logger.debug("reconnection manager start at: %s" % utils.datetime_now())
Expand Down
11 changes: 10 additions & 1 deletion tests/acceptance/subscribe/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ def before_scenario(context: Context, feature):
def after_scenario(context: Context, feature):
loop = asyncio.get_event_loop()
loop.run_until_complete(context.pubnub.stop())
loop.run_until_complete(asyncio.sleep(0.1))
# asyncio cleaning all pending tasks to eliminate any potential state changes
pending_tasks = asyncio.all_tasks(loop)
for task in pending_tasks:
task.cancel()
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
loop.run_until_complete(asyncio.sleep(1.5))
del context.pubnub

for tag in feature.tags:
if "contract" in tag:
Expand Down
Loading

0 comments on commit 0d47e58

Please sign in to comment.