From 3d1fc4768e3091aa4de2d71e7c31673b0f2bf717 Mon Sep 17 00:00:00 2001 From: James Robinson Date: Thu, 9 May 2024 11:47:13 +0100 Subject: [PATCH] :loud_sound: Better exception handling for EntraIDUsers --- .../administration/users/entra_id_users.py | 190 ++++++++++++------ data_safe_haven/exceptions/__init__.py | 4 + data_safe_haven/external/api/graph_api.py | 2 +- 3 files changed, 131 insertions(+), 65 deletions(-) diff --git a/data_safe_haven/administration/users/entra_id_users.py b/data_safe_haven/administration/users/entra_id_users.py index 870f5844ec..7a99de4d5a 100644 --- a/data_safe_haven/administration/users/entra_id_users.py +++ b/data_safe_haven/administration/users/entra_id_users.py @@ -3,7 +3,11 @@ from collections.abc import Sequence from typing import Any -from data_safe_haven.exceptions import DataSafeHavenMicrosoftGraphError +from data_safe_haven.exceptions import ( + DataSafeHavenEntraIDError, + DataSafeHavenError, + DataSafeHavenInputError, +) from data_safe_haven.external import GraphApi from data_safe_haven.functions import password from data_safe_haven.utility import LoggingSingleton @@ -25,81 +29,139 @@ def __init__( self.logger = LoggingSingleton() def add(self, new_users: Sequence[ResearchUser]) -> None: - """Add list of users to Entra ID""" - # Get the default domain - default_domain = next( - domain["id"] - for domain in self.graph_api.read_domains() - if domain["isDefault"] - ) - for user in new_users: - request_json = { - "accountEnabled": user.account_enabled, - "displayName": user.display_name, - "givenName": user.given_name, - "surname": user.surname, - "mailNickname": user.username, - "passwordProfile": {"password": password(20)}, - "userPrincipalName": f"{user.username}@{default_domain}", - } - if user.email_address and user.phone_number: + """ + Add list of users to Entra ID + + Raises: + DataSafeHavenEntraIDError if any user could not be created + """ + try: + default_domain = next( + domain["id"] + for domain in self.graph_api.read_domains() + if domain["isDefault"] + ) + for user in new_users: + request_json = { + "accountEnabled": user.account_enabled, + "displayName": user.display_name, + "givenName": user.given_name, + "surname": user.surname, + "mailNickname": user.username, + "passwordProfile": {"password": password(20)}, + "userPrincipalName": f"{user.username}@{default_domain}", + } + if not user.email_address: + msg = ( + f"User '[green]{user.username}[/]' is missing an email address." + ) + raise DataSafeHavenInputError(msg) + if not user.phone_number: + msg = f"User '[green]{user.username}[/]' is missing a phone number." + raise DataSafeHavenInputError(msg) self.graph_api.create_user( request_json, user.email_address, user.phone_number ) - self.logger.info( - f"Ensured user '[green]{user.preferred_username}[/]' exists in Entra ID" - ) + self.logger.info( + f"Ensured user '[green]{user.preferred_username}[/]' exists in Entra ID" + ) + except DataSafeHavenError as exc: + msg = f"Unable to add users to Entra ID.\n{exc}" + raise DataSafeHavenEntraIDError(msg) from exc def list(self) -> Sequence[ResearchUser]: - user_list = self.graph_api.read_users() - return [ - ResearchUser( - account_enabled=user_details["accountEnabled"], - email_address=user_details["mail"], - given_name=user_details["givenName"], - phone_number=( - user_details["businessPhones"][0] - if len(user_details["businessPhones"]) - else None - ), - sam_account_name=( - user_details["onPremisesSamAccountName"] - if user_details["onPremisesSamAccountName"] - else user_details["mailNickname"] - ), - surname=user_details["surname"], - user_principal_name=user_details["userPrincipalName"], - ) - for user_details in user_list - ] + """ + List available Entra ID users + + Raises: + DataSafeHavenEntraIDError if users could not be loaded + """ + try: + user_list = self.graph_api.read_users() + return [ + ResearchUser( + account_enabled=user_details["accountEnabled"], + email_address=user_details["mail"], + given_name=user_details["givenName"], + phone_number=( + user_details["businessPhones"][0] + if len(user_details["businessPhones"]) + else None + ), + sam_account_name=( + user_details["onPremisesSamAccountName"] + if user_details["onPremisesSamAccountName"] + else user_details["mailNickname"] + ), + surname=user_details["surname"], + user_principal_name=user_details["userPrincipalName"], + ) + for user_details in user_list + ] + except DataSafeHavenError as exc: + msg = f"Unable list Entra ID users.\n{exc}" + raise DataSafeHavenEntraIDError(msg) from exc def register(self, sre_name: str, usernames: Sequence[str]) -> None: - """Add usernames to SRE group""" - group_name = f"Data Safe Haven SRE {sre_name} Users" - for username in usernames: - self.graph_api.add_user_to_group(username, group_name) + """ + Add usernames to SRE group + + Raises: + DataSafeHavenEntraIDError if any user could not be added to the group. + """ + try: + group_name = f"Data Safe Haven SRE {sre_name} Users" + for username in usernames: + self.graph_api.add_user_to_group(username, group_name) + except DataSafeHavenError as exc: + msg = f"Unable add users to group '{group_name}'.\n{exc}" + raise DataSafeHavenEntraIDError(msg) from exc def remove(self, users: Sequence[ResearchUser]) -> None: - """Remove list of users from Entra ID""" - for user in filter( - lambda existing_user: any(existing_user == user for user in users), - self.list(), - ): - try: + """ + Remove list of users from Entra ID + + Raises: + DataSafeHavenEntraIDError if any user could not be removed. + """ + try: + for user in filter( + lambda existing_user: any(existing_user == user for user in users), + self.list(), + ): self.graph_api.remove_user(user.username) self.logger.info(f"Removed '{user.preferred_username}'.") - except DataSafeHavenMicrosoftGraphError: - self.logger.error(f"Unable to remove '{user.preferred_username}'.") + except DataSafeHavenError as exc: + msg = f"Unable to remove users from Entra ID.\n{exc}" + raise DataSafeHavenEntraIDError(msg) from exc def set(self, users: Sequence[ResearchUser]) -> None: - """Set Entra ID users to specified list""" - users_to_remove = [user for user in self.list() if user not in users] - self.remove(users_to_remove) - users_to_add = [user for user in users if user not in self.list()] - self.add(users_to_add) + """ + Set Entra ID users to specified list + + Raises: + DataSafeHavenEntraIDError if user list could not be set + """ + try: + users_to_remove = [user for user in self.list() if user not in users] + self.remove(users_to_remove) + users_to_add = [user for user in users if user not in self.list()] + self.add(users_to_add) + except DataSafeHavenError as exc: + msg = f"Unable to set desired user list in Entra ID.\n{exc}" + raise DataSafeHavenEntraIDError(msg) from exc def unregister(self, sre_name: str, usernames: Sequence[str]) -> None: - """Remove usernames from SRE group""" - group_name = f"Data Safe Haven SRE {sre_name}" - for username in usernames: - self.graph_api.remove_user_from_group(username, group_name) + """ + Remove usernames from SRE group + + Raises: + DataSafeHavenEntraIDError if any user could not be added to the group. + """ + try: + group_name = f"Data Safe Haven SRE {sre_name}" + for username in usernames: + self.graph_api.remove_user_from_group(username, group_name) + except DataSafeHavenError as exc: + msg = f"Unable to remove users from group {group_name}.\n{exc}" + raise DataSafeHavenEntraIDError(msg) from exc diff --git a/data_safe_haven/exceptions/__init__.py b/data_safe_haven/exceptions/__init__.py index 2a519ec604..3ef0bbcac8 100644 --- a/data_safe_haven/exceptions/__init__.py +++ b/data_safe_haven/exceptions/__init__.py @@ -10,6 +10,10 @@ class DataSafeHavenConfigError(DataSafeHavenError): pass +class DataSafeHavenEntraIDError(DataSafeHavenCloudError): + pass + + class DataSafeHavenInputError(DataSafeHavenError): pass diff --git a/data_safe_haven/external/api/graph_api.py b/data_safe_haven/external/api/graph_api.py index 968118be1a..b55677a99c 100644 --- a/data_safe_haven/external/api/graph_api.py +++ b/data_safe_haven/external/api/graph_api.py @@ -137,7 +137,7 @@ def add_user_to_group( """Add a user to a group Raises: - DataSafeHavenMicrosoftGraphError if the token could not be created + DataSafeHavenMicrosoftGraphError if the user could not be added to the group. """ try: user_id = self.get_id_from_username(username)