Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expoential as a default reconnection policy #196

Merged
merged 8 commits into from
Oct 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions examples/native_threads/subscribe_with_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import logging
import sys
import time

from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub import PubNub, SubscribeListener
from pubnub.enums import PNReconnectionPolicy, PNStatusCategory


class TestListener(SubscribeListener):
status_result = None
disconnected = False

def status(self, pubnub, status):
if status.category == PNStatusCategory.PNDisconnectedCategory:
print('Could not connect. Exiting...')
self.disconnected = True

def message(self, pubnub, message):
print(f'Message:\n{message.__dict__}')

def presence(self, pubnub, presence):
print(f'Presence:\n{presence.__dict__}')


logger = logging.getLogger("pubnub")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)


config = PNConfiguration()
config.subscribe_key = "demo"
config.publish_key = "demo"
config.user_id = 'example'
config.enable_subscribe = True
config.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
config.origin = '127.0.0.1'
config.ssl = False

listener = TestListener()

pubnub = PubNub(config)
pubnub.add_listener(listener)
sub = pubnub.subscribe().channels(['example']).execute()

while not listener.disconnected:
time.sleep(0.5)
print('Disconnected. Bye.')
59 changes: 59 additions & 0 deletions examples/pubnub_asyncio/subscribe_with_retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import asyncio
import logging
import sys

from pubnub.callbacks import SubscribeCallback
from pubnub.models.consumer.common import PNStatus
from pubnub.pubnub_asyncio import PubNubAsyncio
from pubnub.pnconfiguration import PNConfiguration
from pubnub.enums import PNReconnectionPolicy, PNStatusCategory

config = PNConfiguration()
config.subscribe_key = "demo"
config.publish_key = "demo"
config.enable_subscribe = True
config.uuid = "test-uuid"
config.origin = "127.0.0.1"
config.ssl = False
config.reconnect_policy = PNReconnectionPolicy.NONE

pubnub = PubNubAsyncio(config)

logger = logging.getLogger("pubnub")
logger.setLevel(logging.WARNING)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.WARNING)
logger.addHandler(handler)


class SampleCallback(SubscribeCallback):
message_result = None
status_result = None
presence_result = None

def status(self, pubnub, status):
self.status_result = status

def message(self, pubnub, message):
self.message_result = message

def presence(self, pubnub, presence):
self.presence_result = presence


async def main():
listener = SampleCallback()
pubnub.add_listener(listener)
pubnub.subscribe().channels("my_channel").execute()
while True:
if isinstance(listener.status_result, PNStatus) \
and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory:
print('Could not connect. Exiting...')
break
await asyncio.sleep(1)


if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
52 changes: 34 additions & 18 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio
import logging
import math

from typing import Optional, Union
from pubnub.endpoints.presence.heartbeat import Heartbeat
Expand All @@ -14,6 +13,7 @@
from pubnub.event_engine.models import events, invocations
from pubnub.models.consumer.common import PNStatus
from pubnub.workers import BaseMessageWorker
from pubnub.managers import LinearDelay, ExponentialDelay


class Effect:
Expand Down Expand Up @@ -57,14 +57,6 @@ def get_new_stop_event(self):
self.logger.debug(f'creating new stop_event({id(event)}) for {self.__class__.__name__}')
return event

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1)
else:
delay = self.interval

return delay


class HandshakeEffect(Effect):
def run(self):
Expand Down Expand Up @@ -157,10 +149,15 @@ def __init__(self, pubnub_instance, event_engine_instance,
invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None:
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
self.interval = pubnub_instance.config.RECONNECTION_INTERVAL
self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF
self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF
self.interval = pubnub_instance.config.reconnection_interval

if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
elif self.reconnection_policy is PNReconnectionPolicy.LINEAR:
self.max_retry_attempts = LinearDelay.MAX_RETRIES

if pubnub_instance.config.maximum_reconnection_retries is not None:
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries

def give_up(self, reason: PubNubException, attempt: int, timetoken: int = 0):
self.logger.error(f"GiveUp called on Unspecific event. Reason: {reason}, Attempt: {attempt} TT:{timetoken}")
Expand All @@ -174,13 +171,23 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs):
self.logger.error(f"Success called on Unspecific event. TT:{timetoken}, Reg: {region}, KWARGS: {kwargs.keys()}")
raise PubNubException('Unspecified Invocation')

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
else:
delay = self.interval

