forked from facebookincubator/jupyterhub_fb_authenticator
-
Notifications
You must be signed in to change notification settings - Fork 1
/
authenticator.py
121 lines (92 loc) · 3.62 KB
/
authenticator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
"""
Custom Jupyterhub Authenticator to use Facebook OAuth.
"""
import json
import os
import urllib
from jupyterhub.auth import LocalAuthenticator
from oauthenticator.oauth2 import OAuthenticator, OAuthLoginHandler
from tornado import gen
from tornado.auth import OAuth2Mixin
from tornado.httpclient import AsyncHTTPClient, HTTPRequest
from tornado.httputil import url_concat
from tornado.web import HTTPError
from traitlets import Bool, Dict, Unicode
class GenericEnvMixin(OAuth2Mixin):
_OAUTH_AUTHORIZE_URL = os.environ.get("OAUTH2_AUTHORIZE_URL", "")
_OAUTH_ACCESS_TOKEN_URL = os.environ.get("OAUTH2_TOKEN_URL", "")
class FBLoginHandler(OAuthLoginHandler, GenericEnvMixin):
pass
class FBAuthenticator(OAuthenticator):
FB_GRAPH_EP = "https://graph.facebook.com"
login_service = Unicode("Facebook", config=True)
login_handler = FBLoginHandler
token_url = Unicode(
os.environ.get("OAUTH2_TOKEN_URL", ""),
config=True,
help="Access token endpoint URL",
)
extra_params = Dict(help="Extra parameters for first POST request").tag(config=True)
tls_verify = Bool(
os.environ.get("OAUTH2_TLS_VERIFY", "True").lower() in {"true", "1"},
config=True,
help="Disable TLS verification on http request",
)
async def authenticate(self, handler, data=None):
code = handler.get_argument("code")
# get app user id
params = {
"client_id": self.client_id,
"redirect_uri": self.get_callback_url(handler),
"client_secret": self.client_secret,
"code": code,
"grant_type": "authorization_code",
}
params.update(self.extra_params)
if self.token_url:
url = self.token_url
else:
raise ValueError("Please set the OAUTH2_TOKEN_URL environment variable")
url = url_concat(url, params)
resp_json = await self._http_get(url)
access_token = resp_json["access_token"]
user_id = await self._get_user_id(access_token)
return await self.authorize(access_token, user_id)
async def _http_get(self, url):
http_client = AsyncHTTPClient()
headers = {"Accept": "application/json", "User-Agent": "JupyterHub"}
req = HTTPRequest(
url, method="GET", headers=headers, validate_cert=self.tls_verify
)
resp = await http_client.fetch(req)
return json.loads(resp.body.decode("utf8", "replace"))
async def _get_user_id(self, access_token):
"""
Return user id of the given user.
Throw a HTTP 500 error otherwise.
"""
try:
url = f"{FBAuthenticator.FB_GRAPH_EP}/me?fields=id&access_token={access_token}"
with urllib.request.urlopen(url) as response:
body = response.read()
user = json.loads(body)
return user["id"]
except Exception:
raise HTTPError(500, "Failed to get user ID")
async def authorize(self, access_token, user_id):
raise NotImplementedError()
@gen.coroutine
def pre_spawn_start(self, user, spawner):
auth_state = yield user.get_auth_state()
if not auth_state:
return
spawner.environment["FB_TOKEN"] = auth_state["access_token"]
class LocalFBAuthenticator(LocalAuthenticator, FBAuthenticator):
"""A version that mixes in local system user creation"""
pass