forked from datalogics/flask-oauthprovider
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flask_oauthprovider.py
343 lines (272 loc) · 13.8 KB
/
flask_oauthprovider.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
# -*- coding: utf-8 -*-
from oauthlib.oauth1.rfc5849 import Server
from oauthlib.oauth1.rfc5849.signature import collect_parameters
from oauthlib.common import add_params_to_uri, encode_params_utf8
from oauthlib.common import generate_token, urlencode
from flask import Response, request, redirect
from werkzeug.exceptions import Unauthorized, BadRequest
from functools import wraps
from urlparse import urlparse
class OAuthProvider(Server):
"""Provide secure services using OAuth 1 RFC 5849.
OAuthProvider is based on the secure and highly configurable Server
base class of oauthlib.oauth.rfc5849 for OAuth 1 providers. This flask
extension adds a number of convenience methods to act as helpers and a base.
A number of additional methods that will need to be implemented are added
and documented as to how they fit into the whole OAuth workflow. Detailed
descriptions of these methods are provided in respective method __doc__.
Providers will have to implement the following methods:
* register(self)
* save_timestamp_and_nonce(self, client_key, timestamp, nonce,
request_token=None, access_token=None
* authorize(self)
* get_callback(self, request_token)
* save_request_token(self, client_key, request_token, realm=None,
secret=None)
* save_verifier(self, client_key, request_token, verifier)
* save_access_token(self, client_key, request_token, realm=None,
secret=None)
Furthermore 4 default URLs are automatically routed using these properties:
* request_token_url
* access_token_url
* register_url
* authorize_url
Request tokens and access tokens will automatically be generated and
returned to clients. They will be saved using the abstract methods outlined
earlier.
A successful provider implementation will enable views to be easily and
securely protected. Providers will also enjoy fine-grained control over
which clients can access which resources through the use of realms.
Follows are two view functions, the first under the default non-specified
realm and the second under the photos realm.
@app.route("/status_feed")
@provider.require_oauth()
def status_feed(self):
...
@app.route("/photos")
@provider.require_oauth(realm="photos")
def photos(self):
...
"""
# Properties used to configure the application, can safely be overloaded
@property
def request_token_url(self):
return u'/request_token'
@property
def access_token_url(self):
return u'/access_token'
@property
def register_url(self):
return u'/register'
@property
def authorize_url(self):
return u'/authorize'
@property
def secret_length(self):
return 30
# Methods that must be overloaded
def register(self):
"""Client registration.
Defaults to /register URL.
A few common actions during client registration includes:
* Ask the client for an application name and description
* Ask the client for one or several callback URIs
* Allow the client to upload a public RSA key if the RSA signature
method is supported.
Upon registration each client must be provided with a client key. If
the HMAC signature method is used a client secret should also be
be provided.
For your convenience the following methods are provided:
* generate_client_key(self) for the client/consumer key
* generate_client_secret(self) for the client/consumer secret
"""
raise NotImplementedError("Must be implemented by inheriting classes")
def save_timestamp_and_nonce(self, client_key, timestamp, nonce,
request_token=None, access_token=None):
"""All timestamp and nonces must be stored.
It is recommended that they are also connected to at least the client
but preferably also the resource owner/user.
"""
raise NotImplementedError("Must be implemented by inheriting classes")
def authorize(self):
"""Ask the user to authorize access to the client.
Defaults to /authorize URL. Invoked by user (redirected by the client).
This view should only be accessible by authenticated users, redirect
unauthenticated users to a login.
Authorization is commonly done through a form asking the user to
grant or deny access. This form should also include information that
help the user identify which client it is authorizing access to.
Usually by displaying application name and description.
To the authorization URL the client will append the oauth_token parameter
which corresponds to the previously obtained request token. This token
should be validated using self.validate_request_token method.
The request token should be securely kept, preferably in an encrypted
HTTPOnly secure cookie during form submission as it will be needed to
complete the authorization.
Upon user authorization you should use the authorized method to easily
generate and return a verifier code to the client.
def authorize(self):
...
return authorized(request_token)
If the user denied access or if the request token was invalid it is
important to not redirect the user back to the client.
"""
raise NotImplementedError("Must be implemented by inheriting classes")
def get_callback(self, request_token):
"""Return the callback associated with the request token."""
raise NotImplementedError("Must be implemented by inheriting classes")
def save_request_token(self, client_key, request_token, realm=None,
secret=None):
"""Store request tokens.
This method is invoked by the request_token view and all you need to do
is to store the token and its associated realm and token secret.
"""
raise NotImplementedError("Must be implemented by inheriting classes")
def save_verifier(self, request_token, verifier):
"""Store verifier and user associated with a specific request token.
This method is invoked automatically by authorized.
It is VITAL that you relate the user who authorized access with this
verifier and request token or else you will be unable to provide
access to the correct resources later.
Since invocation of this method originates from the user accessing
the authorize view you should be able to extract their ID easily from
the request object.
"""
raise NotImplementedError("Must be implemented by inheriting classes")
def save_access_token(self, client_key, access_token, request_token,
secret=None):
"""Store access tokens.
This method is invoked by the access_token view and there are two
tasks you will need to carry out in addition to storing the token:
1. Retrieve the associated user and realm using request_token
2. Associate the realm and user with the new access token
"""
raise NotImplementedError("Must be implemented by inheriting classes")
# There be dragons beyond this point, tread lightly.
def __init__(self, app):
"""Setup routes and OAuth token methods."""
self.request_token = self.require_oauth(require_resource_owner=False)(self.request_token)
self.access_token = self.require_oauth(require_verifier=True)(self.access_token)
if app is not None:
self.app = app
self.init_app(app)
else:
self.app = None
def init_app(self, app):
"""Setup the 4 default routes."""
app.add_url_rule(self.request_token_url, view_func=self.request_token,
methods=[u'POST'])
app.add_url_rule(self.access_token_url, view_func=self.access_token,
methods=[u'POST'])
app.add_url_rule(self.register_url, view_func=self.register,
methods=[u'GET', u'POST'])
app.add_url_rule(self.authorize_url, view_func=self.authorize,
methods=[u'GET', u'POST'])
def authorized(self, request_token):
"""Create a verifier for an user authorized client"""
verifier = generate_token(length=self.verifier_length[1])
self.save_verifier(request_token, verifier)
response = [
(u'oauth_token', request_token),
(u'oauth_verifier', verifier)
]
callback = self.get_callback(request_token)
return redirect(add_params_to_uri(callback, response))
def request_token(self):
"""Create an OAuth request token for a valid client request.
Defaults to /request_token. Invoked by client applications.
"""
client_key = request.oauth.client_key
realm = request.oauth.realm
# TODO: fallback on default realm?
callback = request.oauth.callback_uri
request_token = generate_token(length=self.request_token_length[1])
token_secret = generate_token(length=self.secret_length)
self.save_request_token(client_key, request_token, callback,
realm=realm, secret=token_secret)
return urlencode([(u'oauth_token', request_token),
(u'oauth_token_secret', token_secret),
(u'oauth_callback_confirmed', u'true')])
def access_token(self):
"""Create an OAuth access token for an authorized client.
Defaults to /access_token. Invoked by client applications.
"""
access_token = generate_token(length=self.access_token_length[1])
token_secret = generate_token(self.secret_length)
client_key = request.oauth.client_key
self.save_access_token(client_key, access_token,
request.oauth.resource_owner_key, secret=token_secret)
return urlencode([(u'oauth_token', access_token),
(u'oauth_token_secret', token_secret)])
def generate_client_key(self):
return generate_token(length=self.client_key_length[1])
def generate_client_secret(self):
return generate_token(length=self.secret_length)
def require_oauth(self, realm=None, require_resource_owner=True,
require_verifier=False, require_realm=False):
"""Mark the view function f as a protected resource"""
def decorator(f):
@wraps(f)
def verify_request(*args, **kwargs):
"""Verify OAuth params before running view function f"""
try:
if request.form:
body = request.form.to_dict()
else:
body = request.data.decode("utf-8")
verify_result = self.verify_request(request.url.decode("utf-8"),
http_method=request.method.decode("utf-8"),
body=body,
headers=request.headers,
require_resource_owner=require_resource_owner,
require_verifier=require_verifier,
require_realm=require_realm or bool(realm),
required_realm=realm)
valid, oauth_request = verify_result
if valid:
request.oauth = self.collect_request_parameters(request)
# Request tokens are only valid when a verifier is too
token = {}
if require_verifier:
token[u'request_token'] = request.oauth.resource_owner_key
else:
token[u'access_token'] = request.oauth.resource_owner_key
# All nonce/timestamp pairs must be stored to prevent
# replay attacks, they may be connected to a specific
# client and token to decrease collision probability.
self.save_timestamp_and_nonce(request.oauth.client_key,
request.oauth.timestamp, request.oauth.nonce,
**token)
# By this point, the request is fully authorized
return f(*args, **kwargs)
else:
# Unauthorized requests should not diclose their cause
raise Unauthorized()
except ValueError as err:
# Caused by missing of or badly formatted parameters
raise BadRequest(err.message)
return verify_request
return decorator
def collect_request_parameters(self, request):
"""Collect parameters in an object for convenient access"""
class OAuthParameters(object):
"""Used as a parameter container since plain object()s can't"""
pass
# Collect parameters
query = urlparse(request.url.decode("utf-8")).query
if request.form:
body = request.form.to_dict()
else:
body = request.data.decode("utf-8")
headers = dict(encode_params_utf8(request.headers.items()))
params = dict(collect_parameters(uri_query=query, body=body, headers=headers))
# Extract params and store for convenient and predictable access
oauth_params = OAuthParameters()
oauth_params.client_key = params.get(u'oauth_consumer_key')
oauth_params.resource_owner_key = params.get(u'oauth_token', None)
oauth_params.nonce = params.get(u'oauth_nonce')
oauth_params.timestamp = params.get(u'oauth_timestamp')
oauth_params.verifier = params.get(u'oauth_verifier', None)
oauth_params.callback_uri = params.get(u'oauth_callback', None)
oauth_params.realm = params.get(u'realm', None)
return oauth_params