Skip to content

Commit

Permalink
🔊 Better exception handling for EntraIDUsers
Browse files Browse the repository at this point in the history
  • Loading branch information
jemrobinson committed May 9, 2024
1 parent 68ccc32 commit 3d1fc47
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 65 deletions.
190 changes: 126 additions & 64 deletions data_safe_haven/administration/users/entra_id_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@
from collections.abc import Sequence
from typing import Any

from data_safe_haven.exceptions import DataSafeHavenMicrosoftGraphError
from data_safe_haven.exceptions import (
DataSafeHavenEntraIDError,
DataSafeHavenError,
DataSafeHavenInputError,
)
from data_safe_haven.external import GraphApi
from data_safe_haven.functions import password
from data_safe_haven.utility import LoggingSingleton
Expand All @@ -25,81 +29,139 @@ def __init__(
self.logger = LoggingSingleton()

def add(self, new_users: Sequence[ResearchUser]) -> None:
"""Add list of users to Entra ID"""
# Get the default domain
default_domain = next(
domain["id"]
for domain in self.graph_api.read_domains()
if domain["isDefault"]
)
for user in new_users:
request_json = {
"accountEnabled": user.account_enabled,
"displayName": user.display_name,
"givenName": user.given_name,
"surname": user.surname,
"mailNickname": user.username,
"passwordProfile": {"password": password(20)},
"userPrincipalName": f"{user.username}@{default_domain}",
}
if user.email_address and user.phone_number:
"""
Add list of users to Entra ID
Raises:
DataSafeHavenEntraIDError if any user could not be created
"""
try:
default_domain = next(
domain["id"]
for domain in self.graph_api.read_domains()
if domain["isDefault"]
)
for user in new_users:
request_json = {
"accountEnabled": user.account_enabled,
"displayName": user.display_name,
"givenName": user.given_name,
"surname": user.surname,
"mailNickname": user.username,
"passwordProfile": {"password": password(20)},
"userPrincipalName": f"{user.username}@{default_domain}",
}
if not user.email_address:
msg = (
f"User '[green]{user.username}[/]' is missing an email address."
)
raise DataSafeHavenInputError(msg)
if not user.phone_number:
msg = f"User '[green]{user.username}[/]' is missing a phone number."
raise DataSafeHavenInputError(msg)
self.graph_api.create_user(
request_json, user.email_address, user.phone_number
)
self.logger.info(
f"Ensured user '[green]{user.preferred_username}[/]' exists in Entra ID"
)
self.logger.info(
f"Ensured user '[green]{user.preferred_username}[/]' exists in Entra ID"
)
except DataSafeHavenError as exc:
msg = f"Unable to add users to Entra ID.\n{exc}"
raise DataSafeHavenEntraIDError(msg) from exc

def list(self) -> Sequence[ResearchUser]:
user_list = self.graph_api.read_users()
return [
ResearchUser(
account_enabled=user_details["accountEnabled"],
email_address=user_details["mail"],
given_name=user_details["givenName"],
phone_number=(
user_details["businessPhones"][0]
if len(user_details["businessPhones"])
else None
),
sam_account_name=(
user_details["onPremisesSamAccountName"]
if user_details["onPremisesSamAccountName"]
else user_details["mailNickname"]
),
surname=user_details["surname"],
user_principal_name=user_details["userPrincipalName"],
)
for user_details in user_list
]
"""
List available Entra ID users
Raises:
DataSafeHavenEntraIDError if users could not be loaded
"""
try:
user_list = self.graph_api.read_users()
return [
ResearchUser(
account_enabled=user_details["accountEnabled"],
email_address=user_details["mail"],
given_name=user_details["givenName"],
phone_number=(
user_details["businessPhones"][0]
if len(user_details["businessPhones"])
else None
),
sam_account_name=(
user_details["onPremisesSamAccountName"]
if user_details["onPremisesSamAccountName"]
else user_details["mailNickname"]
),
surname=user_details["surname"],
user_principal_name=user_details["userPrincipalName"],
)
for user_details in user_list
]
except DataSafeHavenError as exc:
msg = f"Unable list Entra ID users.\n{exc}"
raise DataSafeHavenEntraIDError(msg) from exc

def register(self, sre_name: str, usernames: Sequence[str]) -> None:
"""Add usernames to SRE group"""
group_name = f"Data Safe Haven SRE {sre_name} Users"
for username in usernames:
self.graph_api.add_user_to_group(username, group_name)
"""
Add usernames to SRE group
Raises:
DataSafeHavenEntraIDError if any user could not be added to the group.
"""
try:
group_name = f"Data Safe Haven SRE {sre_name} Users"
for username in usernames:
self.graph_api.add_user_to_group(username, group_name)
except DataSafeHavenError as exc:
msg = f"Unable add users to group '{group_name}'.\n{exc}"
raise DataSafeHavenEntraIDError(msg) from exc

def remove(self, users: Sequence[ResearchUser]) -> None:
"""Remove list of users from Entra ID"""
for user in filter(
lambda existing_user: any(existing_user == user for user in users),
self.list(),
):
try:
"""
Remove list of users from Entra ID
Raises:
DataSafeHavenEntraIDError if any user could not be removed.
"""
try:
for user in filter(
lambda existing_user: any(existing_user == user for user in users),
self.list(),
):
self.graph_api.remove_user(user.username)
self.logger.info(f"Removed '{user.preferred_username}'.")
except DataSafeHavenMicrosoftGraphError:
self.logger.error(f"Unable to remove '{user.preferred_username}'.")
except DataSafeHavenError as exc:
msg = f"Unable to remove users from Entra ID.\n{exc}"
raise DataSafeHavenEntraIDError(msg) from exc

def set(self, users: Sequence[ResearchUser]) -> None:
"""Set Entra ID users to specified list"""
users_to_remove = [user for user in self.list() if user not in users]
self.remove(users_to_remove)
users_to_add = [user for user in users if user not in self.list()]
self.add(users_to_add)
"""
Set Entra ID users to specified list
Raises:
DataSafeHavenEntraIDError if user list could not be set
"""
try:
users_to_remove = [user for user in self.list() if user not in users]
self.remove(users_to_remove)
users_to_add = [user for user in users if user not in self.list()]
self.add(users_to_add)
except DataSafeHavenError as exc:
msg = f"Unable to set desired user list in Entra ID.\n{exc}"
raise DataSafeHavenEntraIDError(msg) from exc

def unregister(self, sre_name: str, usernames: Sequence[str]) -> None:
"""Remove usernames from SRE group"""
group_name = f"Data Safe Haven SRE {sre_name}"
for username in usernames:
self.graph_api.remove_user_from_group(username, group_name)
"""
Remove usernames from SRE group
Raises:
DataSafeHavenEntraIDError if any user could not be added to the group.
"""
try:
group_name = f"Data Safe Haven SRE {sre_name}"
for username in usernames:
self.graph_api.remove_user_from_group(username, group_name)
except DataSafeHavenError as exc:
msg = f"Unable to remove users from group {group_name}.\n{exc}"
raise DataSafeHavenEntraIDError(msg) from exc
4 changes: 4 additions & 0 deletions data_safe_haven/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ class DataSafeHavenConfigError(DataSafeHavenError):
pass


class DataSafeHavenEntraIDError(DataSafeHavenCloudError):
pass


class DataSafeHavenInputError(DataSafeHavenError):
pass

Expand Down
2 changes: 1 addition & 1 deletion data_safe_haven/external/api/graph_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ def add_user_to_group(
"""Add a user to a group
Raises:
DataSafeHavenMicrosoftGraphError if the token could not be created
DataSafeHavenMicrosoftGraphError if the user could not be added to the group.
"""
try:
user_id = self.get_id_from_username(username)
Expand Down

0 comments on commit 3d1fc47

Please sign in to comment.