Skip to content

Commit

Permalink
Add custom delay for linear policy
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Oct 1, 2024
1 parent b6746db commit 2aad937
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 9 deletions.
12 changes: 8 additions & 4 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def __init__(self, pubnub_instance, event_engine_instance,
invocation: Union[invocations.PNManageableInvocation, invocations.PNCancelInvocation]) -> None:
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.interval = pubnub_instance.config.RECONNECTION_INTERVAL
self.interval = pubnub_instance.config.reconnection_interval

if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
self.max_retry_attempts = ExponentialDelay.MAX_RETRIES
Expand All @@ -174,8 +174,10 @@ def success(self, timetoken: str, region: Optional[int] = None, **kwargs):
def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
else:
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
else:
delay = self.interval

return delay

Expand Down Expand Up @@ -347,13 +349,15 @@ def __init__(self, pubnub_instance, event_engine_instance,
super().__init__(pubnub_instance, event_engine_instance, invocation)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
self.interval = pubnub_instance.config.RECONNECTION_INTERVAL
self.interval = pubnub_instance.config.reconnection_interval

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = ExponentialDelay.calculate(attempts)
else:
elif self.interval is None:
delay = LinearDelay.calculate(attempts)
else:
delay = self.interval

return delay

Expand Down
9 changes: 7 additions & 2 deletions pubnub/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,13 @@ def set_reconnection_listener(self, reconnection_callback):

def _recalculate_interval(self):
policy = self._pubnub.config.reconnect_policy
calculate = (LinearDelay.calculate if policy == PNReconnectionPolicy.LINEAR else ExponentialDelay.calculate)
self._timer_interval = calculate(self._connection_errors)
interval = self._pubnub.config.reconnection_interval
if policy == PNReconnectionPolicy.LINEAR and interval is not None:
self._timer_interval = interval
elif policy == PNReconnectionPolicy.LINEAR:
self._timer_interval = LinearDelay.calculate(self._connection_errors)
else:
self._timer_interval = ExponentialDelay.calculate(self._connection_errors)

def _retry_limit_reached(self):
user_limit = self._pubnub.config.maximum_reconnection_retries
Expand Down
4 changes: 1 addition & 3 deletions pubnub/pnconfiguration.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ class PNConfiguration(object):
DEFAULT_PRESENCE_TIMEOUT = 300
DEFAULT_HEARTBEAT_INTERVAL = 280
ALLOWED_AES_MODES = [AES.MODE_CBC, AES.MODE_GCM]
RECONNECTION_INTERVAL = 3
RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1
RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32
DEFAULT_CRYPTO_MODULE = LegacyCryptoModule
_locked = False

Expand Down Expand Up @@ -41,6 +38,7 @@ def __init__(self):
self.heartbeat_notification_options = PNHeartbeatNotificationOptions.FAILURES
self.reconnect_policy = PNReconnectionPolicy.EXPONENTIAL
self.maximum_reconnection_retries = None # -1 means unlimited/ 0 means no retries
self.reconnection_interval = None # if None is left the default value from LinearDelay is used
self.daemon = False
self.use_random_initialization_vector = True
self.suppress_leave_events = False
Expand Down
1 change: 1 addition & 0 deletions tests/acceptance/subscribe/steps/then_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ async def step_impl(ctx):
@async_run_until_complete
async def step_impl(context, channel1, channel2):
context.pubnub.unsubscribe().channels([channel1, channel2]).execute()
await asyncio.sleep(0.5)


@then(u'I don\'t observe any Events and Invocations of the Presence EE')
Expand Down
24 changes: 24 additions & 0 deletions tests/integrational/asyncio/test_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,3 +554,27 @@ def mock_calculate(*args, **kwargs):
break
await asyncio.sleep(0.5)
assert calculate_mock.call_count == 3


@pytest.mark.asyncio
async def test_subscribe_failing_reconnect_policy_linear_with_custom_interval():
# we don't test the actual delay calculation here, just everything around it
def mock_calculate(*args, **kwargs):
return 0.2

with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
config = pnconf_env_copy(enable_subscribe=True, maximum_reconnection_retries=3, reconnection_interval=1,
uuid="test-subscribe-failing-reconnect-policy-linear-with-max-retries",
reconnect_policy=PNReconnectionPolicy.LINEAR,
origin='127.0.0.1')
pubnub = PubNubAsyncio(config)

listener = TestCallback()
pubnub.add_listener(listener)
pubnub.subscribe().channels("my_channel_linear").execute()
while True:
if isinstance(listener.status_result, PNStatus) \
and listener.status_result.category == PNStatusCategory.PNDisconnectedCategory:
break
await asyncio.sleep(0.5)
assert calculate_mock.call_count == 0
24 changes: 24 additions & 0 deletions tests/integrational/native_threads/test_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,27 @@ def mock_calculate(*args, **kwargs):
self.fail(e)

assert calculate_mock.call_count == 3

def test_subscribe_retry_policy_linear_with_custom_interval(self):
# we don't test the actual delay calculation here, just everything around it
def mock_calculate(*args, **kwargs):
return 0.2

with patch('pubnub.managers.LinearDelay.calculate', wraps=mock_calculate) as calculate_mock:
ch = "test-subscribe-retry-policy-linear"
pubnub = PubNub(pnconf_env_copy(enable_subscribe=True, daemon=True, origin='127.0.0.1',
maximum_reconnection_retries=3, reconnection_interval=1,
reconnect_policy=PNReconnectionPolicy.LINEAR))
listener = DisconnectListener()

try:
pubnub.add_listener(listener)
pubnub.subscribe().channels(ch).execute()

while not listener.disconnected:
time.sleep(0.5)

except PubNubException as e:
self.fail(e)

assert calculate_mock.call_count == 0

0 comments on commit 2aad937

Please sign in to comment.