From 9693ef1f225fe5a49934b8d54f735c76bd7b223c Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Fri, 26 Jan 2024 15:09:24 +0100 Subject: [PATCH] Presence engine, refactors and everything else including meaning of life --- pubnub/event_engine/manage_effects.py | 152 +++++++++++++----- pubnub/event_engine/models/effects.py | 15 +- pubnub/event_engine/models/events.py | 11 +- pubnub/event_engine/models/states.py | 110 +++++++++++-- pubnub/event_engine/statemachine.py | 5 +- pubnub/features.py | 8 +- pubnub/managers.py | 3 + pubnub/pubnub_asyncio.py | 14 +- pubnub/pubnub_core.py | 1 - pubnub/workers.py | 132 ++++++++++----- tests/acceptance/subscribe/environment.py | 9 +- .../acceptance/subscribe/steps/given_steps.py | 10 +- .../acceptance/subscribe/steps/then_steps.py | 45 +++--- 13 files changed, 374 insertions(+), 141 deletions(-) diff --git a/pubnub/event_engine/manage_effects.py b/pubnub/event_engine/manage_effects.py index 45f4fe01..bde55741 100644 --- a/pubnub/event_engine/manage_effects.py +++ b/pubnub/event_engine/manage_effects.py @@ -8,11 +8,12 @@ from pubnub.endpoints.pubsub.subscribe import Subscribe from pubnub.enums import PNReconnectionPolicy from pubnub.exceptions import PubNubException -from pubnub.models.consumer.pubsub import PNMessageResult +from pubnub.features import feature_enabled from pubnub.models.server.subscribe import SubscribeMessage from pubnub.pubnub import PubNub from pubnub.event_engine.models import effects, events from pubnub.models.consumer.common import PNStatus +from pubnub.workers import BaseMessageWorker class ManagedEffect: @@ -73,15 +74,23 @@ def run(self): async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0): request = Subscribe(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'): + # request.set_state(self._context.states) # stub for state handling + pass + request.timetoken(0) - handshake = await request.future() + response = await request.future() - if handshake.status.error: - self.logger.warning(f'Handshake failed: {handshake.status.error_data.__dict__}') - handshake_failure = events.HandshakeFailureEvent(handshake.status.error_data, 1, timetoken=timetoken) + if isinstance(response, PubNubException): + self.logger.warning(f'Handshake failed: {str(response)}') + handshake_failure = events.HandshakeFailureEvent(str(response), 1, timetoken=timetoken) + self.event_engine.trigger(handshake_failure) + elif response.status.error: + self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}') + handshake_failure = events.HandshakeFailureEvent(response.status.error_data, 1, timetoken=timetoken) self.event_engine.trigger(handshake_failure) else: - cursor = handshake.result['t'] + cursor = response.result['t'] timetoken = timetoken if timetoken > 0 else cursor['t'] region = cursor['r'] handshake_success = events.HandshakeSuccessEvent(timetoken, region) @@ -110,18 +119,18 @@ def run(self): pass async def receive_messages_async(self, channels, groups, timetoken, region): - subscribe = Subscribe(self.pubnub) + request = Subscribe(self.pubnub) if channels: - subscribe.channels(channels) + request.channels(channels) if groups: - subscribe.channel_groups(groups) + request.channel_groups(groups) if timetoken: - subscribe.timetoken(timetoken) + request.timetoken(timetoken) if region: - subscribe.region(region) + request.region(region) - subscribe.cancellation_event(self.stop_event) - response = await subscribe.future() + request.cancellation_event(self.stop_event) + response = await request.future() if response.status is None and response.result is None: self.logger.warning('Recieve messages failed: Empty response') @@ -196,25 +205,34 @@ async def delayed_reconnect_async(self, delay, attempt): self.stop_event = self.get_new_stop_event() await asyncio.sleep(delay) - request = Subscribe(self.pubnub) \ - .channels(self.effect.channels) \ - .channel_groups(self.effect.groups) \ - .timetoken(self.get_timetoken()) \ - .cancellation_event(self.stop_event) + request = Subscribe(self.pubnub).timetoken(self.get_timetoken()).cancellation_event(self.stop_event) + + if self.effect.channels: + request.channels(self.effect.channels) + if self.effect.groups: + request.channel_groups(self.effect.groups) if self.effect.region: request.region(self.effect.region) - reconnect = await request.future() + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'): + # subscribe.set_state(self._context.states) # stub for state handling + pass + + response = await request.future() + + if isinstance(response, PubNubException): + self.logger.warning(f'Reconnect failed: {str(response)}') + self.failure(str(response), attempt, self.get_timetoken()) - if reconnect.status.error: - self.logger.warning(f'Reconnect failed: {reconnect.status.error_data.__dict__}') - self.failure(reconnect.status.error_data, attempt, self.get_timetoken()) + elif response.status.error: + self.logger.warning(f'Reconnect failed: {response.status.error_data.__dict__}') + self.failure(response.status.error_data, attempt, self.get_timetoken()) else: - cursor = reconnect.result['t'] + cursor = response.result['t'] timetoken = int(self.effect.timetoken) if self.effect.timetoken else cursor['t'] region = cursor['r'] - messages = reconnect.result['m'] + messages = response.result['m'] self.success(timetoken=timetoken, region=region, messages=messages) def stop(self): @@ -286,12 +304,21 @@ def run(self): async def heartbeat(self, channels, groups, stop_event): request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) - heartbeat = await request.future() - if heartbeat.status.error: - self.logger.warning(f'Heartbeat failed: {heartbeat.status.error_data.__dict__}') + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'): + # subscribe.set_state(self._context.states) # stub for state handling + pass + + response = await request.future() + + if isinstance(response, PubNubException): + self.logger.warning(f'Heartbeat failed: {str(response)}') + self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, + reason=response.status.error_data, attempt=1)) + elif response.status.error: + self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}') self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, - reason=heartbeat.status.error_data, attempt=1)) + reason=response.status.error_data, attempt=1)) else: self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups)) @@ -314,8 +341,7 @@ def run(self): async def heartbeat_wait(self, wait_time: int, stop_event): try: await asyncio.sleep(wait_time) - self.event_engine.trigger(events.HeartbeatTimesUpEvent(channels=self.effect.channels, - groups=self.effect.groups)) + self.event_engine.trigger(events.HeartbeatTimesUpEvent()) except asyncio.CancelledError: pass @@ -341,8 +367,55 @@ async def leave(self, channels, groups, stop_event): self.logger.warning(f'Heartbeat failed: {leave.status.error_data.__dict__}') -class ManagedHeartbeatDelayedHeartbeatEffect(ManagedEffect): - pass +class ManagedHeartbeatDelayedEffect(ManagedEffect): + def __init__(self, pubnub_instance, event_engine_instance, + effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None: + super().__init__(pubnub_instance, event_engine_instance, effect) + self.reconnection_policy = pubnub_instance.config.reconnect_policy + self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries + self.interval = pubnub_instance.config.RECONNECTION_INTERVAL + self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF + self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF + + def calculate_reconnection_delay(self, attempts): + if self.reconnection_policy is PNReconnectionPolicy.LINEAR: + delay = self.interval + + elif self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL: + delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1) + return delay + + def run(self): + channels = self.effect.channels + groups = self.effect.groups + if hasattr(self.pubnub, 'event_loop'): + self.stop_event = self.get_new_stop_event() + + loop: asyncio.AbstractEventLoop = self.pubnub.event_loop + coro = self.heartbeat(channels=channels, groups=groups, attempt=1, stop_event=self.stop_event) + if loop.is_running(): + self.task = loop.create_task(coro) + else: + self.task = loop.run_until_complete(coro) + + async def heartbeat(self, channels, groups, attempt, stop_event): + request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) + delay = self.calculate_reconnection_delay(attempt) + await asyncio.sleep(delay) + + response = await request.future() + if isinstance(response, PubNubException): + self.logger.warning(f'Heartbeat failed: {str(response)}') + self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, + reason=response.status.error_data, + attempt=attempt + 1)) + elif response.status.error: + self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}') + self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups, + reason=response.status.error_data, + attempt=attempt + 1)) + else: + self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups)) class ManagedEffectFactory: @@ -353,7 +426,7 @@ class ManagedEffectFactory: effects.ReceiveReconnectEffect.__name__: ManagedReceiveReconnectEffect, effects.HeartbeatEffect.__name__: ManagedHeartbeatEffect, effects.HeartbeatWaitEffect.__name__: ManagedHeartbeatWaitEffect, - effects.HeartbeatDelayedEffect.__name__: ManagedHeartbeatDelayedHeartbeatEffect, + effects.HeartbeatDelayedEffect.__name__: ManagedHeartbeatDelayedEffect, effects.HeartbeatLeaveEffect.__name__: ManagedHeartbeatLeaveEffect, } @@ -369,9 +442,11 @@ def create(self, effect: ManagedEffect): class EmitEffect: pubnub: PubNub + message_worker: BaseMessageWorker def set_pn(self, pubnub: PubNub): self.pubnub = pubnub + self.message_worker = BaseMessageWorker(pubnub) def emit(self, effect: effects.PNEmittableEffect): if isinstance(effect, effects.EmitMessagesEffect): @@ -380,17 +455,10 @@ def emit(self, effect: effects.PNEmittableEffect): self.emit_status(effect) def emit_message(self, effect: effects.EmitMessagesEffect): + self.message_worker._listener_manager = self.pubnub._subscription_manager._listener_manager for message in effect.messages: subscribe_message = SubscribeMessage().from_json(message) - pn_message_result = PNMessageResult( - message=subscribe_message.payload, - subscription=subscribe_message.subscription_match, - channel=subscribe_message.channel, - timetoken=int(message['p']['t']), - user_metadata=subscribe_message.publish_metadata, - publisher=subscribe_message.issuing_client_id - ) - self.pubnub._subscription_manager._listener_manager.announce_message(pn_message_result) + self.message_worker._process_incoming_payload(subscribe_message) def emit_status(self, effect: effects.EmitStatusEffect): pn_status = PNStatus() diff --git a/pubnub/event_engine/models/effects.py b/pubnub/event_engine/models/effects.py index 4e503399..34bf7a49 100644 --- a/pubnub/event_engine/models/effects.py +++ b/pubnub/event_engine/models/effects.py @@ -118,14 +118,25 @@ class HeartbeatCancelWaitEffect(PNCancelEffect): class HeartbeatLeaveEffect(PNManageableEffect): - def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None) -> None: + def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None, + suppress_leave: bool = False) -> None: super().__init__() self.channels = channels self.groups = groups + self.suppress_leave = suppress_leave class HeartbeatDelayedEffect(PNManageableEffect): - pass + def __init__(self, + channels: Union[None, List[str]] = None, + groups: Union[None, List[str]] = None, + attempts: Union[None, int] = None, + reason: Union[None, PubNubException] = None): + super().__init__() + self.channels = channels + self.groups = groups + self.attempts = attempts + self.reason = reason class HeartbeatCancelDelayedEffect(PNCancelEffect): diff --git a/pubnub/event_engine/models/events.py b/pubnub/event_engine/models/events.py index c83e5628..e3dfeac0 100644 --- a/pubnub/event_engine/models/events.py +++ b/pubnub/event_engine/models/events.py @@ -117,7 +117,9 @@ class HeartbeatLeftAllEvent(PNEvent): class HeartbeatLeftEvent(PNChannelGroupsEvent): - pass + def __init__(self, channels: List[str], groups: List[str], suppress_leave: bool = False) -> None: + PNChannelGroupsEvent.__init__(self, channels, groups) + self.suppress_leave = suppress_leave class HeartbeatDisconnectEvent(PNChannelGroupsEvent): @@ -135,11 +137,8 @@ def __init__(self, channels: List[str], groups: List[str], reason: PubNubExcepti PNFailureEvent.__init__(self, reason, attempt, timetoken) -class HeartbeatTimesUpEvent(PNChannelGroupsEvent, PNFailureEvent): - def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int, - timetoken: int = 0) -> None: - PNChannelGroupsEvent.__init__(self, channels, groups) - PNFailureEvent.__init__(self, reason, attempt, timetoken) +class HeartbeatTimesUpEvent(PNEvent): + pass class HeartbeatGiveUpEvent(PNChannelGroupsEvent, PNFailureEvent): diff --git a/pubnub/event_engine/models/states.py b/pubnub/event_engine/models/states.py index c420367c..e4bed1a5 100644 --- a/pubnub/event_engine/models/states.py +++ b/pubnub/event_engine/models/states.py @@ -597,6 +597,8 @@ def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) - def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) + self._context.channels = [] + self._context.groups = [] return PNTransition( state=HeartbeatInactiveState, @@ -613,6 +615,11 @@ def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTr def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) return PNTransition( state=HeartbeatStoppedState, @@ -642,11 +649,20 @@ def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTr def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatingState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) -> PNTransition: @@ -660,19 +676,29 @@ def reconnect(self, event: events.HeartbeatReconnectEvent, context: PNContext) - def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: self._context.update(context) + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + return PNTransition( state=HeartbeatStoppedState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatInactiveState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) @@ -704,19 +730,29 @@ def failure(self, event: events.HeartbeatFailureEvent, context: PNContext) -> PN def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: self._context.update(context) + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + return PNTransition( state=HeartbeatStoppedState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatInactiveState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: @@ -729,11 +765,20 @@ def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTr def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatingState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition: @@ -769,19 +814,29 @@ def on_exit(self): def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: self._context.update(context) + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + return PNTransition( state=HeartbeatStoppedState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatInactiveState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTransition: @@ -794,11 +849,20 @@ def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTr def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatingState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def times_up(self, event: events.HeartbeatTimesUpEvent, context: PNContext) -> PNTransition: @@ -826,7 +890,8 @@ def __init__(self, context: PNContext) -> None: def on_enter(self, context: PNContext): self._context.update(context) super().on_enter(self._context) - return effects.HeartbeatDelayedEffect(self._context) + return effects.HeartbeatDelayedEffect(channels=self._context.channels, groups=self._context.groups, + attempts=1, reason=None) def on_exit(self): super().on_exit() @@ -850,11 +915,20 @@ def joined(self, event: events.HeartbeatJoinedEvent, context: PNContext) -> PNTr def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransition: self._context.update(context) + for channel in event.channels: + self._context.channels.remove(channel) + + for group in event.groups: + self._context.groups.remove(group) + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatingState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def success(self, event: events.HeartbeatSuccessEvent, context: PNContext) -> PNTransition: @@ -876,17 +950,27 @@ def give_up(self, event: events.HeartbeatGiveUpEvent, context: PNContext) -> PNT def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) -> PNTransition: self._context.update(context) + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + return PNTransition( state=HeartbeatStoppedState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition: self._context.update(context) + self._context.channels = [] + self._context.groups = [] + + effect = None + if not event.suppress_leave: + effect = effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) return PNTransition( state=HeartbeatInactiveState, context=self._context, - effect=effects.HeartbeatLeaveEffect(channels=self._context.channels, groups=self._context.groups) + effect=effect ) diff --git a/pubnub/event_engine/statemachine.py b/pubnub/event_engine/statemachine.py index 81e0a85a..103847c1 100644 --- a/pubnub/event_engine/statemachine.py +++ b/pubnub/event_engine/statemachine.py @@ -39,6 +39,7 @@ def get_dispatcher(self) -> Dispatcher: return self._dispatcher def trigger(self, event: events.PNEvent) -> states.PNTransition: + self.logger.debug(f'Current State: {self.get_state_name()}') self.logger.debug(f'Triggered event: {event.__class__.__name__}({event.__dict__}) on {self.get_state_name()}') if not self._enabled: @@ -70,7 +71,7 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: effect = self._current_state.on_enter(self._context) if effect: - self.logger.debug(f'Invoke effect: {effect.__class__.__name__} StateMachine ({id(self)})') + self.logger.debug(f'Invoke effect: {effect.__class__.__name__}') self._effect_list.append(effect) else: @@ -82,7 +83,7 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: def dispatch_effects(self): for effect in self._effect_list: - self.logger.debug(f'dispatching {effect.__class__.__name__} {id(effect)}') + self.logger.debug(f'Dispatching {effect.__class__.__name__} {id(effect)}') self._dispatcher.dispatch_effect(effect) self._effect_list.clear() diff --git a/pubnub/features.py b/pubnub/features.py index 95d5fc7e..d0e8c333 100644 --- a/pubnub/features.py +++ b/pubnub/features.py @@ -2,7 +2,9 @@ from pubnub.exceptions import PubNubException flags = { - 'PN_ENABLE_ENTITIES': getenv('PN_ENABLE_ENTITIES', False) + 'PN_ENABLE_ENTITIES': getenv('PN_ENABLE_ENTITIES', False), + 'PN_ENABLE_EVENT_ENGINE': getenv('PN_ENABLE_EVENT_ENGINE', False), + 'PN_MAINTAIN_PRESENCE_STATE': getenv('PN_MAINTAIN_PRESENCE_STATE', False), } @@ -18,3 +20,7 @@ def inner(method): return not_implemented return method return inner + + +def feature_enabled(flag): + return flags[flag] diff --git a/pubnub/managers.py b/pubnub/managers.py index 181e122d..785b75e4 100644 --- a/pubnub/managers.py +++ b/pubnub/managers.py @@ -365,6 +365,9 @@ def _handle_endpoint_call(self, raw_result, status): def _register_heartbeat_timer(self): self._stop_heartbeat_timer() + def get_custom_params(self): + return {} + class TelemetryManager: TIMESTAMP_DIVIDER = 1000 diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index c550c135..1ad84ab6 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -16,6 +16,7 @@ from pubnub.endpoints.presence.heartbeat import Heartbeat from pubnub.endpoints.presence.leave import Leave from pubnub.endpoints.pubsub.subscribe import Subscribe +from pubnub.features import feature_enabled from pubnub.pubnub_core import PubNubCore from pubnub.workers import SubscribeMessageWorker from pubnub.managers import SubscriptionManager, PublishSequenceManager, ReconnectionManager, TelemetryManager @@ -47,7 +48,9 @@ def __init__(self, config, custom_event_loop=None, subscription_manager=None): self._connector = aiohttp.TCPConnector(verify_ssl=True, loop=self.event_loop) if not subscription_manager: - subscription_manager = AsyncioSubscriptionManager + subscription_manager = ( + EventEngineSubscriptionManager if feature_enabled('PN_ENABLE_EVENT_ENGINE') + else AsyncioSubscriptionManager) if self.config.enable_subscribe: self._subscription_manager = subscription_manager(self) @@ -593,7 +596,14 @@ def adapt_unsubscribe_builder(self, unsubscribe_operation): raise PubNubException('Invalid Unsubscribe Operation') event = events.SubscriptionChangedEvent(None, None) self.event_engine.trigger(event) - # self.presence_engine.trigger(events.HeartbeatLeftAllEvent()) + self.presence_engine.trigger(event=events.HeartbeatLeftEvent( + channels=unsubscribe_operation.channels, + groups=unsubscribe_operation.channel_groups, + suppress_leave=self._pubnub.config.suppress_leave_events + )) + + def get_custom_params(self): + return {'ee': 1} class AsyncioSubscribeMessageWorker(SubscribeMessageWorker): diff --git a/pubnub/pubnub_core.py b/pubnub/pubnub_core.py index fc55059b..a0d6fd6c 100644 --- a/pubnub/pubnub_core.py +++ b/pubnub/pubnub_core.py @@ -92,7 +92,6 @@ class PubNubCore: MAX_SEQUENCE = 65535 __metaclass__ = ABCMeta - _plugins = [] __crypto = None def __init__(self, config): diff --git a/pubnub/workers.py b/pubnub/workers.py index 81eb5b78..70a18d30 100644 --- a/pubnub/workers.py +++ b/pubnub/workers.py @@ -1,48 +1,38 @@ import logging from abc import abstractmethod - -from .enums import PNStatusCategory, PNOperationType -from .models.consumer.common import PNStatus -from .models.consumer.objects_v2.channel import PNChannelMetadataResult -from .models.consumer.objects_v2.memberships import PNMembershipResult -from .models.consumer.objects_v2.uuid import PNUUIDMetadataResult -from .models.consumer.pn_error_data import PNErrorData -from .utils import strip_right -from .models.consumer.pubsub import ( +from typing import Union + +from pubnub.enums import PNStatusCategory, PNOperationType +from pubnub.managers import ListenerManager +from pubnub.models.consumer.common import PNStatus +from pubnub.models.consumer.objects_v2.channel import PNChannelMetadataResult +from pubnub.models.consumer.objects_v2.memberships import PNMembershipResult +from pubnub.models.consumer.objects_v2.uuid import PNUUIDMetadataResult +from pubnub.models.consumer.pn_error_data import PNErrorData +from pubnub.utils import strip_right +from pubnub.models.consumer.pubsub import ( PNPresenceEventResult, PNMessageResult, PNSignalMessageResult, PNMessageActionResult, PNFileMessageResult ) -from .models.server.subscribe import SubscribeMessage, PresenceEnvelope -from .endpoints.file_operations.get_file_url import GetFileDownloadUrl +from pubnub.models.server.subscribe import SubscribeMessage, PresenceEnvelope +from pubnub.endpoints.file_operations.get_file_url import GetFileDownloadUrl logger = logging.getLogger("pubnub") -class SubscribeMessageWorker(object): +class BaseMessageWorker: + # _pubnub: PubNub + _listener_manager: Union[ListenerManager, None] = None + TYPE_MESSAGE = 0 TYPE_SIGNAL = 1 TYPE_OBJECT = 2 TYPE_MESSAGE_ACTION = 3 TYPE_FILE_MESSAGE = 4 - def __init__(self, pubnub_instance, listener_manager_instance, queue_instance, event): - # assert isinstance(pubnub_instnace, PubNubCore) - # assert isinstance(listener_manager_instance, ListenerManager) - # assert isinstance(queue_instance, utils.Queue) - + def __init__(self, pubnub_instance) -> None: self._pubnub = pubnub_instance - self._listener_manager = listener_manager_instance - self._queue = queue_instance - self._is_running = None - self._event = event - - def run(self): - self._take_message() - - @abstractmethod - def _take_message(self): - pass def _get_url_for_file_event_message(self, channel, extracted_message): return GetFileDownloadUrl(self._pubnub)\ @@ -55,10 +45,7 @@ def _process_message(self, message_input): return message_input, None else: try: - return self._pubnub.config.crypto.decrypt( - self._pubnub.config.cipher_key, - message_input - ), None + return self._pubnub.crypto.decrypt(message_input), None except Exception as exception: logger.warning("could not decrypt message: \"%s\", due to error %s" % (message_input, str(exception))) @@ -67,10 +54,41 @@ def _process_message(self, message_input): pn_status.error_data = PNErrorData(str(exception), exception) pn_status.error = True pn_status.operation = PNOperationType.PNSubscribeOperation - self._listener_manager.announce_status(pn_status) + self.announce(pn_status) return message_input, exception - def _process_incoming_payload(self, message): + def announce(self, result): + if not self._listener_manager: + return + + if isinstance(result, PNStatus): + self._listener_manager.announce_status(result) + + elif isinstance(result, PNPresenceEventResult): + self._listener_manager.announce_presence(result) + + elif isinstance(result, PNChannelMetadataResult): + self._listener_manager.announce_channel(result) + + elif isinstance(result, PNUUIDMetadataResult): + self._listener_manager.announce_uuid(result) + + elif isinstance(result, PNMembershipResult): + self._listener_manager.announce_membership(result) + + elif isinstance(result, PNFileMessageResult): + self._listener_manager.announce_file_message(result) + + elif isinstance(result, PNSignalMessageResult): + self._listener_manager.announce_signal(result) + + elif isinstance(result, PNMessageActionResult): + self._listener_manager.announce_message_action(result) + + elif isinstance(result, PNMessageResult): + self._listener_manager.announce_message(result) + + def _process_incoming_payload(self, message: SubscribeMessage): assert isinstance(message, SubscribeMessage) channel = message.channel @@ -105,26 +123,35 @@ def _process_incoming_payload(self, message): leave=message.payload.get('leave', None), timeout=message.payload.get('timeout', None) ) - self._listener_manager.announce_presence(pn_presence_event_result) + + self.announce(pn_presence_event_result) + return pn_presence_event_result + elif message.type == SubscribeMessageWorker.TYPE_OBJECT: if message.payload['type'] == 'channel': channel_result = PNChannelMetadataResult( event=message.payload['event'], data=message.payload['data'] ) - self._listener_manager.announce_channel(channel_result) + self.announce(channel_result) + return channel_result + elif message.payload['type'] == 'uuid': uuid_result = PNUUIDMetadataResult( event=message.payload['event'], data=message.payload['data'] ) - self._listener_manager.announce_uuid(uuid_result) + self.announce(uuid_result) + return uuid_result + elif message.payload['type'] == 'membership': membership_result = PNMembershipResult( event=message.payload['event'], data=message.payload['data'] ) - self._listener_manager.announce_membership(membership_result) + self.announce(membership_result) + return membership_result + elif message.type == SubscribeMessageWorker.TYPE_FILE_MESSAGE: extracted_message, _ = self._process_message(message.payload) download_url = self._get_url_for_file_event_message(channel, extracted_message) @@ -139,8 +166,8 @@ def _process_incoming_payload(self, message): file_id=extracted_message["file"]["id"], file_name=extracted_message["file"]["name"] ) - - self._listener_manager.announce_file_message(pn_file_result) + self.announce(pn_file_result) + return pn_file_result else: extracted_message, error = self._process_message(message.payload) @@ -157,7 +184,8 @@ def _process_incoming_payload(self, message): timetoken=publish_meta_data.publish_timetoken, publisher=publisher ) - self._listener_manager.announce_signal(pn_signal_result) + self.announce(pn_signal_result) + return pn_signal_result elif message.type == SubscribeMessageWorker.TYPE_MESSAGE_ACTION: message_action = extracted_message['data'] @@ -176,4 +204,24 @@ def _process_incoming_payload(self, message): publisher=publisher, error=error ) - self._listener_manager.announce_message(pn_message_result) + self.announce(pn_message_result) + return pn_message_result + + +class SubscribeMessageWorker(BaseMessageWorker): + def __init__(self, pubnub_instance, listener_manager_instance, queue_instance, event): + # assert isinstance(pubnub_instnace, PubNubCore) + # assert isinstance(listener_manager_instance, ListenerManager) + # assert isinstance(queue_instance, utils.Queue) + super().__init__(pubnub_instance) + self._listener_manager = listener_manager_instance + self._queue = queue_instance + self._is_running = None + self._event = event + + def run(self): + self._take_message() + + @abstractmethod + def _take_message(self): + pass diff --git a/tests/acceptance/subscribe/environment.py b/tests/acceptance/subscribe/environment.py index 4700ef12..a73cff8b 100644 --- a/tests/acceptance/subscribe/environment.py +++ b/tests/acceptance/subscribe/environment.py @@ -1,3 +1,4 @@ +import asyncio import requests from behave.runner import Context @@ -40,7 +41,9 @@ def before_scenario(context: Context, feature): def after_scenario(context: Context, feature): - context.pubnub.unsubscribe_all() + loop = asyncio.get_event_loop() + loop.run_until_complete(context.pubnub.stop()) + for tag in feature.tags: if "contract" in tag: response = requests.get(MOCK_SERVER_URL + CONTRACT_EXPECT_ENDPOINT) @@ -48,5 +51,5 @@ def after_scenario(context: Context, feature): response_json = response.json() - assert not response_json["expectations"]["failed"] - assert not response_json["expectations"]["pending"] + assert not response_json["expectations"]["failed"], str(response_json["expectations"]["failed"]) + assert not response_json["expectations"]["pending"], str(response_json["expectations"]["pending"]) diff --git a/tests/acceptance/subscribe/steps/given_steps.py b/tests/acceptance/subscribe/steps/given_steps.py index 54f9c904..7ff9dec3 100644 --- a/tests/acceptance/subscribe/steps/given_steps.py +++ b/tests/acceptance/subscribe/steps/given_steps.py @@ -38,6 +38,12 @@ def step_impl(context: PNContext, max_retries: str): @given("the demo keyset with Presence EE enabled") def step_impl(context: PNContext): + context.log_stream_pubnub = StringIO() + logger = logging.getLogger('pubnub') + logger.setLevel(logging.DEBUG) + logger.handlers = [] + logger.addHandler(logging.StreamHandler(context.log_stream_pubnub)) + context.log_stream = StringIO() logger = logging.getLogger('pubnub').getChild('presence') logger.setLevel(logging.DEBUG) @@ -47,7 +53,9 @@ def step_impl(context: PNContext): context.pn_config = pnconf_env_acceptance_copy() context.pn_config.enable_subscribe = True context.pn_config.enable_presence_heartbeat = True - context.pn_config.reconnect_policy = PNReconnectionPolicy.NONE + context.pn_config.reconnect_policy = PNReconnectionPolicy.LINEAR + context.pn_config.subscribe_request_timeout = 10 + context.pn_config.set_presence_timeout(3) context.pubnub = PubNubAsyncio(context.pn_config, subscription_manager=EventEngineSubscriptionManager) context.callback = AcceptanceCallback() diff --git a/tests/acceptance/subscribe/steps/then_steps.py b/tests/acceptance/subscribe/steps/then_steps.py index 943a4ae8..afea3a50 100644 --- a/tests/acceptance/subscribe/steps/then_steps.py +++ b/tests/acceptance/subscribe/steps/then_steps.py @@ -12,15 +12,11 @@ @then("I receive the message in my subscribe response") @async_run_until_complete async def step_impl(context: PNContext): - try: - await busypie.wait() \ - .at_most(15) \ - .poll_delay(1) \ - .poll_interval(1) \ - .until_async(lambda: context.callback.message_result) - except Exception: - import ipdb - ipdb.set_trace() + await busypie.wait() \ + .at_most(15) \ + .poll_delay(1) \ + .poll_interval(1) \ + .until_async(lambda: context.callback.message_result) response = context.callback.message_result assert isinstance(response, PNMessageResult) @@ -75,8 +71,8 @@ async def step_impl(context: PNContext): async def step_impl(context: PNContext, wait_time: str): await busypie.wait() \ .at_most(int(wait_time)) \ - .poll_delay(1) \ - .poll_interval(1) \ + .poll_delay(int(wait_time)) \ + .poll_interval(int(wait_time)) \ .until_async(lambda: True) @@ -89,7 +85,8 @@ def parse_log_line(line: str): name = m.group(0).replace('Effect', '').replace('Event', '') name = name.replace('Effect', '').replace('Event', '') name = re.sub(r'([A-Z])', r'_\1', name).upper().lstrip('_') - name = name.replace('HEARTBEAT_JOIN', 'JOIN').replace('HEARTBEAT_WAIT', 'WAIT') + if name.endswith('JOINED') or name.endswith('LEFT') or name.endswith('WAIT'): + name = name.replace('HEARTBEAT_', '') return (line_type, name) normalized_log = [parse_log_line(log_line) for log_line in list(filter( @@ -97,15 +94,11 @@ def parse_log_line(line: str): context.log_stream.getvalue().splitlines() ))] - try: - for index, expected in enumerate(context.table): - logged_type, logged_name = normalized_log[index] - expected_type, expected_name = expected - assert expected_type == logged_type, f'on line {index + 1} => {expected_type} != {logged_type}' - assert expected_name == logged_name, f'on line {index + 1} => {expected_name} != {logged_name}' - except Exception: - import ipdb - ipdb.set_trace() + for index, expected in enumerate(context.table): + logged_type, logged_name = normalized_log[index] + expected_type, expected_name = expected + assert expected_type == logged_type, f'on line {index + 1} => {expected_type} != {logged_type}' + assert expected_name == logged_name, f'on line {index + 1} => {expected_name} != {logged_name}' @then(u'I wait for getting Presence joined events') @@ -113,9 +106,9 @@ def parse_log_line(line: str): async def step_impl(context: PNContext): await busypie.wait() \ .at_most(15) \ - .poll_delay(3) \ + .poll_delay(1) \ .poll_interval(1) \ - .until_async(lambda: True) + .until_async(lambda: context.callback.presence_result) @then(u'I receive an error in my heartbeat response') @@ -124,10 +117,10 @@ async def step_impl(context): pass -@then(u'I leave {channel1} and {channel2} channels with presence') +@then("I leave '{channel1}' and '{channel2}' channels with presence") @async_run_until_complete -async def step_impl(context): - pass +async def step_impl(context, channel1, channel2): + context.pubnub.unsubscribe().channels([channel1, channel2]).execute() @then(u'I don\'t observe any Events and Invocations of the Presence EE')