Skip to content

Commit

Permalink
Fix leaving channels
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Feb 7, 2024
1 parent ce317a6 commit 0b66f04
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 35 deletions.
16 changes: 14 additions & 2 deletions pubnub/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ def __init__(self, channels=None, channel_groups=None, presence_enabled=None, ti
def channels_with_pressence(self):
if not self.presence_enabled:
return self.channels
return [*self.channels] + [ch + '-pnpres' for ch in self.channels]
return self.channels + [ch + '-pnpres' for ch in self.channels]

@property
def groups_with_pressence(self):
if not self.presence_enabled:
return self.channel_groups
return [*self.channel_groups] + [ch + '-pnpres' for ch in self.channel_groups]
return self.channel_groups + [ch + '-pnpres' for ch in self.channel_groups]


class UnsubscribeOperation(object):
Expand All @@ -31,6 +31,18 @@ def __init__(self, channels=None, channel_groups=None):
self.channels = channels
self.channel_groups = channel_groups

def get_subscribed_channels(self, channels, with_presence=False) -> list:
result = [ch for ch in channels if ch not in self.channels and not ch.endswith('-pnpres')]
if not with_presence:
return result
return result + [ch + '-pnpres' for ch in result]

def get_subscribed_channel_groups(self, channel_groups, with_presence=False) -> list:
result = [grp for grp in channel_groups if grp not in self.channel_groups and not grp.endswith('-pnpres')]
if not with_presence:
return result
return result + [grp + '-pnpres' for grp in result]


class StateOperation(object):
def __init__(self, channels=None, channel_groups=None, state=None):
Expand Down
2 changes: 1 addition & 1 deletion pubnub/event_engine/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def register_state(self, state: dict, channels: list):
self.channel_states[channel] = state

def get_state(self, channels: list):
return {**self.get_channels_states(channels)}
return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states}

def get_channels_states(self, channels: list):
return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states}
2 changes: 1 addition & 1 deletion pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ def run(self):
def run_async(self, coro):
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
if loop.is_running():
self.task = loop.create_task(coro)
self.task = loop.create_task(coro, name=self.__class__.__name__)
else:
self.task = loop.run_until_complete(coro)

Expand Down
7 changes: 5 additions & 2 deletions pubnub/event_engine/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,17 @@ def __init__(self, channels: List[str], groups: List[str]) -> None:


class SubscriptionChangedEvent(PNChannelGroupsEvent):
def __init__(self, channels: List[str], groups: List[str]) -> None:
def __init__(self, channels: List[str], groups: List[str], with_presence: Optional[bool] = None) -> None:
PNChannelGroupsEvent.__init__(self, channels, groups)
self.with_presence = with_presence


class SubscriptionRestoredEvent(PNCursorEvent, PNChannelGroupsEvent):
def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None) -> None:
def __init__(self, timetoken: str, channels: List[str], groups: List[str], region: Optional[int] = None,
with_presence: Optional[bool] = None) -> None:
PNCursorEvent.__init__(self, timetoken, region)
PNChannelGroupsEvent.__init__(self, channels, groups)
self.with_presence = with_presence


class HandshakeSuccessEvent(PNCursorEvent):
Expand Down
65 changes: 40 additions & 25 deletions pubnub/event_engine/models/states.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class PNContext(dict):
timetoken: str
attempt: int
reason: PubNubException
with_presence: bool = False

def update(self, context):
super().update(context.__dict__)
Expand Down Expand Up @@ -67,6 +68,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = 0