return delay

def run(self):
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
self.give_up(reason=self.invocation.reason, attempt=self.invocation.attempts)
else:
attempts = self.invocation.attempts
delay = self.calculate_reconnection_delay(attempts)
self.logger.warning(f'will reconnect in {delay}s')
self.logger.warning(f'Will reconnect in {delay}s')
if hasattr(self.pubnub, 'event_loop'):
self.run_async(self.delayed_reconnect_async(delay, attempts))

Expand Down Expand Up @@ -314,7 +321,8 @@ def run(self):
async def heartbeat_wait(self, wait_time: int, stop_event):
try:
await asyncio.sleep(wait_time)
self.event_engine.trigger(events.HeartbeatTimesUpEvent())
if not stop_event.is_set():
self.event_engine.trigger(events.HeartbeatTimesUpEvent())
except asyncio.CancelledError:
pass

Expand All @@ -341,9 +349,17 @@ def __init__(self, pubnub_instance, event_engine_instance,
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
self.interval = pubnub_instance.config.RECONNECTION_INTERVAL
self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF
self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF
self.interval = pubnub_instance.config.reconnection_interval

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
else:
delay = self.interval

return delay

def run(self):
if self.reconnection_policy is PNReconnectionPolicy.NONE or self.invocation.attempts > self.max_retry_attempts:
Expand Down
61 changes: 45 additions & 16 deletions pubnub/managers.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import logging
from abc import abstractmethod, ABCMeta

import math
import time
import copy
import base64
import random

from cbor2 import loads

from . import utils
Expand Down Expand Up @@ -51,33 +52,41 @@ def get_base_path(self):


class ReconnectionManager:
INTERVAL = 3
MINEXPONENTIALBACKOFF = 1
MAXEXPONENTIALBACKOFF = 32

def __init__(self, pubnub):
self._pubnub = pubnub
self._callback = None
self._timer = None
self._timer_interval = None
self._connection_errors = 1
self._connection_errors = 0

def set_reconnection_listener(self, reconnection_callback):
assert isinstance(reconnection_callback, ReconnectionCallback)
self._callback = reconnection_callback

def _recalculate_interval(self):
if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.EXPONENTIAL:
self._timer_interval = int(math.pow(2, self._connection_errors) - 1)
if self._timer_interval > self.MAXEXPONENTIALBACKOFF:
self._timer_interval = self.MINEXPONENTIALBACKOFF
self._connection_errors = 1
logger.debug("timerInterval > MAXEXPONENTIALBACKOFF at: %s" % utils.datetime_now())
elif self._timer_interval < 1:
self._timer_interval = self.MINEXPONENTIALBACKOFF
logger.debug("timerInterval = %d at: %s" % (self._timer_interval, utils.datetime_now()))
policy = self._pubnub.config.reconnect_policy
interval = self._pubnub.config.reconnection_interval
if policy == PNReconnectionPolicy.LINEAR and interval is not None:
self._timer_interval = interval
elif policy == PNReconnectionPolicy.LINEAR:
self._timer_interval = LinearDelay.calculate(self._connection_errors)
else:
self._timer_interval = self.INTERVAL
self._timer_interval = ExponentialDelay.calculate(self._connection_errors)

def _retry_limit_reached(self):
user_limit = self._pubnub.config.maximum_reconnection_retries
policy = self._pubnub.config.reconnect_policy

if user_limit == 0 or policy == PNReconnectionPolicy.NONE:
return True
elif user_limit == -1:
return False

policy_limit = (LinearDelay.MAX_RETRIES if policy == PNReconnectionPolicy.LINEAR
else ExponentialDelay.MAX_RETRIES)
if user_limit is not None:
return self._connection_errors >= min(user_limit, policy_limit)
return self._connection_errors > policy_limit

@abstractmethod
def start_polling(self):
Expand All @@ -89,6 +98,26 @@ def _stop_heartbeat_timer(self):
self._timer = None


class LinearDelay:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to provide custom delay other than 2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, not really. Should we add this option?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added in latest commit. thanks for pointing that out ;)

