Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix parsing of search response #294

Merged
merged 8 commits into from
Nov 5, 2024
94 changes: 55 additions & 39 deletions ldapauthenticator/ldapauthenticator.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,38 +455,49 @@ def resolve_username(self, username_supplied_by_user):
search_filter=search_filter,
attributes=[self.lookup_dn_user_dn_attribute],
)
response = conn.response
if len(response) == 0:
self.log.warning(
f"Failed to lookup a DN for username '{username_supplied_by_user}'"
)
return (None, None)
if len(response) > 1:
self.log.warning(
f"Failed to lookup a unique DN for username '{username_supplied_by_user}'"
)

# identify unique search response entry
n_entries = len(conn.entries)
if n_entries == 0:
self.log.warning(f"No response looking up '{username_supplied_by_user}'")
return (None, None)
if "attributes" not in response[0].keys():
self.log.warning(
f"Failed to lookup attribute '{self.lookup_dn_user_dn_attribute}' for username '{username_supplied_by_user}'"
if n_entries > 1:
self.log.error(
f"Looking up '{username_supplied_by_user}' gave multiple entries, "
f"expected 0 or 1 search response entries but received {n_entries}. "
"Is lookup_dn_search_filter and user_attribute configured to get a "
"unique match?"
)
return (None, None)
entry = conn.entries[0]

userdn = response[0]["dn"]
username = response[0]["attributes"][self.lookup_dn_user_dn_attribute]
if isinstance(username, list):
if len(username) == 0:
return (None, None)
elif len(username) == 1:
username = username[0]
# identify unique attribute value within the entry
attribute_values = entry.entry_attributes_as_dict.get(
self.lookup_dn_user_dn_attribute
)
if not attribute_values:
if attribute_values is None:
self.log.error(
f"No attribute '{self.lookup_dn_user_dn_attribute}' found. "
"Is lookup_dn_user_dn_attribute configured correctly?"
)
else:
self.log.error(
f"A lookup of the username '{username_supplied_by_user}' returned a list "
f"of entries for the attribute '{self.lookup_dn_user_dn_attribute}': "
f"({', '.join(username)})"
f"No attribute values for '{self.lookup_dn_user_dn_attribute}'. "
"Is lookup_dn_user_dn_attribute configured correctly?"
)
return None, None
return (None, None)
if len(attribute_values) > 1:
self.log.error(
f"Attribute '{self.lookup_dn_user_dn_attribute}' had multiple values, "
f"expected one attribute value but it had {len(attribute_values)} "
f"({';'.join(attribute_values)}). "
"Is lookup_dn_user_dn_attribute configured correctly?"
)
return None, None

userdn = entry.entry_dn
username = attribute_values[0]
return (username, userdn)

def get_connection(self, userdn, password):
Expand Down Expand Up @@ -539,17 +550,24 @@ def get_connection(self, userdn, password):
return conn

def get_user_attributes(self, conn, userdn):
attrs = {}
if self.auth_state_attributes:
found = conn.search(
conn.search(
search_base=userdn,
search_scope=ldap3.SUBTREE,
search_filter="(objectClass=*)",
attributes=self.auth_state_attributes,
)
if found:
attrs = conn.entries[0].entry_attributes_as_dict
return attrs

# identify unique search response entry
n_entries = len(conn.entries)
if n_entries == 1:
return conn.entries[0].entry_attributes_as_dict
self.log.error(
f"Expected 1 but got {n_entries} search response entries for DN '{userdn}' "
"when looking up attributes configured via auth_state_attributes. The user's "
"auth state will not include any attributes."
)
return {}

async def authenticate(self, handler, data):
"""
Expand Down Expand Up @@ -584,6 +602,9 @@ async def authenticate(self, handler, data):
if self.lookup_dn:
resolved_username, resolved_dn = self.resolve_username(login_username)
if not resolved_dn:
self.log.warning(
"username:%s Login denied for failed lookup", login_username
)
return None
if not bind_dn_template:
bind_dn_template = [resolved_dn]
Expand Down Expand Up @@ -632,18 +653,13 @@ async def authenticate(self, handler, data):
),
attributes=self.attributes,
)
n_users = len(conn.response)
if n_users == 0:
self.log.warning(
"Configured search_filter found no user associated with "
f"userattr='{self.user_attribute}' and username='{resolved_username}'"
)
return None
if n_users > 1:
n_entries = len(conn.entries)
if n_entries != 1:
self.log.warning(
"Configured search_filter found multiple users associated with "
f"Login of '{login_username}' denied. Configured search_filter "
f"found {n_entries} users associated with "
f"userattr='{self.user_attribute}' and username='{resolved_username}', "
"a unique match is required."
"and a unique match is required."
)
return None

Expand Down