Skip to content

Commit

Permalink
Presence engine, refactors and everything else including meaning of life
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Jan 26, 2024
1 parent be47efe commit 9693ef1
Show file tree
Hide file tree
Showing 13 changed files with 374 additions and 141 deletions.
152 changes: 110 additions & 42 deletions pubnub/event_engine/manage_effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from pubnub.endpoints.pubsub.subscribe import Subscribe
from pubnub.enums import PNReconnectionPolicy
from pubnub.exceptions import PubNubException
from pubnub.models.consumer.pubsub import PNMessageResult
from pubnub.features import feature_enabled
from pubnub.models.server.subscribe import SubscribeMessage
from pubnub.pubnub import PubNub
from pubnub.event_engine.models import effects, events
from pubnub.models.consumer.common import PNStatus
from pubnub.workers import BaseMessageWorker


class ManagedEffect:
Expand Down Expand Up @@ -73,15 +74,23 @@ def run(self):

async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0):
request = Subscribe(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'):
# request.set_state(self._context.states) # stub for state handling
pass

request.timetoken(0)
handshake = await request.future()
response = await request.future()

if handshake.status.error:
self.logger.warning(f'Handshake failed: {handshake.status.error_data.__dict__}')
handshake_failure = events.HandshakeFailureEvent(handshake.status.error_data, 1, timetoken=timetoken)
if isinstance(response, PubNubException):
self.logger.warning(f'Handshake failed: {str(response)}')
handshake_failure = events.HandshakeFailureEvent(str(response), 1, timetoken=timetoken)
self.event_engine.trigger(handshake_failure)
elif response.status.error:
self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}')
handshake_failure = events.HandshakeFailureEvent(response.status.error_data, 1, timetoken=timetoken)
self.event_engine.trigger(handshake_failure)
else:
cursor = handshake.result['t']
cursor = response.result['t']
timetoken = timetoken if timetoken > 0 else cursor['t']
region = cursor['r']
handshake_success = events.HandshakeSuccessEvent(timetoken, region)
Expand Down Expand Up @@ -110,18 +119,18 @@ def run(self):
pass

async def receive_messages_async(self, channels, groups, timetoken, region):
subscribe = Subscribe(self.pubnub)
request = Subscribe(self.pubnub)
if channels:
subscribe.channels(channels)
request.channels(channels)
if groups:
subscribe.channel_groups(groups)
request.channel_groups(groups)
if timetoken:
subscribe.timetoken(timetoken)
request.timetoken(timetoken)
if region:
subscribe.region(region)
request.region(region)

subscribe.cancellation_event(self.stop_event)
response = await subscribe.future()
request.cancellation_event(self.stop_event)
response = await request.future()

if response.status is None and response.result is None:
self.logger.warning('Recieve messages failed: Empty response')
Expand Down Expand Up @@ -196,25 +205,34 @@ async def delayed_reconnect_async(self, delay, attempt):
self.stop_event = self.get_new_stop_event()
await asyncio.sleep(delay)

request = Subscribe(self.pubnub) \
.channels(self.effect.channels) \
.channel_groups(self.effect.groups) \
.timetoken(self.get_timetoken()) \
.cancellation_event(self.stop_event)
request = Subscribe(self.pubnub).timetoken(self.get_timetoken()).cancellation_event(self.stop_event)

if self.effect.channels:
request.channels(self.effect.channels)
if self.effect.groups:
request.channel_groups(self.effect.groups)

if self.effect.region:
request.region(self.effect.region)

reconnect = await request.future()
if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'):
# subscribe.set_state(self._context.states) # stub for state handling
pass

response = await request.future()

if isinstance(response, PubNubException):
self.logger.warning(f'Reconnect failed: {str(response)}')
self.failure(str(response), attempt, self.get_timetoken())

