forked from nishen/nginx-ad-proxy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
active_directory_dao.py
129 lines (102 loc) · 4.48 KB
/
active_directory_dao.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
import os
import re
import ssl
import logging as log
from ldap3 import Tls, Server, Connection, ALL, ALL_ATTRIBUTES, SUBTREE
from ldap3.core.exceptions import LDAPBindError, LDAPException, LDAPStartTLSError
from cachetools import cached, TTLCache
log.basicConfig(level=log.INFO, format="[%(asctime)s][%(levelname)s]: %(message)s")
if "DEBUG" in os.environ and os.environ["DEBUG"] == "1":
log.getLogger().setLevel(log.DEBUG)
AD_HOST = os.environ["AD_HOST"]
AD_PORT = os.environ["AD_PORT"]
AD_DOMAIN = os.environ["AD_DOMAIN"]
AD_BASEDN = os.environ["AD_BASEDN"]
class ActiveDirectoryDAO:
def __init__(self):
pass
def authenticate(self, username, password, auth_users, auth_groups):
ad_groups = []
try:
ad_groups = self.fetch_ad_groups(username, password)
except LDAPBindError as bindErr:
log.error("binding user failed: [%s] (%s)", username, bindErr)
return False
except LDAPException as ldapErr:
log.error("ldap error: [%s]", ldapErr)
return False
except Exception as err:
log.error("general error: [%s]", err)
return False
if auth_users is not None and len(auth_users) > 0 and auth_groups is not None and len(auth_groups) > 0:
return self.check_user(username, auth_users) or self.check_groups(ad_groups, auth_groups)
if auth_groups is not None and len(auth_groups) > 0:
return self.check_groups(ad_groups, auth_groups)
if auth_users is not None and len(auth_users) > 0:
return self.check_user(username, auth_users)
return True
@cached(cache=TTLCache(maxsize=1024, ttl=1800))
def fetch_ad_groups(self, username, password):
log.debug("binding user: [%s]", username)
tls = Tls(validate=ssl.CERT_NONE, version=ssl.PROTOCOL_TLSv1_2)
s = Server(host=AD_HOST, port=int(AD_PORT), tls=tls, get_info="ALL")
c = Connection(
s,
user=f"{AD_DOMAIN}\\{username}",
password=password,
auto_bind=True,
version=3,
authentication='SIMPLE',
client_strategy='SYNC',
auto_referrals=True,
check_names=True,
read_only=True,
lazy=False,
raise_exceptions=False)
c.start_tls()
log.debug("tls status: %s", c.tls_started)
c.bind()
# fetch user object and extract groups
log.debug("searching for entry: sAMAccountName=%s", username)
ad_basedns = AD_BASEDN.split("|")
if len(ad_basedns) == 0:
raise Exception("no base dns configured")
found_user = False
for ad_basedn in ad_basedns:
c.search(search_base=ad_basedn,
search_filter=f"(sAMAccountName={username})",
search_scope=SUBTREE,
attributes=["memberOf"])
# just a check... should not be 0 unless they are not in the active container
if len(c.response) > 0:
found_user = True
break
if not found_user:
log.warn("no entities in ldap search response")
raise Exception("no entities in ldap search response")
# collect ad groups
ad_groups = []
for entry in c.response:
ad_groups += entry["attributes"]["memberOf"]
if log.getLogger().isEnabledFor(log.DEBUG):
for ad_group in ad_groups:
log.debug("ad_group: %s", ad_group)
return ad_groups
@staticmethod
def check_user(username, auth_users):
if auth_users is None or len(auth_users) == 0:
return True
return username.lower() in auth_users
@staticmethod
def check_groups(ad_groups, auth_groups):
if auth_groups is None or len(auth_groups) == 0:
return True
auth_group_names = [auth_group.lower() for auth_group in auth_groups]
ad_group_names = []
for ad_group in ad_groups:
match = re.search("cn=(.*?),.*", ad_group.lower())
if match:
ad_group_names.append(match.group(1))
log.debug("matched: %s from %s", match.group(1), match.group(0))
log.debug("common groups: %s", set(auth_group_names).intersection(ad_group_names))
return len(set(auth_group_names).intersection(ad_group_names)) > 0