INTERVAL = 2
MAX_RETRIES = 10

@classmethod
def calculate(cls, attempt: int):
return cls.INTERVAL + round(random.random(), 3)


class ExponentialDelay:
MIN_DELAY = 2
MAX_RETRIES = 6
MIN_BACKOFF = 2
MAX_BACKOFF = 150

@classmethod
def calculate(cls, attempt: int) -> int:
return min(cls.MAX_BACKOFF, cls.MIN_DELAY * (2 ** attempt)) + round(random.random(), 3)


class StateManager:
def __init__(self):
self._channels = {}
Expand Down
8 changes: 3 additions & 5 deletions pubnub/pnconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ class PNConfiguration(object):
DEFAULT_PRESENCE_TIMEOUT = 300
DEFAULT_HEARTBEAT_INTERVAL = 280
ALLOWED_AES_MODES = [AES.MODE_CBC, AES.MODE_GCM]
RECONNECTION_INTERVAL = 3
RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1
RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32
DEFAULT_CRYPTO_MODULE = LegacyCryptoModule
_locked = False

Expand All @@ -39,8 +36,9 @@ def __init__(self):
self.log_verbosity = False
self.enable_presence_heartbeat = False
self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES
self.reconnect_policy = PNReconnectionPolicy.NONE
self.maximum_reconnection_retries = -1 # -1 means unlimited/ 0 means no retries
self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries
self.reconnection_interval = None # if None is left the default value from LinearDelay is used
self.daemon = False
self.use_random_initialization_vector = True
self.suppress_leave_events = False
Expand Down
10 changes: 10 additions & 0 deletions pubnub/pubnub.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ def __init__(self, pubnub):
def _register_heartbeat_timer(self):
self.stop_heartbeat_timer()

if self._retry_limit_reached():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how it works in Python, but should announcing disconnected status being part of EE?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This one is for non-event engine subscribe (yeah, python has more than one subscribe)

logger.warning("Reconnection retry limit reached. Disconnecting.")
disconnect_status = PNStatus()
disconnect_status.category = PNStatusCategory.PNDisconnectedCategory
self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status)
return

self._recalculate_interval()

self._timer = threading.Timer(self._timer_interval, self._call_time)
Expand Down Expand Up @@ -129,6 +136,9 @@ def _call_time_callback(self, resp, status):
def start_polling(self):
if self._pubnub.config.reconnect_policy == PNReconnectionPolicy.NONE:
logger.warning("reconnection policy is disabled, please handle reconnection manually.")
disconnect_status = PNStatus()
disconnect_status.category = PNStatusCategory.PNDisconnectedCategory
self._pubnub._subscription_manager._listener_manager.announce_status(disconnect_status)
return

logger.debug("reconnection manager start at: %s" % utils.datetime_now())
Expand Down
11 changes: 10 additions & 1 deletion tests/acceptance/subscribe/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,16 @@ def before_scenario(context: Context, feature):
def after_scenario(context: Context, feature):
loop = asyncio.get_event_loop()
loop.run_until_complete(context.pubnub.stop())
loop.run_until_complete(asyncio.sleep(0.1))
# asyncio cleaning all pending tasks to eliminate any potential state changes
pending_tasks = asyncio.all_tasks(loop)
for task in pending_tasks:
task.cancel()
try:
loop.run_until_complete(task)
except asyncio.CancelledError:
pass
loop.run_until_complete(asyncio.sleep(1.5))
del context.pubnub

for tag in feature.tags:
if "contract" in tag:
Expand Down
Loading
Loading