if reconnect.status.error:
self.logger.warning(f'Reconnect failed: {reconnect.status.error_data.__dict__}')
self.failure(reconnect.status.error_data, attempt, self.get_timetoken())
elif response.status.error:
self.logger.warning(f'Reconnect failed: {response.status.error_data.__dict__}')
self.failure(response.status.error_data, attempt, self.get_timetoken())
else:
cursor = reconnect.result['t']
cursor = response.result['t']
timetoken = int(self.effect.timetoken) if self.effect.timetoken else cursor['t']
region = cursor['r']
messages = reconnect.result['m']
messages = response.result['m']
self.success(timetoken=timetoken, region=region, messages=messages)

def stop(self):
Expand Down Expand Up @@ -286,12 +304,21 @@ def run(self):

async def heartbeat(self, channels, groups, stop_event):
request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
heartbeat = await request.future()

if heartbeat.status.error:
self.logger.warning(f'Heartbeat failed: {heartbeat.status.error_data.__dict__}')
if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'):
# subscribe.set_state(self._context.states) # stub for state handling
pass

response = await request.future()

if isinstance(response, PubNubException):
self.logger.warning(f'Heartbeat failed: {str(response)}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data, attempt=1))
elif response.status.error:
self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=heartbeat.status.error_data, attempt=1))
reason=response.status.error_data, attempt=1))
else:
self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups))

Expand All @@ -314,8 +341,7 @@ def run(self):
async def heartbeat_wait(self, wait_time: int, stop_event):
try:
await asyncio.sleep(wait_time)
self.event_engine.trigger(events.HeartbeatTimesUpEvent(channels=self.effect.channels,
groups=self.effect.groups))
self.event_engine.trigger(events.HeartbeatTimesUpEvent())
except asyncio.CancelledError:
pass

Expand All @@ -341,8 +367,55 @@ async def leave(self, channels, groups, stop_event):
self.logger.warning(f'Heartbeat failed: {leave.status.error_data.__dict__}')


class ManagedHeartbeatDelayedHeartbeatEffect(ManagedEffect):
pass
class ManagedHeartbeatDelayedEffect(ManagedEffect):
def __init__(self, pubnub_instance, event_engine_instance,
effect: Union[effects.PNManageableEffect, effects.PNCancelEffect]) -> None:
super().__init__(pubnub_instance, event_engine_instance, effect)
self.reconnection_policy = pubnub_instance.config.reconnect_policy
self.max_retry_attempts = pubnub_instance.config.maximum_reconnection_retries
self.interval = pubnub_instance.config.RECONNECTION_INTERVAL
self.min_backoff = pubnub_instance.config.RECONNECTION_MIN_EXPONENTIAL_BACKOFF
self.max_backoff = pubnub_instance.config.RECONNECTION_MAX_EXPONENTIAL_BACKOFF

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.LINEAR:
delay = self.interval

elif self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1)
return delay

def run(self):
channels = self.effect.channels
groups = self.effect.groups
if hasattr(self.pubnub, 'event_loop'):
self.stop_event = self.get_new_stop_event()

loop: asyncio.AbstractEventLoop = self.pubnub.event_loop
coro = self.heartbeat(channels=channels, groups=groups, attempt=1, stop_event=self.stop_event)
if loop.is_running():
self.task = loop.create_task(coro)
else:
self.task = loop.run_until_complete(coro)

async def heartbeat(self, channels, groups, attempt, stop_event):
request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
delay = self.calculate_reconnection_delay(attempt)
await asyncio.sleep(delay)

response = await request.future()
if isinstance(response, PubNubException):
self.logger.warning(f'Heartbeat failed: {str(response)}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data,
attempt=attempt + 1))
elif response.status.error:
self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data,
attempt=attempt + 1))
else:
self.event_engine.trigger(events.HeartbeatSuccessEvent(channels=channels, groups=groups))


