Skip to content

Commit

Permalink
Presence engine - states and events
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Jan 3, 2024
1 parent 4ac2f92 commit 4e9eb3e
Show file tree
Hide file tree
Showing 9 changed files with 587 additions and 6 deletions.
60 changes: 60 additions & 0 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math

from typing import Optional, Union
from pubnub.endpoints.presence.heartbeat import Heartbeat
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNReconnectionPolicy
from pubnub.exceptions import PubNubException
Expand Down Expand Up @@ -265,12 +266,71 @@ def get_timetoken(self):
return int(self.effect.timetoken)


class ManagedHeartbeatEffect(ManagedEffect):
def run(self):
channels = self.effect.channels
groups = self.effect.groups
if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()

loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.heartbeat(channels=channels, groups=groups, stop_event=self.stop_event)
if loop.is_running():
loop.create_task(coro)
else:
loop.run_until_complete(coro)

async def heartbeat(self, channels, groups, stop_event):
request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
heartbeat = await request.future()

if heartbeat.status.error:
self.logger.warning(f'Heartbeat failed: {heartbeat.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(heartbeat.status.error_data, 1))
else:
self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups))


class ManagedHeartbeatWaitEffect(ManagedEffect):
def __init__(self, pubnub_instance, event_engine_instance,
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None:
super().__init__(pubnub_instance, event_engine_instance, effect)
self.heartbeat_interval = pubnub_instance.config.heartbeat_interval

def run(self):
if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.heartbeat_wait(self.heartbeat_interval, stop_event=self.stop_event)
if loop.is_running():
loop.create_task(coro)
else:
loop.run_until_complete(coro)

async def heartbeat(self, wait_time: int, stop_event):
try:
await asyncio.sleep(wait_time)
self.event_engine.trigger(events.HeartbeatTimesUpEvent(channels=self.effect.channels,
groups=self.effect.groups))
except asyncio.CancelledError:
pass


class ManagedHeartbeatLeaveEffect(ManagedEffect):
pass


class ManagedHeartbeatDelayedHeartbeatEffect(ManagedEffect):
pass


class ManagedEffectFactory:
_managed_effects = {
effects.HandshakeEffect.__name__: ManageHandshakeEffect,
effects.ReceiveMessagesEffect.__name__: ManagedReceiveMessagesEffect,
effects.HandshakeReconnectEffect.__name__: ManagedHandshakeReconnectEffect,
effects.ReceiveReconnectEffect.__name__: ManagedReceiveReconnectEffect,
effects.HeartbeatEffect.__name__: ManagedHeartbeatEffect,
}

def __init__(self, pubnub_instance, event_engine_instance) -> None:
Expand Down
35 changes: 35 additions & 0 deletions pubnub/event_engine/models/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,38 @@ class EmitStatusEffect(PNEmittableEffect):
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
super().__init__()
self.status = status


"""
Presence Effects
"""


class HeartbeatEffect(PNManageableEffect):
def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None) -> None:
super().__init__()
self.channels = channels
self.groups = groups


class HeartbeatWaitEffect(PNManageableEffect):
def __init__(self, time) -> None:
super().__init__()


class HeartbeatCancelWaitEffect(PNCancelEffect):
cancel_effect = HeartbeatWaitEffect.__name__


class HeartbeatLeaveEffect(PNManageableEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatDelayedEffect(PNManageableEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatCancelDelayedEffect(PNCancelEffect):
cancel_effect = HeartbeatDelayedEffect.__name__
50 changes: 50 additions & 0 deletions pubnub/event_engine/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,53 @@ class DisconnectEvent(PNEvent):

class ReconnectEvent(PNEvent):
pass


"""
Presence Events
"""


class HeartbeatJoinedEvent(PNChannelGroupsEvent):
pass


class HeartbeatReconnectEvent(PNEvent):
pass


class HeartbeatLeftAllEvent(PNEvent):
pass


class HeartbeatLeftEvent(PNChannelGroupsEvent):
pass


class HeartbeatDisconnectEvent(PNChannelGroupsEvent):
pass


class HeartbeatSuccessEvent(PNChannelGroupsEvent):
pass


class HeartbeatFailureEvent(PNChannelGroupsEvent, PNFailureEvent):
def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int,
timetoken: int = 0) -> None:
PNChannelGroupsEvent.__init__(self, channels, groups)
PNFailureEvent.__init__(self, reason, attempt, timetoken)


class HeartbeatTimesUpEvent(PNChannelGroupsEvent, PNFailureEvent):
def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int,
timetoken: int = 0) -> None:
PNChannelGroupsEvent.__init__(self, channels, groups)
PNFailureEvent.__init__(self, reason, attempt, timetoken)


class HeartbeatGiveUpEvent(PNChannelGroupsEvent, PNFailureEvent):
def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int,
timetoken: int = 0) -> None:
PNChannelGroupsEvent.__init__(self, channels, groups)
PNFailureEvent.__init__(self, reason, attempt, timetoken)
Loading

0 comments on commit 4e9eb3e

Please sign in to comment.