diff --git a/pubnub/endpoints/pubsub/subscribe.py b/pubnub/endpoints/pubsub/subscribe.py index b27783cc..7209aa42 100644 --- a/pubnub/endpoints/pubsub/subscribe.py +++ b/pubnub/endpoints/pubsub/subscribe.py @@ -18,6 +18,7 @@ def __init__(self, pubnub): self._filter_expression = None self._timetoken = None self._with_presence = None + self._state = None def channels(self, channels): utils.extend_list(self._channels, channels) @@ -44,6 +45,10 @@ def region(self, region): return self + def state(self, state): + self._state = state + return self + def http_method(self): return HttpMethod.GET @@ -75,6 +80,9 @@ def custom_params(self): if not self.pubnub.config.heartbeat_default_values: params['heartbeat'] = self.pubnub.config.presence_timeout + if self._state is not None and len(self._state) > 0: + params['state'] = utils.url_write(self._state) + if hasattr(self.pubnub, '_subscription_manager'): params.update(self.pubnub._subscription_manager.get_custom_params()) diff --git a/pubnub/event_engine/containers.py b/pubnub/event_engine/containers.py new file mode 100644 index 00000000..14a4e9d5 --- /dev/null +++ b/pubnub/event_engine/containers.py @@ -0,0 +1,15 @@ +class PresenceStateContainer: + channel_states: dict + + def __init__(self): + self.channel_states = {} + + def register_state(self, state: dict, channels: list): + for channel in channels: + self.channel_states[channel] = state + + def get_state(self, channels: list): + return {**self.get_channels_states(channels)} + + def get_channels_states(self, channels: list): + return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states} diff --git a/pubnub/event_engine/effects.py b/pubnub/event_engine/effects.py index 25cd155d..cd190038 100644 --- a/pubnub/event_engine/effects.py +++ b/pubnub/event_engine/effects.py @@ -80,9 +80,10 @@ def run(self): async def handshake_async(self, channels, groups, stop_event, timetoken: int = 0): request = Subscribe(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) - if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'): - # request.set_state(self._context.states) # stub for state handling - pass + + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'): + state_container = self.pubnub.state_container + request.state(state_container.get_state(channels)) request.timetoken(0) response = await request.future() @@ -197,9 +198,9 @@ async def delayed_reconnect_async(self, delay, attempt): if self.invocation.region: request.region(self.invocation.region) - if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'): - # subscribe.set_state(self._context.states) # stub for state handling - pass + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'): + state_container = self.pubnub.state_container + request.state(state_container.get_state(self.invocation.channels)) response = await request.future() @@ -281,9 +282,9 @@ def run(self): async def heartbeat(self, channels, groups, stop_event): request = Heartbeat(self.pubnub).channels(channels).channel_groups(groups).cancellation_event(stop_event) - if feature_enabled('PN_MAINTAIN_PRESENCE_STATE'): - # subscribe.set_state(self._context.states) # stub for state handling - pass + if feature_enabled('PN_MAINTAIN_PRESENCE_STATE') and hasattr(self.pubnub, 'state_container'): + state_container = self.pubnub.state_container + request.state(state_container.get_state(self.invocation.channels)) response = await request.future() diff --git a/pubnub/event_engine/statemachine.py b/pubnub/event_engine/statemachine.py index 8faa8ce1..41c0b327 100644 --- a/pubnub/event_engine/statemachine.py +++ b/pubnub/event_engine/statemachine.py @@ -13,7 +13,7 @@ class StateMachine: _enabled: bool def __init__(self, initial_state: states.PNState, dispatcher_class: Optional[Dispatcher] = None, - name: str = None) -> None: + name: Optional[str] = None) -> None: self._context = states.PNContext() self._current_state = initial_state(self._context) self._listeners = {} diff --git a/pubnub/pubnub_asyncio.py b/pubnub/pubnub_asyncio.py index 4985dbba..9007af14 100644 --- a/pubnub/pubnub_asyncio.py +++ b/pubnub/pubnub_asyncio.py @@ -8,6 +8,7 @@ from asyncio import Event, Queue, Semaphore from yarl import URL +from pubnub.event_engine.containers import PresenceStateContainer from pubnub.event_engine.models import events, states from pubnub.models.consumer.common import PNStatus @@ -557,12 +558,15 @@ class EventEngineSubscriptionManager(SubscriptionManager): loop: asyncio.AbstractEventLoop def __init__(self, pubnub_instance): - self.event_engine = StateMachine(states.UnsubscribedState, name="subscribe") - self.presence_engine = StateMachine(states.HeartbeatInactiveState, name="presence") + self.state_container = PresenceStateContainer() + self.event_engine = StateMachine(states.UnsubscribedState, + name="subscribe") + self.presence_engine = StateMachine(states.HeartbeatInactiveState, + name="presence") self.event_engine.get_dispatcher().set_pn(pubnub_instance) self.presence_engine.get_dispatcher().set_pn(pubnub_instance) self.loop = asyncio.new_event_loop() - + pubnub_instance.state_container = self.state_container super().__init__(pubnub_instance) def stop(self): @@ -594,7 +598,7 @@ def adapt_subscribe_builder(self, subscribe_operation: SubscribeOperation): def adapt_unsubscribe_builder(self, unsubscribe_operation): if not isinstance(unsubscribe_operation, UnsubscribeOperation): raise PubNubException('Invalid Unsubscribe Operation') - event = events.SubscriptionChangedEvent(None, None) + event = events.SubscriptionChangedEvent(['third', 'third-pnpres'], []) self.event_engine.trigger(event) self.presence_engine.trigger(event=events.HeartbeatLeftEvent( channels=unsubscribe_operation.channels, @@ -602,6 +606,12 @@ def adapt_unsubscribe_builder(self, unsubscribe_operation): suppress_leave=self._pubnub.config.suppress_leave_events )) + def adapt_state_builder(self, state_operation): + self.state_container.register_state(state_operation.state, + state_operation.channels, + state_operation.channel_groups) + return super().adapt_state_builder(state_operation) + def get_custom_params(self): return {'ee': 1} diff --git a/tests/acceptance/subscribe/steps/then_steps.py b/tests/acceptance/subscribe/steps/then_steps.py index 77f545e6..26c84c63 100644 --- a/tests/acceptance/subscribe/steps/then_steps.py +++ b/tests/acceptance/subscribe/steps/then_steps.py @@ -1,3 +1,4 @@ +import asyncio import re import busypie @@ -69,11 +70,7 @@ async def step_impl(ctx: PNContext): @then("I wait '{wait_time}' seconds") @async_run_until_complete async def step_impl(ctx: PNContext, wait_time: str): - await busypie.wait() \ - .at_most(int(wait_time)) \ - .poll_delay(int(wait_time)) \ - .poll_interval(int(wait_time)) \ - .until_async(lambda: True) + await asyncio.sleep(int(wait_time)) @then(u'I observe the following Events and Invocations of the Presence EE') diff --git a/tests/functional/event_engine/test_state_container.py b/tests/functional/event_engine/test_state_container.py new file mode 100644 index 00000000..d0b7af7d --- /dev/null +++ b/tests/functional/event_engine/test_state_container.py @@ -0,0 +1,16 @@ +from pubnub.event_engine.containers import PresenceStateContainer + + +def test_set_state(): + container = PresenceStateContainer() + container.register_state(state={'state': 'active'}, channels=['c1', 'c2']) + assert container.get_channels_states(['c1', 'c2']) == {'c1': {'state': 'active'}, 'c2': {'state': 'active'}} + assert container.get_state(['c1']) == {'c1': {'state': 'active'}} + + +def test_set_state_with_overwrite(): + container = PresenceStateContainer() + container.register_state(state={'state': 'active'}, channels=['c1']) + container.register_state(state={'state': 'inactive'}, channels=['c1']) + assert container.get_channels_states(['c1']) == {'c1': {'state': 'inactive'}} + assert container.get_state(['c1', 'c2']) == {'c1': {'state': 'inactive'}}