diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 76e8b955..bb6f8cda 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -77,10 +77,12 @@ jobs: cp sdk-specifications/features/encryption/cryptor-module.feature tests/acceptance/encryption mkdir tests/acceptance/encryption/assets/ cp sdk-specifications/features/encryption/assets/* tests/acceptance/encryption/assets/ + cp sdk-specifications/features/subscribe/event-engine/happy-path.feature tests/acceptance/subscribe/happy-path.feature sudo pip3 install -r requirements-dev.txt behave --junit tests/acceptance/pam behave --junit tests/acceptance/encryption/cryptor-module.feature -t=~na=python -k + behave --junit tests/acceptance/subscribe - name: Expose acceptance tests reports uses: actions/upload-artifact@v3 if: always() diff --git a/pubnub/event_engine/manage_effects.py b/pubnub/event_engine/manage_effects.py index beacef57..b78f63f8 100644 --- a/pubnub/event_engine/manage_effects.py +++ b/pubnub/event_engine/manage_effects.py @@ -6,6 +6,7 @@ from pubnub.endpoints.pubsub.subscribe import Subscribe from pubnub.enums import PNReconnectionPolicy from pubnub.exceptions import PubNubException +from pubnub.models.consumer.pn_error_data import PNErrorData from pubnub.models.consumer.pubsub import PNMessageResult from pubnub.models.server.subscribe import SubscribeMessage from pubnub.pubnub import PubNub @@ -118,6 +119,13 @@ async def receive_messages_async(self, channels, groups, timetoken, region): subscribe.cancellation_event(self.stop_event) response = await subscribe.future() + if response.status is None and response.result is None: + + error = PubNubException("Empty response") + response.status = PNStatus() + response.status.error = True + response.status.error_data = PNErrorData(str(error), error) + if response.status.error: self.logger.warning(f'Recieve messages failed: {response.status.error_data.__dict__}') recieve_failure = events.ReceiveFailureEvent(response.status.error_data, 1, timetoken=timetoken) diff --git a/pubnub/event_engine/statemachine.py b/pubnub/event_engine/statemachine.py index c2092de2..4373bf9d 100644 --- a/pubnub/event_engine/statemachine.py +++ b/pubnub/event_engine/statemachine.py @@ -1,6 +1,5 @@ import logging -from asyncio import Event as AsyncioEvent from typing import List, Optional from pubnub.event_engine.models import effects, events, states @@ -81,7 +80,6 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: def dispatch_effects(self): for effect in self._effect_list: - self._current_stop_event = AsyncioEvent() self.logger.debug(f'dispatching {effect.__class__.__name__} {id(effect)}') self._dispatcher.dispatch_effect(effect) @@ -89,4 +87,3 @@ def dispatch_effects(self): def stop(self): self._enabled = False - self._current_stop_event.set() diff --git a/tests/functional/event_engine/test_managed_effect.py b/tests/functional/event_engine/test_managed_effect.py index ca0032e6..26c46530 100644 --- a/tests/functional/event_engine/test_managed_effect.py +++ b/tests/functional/event_engine/test_managed_effect.py @@ -12,6 +12,7 @@ class FakeConfig: RECONNECTION_INTERVAL = 1 RECONNECTION_MIN_EXPONENTIAL_BACKOFF = 1 RECONNECTION_MAX_EXPONENTIAL_BACKOFF = 32 + maximum_reconnection_retries = 3 class FakePN: @@ -67,15 +68,17 @@ def test_dispatch_stop_handshake_reconnect_effect(): def test_dispatch_run_receive_reconnect_effect(): - with patch.object(manage_effects.ManagedEffect, 'run') as mocked_run: + with patch.object(manage_effects.ManagedReceiveReconnectEffect, 'run') as mocked_run: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) + dispatcher.set_pn(FakePN()) dispatcher.dispatch_effect(effects.ReceiveReconnectEffect(['chan'])) mocked_run.assert_called() def test_dispatch_stop_receive_reconnect_effect(): - with patch.object(manage_effects.ManagedEffect, 'stop') as mocked_stop: + with patch.object(manage_effects.ManagedReceiveReconnectEffect, 'stop') as mocked_stop: dispatcher = Dispatcher(StateMachine(UnsubscribedState)) + dispatcher.set_pn(FakePN()) dispatcher.dispatch_effect(effects.ReceiveReconnectEffect(['chan'])) dispatcher.dispatch_effect(effects.CancelReceiveReconnectEffect()) mocked_stop.assert_called() diff --git a/tests/functional/event_engine/test_subscribe.py b/tests/functional/event_engine/test_subscribe.py index 40a0fc48..37fbaf50 100644 --- a/tests/functional/event_engine/test_subscribe.py +++ b/tests/functional/event_engine/test_subscribe.py @@ -2,7 +2,6 @@ import busypie import logging import pytest -import sys from unittest.mock import patch from tests.helper import pnconf_env_copy @@ -10,9 +9,7 @@ from pubnub.pubnub_asyncio import PubNubAsyncio, EventEngineSubscriptionManager, SubscribeCallback from pubnub.event_engine.models import states from pubnub.models.consumer.common import PNStatus -from pubnub.enums import PNStatusCategory, PNReconnectionPolicy - -logging.basicConfig(level=logging.DEBUG, stream=sys.stdout) +from pubnub.enums import PNReconnectionPolicy class TestCallback(SubscribeCallback): @@ -27,8 +24,7 @@ def message_called(self): def status(self, pubnub, status: PNStatus): self._status_called = True - assert status.error is False - assert status.category is PNStatusCategory.PNConnectedCategory + assert isinstance(status, PNStatus) logging.debug('calling status_callback()') self.status_callback() @@ -57,8 +53,9 @@ async def test_subscribe(): pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager, custom_event_loop=loop) pubnub.add_listener(callback) pubnub.subscribe().channels('foo').execute() + await delayed_publish('foo', 'test', 1) - await busypie.wait().at_most(10).poll_delay(2).poll_interval(1).until_async(lambda: callback.message_called) + await busypie.wait().at_most(5).poll_delay(1).poll_interval(1).until_async(lambda: callback.message_called) assert pubnub._subscription_manager.event_engine.get_state_name() == states.ReceivingState.__name__ status_callback.assert_called() @@ -66,12 +63,6 @@ async def test_subscribe(): pubnub.unsubscribe_all() pubnub._subscription_manager.stop() - try: - await asyncio.gather(*asyncio.tasks.all_tasks()) - except asyncio.CancelledError: - pass - await pubnub.close_session() - async def delayed_publish(channel, message, delay): pn = PubNubAsyncio(pnconf_env_copy()) @@ -83,20 +74,16 @@ async def delayed_publish(channel, message, delay): async def test_handshaking(): config = pnconf_env_copy() config.enable_subscribe = True + config.subscribe_request_timeout = 3 callback = TestCallback() with patch.object(TestCallback, 'status_callback') as status_callback: pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager) pubnub.add_listener(callback) pubnub.subscribe().channels('foo').execute() - await busypie.wait().at_most(10).poll_delay(2).poll_interval(1).until_async(lambda: callback.status_called) + await busypie.wait().at_most(10).poll_delay(1).poll_interval(1).until_async(lambda: callback.status_called) assert pubnub._subscription_manager.event_engine.get_state_name() == states.ReceivingState.__name__ status_callback.assert_called() pubnub._subscription_manager.stop() - try: - await asyncio.gather(*asyncio.tasks.all_tasks()) - except asyncio.CancelledError: - pass - await pubnub.close_session() @pytest.mark.asyncio @@ -113,7 +100,16 @@ async def test_handshake_failed_no_reconnect(): pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager) pubnub.add_listener(callback) pubnub.subscribe().channels('foo').execute() - await asyncio.sleep(4) + + def is_state(state): + return pubnub._subscription_manager.event_engine.get_state_name() == state + + await busypie.wait() \ + .at_most(10) \ + .poll_delay(1) \ + .poll_interval(1) \ + .until_async(lambda: is_state(states.HandshakeFailedState.__name__)) + assert pubnub._subscription_manager.event_engine.get_state_name() == states.HandshakeFailedState.__name__ pubnub._subscription_manager.stop() await pubnub.close_session() @@ -134,9 +130,14 @@ async def test_handshake_failed_reconnect(): pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager) pubnub.add_listener(callback) pubnub.subscribe().channels('foo').execute() - await asyncio.sleep(7) - assert pubnub._subscription_manager.event_engine.get_state_name() == states.HandshakeReconnectingState.__name__ - await asyncio.sleep(1) - await pubnub.close_session() + def is_state(state): + return pubnub._subscription_manager.event_engine.get_state_name() == state + + await busypie.wait() \ + .at_most(10) \ + .poll_delay(1) \ + .poll_interval(1) \ + .until_async(lambda: is_state(states.HandshakeReconnectingState.__name__)) + assert pubnub._subscription_manager.event_engine.get_state_name() == states.HandshakeReconnectingState.__name__ pubnub._subscription_manager.stop()