Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Presence engine - states and events #178

Merged
merged 16 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ jobs:
mkdir tests/acceptance/encryption/assets/
cp sdk-specifications/features/encryption/assets/* tests/acceptance/encryption/assets/
cp sdk-specifications/features/subscribe/event-engine/happy-path.feature tests/acceptance/subscribe/happy-path.feature
cp sdk-specifications/features/presence/event-engine/presence-engine.feature tests/acceptance/subscribe/presence-engine.feature
sudo pip3 install -r requirements-dev.txt
behave --junit tests/acceptance/pam
Expand Down
61 changes: 61 additions & 0 deletions examples/cli_chat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import argparse
import asyncio

from os import getenv
from pubnub.callbacks import SubscribeCallback
from pubnub.pubnub_asyncio import EventEngineSubscriptionManager, PubNubAsyncio
from pubnub.pnconfiguration import PNConfiguration

parser = argparse.ArgumentParser(description="Chat with others using PubNub")
parser.add_argument("name", help="Your name")
parser.add_argument("channel", help="The channel you want to join")
args = parser.parse_args()


class ExampleCallback(SubscribeCallback):
def message(self, pubnub, message):
print(f"{message.publisher}> {message.message}\n")

def presence(self, pubnub, presence):
print(f"{presence.event} {presence.uuid}\n")

def status(self, pubnub, status):
# print(status.__dict__)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment?

pass


async def async_input():
print()
await asyncio.sleep(0.1)
return (await asyncio.get_event_loop().run_in_executor(None, input))


async def main():
name = args.name if args.name else input("Enter your name: ")
channel = args.channel if args.channel else input("Enter the channel you want to join: ")

print("Welcome to the chat room. Type 'exit' to leave the chat.")

config = PNConfiguration()
config.subscribe_key = getenv("PN_KEY_SUBSCRIBE")
config.publish_key = getenv("PN_KEY_PUBLISH")
config.uuid = name

pubnub = PubNubAsyncio(config, subscription_manager=EventEngineSubscriptionManager)
pubnub.add_listener(ExampleCallback())

pubnub.subscribe().channels(channel).with_presence().execute()

while True:
message = await async_input()
print("\x1b[2K")
if message == "exit":
print("Goodbye!")
break

await pubnub.publish().channel(channel).message(message).future()


if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
24 changes: 24 additions & 0 deletions pubnub/dtos.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,18 @@ def __init__(self, channels=None, channel_groups=None, presence_enabled=None, ti
self.presence_enabled = presence_enabled
self.timetoken = timetoken

@property
def channels_with_pressence(self):
if not self.presence_enabled:
return self.channels
return self.channels + [ch + '-pnpres' for ch in self.channels]

@property
def groups_with_pressence(self):
if not self.presence_enabled:
return self.channel_groups
return self.channel_groups + [ch + '-pnpres' for ch in self.channel_groups]


class UnsubscribeOperation(object):
def __init__(self, channels=None, channel_groups=None):
Expand All @@ -19,6 +31,18 @@ def __init__(self, channels=None, channel_groups=None):
self.channels = channels
self.channel_groups = channel_groups

def get_subscribed_channels(self, channels, with_presence=False) -> list:
result = [ch for ch in channels if ch not in self.channels and not ch.endswith('-pnpres')]
if not with_presence:
return result
return result + [ch + '-pnpres' for ch in result]

def get_subscribed_channel_groups(self, channel_groups, with_presence=False) -> list:
result = [grp for grp in channel_groups if grp not in self.channel_groups and not grp.endswith('-pnpres')]
if not with_presence:
return result
return result + [grp + '-pnpres' for grp in result]


class StateOperation(object):
def __init__(self, channels=None, channel_groups=None, state=None):
Expand Down
3 changes: 3 additions & 0 deletions pubnub/endpoints/presence/heartbeat.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def custom_params(self):
if self._state is not None and len(self._state) > 0:
params['state'] = utils.url_write(self._state)

if hasattr(self.pubnub, '_subscription_manager'):
params.update(self.pubnub._subscription_manager.get_custom_params())

return params

def create_response(self, envelope):
Expand Down
3 changes: 3 additions & 0 deletions pubnub/endpoints/presence/leave.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ def custom_params(self):
if len(self._groups) > 0:
params['channel-group'] = utils.join_items(self._groups)

if hasattr(self.pubnub, '_subscription_manager'):
params.update(self.pubnub._subscription_manager.get_custom_params())

return params

def build_path(self):
Expand Down
11 changes: 11 additions & 0 deletions pubnub/endpoints/pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def __init__(self, pubnub):
self._filter_expression = None
self._timetoken = None
self._with_presence = None
self._state = None

def channels(self, channels):
utils.extend_list(self._channels, channels)
Expand All @@ -44,6 +45,10 @@ def region(self, region):

return self

def state(self, state):
self._state = state
return self

def http_method(self):
return HttpMethod.GET

Expand Down Expand Up @@ -75,6 +80,12 @@ def custom_params(self):
if not self.pubnub.config.heartbeat_default_values:
params['heartbeat'] = self.pubnub.config.presence_timeout

if self._state is not None and len(self._state) > 0:
params['state'] = utils.url_write(self._state)

if hasattr(self.pubnub, '_subscription_manager'):
params.update(self.pubnub._subscription_manager.get_custom_params())

return params

def create_response(self, envelope):
Expand Down
15 changes: 15 additions & 0 deletions pubnub/event_engine/containers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
class PresenceStateContainer:
channel_states: dict

def __init__(self):
self.channel_states = {}

def register_state(self, state: dict, channels: list):
for channel in channels:
self.channel_states[channel] = state

def get_state(self, channels: list):
return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states}

def get_channels_states(self, channels: list):
return {channel: self.channel_states[channel] for channel in channels if channel in self.channel_states}
44 changes: 22 additions & 22 deletions pubnub/event_engine/dispatcher.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
from pubnub.event_engine.models import effects
from pubnub.event_engine import manage_effects
from pubnub.event_engine.models import invocations
from pubnub.event_engine import effects


class Dispatcher:
_pubnub = None
_managed_effects_factory = None
_effects_factory = None

def __init__(self, event_engine) -> None:
self._event_engine = event_engine
self._managed_effects = {}
self._effect_emitter = manage_effects.EmitEffect()
self._effect_emitter = effects.EmitEffect()

def set_pn(self, pubnub_instance):
self._pubnub = pubnub_instance
self._effect_emitter.set_pn(pubnub_instance)

def dispatch_effect(self, effect: effects.PNEffect):
if not self._managed_effects_factory:
self._managed_effects_factory = manage_effects.ManagedEffectFactory(self._pubnub, self._event_engine)
def dispatch_effect(self, invocation: invocations.PNInvocation):
if not self._effects_factory:
Copy link

@jguz-pubnub jguz-pubnub Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhm, I don't think we should differentiate Effects that emit status (or anything else) from other non-cancellable Effects. They should have the same interface or method to fire them. I know it's not possible to fix it, so let's keep this comment so that others are aware of this difference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look at it

self._effects_factory = effects.EffectFactory(self._pubnub, self._event_engine)

if isinstance(effect, effects.PNEmittableEffect):
self.emit_effect(effect)
if isinstance(invocation, invocations.PNEmittableInvocation):
self.emit_effect(invocation)

elif isinstance(effect, effects.PNManageableEffect):
self.dispatch_managed_effect(effect)
elif isinstance(invocation, invocations.PNManageableInvocation):
self.dispatch_managed_effect(invocation)

elif isinstance(effect, effects.PNCancelEffect):
self.dispatch_cancel_effect(effect)
elif isinstance(invocation, invocations.PNCancelInvocation):
self.dispatch_cancel_effect(invocation)

def emit_effect(self, effect: effects.PNEffect):
def emit_effect(self, effect: invocations.PNInvocation):
self._effect_emitter.emit(effect)

def dispatch_managed_effect(self, effect: effects.PNEffect):
managed_effect = self._managed_effects_factory.create(effect)
managed_effect.run()
self._managed_effects[effect.__class__.__name__] = managed_effect
def dispatch_managed_effect(self, invocation: invocations.PNInvocation):
effect = self._effects_factory.create(invocation)
effect.run()
self._managed_effects[invocation.__class__.__name__] = effect

def dispatch_cancel_effect(self, effect: effects.PNEffect):
if effect.cancel_effect in self._managed_effects:
self._managed_effects[effect.cancel_effect].stop()
del self._managed_effects[effect.cancel_effect]
def dispatch_cancel_effect(self, invocation: invocations.PNInvocation):
if invocation.cancel_effect in self._managed_effects:
self._managed_effects[invocation.cancel_effect].stop()
del self._managed_effects[invocation.cancel_effect]
Loading
Loading