forked from facebookincubator/jupyterhub_fb_authenticator
-
Notifications
You must be signed in to change notification settings - Fork 1
/
business_authenticator.py
117 lines (99 loc) · 4.12 KB
/
business_authenticator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Custom Jupyterhub Authenticator to use Facebook OAuth with business manager check.
"""
import json
import os
import urllib
from tornado.web import HTTPError
from .authenticator import FBAuthenticator
class FBBusinessAuthenticator(FBAuthenticator):
BUSINESS_ID = os.environ.get("BUSINESS_ID")
PAGE_THRESHOLD = 100
async def authorize(self, access_token, user_id):
# check if the user has business management permission
if not await self._check_permission(access_token, "business_management"):
self.log.warning(
"User %s doesn't have business management permission", user_id
)
raise HTTPError(
403, "Your access token doesn't have the required permission"
)
self.log.info("User %s passed business management permission check", user_id)
# check if the user is in the business
if not await self._check_in_business(access_token):
self.log.warning(
"User %s is not in the business %s", user_id, self.BUSINESS_ID
)
raise HTTPError(403, "Your are not in the business yet")
self.log.info("User %s passed business check", user_id)
return {
"name": user_id,
"auth_state": {
"access_token": access_token,
"fb_user": {"username": user_id},
},
}
async def _check_permission(self, access_token, permission):
"""
Return true if the user has the given permission, false if not.
Throw a HTTP 500 error otherwise.
"""
try:
url = f"{FBAuthenticator.FB_GRAPH_EP}/me/permissions/?permission={permission}&access_token={access_token}"
with urllib.request.urlopen(url) as response:
body = response.read()
permission = json.loads(body).get("data")
return permission and permission[0]["status"] == "granted"
except Exception:
raise HTTPError(500, "Failed to check permission")
async def _check_in_business(self, access_token):
"""
Return true if the user is in the given business, false if not.
Throw a HTTP 500 error otherwise.
"""
try:
url = f"{FBAuthenticator.FB_GRAPH_EP}/me/business_users?access_token={access_token}"
with urllib.request.urlopen(url) as response:
body = response.read()
body_json = json.loads(body)
return await self._check_in_page(body_json, 1)
except Exception:
raise HTTPError(500, "Authorization failed")
async def _check_in_page(self, body_json, current_page):
"""
Return false if the current page is larger thatn the threshold.
Return true if the user is in the given page.
Then recursively check the next page if it exists, return false if not.
Throw a HTTP 500 error otherwise.
"""
if current_page > self.PAGE_THRESHOLD:
return False
if self._has_business(body_json["data"]):
return True
paging = body_json["paging"]
if "next" not in paging:
return False
try:
next_page_url = paging["next"]
with urllib.request.urlopen(next_page_url) as response:
body = response.read()
return await self._check_in_page(json.loads(body), current_page + 1)
except Exception:
raise HTTPError(500, "Authorization failed")
def _has_business(self, data):
"""
Given the data of one page of business users, check if the user is in the business.
Return true if the user is in the business, false otherwise.
"""
return any(
"business" in entry
and "id" in entry["business"]
and entry["business"]["id"] == self.BUSINESS_ID
for entry in data
)