Skip to content

Commit

Permalink
Fix unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Dec 7, 2023
1 parent 46812db commit 1c9a08b
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 27 deletions.
8 changes: 8 additions & 0 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNReconnectionPolicy
from pubnub.exceptions import PubNubException
from pubnub.models.consumer.pn_error_data import PNErrorData
from pubnub.models.consumer.pubsub import PNMessageResult
from pubnub.models.server.subscribe import SubscribeMessage
from pubnub.pubnub import PubNub
Expand Down Expand Up @@ -118,6 +119,13 @@ async def receive_messages_async(self, channels, groups, timetoken, region):
subscribe.cancellation_event(self.stop_event)
response = await subscribe.future()

if response.status is None and response.result is None:

error = PubNubException("Empty response")
response.status = PNStatus()
response.status.error = True
response.status.error_data = PNErrorData(str(error), error)

if response.status.error:
self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}')
recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken)
Expand Down
3 changes: 0 additions & 3 deletions pubnub/event_engine/statemachine.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging

from asyncio import Event as AsyncioEvent
from typing import List, Optional

from pubnub.event_engine.models import effects, events, states
Expand Down Expand Up @@ -81,12 +80,10 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition:

def dispatch_effects(self):
for effect in self._effect_list:
self._current_stop_event = AsyncioEvent()
self.logger.debug(f'dispatching {effect.__class__.__name__} {id(effect)}')
self._dispatcher.dispatch_effect(effect)

self._effect_list.clear()

def stop(self):
self._enabled = False
self._current_stop_event.set()
7 changes: 5 additions & 2 deletions tests/functional/event_engine/test_managed_effect.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class FakeConfig:
RECONNECTION_INTERVAL = 1
RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1
RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32
maximum_reconnection_retries = 3


class FakePN:
Expand Down Expand Up @@ -67,15 +68,17 @@ def test_dispatch_stop_handshake_reconnect_effect():


def test_dispatch_run_receive_reconnect_effect():
with patch.object(manage_effects.ManagedEffect, 'run') as mocked_run:
with patch.object(manage_effects.ManagedReceiveReconnectEffect, 'run') as mocked_run:
dispatcher = Dispatcher(StateMachine(UnsubscribedState))
dispatcher.set_pn(FakePN())
dispatcher.dispatch_effect(effects.ReceiveReconnectEffect(['chan']))
mocked_run.assert_called()


def test_dispatch_stop_receive_reconnect_effect():
with patch.object(manage_effects.ManagedEffect, 'stop') as mocked_stop:
with patch.object(manage_effects.ManagedReceiveReconnectEffect, 'stop') as mocked_stop:
dispatcher = Dispatcher(StateMachine(UnsubscribedState))
dispatcher.set_pn(FakePN())
dispatcher.dispatch_effect(effects.ReceiveReconnectEffect(['chan']))
dispatcher.dispatch_effect(effects.CancelReceiveReconnectEffect())
mocked_stop.assert_called()
63 changes: 41 additions & 22 deletions tests/functional/event_engine/test_subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
import busypie
import logging
import pytest
import sys

from unittest.mock import patch
from tests.helper import pnconf_env_copy

from pubnub.pubnub_asyncio import PubNubAsyncio, EventEngineSubscriptionManager, SubscribeCallback
from pubnub.event_engine.models import states
from pubnub.models.consumer.common import PNStatus
from pubnub.enums import PNStatusCategory, PNReconnectionPolicy

logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
from pubnub.enums import PNReconnectionPolicy


class TestCallback(SubscribeCallback):
Expand All @@ -27,8 +24,7 @@ def message_called(self):

def status(self, pubnub, status: PNStatus):
self._status_called = True
assert status.error is False
assert status.category is PNStatusCategory.PNConnectedCategory
assert isinstance(status, PNStatus)
logging.debug('calling status_callback()')
self.status_callback()

Expand Down Expand Up @@ -57,20 +53,21 @@ async def test_subscribe():
pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager, custom_event_loop=loop)
pubnub.add_listener(callback)
pubnub.subscribe().channels('foo').execute()

await delayed_publish('foo', 'test', 1)
await busypie.wait().at_most(10).poll_delay(2).poll_interval(1).until_async(lambda: callback.message_called)
await busypie.wait().at_most(5).poll_delay(1).poll_interval(1).until_async(lambda: callback.message_called)
assert pubnub._subscription_manager.event_engine.get_state_name() == states.ReceivingState.__name__

status_callback.assert_called()
message_callback.assert_called()
pubnub.unsubscribe_all()
pubnub._subscription_manager.stop()

try:
await asyncio.gather(*asyncio.tasks.all_tasks())
except asyncio.CancelledError:
pass
await pubnub.close_session()
# try:
# await asyncio.gather(*asyncio.tasks.all_tasks())
# except asyncio.CancelledError:
# pass
# await pubnub.close_session()


async def delayed_publish(channel, message, delay):
Expand All @@ -83,20 +80,22 @@ async def delayed_publish(channel, message, delay):
async def test_handshaking():
config = pnconf_env_copy()
config.enable_subscribe = True
config.subscribe_request_timeout = 3
callback = TestCallback()
with patch.object(TestCallback, 'status_callback') as status_callback:
pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager)
pubnub.add_listener(callback)
pubnub.subscribe().channels('foo').execute()
await busypie.wait().at_most(10).poll_delay(2).poll_interval(1).until_async(lambda: callback.status_called)
await busypie.wait().at_most(10).poll_delay(1).poll_interval(1).until_async(lambda: callback.status_called)
assert pubnub._subscription_manager.event_engine.get_state_name() == states.ReceivingState.__name__
status_callback.assert_called()
pubnub._subscription_manager.stop()
try:
await asyncio.gather(*asyncio.tasks.all_tasks())
except asyncio.CancelledError:
pass
await pubnub.close_session()

# try:
# await asyncio.gather(*asyncio.tasks.all_tasks())
# except asyncio.CancelledError:
# pass
# await pubnub.close_session()


@pytest.mark.asyncio
Expand All @@ -113,7 +112,16 @@ async def test_handshake_failed_no_reconnect():
pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager)
pubnub.add_listener(callback)
pubnub.subscribe().channels('foo').execute()
await asyncio.sleep(4)

def is_state(state):
return pubnub._subscription_manager.event_engine.get_state_name() == state

await busypie.wait() \
.at_most(10) \
.poll_delay(1) \
.poll_interval(1) \
.until_async(lambda: is_state(states.HandshakeFailedState.__name__))

assert pubnub._subscription_manager.event_engine.get_state_name() == states.HandshakeFailedState.__name__
pubnub._subscription_manager.stop()
await pubnub.close_session()
Expand All @@ -134,9 +142,20 @@ async def test_handshake_failed_reconnect():
pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager)
pubnub.add_listener(callback)
pubnub.subscribe().channels('foo').execute()
await asyncio.sleep(7)

def is_state(state):
return pubnub._subscription_manager.event_engine.get_state_name() == state

await busypie.wait() \
.at_most(10) \
.poll_delay(1) \
.poll_interval(1) \
.until_async(lambda: is_state(states.HandshakeReconnectingState.__name__))
assert pubnub._subscription_manager.event_engine.get_state_name() == states.HandshakeReconnectingState.__name__
await asyncio.sleep(1)

await pubnub.close_session()
# try:
# await asyncio.gather(*asyncio.tasks.all_tasks())
# except asyncio.CancelledError:
# pass
# await pubnub.close_session()
pubnub._subscription_manager.stop()

0 comments on commit 1c9a08b

Please sign in to comment.