class ManagedEffectFactory:
Expand All @@ -353,7 +426,7 @@ class ManagedEffectFactory:
effects.ReceiveReconnectEffect.__name__: ManagedReceiveReconnectEffect,
effects.HeartbeatEffect.__name__: ManagedHeartbeatEffect,
effects.HeartbeatWaitEffect.__name__: ManagedHeartbeatWaitEffect,
effects.HeartbeatDelayedEffect.__name__: ManagedHeartbeatDelayedHeartbeatEffect,
effects.HeartbeatDelayedEffect.__name__: ManagedHeartbeatDelayedEffect,
effects.HeartbeatLeaveEffect.__name__: ManagedHeartbeatLeaveEffect,
}

Expand All @@ -369,9 +442,11 @@ def create(self, effect: ManagedEffect):

class EmitEffect:
pubnub: PubNub
message_worker: BaseMessageWorker

def set_pn(self, pubnub: PubNub):
self.pubnub = pubnub
self.message_worker = BaseMessageWorker(pubnub)

def emit(self, effect: effects.PNEmittableEffect):
if isinstance(effect, effects.EmitMessagesEffect):
Expand All @@ -380,17 +455,10 @@ def emit(self, effect: effects.PNEmittableEffect):
self.emit_status(effect)

def emit_message(self, effect: effects.EmitMessagesEffect):
self.message_worker._listener_manager = self.pubnub._subscription_manager._listener_manager
for message in effect.messages:
subscribe_message = SubscribeMessage().from_json(message)
pn_message_result = PNMessageResult(
message=subscribe_message.payload,
subscription=subscribe_message.subscription_match,
channel=subscribe_message.channel,
timetoken=int(message['p']['t']),
user_metadata=subscribe_message.publish_metadata,
publisher=subscribe_message.issuing_client_id
)
self.pubnub._subscription_manager._listener_manager.announce_message(pn_message_result)
self.message_worker._process_incoming_payload(subscribe_message)

def emit_status(self, effect: effects.EmitStatusEffect):
pn_status = PNStatus()
Expand Down
15 changes: 13 additions & 2 deletions pubnub/event_engine/models/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,25 @@ class HeartbeatCancelWaitEffect(PNCancelEffect):


class HeartbeatLeaveEffect(PNManageableEffect):
def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None) -> None:
def __init__(self, channels: Union[None, List[str]] = None, groups: Union[None, List[str]] = None,
suppress_leave: bool = False) -> None:
super().__init__()
self.channels = channels
self.groups = groups
self.suppress_leave = suppress_leave


class HeartbeatDelayedEffect(PNManageableEffect):
pass
def __init__(self,
channels: Union[None, List[str]] = None,
groups: Union[None, List[str]] = None,
attempts: Union[None, int] = None,
reason: Union[None, PubNubException] = None):
super().__init__()
self.channels = channels
self.groups = groups
self.attempts = attempts
self.reason = reason


class HeartbeatCancelDelayedEffect(PNCancelEffect):
Expand Down
11 changes: 5 additions & 6 deletions pubnub/event_engine/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ class HeartbeatLeftAllEvent(PNEvent):


class HeartbeatLeftEvent(PNChannelGroupsEvent):
pass
def __init__(self, channels: List[str], groups: List[str], suppress_leave: bool = False) -> None:
PNChannelGroupsEvent.__init__(self, channels, groups)
self.suppress_leave = suppress_leave


class HeartbeatDisconnectEvent(PNChannelGroupsEvent):
Expand All @@ -135,11 +137,8 @@ def __init__(self, channels: List[str], groups: List[str], reason: PubNubExcepti
PNFailureEvent.__init__(self, reason, attempt, timetoken)


class HeartbeatTimesUpEvent(PNChannelGroupsEvent, PNFailureEvent):
def __init__(self, channels: List[str], groups: List[str], reason: PubNubException, attempt: int,
timetoken: int = 0) -> None:
PNChannelGroupsEvent.__init__(self, channels, groups)
PNFailureEvent.__init__(self, reason, attempt, timetoken)
class HeartbeatTimesUpEvent(PNEvent):
pass


class HeartbeatGiveUpEvent(PNChannelGroupsEvent, PNFailureEvent):
Expand Down
Loading

0 comments on commit 9693ef1

Please sign in to comment.