-
Notifications
You must be signed in to change notification settings - Fork 0
/
auth0_tokens.py
133 lines (117 loc) · 4.62 KB
/
auth0_tokens.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""Extracted from Python Flask API Auth0 integration example
"""
from functools import wraps
import json
from os import environ as env
from urllib.request import urlopen
from dotenv import load_dotenv, find_dotenv
from flask import request, _request_ctx_stack
from jose import jwt
ENV_FILE = find_dotenv()
if ENV_FILE:
load_dotenv(ENV_FILE)
AUTH0_DOMAIN = env.get("AUTH0_DOMAIN")
API_IDENTIFIER = env.get("API_IDENTIFIER")
ALGORITHMS = ["RS256"]
# Format error response and append status code.
class AuthError(Exception):
def __init__(self, error, status_code):
self.error = error
self.status_code = status_code
def get_token_auth_header():
"""Obtains the access token from the Authorization Header
"""
auth = request.headers.get("Authorization", None)
if not auth:
raise AuthError({"success": False,
"message":
"Authorization header is missing"}, 401)
parts = auth.split()
if parts[0].lower() != "bearer":
raise AuthError({"success": False,
"message":
"Invalid header: "
"Authorization header must start with"
" Bearer"}, 401)
elif len(parts) == 1:
raise AuthError({"success": False,
"message": "Invalid header: Token not found"}, 401)
elif len(parts) > 2:
raise AuthError({"success": False,
"message":
"Invalid header: "
"Authorization header must be"
" Bearer token"}, 401)
token = parts[1]
return token
def requires_scope(required_scope):
"""Determines if the required scope is present in the access token
Args:
required_scope (str): The scope required to access the resource
"""
token = get_token_auth_header()
unverified_claims = jwt.get_unverified_claims(token)
if unverified_claims.get("scope"):
token_scopes = unverified_claims["scope"].split()
for token_scope in token_scopes:
if token_scope == required_scope:
return True
return False
def requires_auth(f):
"""Determines if the access token is valid
"""
@wraps(f)
def decorated(*args, **kwargs):
token = get_token_auth_header()
jsonurl = urlopen("https://" + AUTH0_DOMAIN + "/.well-known/jwks.json")
jwks = json.loads(jsonurl.read())
try:
unverified_header = jwt.get_unverified_header(token)
except jwt.JWTError:
raise AuthError({"success": False,
"message":
"Invalid header: "
"Use an RS256 signed JWT Access Token"}, 401)
if unverified_header["alg"] == "HS256":
raise AuthError({"success": False,
"message":
"Invalid header: "
"Use an RS256 signed JWT Access Token"}, 401)
rsa_key = {}
for key in jwks["keys"]:
if key["kid"] == unverified_header["kid"]:
rsa_key = {
"kty": key["kty"],
"kid": key["kid"],
"use": key["use"],
"n": key["n"],
"e": key["e"]
}
if rsa_key:
try:
payload = jwt.decode(
token,
rsa_key,
algorithms=ALGORITHMS,
audience=API_IDENTIFIER,
issuer="https://" + AUTH0_DOMAIN + "/"
)
except jwt.ExpiredSignatureError:
raise AuthError({"success": False,
"message": "Token is expired"}, 401)
except jwt.JWTClaimsError:
raise AuthError({"success": False,
"message":
"Invalid claims,"
" please check the audience and issuer"}, 401)
except Exception:
raise AuthError({"success": False,
"message":
"Invalid header: "
"Unable to parse authentication"
" token."}, 401)
_request_ctx_stack.top.current_user = payload
return f(*args, **kwargs)
raise AuthError({"success": False,
"message": "Invalid header: Unable to find appropriate key"}, 401)
return decorated