return PNTransition(
Expand All @@ -78,6 +80,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = event.timetoken
self._context.region = event.region

Expand Down Expand Up @@ -113,6 +116,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = 0

return PNTransition(
Expand All @@ -124,6 +128,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.region = event.region
if self._context.timetoken == 0:
self._context.timetoken = event.timetoken
Expand Down Expand Up @@ -204,6 +209,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = 0

return PNTransition(
Expand Down Expand Up @@ -236,6 +242,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = event.timetoken
self._context.region = event.region

Expand Down Expand Up @@ -269,6 +276,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = 0

return PNTransition(
Expand All @@ -288,6 +296,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = event.timetoken
self._context.region = event.region

Expand Down Expand Up @@ -342,7 +351,8 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.timetoken = 0
self._context.with_presence = event.with_presence
# self._context.timetoken = 0 # why we don't reset timetoken here?

return PNTransition(
state=self.__class__,
Expand All @@ -353,6 +363,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = event.timetoken
self._context.region = event.region

Expand Down Expand Up @@ -440,6 +451,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = 0

return PNTransition(
Expand Down Expand Up @@ -481,6 +493,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = event.timetoken
self._context.region = event.region

Expand Down Expand Up @@ -512,6 +525,7 @@ def subscription_changed(self, event: events.SubscriptionChangedEvent, context:
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = 0

return PNTransition(
Expand All @@ -531,6 +545,7 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
self._context.update(context)
self._context.channels = event.channels
self._context.groups = event.groups
self._context.with_presence = event.with_presence
self._context.timetoken = event.timetoken
self._context.region = event.region

Expand Down Expand Up @@ -663,8 +678,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatingState,
Expand All @@ -685,8 +700,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatStoppedState,
Expand All @@ -701,8 +716,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatInactiveState,
Expand Down Expand Up @@ -743,8 +758,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatStoppedState,
Expand All @@ -759,8 +774,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatInactiveState,
Expand All @@ -786,8 +801,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatingState,
Expand Down Expand Up @@ -831,8 +846,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatStoppedState,
Expand All @@ -847,8 +862,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatInactiveState,
Expand All @@ -874,8 +889,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatingState,
Expand Down Expand Up @@ -946,8 +961,8 @@ def left(self, event: events.HeartbeatLeftEvent, context: PNContext) -> PNTransi

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatingState,
Expand Down Expand Up @@ -979,8 +994,8 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatStoppedState,
Expand All @@ -995,8 +1010,8 @@ def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> P

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)

return PNTransition(
state=HeartbeatInactiveState,
Expand Down
3 changes: 3 additions & 0 deletions pubnub/event_engine/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition:
self.logger.debug(f'Current State: {self.get_state_name()}')
self.logger.debug(f'Triggered event: {event.__class__.__name__}({event.__dict__}) on {self.get_state_name()}')

if (isinstance(event, events.SubscriptionChangedEvent)):
print(f"SubscriptionChangedEvent in : {self.get_state_name()}")

if not self._enabled:
self.logger.error('EventEngine is not enabled')
return False
Expand Down
19 changes: 15 additions & 4 deletions pubnub/pubnub_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,12 +581,14 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation):
subscription_event = events.SubscriptionRestoredEvent(
channels=subscribe_operation.channels_with_pressence,
groups=subscribe_operation.groups_with_pressence,
timetoken=subscribe_operation.timetoken
timetoken=subscribe_operation.timetoken,
with_presence=subscribe_operation.presence_enabled
)
else:
subscription_event = events.SubscriptionChangedEvent(
channels=subscribe_operation.channels_with_pressence,
groups=subscribe_operation.groups_with_pressence
groups=subscribe_operation.groups_with_pressence,
with_presence=subscribe_operation.presence_enabled
)
self.event_engine.trigger(subscription_event)
if self._pubnub.config._heartbeat_interval > 0:
Expand All @@ -598,8 +600,17 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation):
def adapt_unsubscribe_builder(self, unsubscribe_operation):
if not isinstance(unsubscribe_operation, UnsubscribeOperation):
raise PubNubException('Invalid Unsubscribe Operation')
event = events.SubscriptionChangedEvent(['third', 'third-pnpres'], [])
self.event_engine.trigger(event)

channels = unsubscribe_operation.get_subscribed_channels(
self.event_engine.get_context().channels,
self.event_engine.get_context().with_presence)

groups = unsubscribe_operation.get_subscribed_channel_groups(
self.event_engine.get_context().groups,
self.event_engine.get_context().with_presence)

self.event_engine.trigger(events.SubscriptionChangedEvent(channels=channels, groups=groups))

self.presence_engine.trigger(event=events.HeartbeatLeftEvent(
channels=unsubscribe_operation.channels,
groups=unsubscribe_operation.channel_groups,
Expand Down

0 comments on commit 0b66f04

Please sign in to comment.