Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Event engine/compatibility #185

Merged
merged 7 commits into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ jobs:
cp sdk-specifications/features/encryption/cryptor-module.feature tests/acceptance/encryption
mkdir tests/acceptance/encryption/assets/
cp sdk-specifications/features/encryption/assets/* tests/acceptance/encryption/assets/
cp sdk-specifications/features/subscribe/event-engine/happy-path.feature tests/acceptance/subscribe/happy-path.feature
cp sdk-specifications/features/presence/event-engine/presence-engine.feature tests/acceptance/subscribe/presence-engine.feature
cp sdk-specifications/features/subscribe/event-engine/happy-path_Legacy.feature tests/acceptance/subscribe/happy-path_Legacy.feature
cp sdk-specifications/features/presence/event-engine/presence-engine_Legacy.feature tests/acceptance/subscribe/presence-engine_Legacy.feature

sudo pip3 install -r requirements-dev.txt
behave --junit tests/acceptance/pam
Expand Down
13 changes: 7 additions & 6 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ def get_new_stop_event(self):
return event

def calculate_reconnection_delay(self, attempts):
if self.reconnection_policy is PNReconnectionPolicy.LINEAR:
if self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1)
seba-aln marked this conversation as resolved.
Show resolved Hide resolved
else:
delay = self.interval

elif self.reconnection_policy is PNReconnectionPolicy.EXPONENTIAL:
delay = int(math.pow(2, attempts - 5 * math.floor((attempts - 1) / 5)) - 1)
return delay


Expand All @@ -88,9 +88,9 @@ async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0
request.timetoken(0)
response = await request.future()

if isinstance(response, PubNubException):
if isinstance(response, Exception):
self.logger.warning(f'Handshake failed: {str(response)}')
handshake_failure = events.HandshakeFailureEvent(str(response), 1, timetoken=timetoken)
handshake_failure = events.HandshakeFailureEvent(response, 1, timetoken=timetoken)
self.event_engine.trigger(handshake_failure)
elif response.status.error:
self.logger.warning(f'Handshake failed: {response.status.error_data.__dict__}')
Expand Down Expand Up @@ -292,7 +292,7 @@ async def heartbeat(self, channels, groups, stop_event):
self.logger.warning(f'Heartbeat failed: {str(response)}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data, attempt=1))
elif response.status.error:
elif response.status and response.status.error:
self.logger.warning(f'Heartbeat failed: {response.status.error_data.__dict__}')
self.event_engine.trigger(events.HeartbeatFailureEvent(channels=channels, groups=groups,
reason=response.status.error_data, attempt=1))
Expand Down Expand Up @@ -427,5 +427,6 @@ def emit_message(self, invocation: invocations.EmitMessagesInvocation):
def emit_status(self, invocation: invocations.EmitStatusInvocation):
pn_status = PNStatus()
pn_status.category = invocation.status
pn_status.operation = invocation.operation
pn_status.error = False
self.pubnub._subscription_manager._listener_manager.announce_status(pn_status)
7 changes: 6 additions & 1 deletion pubnub/event_engine/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ class ReconnectEvent(PNEvent):
pass


class UnsubscribeAllEvent(PNEvent):
pass


"""
Presence Events
"""
Expand All @@ -116,7 +120,8 @@ class HeartbeatReconnectEvent(PNEvent):


class HeartbeatLeftAllEvent(PNEvent):
pass
def __init__(self, suppress_leave: bool = False) -> None:
self.suppress_leave = suppress_leave


class HeartbeatLeftEvent(PNChannelGroupsEvent):
Expand Down
5 changes: 3 additions & 2 deletions pubnub/event_engine/models/invocations.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import List, Union
from pubnub.exceptions import PubNubException
from pubnub.enums import PNStatusCategory
from pubnub.enums import PNOperationType, PNStatusCategory


class PNInvocation:
Expand Down Expand Up @@ -90,9 +90,10 @@ def __init__(self, messages: Union[None, List[str]]) -> None:


class EmitStatusInvocation(PNEmittableInvocation):
def __init__(self, status: Union[None, PNStatusCategory]) -> None:
def __init__(self, status: Union[None, PNStatusCategory], operation: Union[None, PNOperationType] = None) -> None:
super().__init__()
self.status = status
self.operation = operation


"""
Expand Down
147 changes: 123 additions & 24 deletions pubnub/event_engine/models/states.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from pubnub.enums import PNStatusCategory
from pubnub.enums import PNOperationType, PNStatusCategory
from pubnub.event_engine.models import invocations
from pubnub.event_engine.models.invocations import PNInvocation
from pubnub.event_engine.models import events
Expand Down Expand Up @@ -99,6 +99,7 @@ def __init__(self, context: PNContext) -> None:
events.HandshakeSuccessEvent.__name__: self.handshaking_success,
events.SubscriptionRestoredEvent.__name__: self.subscription_restored,
events.SubscriptionChangedEvent.__name__: self.subscription_changed,
events.UnsubscribeAllEvent.__name__: self.unsubscribe_all,
}

def on_enter(self, context: Union[None, PNContext]):
Expand Down Expand Up @@ -171,6 +172,21 @@ def handshaking_success(self, event: events.HandshakeSuccessEvent, context: PNCo
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNConnectedCategory)
)

def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.timetoken = 0
self._context.region = None
self._context.attempt = 0
self._context.channels = []
self._context.groups = []

return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
)


class HandshakeReconnectingState(PNState):
def __init__(self, context: PNContext) -> None:
Expand Down Expand Up @@ -231,11 +247,18 @@ def give_up(self, event: events.HandshakeReconnectGiveupEvent, context: PNContex
self._context.update(context)
self._context.attempt = event.attempt
self._context.reason = event.reason
status_invocation = None

if isinstance(event, Exception) and 'status' in event.reason:
status_invocation = invocations.EmitStatusInvocation(status=event.reason.status.category,
operation=PNOperationType.PNUnsubscribeOperation)
else:
status_invocation = invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)

return PNTransition(
state=HandshakeFailedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNDisconnectedCategory)
invocation=status_invocation
)

def subscription_restored(self, event: events.SubscriptionRestoredEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -270,6 +293,7 @@ def __init__(self, context: PNContext) -> None:
events.SubscriptionChangedEvent.__name__: self.subscription_changed,
events.ReconnectEvent.__name__: self.reconnect,
events.SubscriptionRestoredEvent.__name__: self.subscription_restored,
events.UnsubscribeAllEvent.__name__: self.unsubscribe_all,
}

def subscription_changed(self, event: events.SubscriptionChangedEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -305,14 +329,30 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
context=self._context
)

def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
seba-aln marked this conversation as resolved.
Show resolved Hide resolved
self._context.timetoken = 0
self._context.region = None
self._context.attempt = 0
self._context.channels = []
self._context.groups = []

return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
)


class HandshakeStoppedState(PNState):
def __init__(self, context: PNContext) -> None:
super().__init__(context)
self._context.attempt = 0

self._transitions = {
events.ReconnectEvent.__name__: self.reconnect
events.ReconnectEvent.__name__: self.reconnect,
events.UnsubscribeAllEvent.__name__: self.unsubscribe_all,
}

def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
Expand All @@ -323,6 +363,21 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans
context=self._context
)

def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.timetoken = 0
self._context.region = None
self._context.attempt = 0
self._context.channels = []
self._context.groups = []

return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
)


class ReceivingState(PNState):
def __init__(self, context: PNContext) -> None:
Expand All @@ -336,6 +391,7 @@ def __init__(self, context: PNContext) -> None:
events.ReceiveFailureEvent.__name__: self.receiving_failure,
events.DisconnectEvent.__name__: self.disconnect,
events.ReconnectEvent.__name__: self.reconnect,
events.UnsubscribeAllEvent.__name__: self.unsubscribe_all,
}

def on_enter(self, context: Union[None, PNContext]):
Expand Down Expand Up @@ -410,6 +466,21 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans
context=self._context
)

def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.timetoken = 0
self._context.region = None
self._context.attempt = 0
self._context.channels = []
self._context.groups = []

return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
)


class ReceiveReconnectingState(PNState):
def __init__(self, context: PNContext) -> None:
Expand Down Expand Up @@ -511,6 +582,7 @@ def __init__(self, context: PNContext) -> None:
events.SubscriptionChangedEvent.__name__: self.subscription_changed,
events.SubscriptionRestoredEvent.__name__: self.subscription_restored,
events.ReconnectEvent.__name__: self.reconnect,
events.UnsubscribeAllEvent.__name__: self.unsubscribe_all,
}

def reconnect_retry(self, event: events.ReceiveReconnectRetryEvent, context: PNContext) -> PNTransition:
Expand Down Expand Up @@ -554,14 +626,30 @@ def subscription_restored(self, event: events.SubscriptionRestoredEvent, context
context=self._context
)

def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.timetoken = 0
self._context.region = None
self._context.attempt = 0
self._context.channels = []
self._context.groups = []

return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
)


class ReceiveStoppedState(PNState):
def __init__(self, context: PNContext) -> None:
super().__init__(context)
self._context.attempt = 0

self._transitions = {
events.ReconnectEvent.__name__: self.reconnect
events.ReconnectEvent.__name__: self.reconnect,
events.UnsubscribeAllEvent.__name__: self.unsubscribe_all,
}

def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTransition:
Expand All @@ -572,6 +660,21 @@ def reconnect(self, event: events.ReconnectEvent, context: PNContext) -> PNTrans
context=self._context
)

def unsubscribe_all(self, event: events.UnsubscribeAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.timetoken = 0
self._context.region = None
self._context.attempt = 0
self._context.channels = []
self._context.groups = []

return PNTransition(
state=UnsubscribedState,
context=self._context,
invocation=invocations.EmitStatusInvocation(PNStatusCategory.PNAcknowledgmentCategory,
operation=PNOperationType.PNUnsubscribeOperation)
)


"""
Presence states
Expand Down Expand Up @@ -711,13 +814,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.channels = []
self._context.groups = []

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
self._context.channels = []
self._context.groups = []

return PNTransition(
state=HeartbeatInactiveState,
Expand Down Expand Up @@ -769,13 +871,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.channels = []
self._context.groups = []

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
self._context.channels = []
self._context.groups = []

return PNTransition(
state=HeartbeatInactiveState,
Expand Down Expand Up @@ -857,13 +958,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.channels = []
self._context.groups = []

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
self._context.channels = []
self._context.groups = []

return PNTransition(
state=HeartbeatInactiveState,
Expand Down Expand Up @@ -1005,13 +1105,12 @@ def disconnect(self, event: events.HeartbeatDisconnectEvent, context: PNContext)

def left_all(self, event: events.HeartbeatLeftAllEvent, context: PNContext) -> PNTransition:
self._context.update(context)
self._context.channels = []
self._context.groups = []

invocation = None
if not event.suppress_leave:
invocation = invocations.HeartbeatLeaveInvocation(channels=event.channels,
groups=event.groups)
invocation = invocations.HeartbeatLeaveInvocation(channels=self._context.channels,
groups=self._context.groups)
self._context.channels = []
self._context.groups = []

return PNTransition(
state=HeartbeatInactiveState,
Expand Down
2 changes: 1 addition & 1 deletion pubnub/event_engine/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def trigger(self, event: events.PNEvent) -> states.PNTransition:

def dispatch_effects(self):
for invocation in self._invocations:
self.logger.debug(f'Dispatching {invocation.__class__.__name__} {id(invocation)}')
self.logger.debug(f'Dispatching {invocation.__class__.__name__} {invocation.__dict__} {id(invocation)}')
self._dispatcher.dispatch_effect(invocation)

self._invocations.clear()
Expand Down
Loading
Loading