This repository has been archived by the owner on May 27, 2020. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
component_subscription_manager.py
130 lines (94 loc) · 4.42 KB
/
component_subscription_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import asyncio
from autobahn.asyncio.wamp import ApplicationSession, ApplicationRunner
from autobahn.wamp.types import RegisterOptions
from autobahn.wamp import auth
from config import config
import aioredis
import requests
import json
class SubscriptionManagerComponent:
@classmethod
def run(cls):
print(f"Starting {cls.__name__}...")
url = f"ws://{config['crossbar']['host']}:{config['crossbar']['port']}"
runner = ApplicationRunner(url=url, realm=config["crossbar"]["realm"])
runner.run(SubscriptionManagerWAMPComponent)
class SubscriptionManagerWAMPComponent(ApplicationSession):
def __init__(self, c=None):
super().__init__(c)
def onConnect(self):
self.join(config["crossbar"]["realm"], ["wampcra"], config["crossbar"]["auth"]["username"])
def onDisconnect(self):
print("Disconnected from Crossbar!")
def onChallenge(self, challenge):
secret = config["crossbar"]["auth"]["password"]
signature = auth.compute_wcs(secret.encode('utf8'), challenge.extra['challenge'].encode('utf8'))
return signature.decode('ascii')
async def onJoin(self, details):
self.redis_client = await self._initialize_redis_client()
async def subscribe(channel):
await self.register_webhook(channel, action="subscribe")
async def unsubscribe(channel):
await self.register_webhook(channel, action="unsubscribe")
await self.register(subscribe, f"{config['crossbar']['realm']}.subscribe", options=RegisterOptions(invoke="roundrobin"))
await self.register(unsubscribe, f"{config['crossbar']['realm']}.unsubscribe", options=RegisterOptions(invoke="roundrobin"))
channels = config["channels"]
redis_key = f"{config['redis']['prefix']}:SUBSCRIPTION:*"
channel_keys = await self.redis_client.keys(redis_key)
for channel_key in channel_keys:
channel = channel_key.decode("utf-8").split(":")[-1]
if channel not in channels:
await self.register_webhook(channel, action="unsubscribe")
while True:
for channel in channels:
redis_key = f"{config['redis']['prefix']}:SUBSCRIPTION:{channel}"
channel_has_subscription = await self.redis_client.exists(redis_key)
channel_has_subscription = channel_has_subscription == 1
if not channel_has_subscription:
await self.register_webhook(channel, action="subscribe")
else:
continue
await asyncio.sleep(300)
async def register_webhook(self, channel, action="subscribe"):
if action not in ["subscribe", "unsubscribe"]:
return None
channel_id = await self.fetch_channel_id(channel)
url = "https://api.twitch.tv/helix/webhooks/hub"
data = {
"hub.callback": f"{config['webhook_callback_url']}/{channel_id}",
"hub.mode": action,
"hub.topic": f"https://api.twitch.tv/helix/streams?user_id={channel_id}",
"hub.lease_seconds": 864000,
"hub.secret": config["webhook_secret"]
}
requests.post(
url,
data=json.dumps(data),
headers={
"Client-ID": config["credentials"]["twitch"]["client_id"],
"Content-Type": "application/json"
}
)
async def fetch_channel_id(self, channel):
redis_key = f"{config['redis']['prefix']}:CHANNEL_ID:{channel}"
channel_id = await self.redis_client.get(redis_key)
if channel_id is None:
url = f"https://api.twitch.tv/helix/users?login={channel}"
response = requests.get(
url,
headers={"Client-ID": config["credentials"]["twitch"]["client_id"]}
)
channel_id = response.json()["data"][0]["id"]
await self.redis_client.set(redis_key, channel_id)
redis_key = f"{config['redis']['prefix']}:CHANNEL_NAME:{channel_id}"
await self.redis_client.set(redis_key, channel)
return channel_id
else:
return channel_id.decode("utf-8")
async def _initialize_redis_client(self):
return await aioredis.create_redis(
(config["redis"]["host"], config["redis"]["port"]),
loop=asyncio.get_event_loop()
)
if __name__ == "__main__":
SubscriptionManagerComponent.run()