Skip to content

Commit

Permalink
Add support for subscription state
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Feb 5, 2024
1 parent 1103ba8 commit ce317a6
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 19 deletions.
8 changes: 8 additions & 0 deletions pubnub/endpoints/pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, pubnub):
self._filter_expression = None
self._timetoken = None
self._with_presence = None
self._state = None

def channels(self, channels):
utils.extend_list(self._channels, channels)
Expand All @@ -44,6 +45,10 @@ def region(self, region):

return self

def state(self, state):
self._state = state
return self

def http_method(self):
return HttpMethod.GET

Expand Down Expand Up @@ -75,6 +80,9 @@ def custom_params(self):
if not self.pubnub.config.heartbeat_default_values:
params['heartbeat'] = self.pubnub.config.presence_timeout

if self._state is not None and len(self._state) > 0:
params['state'] = utils.url_write(self._state)

if hasattr(self.pubnub, '_subscription_manager'):
params.update(self.pubnub._subscription_manager.get_custom_params())

Expand Down
15 changes: 15 additions & 0 deletions pubnub/event_engine/containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class PresenceStateContainer:
channel_states: dict

def __init__(self):
self.channel_states = {}

def register_state(self, state: dict, channels: list):
for channel in channels:
self.channel_states[channel] = state

def get_state(self, channels: list):
return {**self.get_channels_states(channels)}

def get_channels_states(self, channels: list):
return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states}
19 changes: 10 additions & 9 deletions pubnub/event_engine/effects.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ def run(self):

async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0):
request = Subscribe(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)
if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'):
# request.set_state(self._context.states) # stub for state handling
pass

if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'):
state_container = self.pubnub.state_container
request.state(state_container.get_state(channels))

request.timetoken(0)
response = await request.future()
Expand Down Expand Up @@ -197,9 +198,9 @@ async def delayed_reconnect_async(self, delay, attempt):
if self.invocation.region:
request.region(self.invocation.region)

if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'):
# subscribe.set_state(self._context.states) # stub for state handling
pass
if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'):
state_container = self.pubnub.state_container
request.state(state_container.get_state(self.invocation.channels))

response = await request.future()

Expand Down Expand Up @@ -281,9 +282,9 @@ def run(self):
async def heartbeat(self, channels, groups, stop_event):
request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event)

if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'):
# subscribe.set_state(self._context.states) # stub for state handling
pass
if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'):
state_container = self.pubnub.state_container
request.state(state_container.get_state(self.invocation.channels))

response = await request.future()

Expand Down
2 changes: 1 addition & 1 deletion pubnub/event_engine/statemachine.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class StateMachine:
_enabled: bool

def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dispatcher] = None,
name: str = None) -> None:
name: Optional[str] = None) -> None:
self._context = states.PNContext()
self._current_state = initial_state(self._context)
self._listeners = {}
Expand Down
18 changes: 14 additions & 4 deletions pubnub/pubnub_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from asyncio import Event, Queue, Semaphore
from yarl import URL
from pubnub.event_engine.containers import PresenceStateContainer
from pubnub.event_engine.models import events, states

from pubnub.models.consumer.common import PNStatus
Expand Down Expand Up @@ -557,12 +558,15 @@ class EventEngineSubscriptionManager(SubscriptionManager):
loop: asyncio.AbstractEventLoop

def __init__(self, pubnub_instance):
self.event_engine = StateMachine(states.UnsubscribedState, name="subscribe")
self.presence_engine = StateMachine(states.HeartbeatInactiveState, name="presence")
self.state_container = PresenceStateContainer()
self.event_engine = StateMachine(states.UnsubscribedState,
name="subscribe")
self.presence_engine = StateMachine(states.HeartbeatInactiveState,
name="presence")
self.event_engine.get_dispatcher().set_pn(pubnub_instance)
self.presence_engine.get_dispatcher().set_pn(pubnub_instance)
self.loop = asyncio.new_event_loop()

pubnub_instance.state_container = self.state_container
super().__init__(pubnub_instance)

def stop(self):
Expand Down Expand Up @@ -594,14 +598,20 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation):
def adapt_unsubscribe_builder(self, unsubscribe_operation):
if not isinstance(unsubscribe_operation, UnsubscribeOperation):
raise PubNubException('Invalid Unsubscribe Operation')
event = events.SubscriptionChangedEvent(None, None)
event = events.SubscriptionChangedEvent(['third', 'third-pnpres'], [])
self.event_engine.trigger(event)
self.presence_engine.trigger(event=events.HeartbeatLeftEvent(
channels=unsubscribe_operation.channels,
groups=unsubscribe_operation.channel_groups,
suppress_leave=self._pubnub.config.suppress_leave_events
))

def adapt_state_builder(self, state_operation):
self.state_container.register_state(state_operation.state,
state_operation.channels,
state_operation.channel_groups)
return super().adapt_state_builder(state_operation)

def get_custom_params(self):
return {'ee': 1}

Expand Down
7 changes: 2 additions & 5 deletions tests/acceptance/subscribe/steps/then_steps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
import busypie

Expand Down Expand Up @@ -69,11 +70,7 @@ async def step_impl(ctx: PNContext):
@then("I wait '{wait_time}' seconds")
@async_run_until_complete
async def step_impl(ctx: PNContext, wait_time: str):
await busypie.wait() \
.at_most(int(wait_time)) \
.poll_delay(int(wait_time)) \
.poll_interval(int(wait_time)) \
.until_async(lambda: True)
await asyncio.sleep(int(wait_time))


@then(u'I observe the following Events and Invocations of the Presence EE')
Expand Down
16 changes: 16 additions & 0 deletions tests/functional/event_engine/test_state_container.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from pubnub.event_engine.containers import PresenceStateContainer


def test_set_state():
container = PresenceStateContainer()
container.register_state(state={'state': 'active'}, channels=['c1', 'c2'])
assert container.get_channels_states(['c1', 'c2']) == {'c1': {'state': 'active'}, 'c2': {'state': 'active'}}
assert container.get_state(['c1']) == {'c1': {'state': 'active'}}


def test_set_state_with_overwrite():
container = PresenceStateContainer()
container.register_state(state={'state': 'active'}, channels=['c1'])
container.register_state(state={'state': 'inactive'}, channels=['c1'])
assert container.get_channels_states(['c1']) == {'c1': {'state': 'inactive'}}
assert container.get_state(['c1', 'c2']) == {'c1': {'state': 'inactive'}}

0 comments on commit ce317a6

Please sign in to comment.