diff --git a/pubnub/event_engine/manage_effects.py b/pubnub/event_engine/manage_effects.py index 00746205..609a20e9 100644 --- a/pubnub/event_engine/manage_effects.py +++ b/pubnub/event_engine/manage_effects.py @@ -3,6 +3,7 @@ import math from typing import Optional, Union +from pubnub.endpoints.presence.heartbeat import Heartbeat from pubnub.endpoints.pubsub.subscribe import Subscribe from pubnub.enums import PNReconnectionPolicy from pubnub.exceptions import PubNubException @@ -313,3 +314,61 @@ def emit_status(self, effect: effects.EmitStatusEffect): pn_status.category = effect.status pn_status.error = False self.pubnub._subscription_manager._listener_manager.announce_status(pn_status) + + +class ManageHeartbeatEffect(ManagedEffect): + def run(self): + channels = self.effect.channels + groups = self.effect.groups + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + + loop: asyncio.AbstractEventLoop = self.pubnub.event_loop + coro = self.heartbeat(channels=channels, groups=groups, stop_event=self.stop_event) + if loop.is_running(): + loop.create_task(coro) + else: + loop.run_until_complete(coro) + + async def heartbeat(self, channels, groups, stop_event): + request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) + heartbeat = await request.future() + + if heartbeat.status.error: + self.logger.warning(f'Heartbeat failed: {heartbeat.status.error_data.__dict__}') + self.event_engine.trigger(events.HeartbeatFailureEvent(heartbeat.status.error_data, 1)) + else: + self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups)) + + +class ManageHeartbeatWaitEffect(ManagedEffect): + def __init__(self, pubnub_instance, event_engine_instance, + effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None: + super().__init__(pubnub_instance, event_engine_instance, effect) + self.heartbeat_interval = pubnub_instance.config.heartbeat_interval + + def run(self): + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + loop: asyncio.AbstractEventLoop = self.pubnub.event_loop + coro = self.heartbeat_wait(self.heartbeat_interval, stop_event=self.stop_event) + if loop.is_running(): + loop.create_task(coro) + else: + loop.run_until_complete(coro) + + async def heartbeat(self, wait_time: int, stop_event): + try: + await asyncio.sleep(wait_time) + self.event_engine.trigger(events.HeartbeatTimesUpEvent(channels=self.effect.channels, + groups=self.effect.groups)) + except asyncio.CancelledError: + pass + + +class ManageHeartbeatLeaveEffect(ManagedEffect): + pass + + +class ManageHeartbeatDelayedHeartbeatEffect(ManagedEffect): + pass diff --git a/pubnub/event_engine/models/effects.py b/pubnub/event_engine/models/effects.py index 3112584c..50a90375 100644 --- a/pubnub/event_engine/models/effects.py +++ b/pubnub/event_engine/models/effects.py @@ -93,3 +93,48 @@ class EmitStatusEffect(PNEmittableEffect): def __init__(self, status: Union[None, PNStatusCategory]) -> None: super().__init__() self.status = status + + +""" + Presence Effects +""" + + +class HeartbeatEffect(PNEffect): + def __init__(self) -> None: + super().__init__() + + +class HeartbeatWaitEffect(PNEffect): + def __init__(self) -> None: + super().__init__() + + +class HeartbeatCancelWaitEffect(PNEffect): + def __init__(self) -> None: + super().__init__() + + +class HeartbeatLeaveEffect(PNEffect): + def __init__(self) -> None: + super().__init__() + + +class HeartbeatWaitEffect(PNEffect): + def __init__(self) -> None: + super().__init__() + + +class HeartbeatCancelWaitEffect(PNEffect): + def __init__(self) -> None: + super().__init__() + + +class HeartbeatDelayedEffect(PNEffect): + def __init__(self) -> None: + super().__init__() + + +class HeartbeatCancelDelayedEffect(PNEffect): + def __init__(self) -> None: + super().__init__() diff --git a/pubnub/event_engine/models/events.py b/pubnub/event_engine/models/events.py index 35821f82..44a306a5 100644 --- a/pubnub/event_engine/models/events.py +++ b/pubnub/event_engine/models/events.py @@ -97,3 +97,47 @@ class DisconnectEvent(PNEvent): class ReconnectEvent(PNEvent): pass + + +""" + Presence Events +""" + + +class HeartbeatJoinedEvent(PNChannelGroupsEvent): + pass + + +class HeartbeatReconnectEvent(PNChannelGroupsEvent, PNFailureEvent): + def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int, + timetoken: int = 0) -> None: + PNChannelGroupsEvent.__init__(self, channels, groups) + PNFailureEvent.__init__(self, reason, attempt, timetoken) + + +class HeartbeatLeftAllEvent(PNEvent): + pass + + +class HeartbeatLeftEvent(PNChannelGroupsEvent): + pass + + +class HeartbeatDisconnectEvent(PNChannelGroupsEvent): + pass + + +class HeartbeatSuccessEvent(PNChannelGroupsEvent): + pass + + +class HeartbeatFailureEvent(PNChannelGroupsEvent, PNFailureEvent): + pass + + +class HeartbeatTimesUpEvent(PNChannelGroupsEvent, PNFailureEvent): + pass + + +class HeartbeatGiveUpEvent(PNChannelGroupsEvent, PNFailureEvent): + pass diff --git a/pubnub/event_engine/models/states.py b/pubnub/event_engine/models/states.py index dc5b65e7..e8f628c1 100644 --- a/pubnub/event_engine/models/states.py +++ b/pubnub/event_engine/models/states.py @@ -550,3 +550,352 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans state=ReceiveReconnectingState, context=self._context ) + + +""" +Presence states +""" + + +class HeartbeatInactiveState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + + self._transitions = { + events.HeartbeatJoinedEvent.__name__: self.joined + } + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + +class HeartbeatStoppedState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + + self._transitions = { + events.HeartbeatReconnectEvent.__name__: self.reconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all, + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left + } + + def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context + ) + + +class HeartbeatFailedState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + + self._transitions = { + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatReconnectEvent.__name__: self.reconnect, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all + } + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + +class HeartbeatingState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + self._transitions = { + events.HeartbeatSuccessEvent.__name__: self.cooldown, + events.HeartbeatFailureEvent.__name__: self.failure, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all, + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatSuccessEvent.__name__: self.success + } + + def on_enter(self, context: Union[None, PNContext]): + self._context.update(context) + super().on_enter(self._context) + return effects.HeartbeatEffect() + + def cooldown(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatCooldownState, + context=self._context + ) + + def failure(self, event: events.HeartbeatFailureEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatReconnectingState, + context=self._context + ) + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatCooldownState, + context=self._context + ) + + +class HeartbeatCooldownState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + self._transitions = { + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatTimesUpEvent.__name__: self.times_up, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all, + + } + + def on_enter(self, context: PNContext): + self._context.update(context) + super().on_enter(self._context) + return effects.HeartbeatWaitEffect(self._context) + + def on_exit(self, context: PNContext): + self._context.update(context) + super().on_exit(self._context) + return effects.HeartbeatCancelWaitEffect(self._context) + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def times_up(self, event: events.HeartbeatTimesUpEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + +class HeartbeatReconnectingState(PNState): + def __init__(self, context: PNContext) -> None: + super().__init__(context) + self._transitions = { + events.HeartbeatFailureEvent.__name__: self.failure, + events.HeartbeatJoinedEvent.__name__: self.joined, + events.HeartbeatLeftEvent.__name__: self.left, + events.HeartbeatSuccessEvent.__name__: self.success, + events.HeartbeatGiveUpEvent.__name__: self.give_up, + events.HeartbeatDisconnectEvent.__name__: self.disconnect, + events.HeartbeatLeftAllEvent.__name__: self.left_all + } + + def on_enter(self, context: PNContext): + self._context.update(context) + super().on_enter(self._context) + return effects.HeartbeatDelayedEffect(self._context) + + def on_exit(self, context: PNContext): + self._context.update(context) + super().on_exit(self._context) + return effects.HeartbeatCancelDelayedEffect(self._context) + + def failure(self, event: events.HeartbeatFailureEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatReconnectingState, + context=self._context + ) + + def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context + ) + + def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatingState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatCooldownState, + context=self._context + ) + + def give_up(self, event: events.HeartbeatGiveUpEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatFailedState, + context=self._context + ) + + def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatStoppedState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) + + def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: + self._context.update(context) + + return PNTransition( + state=HeartbeatInactiveState, + context=self._context, + effect=effects.HeartbeatLeaveEffect() + ) diff --git a/pubnub/event_engine/statemachine.py b/pubnub/event_engine/statemachine.py index 4373bf9d..3b5bbe35 100644 --- a/pubnub/event_engine/statemachine.py +++ b/pubnub/event_engine/statemachine.py @@ -12,7 +12,8 @@ class StateMachine: _effect_list: List[effects.PNEffect] _enabled: bool - def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dispatcher] = None) -> None: + def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dispatcher] = None, + name: str = None) -> None: self._context = states.PNContext() self._current_state = initial_state(self._context) self._listeners = {} @@ -21,7 +22,7 @@ def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dis dispatcher_class = Dispatcher self._dispatcher = dispatcher_class(self) self._enabled = True - self.logger = logging.getLogger("pubnub") + self.logger = logging.getLogger("pubnub" if not name else f"pubnub.{name}") def __del__(self): self.logger.debug('Shutting down StateMachine') @@ -48,7 +49,7 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: effect = self._current_state.on_exit() if effect: - self.logger.debug(f'Invoke effect: {effect.__class__.__name__} {effect.__dict__}') + self.logger.debug(f'Invoke effect: {effect.__class__.__name__}') self._effect_list.append(effect) transition: states.PNTransition = self._current_state.on(event, self._context) @@ -62,7 +63,7 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: self.logger.debug(f'Invoke effect: {effect.__class__.__name__}') self._effect_list.append(effect) else: - self.logger.debug(f'Invoke effect: {transition.effect.__class__.__name__}{effect.__dict__}') + self.logger.debug(f'Invoke effect: {transition.effect.__class__.__name__}') self._effect_list.append(transition.effect) effect = self._current_state.on_enter(self._context) diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 450c3efb..7a4f8955 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -558,7 +558,8 @@ class EventEngineSubscriptionManager(SubscriptionManager): loop: asyncio.AbstractEventLoop def __init__(self, pubnub_instance): - self.event_engine = StateMachine(states.UnsubscribedState) + self.event_engine = StateMachine(states.UnsubscribedState, name="subscribe") + self.presence_engine = StateMachine(states.HeartbeatInactiveState, name="presence") self.event_engine.get_dispatcher().set_pn(pubnub_instance) self.loop = asyncio.new_event_loop() @@ -583,12 +584,17 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation): groups=subscribe_operation.channel_groups ) self.event_engine.trigger(subscription_event) + self.presence_engine.trigger(events.HeartbeatJoinedEvent( + channels=subscribe_operation.channels, + groups=subscribe_operation.channel_groups + )) def adapt_unsubscribe_builder(self, unsubscribe_operation): if not isinstance(unsubscribe_operation, UnsubscribeOperation): raise PubNubException('Invalid Unsubscribe Operation') event = events.SubscriptionChangedEvent(None, None) self.event_engine.trigger(event) + self.presence_engine.trigger(events.HeartbeatLeftAllEvent()) class AsyncioSubscribeMessageWorker(SubscribeMessageWorker): diff --git a/tests/acceptance/subscribe/steps/given_steps.py b/tests/acceptance/subscribe/steps/given_steps.py index f33905a0..54f9c904 100644 --- a/tests/acceptance/subscribe/steps/given_steps.py +++ b/tests/acceptance/subscribe/steps/given_steps.py @@ -11,7 +11,7 @@ @given("the demo keyset with event engine enabled") def step_impl(context: PNContext): context.log_stream = StringIO() - logger = logging.getLogger('pubnub') + logger = logging.getLogger('pubnub').getChild('subscribe') logger.setLevel(logging.DEBUG) logger.handlers = [] logger.addHandler(logging.StreamHandler(context.log_stream)) @@ -29,3 +29,33 @@ def step_impl(context: PNContext): def step_impl(context: PNContext, max_retries: str): context.pubnub.config.reconnect_policy = PNReconnectionPolicy.LINEAR context.pubnub.config.maximum_reconnection_retries = int(max_retries) + + +""" +Presence engine step definitions +""" + + +@given("the demo keyset with Presence EE enabled") +def step_impl(context: PNContext): + context.log_stream = StringIO() + logger = logging.getLogger('pubnub').getChild('presence') + logger.setLevel(logging.DEBUG) + logger.handlers = [] + logger.addHandler(logging.StreamHandler(context.log_stream)) + + context.pn_config = pnconf_env_acceptance_copy() + context.pn_config.enable_subscribe = True + context.pn_config.enable_presence_heartbeat = True + context.pn_config.reconnect_policy = PNReconnectionPolicy.NONE + context.pubnub = PubNubAsyncio(context.pn_config, subscription_manager=EventEngineSubscriptionManager) + + context.callback = AcceptanceCallback() + context.pubnub.add_listener(context.callback) + + +@given("heartbeatInterval set to '{interval}', timeout set to '{timeout}'" + " and suppressLeaveEvents set to '{suppress_leave}'") +def step_impl(context: PNContext, interval: str, timeout: str, suppress_leave: str): + context.pn_config.set_presence_timeout_with_custom_interval(int(timeout), int(interval)) + context.pn_config.suppress_leave_events = True if suppress_leave == 'true' else False diff --git a/tests/acceptance/subscribe/steps/then_steps.py b/tests/acceptance/subscribe/steps/then_steps.py index 522c0775..6efbe585 100644 --- a/tests/acceptance/subscribe/steps/then_steps.py +++ b/tests/acceptance/subscribe/steps/then_steps.py @@ -68,3 +68,47 @@ async def step_impl(context: PNContext): assert isinstance(status, PNStatus) assert status.category == PNStatusCategory.PNDisconnectedCategory await context.pubnub.stop() + + +""" +Presence engine step definitions +""" + + +@then(u'I wait {wait_time} seconds') +@async_run_until_complete +async def step_impl(context: PNContext, wait_time: str): + await busypie.wait() \ + .at_most(int(wait_time)) \ + .poll_delay(1) \ + .poll_interval(1) + + +@then(u'I observe the following Events and Invocations of the Presence EE') +@async_run_until_complete +async def step_impl(context): + pass + + +@then(u'I wait for getting Presence joined events') +@async_run_until_complete +async def step_impl(context: PNContext): + pass + + +@then(u'I receive an error in my heartbeat response') +@async_run_until_complete +async def step_impl(context): + pass + + +@then(u'I leave {channel1} and {channel2} channels with presence') +@async_run_until_complete +async def step_impl(context): + pass + + +@then(u'I don\'t observe any Events and Invocations of the Presence EE') +@async_run_until_complete +async def step_impl(context): + pass diff --git a/tests/acceptance/subscribe/steps/when_steps.py b/tests/acceptance/subscribe/steps/when_steps.py index b48f1187..ef9cbdd9 100644 --- a/tests/acceptance/subscribe/steps/when_steps.py +++ b/tests/acceptance/subscribe/steps/when_steps.py @@ -14,3 +14,18 @@ def step_impl(context: PNContext, timetoken: str): # noqa F811 callback = AcceptanceCallback() context.pubnub.add_listener(callback) context.pubnub.subscribe().channels('foo').with_timetoken(int(timetoken)).execute() + + +""" +Presence engine step definitions +""" + + +@when(u'I join {channel1}, {channel2}, {channel3} channels') +def step_impl(context, channel1, channel2, channel3): + context.pubnub.subscribe().channels([channel1, channel2, channel3]).execute() + + +@when(u'I join {channel1}, {channel2}, {channel3} channels with presence') +def step_impl(context, channel1, channel2, channel3): + context.pubnub.subscribe().channels([channel1, channel2, channel3]).with_presence().execute()