Skip to content

Commit

Permalink
Presence engine - states and events
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Jan 3, 2024
1 parent 4ac2f92 commit 7c6d680
Show file tree
Hide file tree
Showing 9 changed files with 599 additions and 6 deletions.
59 changes: 59 additions & 0 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import math

from typing import Optional, Union
from pubnub.endpoints.presence.heartbeat import Heartbeat
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNReconnectionPolicy
from pubnub.exceptions import PubNubException
Expand Down Expand Up @@ -313,3 +314,61 @@ def emit_status(self, effect: effects.EmitStatusEffect):
pn_status.category = effect.status
pn_status.error = False
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)


class ManageHeartbeatEffect(ManagedEffect):
def run(self):
channels = self.effect.channels
groups = self.effect.groups
if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()

loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.heartbeat(channels=channels, groups=groups, stop_event=self.stop_event)
if loop.is_running():
loop.create_task(coro)
else:
loop.run_until_complete(coro)

async def heartbeat(self, channels, groups, stop_event):
request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
heartbeat = await request.future()

if heartbeat.status.error:
self.logger.warning(f'Heartbeat failed: {heartbeat.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(heartbeat.status.error_data, 1))
else:
self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups))


class ManageHeartbeatWaitEffect(ManagedEffect):
def __init__(self, pubnub_instance, event_engine_instance,
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None:
super().__init__(pubnub_instance, event_engine_instance, effect)
self.heartbeat_interval = pubnub_instance.config.heartbeat_interval

def run(self):
if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()
loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.heartbeat_wait(self.heartbeat_interval, stop_event=self.stop_event)
if loop.is_running():
loop.create_task(coro)
else:
loop.run_until_complete(coro)

async def heartbeat(self, wait_time: int, stop_event):
try:
await asyncio.sleep(wait_time)
self.event_engine.trigger(events.HeartbeatTimesUpEvent(channels=self.effect.channels,
groups=self.effect.groups))
except asyncio.CancelledError:
pass


class ManageHeartbeatLeaveEffect(ManagedEffect):
pass


class ManageHeartbeatDelayedHeartbeatEffect(ManagedEffect):
pass
45 changes: 45 additions & 0 deletions pubnub/event_engine/models/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,48 @@ class EmitStatusEffect(PNEmittableEffect):
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
super().__init__()
self.status = status


"""
Presence Effects
"""


class HeartbeatEffect(PNEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatWaitEffect(PNEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatCancelWaitEffect(PNEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatLeaveEffect(PNEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatWaitEffect(PNEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatCancelWaitEffect(PNEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatDelayedEffect(PNEffect):
def __init__(self) -> None:
super().__init__()


class HeartbeatCancelDelayedEffect(PNEffect):
def __init__(self) -> None:
super().__init__()
44 changes: 44 additions & 0 deletions pubnub/event_engine/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,47 @@ class DisconnectEvent(PNEvent):

class ReconnectEvent(PNEvent):
pass


"""
Presence Events
"""


class HeartbeatJoinedEvent(PNChannelGroupsEvent):
pass


class HeartbeatReconnectEvent(PNChannelGroupsEvent, PNFailureEvent):
def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int,
timetoken: int = 0) -> None:
PNChannelGroupsEvent.__init__(self, channels, groups)
PNFailureEvent.__init__(self, reason, attempt, timetoken)


class HeartbeatLeftAllEvent(PNEvent):
pass


class HeartbeatLeftEvent(PNChannelGroupsEvent):
pass


class HeartbeatDisconnectEvent(PNChannelGroupsEvent):
pass


class HeartbeatSuccessEvent(PNChannelGroupsEvent):
pass


class HeartbeatFailureEvent(PNChannelGroupsEvent, PNFailureEvent):
pass


class HeartbeatTimesUpEvent(PNChannelGroupsEvent, PNFailureEvent):
pass


class HeartbeatGiveUpEvent(PNChannelGroupsEvent, PNFailureEvent):
pass
Loading

0 comments on commit 7c6d680

Please sign in to comment.