From 462f1cbbf9641b08e92320be7e4ac8b93c7dd8d7 Mon Sep 17 00:00:00 2001 From: Sebastian Molenda Date: Wed, 7 Feb 2024 22:36:20 +0100 Subject: [PATCH] Fix leaving channels --- .github/workflows/run-tests.yml | 1 + pubnub/dtos.py | 16 ++++++- pubnub/event_engine/containers.py | 2 +- pubnub/event_engine/effects.py | 2 +- pubnub/event_engine/models/events.py | 7 ++- pubnub/event_engine/models/states.py | 65 +++++++++++++++++----------- pubnub/event_engine/statemachine.py | 3 ++ pubnub/pubnub_asyncio.py | 19 ++++++-- 8 files changed, 80 insertions(+), 35 deletions(-) diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index bb6f8cda..5567fda7 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -78,6 +78,7 @@ jobs: mkdir tests/acceptance/encryption/assets/ cp sdk-specifications/features/encryption/assets/* tests/acceptance/encryption/assets/ cp sdk-specifications/features/subscribe/event-engine/happy-path.feature tests/acceptance/subscribe/happy-path.feature + cp sdk-specifications/features/presence/event-engine/presence-engine.feature tests/acceptance/subscribe/presence-engine.feature sudo pip3 install -r requirements-dev.txt behave --junit tests/acceptance/pam diff --git a/pubnub/dtos.py b/pubnub/dtos.py index 94991444..047714a0 100644 --- a/pubnub/dtos.py +++ b/pubnub/dtos.py @@ -14,13 +14,13 @@ def __init__(self, channels=None, channel_groups=None, presence_enabled=None, ti def channels_with_pressence(self): if not self.presence_enabled: return self.channels - return [*self.channels] + [ch + '-pnpres' for ch in self.channels] + return self.channels + [ch + '-pnpres' for ch in self.channels] @property def groups_with_pressence(self): if not self.presence_enabled: return self.channel_groups - return [*self.channel_groups] + [ch + '-pnpres' for ch in self.channel_groups] + return self.channel_groups + [ch + '-pnpres' for ch in self.channel_groups] class UnsubscribeOperation(object): @@ -31,6 +31,18 @@ def __init__(self, channels=None, channel_groups=None): self.channels = channels self.channel_groups = channel_groups + def get_subscribed_channels(self, channels, with_presence=False) -> list: + result = [ch for ch in channels if ch not in self.channels and not ch.endswith('-pnpres')] + if not with_presence: + return result + return result + [ch + '-pnpres' for ch in result] + + def get_subscribed_channel_groups(self, channel_groups, with_presence=False) -> list: + result = [grp for grp in channel_groups if grp not in self.channel_groups and not grp.endswith('-pnpres')] + if not with_presence: + return result + return result + [grp + '-pnpres' for grp in result] + class StateOperation(object): def __init__(self, channels=None, channel_groups=None, state=None): diff --git a/pubnub/event_engine/containers.py b/pubnub/event_engine/containers.py index 14a4e9d5..7f53708c 100644 --- a/pubnub/event_engine/containers.py +++ b/pubnub/event_engine/containers.py @@ -9,7 +9,7 @@ def register_state(self, state: dict, channels: list): self.channel_states[channel] = state def get_state(self, channels: list): - return {**self.get_channels_states(channels)} + return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states} def get_channels_states(self, channels: list): return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states} diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index cd190038..a6a7ce43 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -41,7 +41,7 @@ def run(self): def run_async(self, coro): loop: asyncio.AbstractEventLoop = self.pubnub.event_loop if loop.is_running(): - self.task = loop.create_task(coro) + self.task = loop.create_task(coro, name=self.__class__.__name__) else: self.task = loop.run_until_complete(coro) diff --git a/pubnub/event_engine/models/events.py b/pubnub/event_engine/models/events.py index e3dfeac0..6b926337 100644 --- a/pubnub/event_engine/models/events.py +++ b/pubnub/event_engine/models/events.py @@ -28,14 +28,17 @@ def __init__(self, channels: List[str], groups: List[str]) -> None: class SubscriptionChangedEvent(PNChannelGroupsEvent): - def __init__(self, channels: List[str], groups: List[str]) -> None: + def __init__(self, channels: List[str], groups: List[str], with_presence: Optional[bool] = None) -> None: PNChannelGroupsEvent.__init__(self, channels, groups) + self.with_presence = with_presence class SubscriptionRestoredEvent(PNCursorEvent, PNChannelGroupsEvent): - def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None) -> None: + def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None, + with_presence: Optional[bool] = None) -> None: PNCursorEvent.__init__(self, timetoken, region) PNChannelGroupsEvent.__init__(self, channels, groups) + self.with_presence = with_presence class HandshakeSuccessEvent(PNCursorEvent): diff --git a/pubnub/event_engine/models/states.py b/pubnub/event_engine/models/states.py index b54f0572..72acdfcd 100644 --- a/pubnub/event_engine/models/states.py +++ b/pubnub/event_engine/models/states.py @@ -13,6 +13,7 @@ class PNContext(dict): timetoken: str attempt: int reason: PubNubException + with_presence: bool = False def update(self, context): super().update(context.__dict__) @@ -67,6 +68,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -78,6 +80,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -113,6 +116,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -124,6 +128,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.region = event.region if self._context.timetoken == 0: self._context.timetoken = event.timetoken @@ -204,6 +209,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -236,6 +242,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -269,6 +276,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -288,6 +296,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -342,7 +351,8 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups - self._context.timetoken = 0 + self._context.with_presence = event.with_presence + # self._context.timetoken = 0 # why we don't reset timetoken here? return PNTransition( state=self.__class__, @@ -353,6 +363,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -440,6 +451,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -481,6 +493,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -512,6 +525,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context: self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = 0 return PNTransition( @@ -531,6 +545,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context self._context.update(context) self._context.channels = event.channels self._context.groups = event.groups + self._context.with_presence = event.with_presence self._context.timetoken = event.timetoken self._context.region = event.region @@ -663,8 +678,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatingState, @@ -685,8 +700,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatStoppedState, @@ -701,8 +716,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatInactiveState, @@ -743,8 +758,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatStoppedState, @@ -759,8 +774,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatInactiveState, @@ -786,8 +801,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatingState, @@ -831,8 +846,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatStoppedState, @@ -847,8 +862,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatInactiveState, @@ -874,8 +889,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatingState, @@ -946,8 +961,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatingState, @@ -979,8 +994,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext) invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatStoppedState, @@ -995,8 +1010,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P invocation = None if not event.suppress_leave: - invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels, - groups=self._context.groups) + invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels, + groups=event.groups) return PNTransition( state=HeartbeatInactiveState, diff --git a/pubnub/event_engine/statemachine.py b/pubnub/event_engine/statemachine.py index 41c0b327..555b2239 100644 --- a/pubnub/event_engine/statemachine.py +++ b/pubnub/event_engine/statemachine.py @@ -42,6 +42,9 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition: self.logger.debug(f'Current State: {self.get_state_name()}') self.logger.debug(f'Triggered event: {event.__class__.__name__}({event.__dict__}) on {self.get_state_name()}') + if (isinstance(event, events.SubscriptionChangedEvent)): + print(f"SubscriptionChangedEvent in : {self.get_state_name()}") + if not self._enabled: self.logger.error('EventEngine is not enabled') return False diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 9007af14..d47eb40c 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -581,12 +581,14 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation): subscription_event = events.SubscriptionRestoredEvent( channels=subscribe_operation.channels_with_pressence, groups=subscribe_operation.groups_with_pressence, - timetoken=subscribe_operation.timetoken + timetoken=subscribe_operation.timetoken, + with_presence=subscribe_operation.presence_enabled ) else: subscription_event = events.SubscriptionChangedEvent( channels=subscribe_operation.channels_with_pressence, - groups=subscribe_operation.groups_with_pressence + groups=subscribe_operation.groups_with_pressence, + with_presence=subscribe_operation.presence_enabled ) self.event_engine.trigger(subscription_event) if self._pubnub.config._heartbeat_interval > 0: @@ -598,8 +600,17 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation): def adapt_unsubscribe_builder(self, unsubscribe_operation): if not isinstance(unsubscribe_operation, UnsubscribeOperation): raise PubNubException('Invalid Unsubscribe Operation') - event = events.SubscriptionChangedEvent(['third', 'third-pnpres'], []) - self.event_engine.trigger(event) + + channels = unsubscribe_operation.get_subscribed_channels( + self.event_engine.get_context().channels, + self.event_engine.get_context().with_presence) + + groups = unsubscribe_operation.get_subscribed_channel_groups( + self.event_engine.get_context().groups, + self.event_engine.get_context().with_presence) + + self.event_engine.trigger(events.SubscriptionChangedEvent(channels=channels, groups=groups)) + self.presence_engine.trigger(event=events.HeartbeatLeftEvent( channels=unsubscribe_operation.channels, groups=unsubscribe_operation.channel_groups,