diff --git a/sssd_test_framework/roles/ad.py b/sssd_test_framework/roles/ad.py index ebd30120..0226d0f4 100644 --- a/sssd_test_framework/roles/ad.py +++ b/sssd_test_framework/roles/ad.py @@ -1753,10 +1753,10 @@ def link( args: list[str] | str | None = None, ) -> GPO: """ - Link the group policy to the a target object inside the directory, a site, domain or an ou. + Link the group policy to the target object inside the directory, a site, domain or an ou. ..Note:: - The New and Set cmdlets are identical. To modify an an existing link, + The New and Set cmdlets are identical. To modify an existing link, change the $op parameter to "Set", i.e. to disable 'Enforced' ou_policy.link("Set", args=["-Enforced No"]) diff --git a/sssd_test_framework/roles/generic.py b/sssd_test_framework/roles/generic.py index b1892f63..104eb882 100644 --- a/sssd_test_framework/roles/generic.py +++ b/sssd_test_framework/roles/generic.py @@ -24,6 +24,7 @@ "GenericAutomount", "GenericAutomountMap", "GenericAutomountKey", + "GenericGPO", ] @@ -288,6 +289,14 @@ def fqn(self, name: str) -> str: def firewall(self) -> Firewall: pass + @property + @abstractmethod + def gpo(self) -> GenericGPO: + """ + Generic GPO management. + """ + pass + class GenericUser(ABC, BaseObject): """ @@ -961,3 +970,66 @@ def dump(self) -> str: @abstractmethod def __str__(self) -> str: pass + + +class GenericGPO(ABC, object): + """ + Generic GPO management. + """ + + @abstractmethod + def get(self, key: str) -> str | None: + """ + Get GPO attribute. + + :param key: GPO key value. + :type key: str + :return: GPO key value. + :rtype: str | None + """ + pass + + @abstractmethod + def delete(self) -> None: + """ + Delete GPO. + """ + pass + + @abstractmethod + def add(self) -> GenericGPO: + """ + Add GPO. + """ + pass + + @abstractmethod + def link(self, + op: str | None = "New", + target: str | None = None, + args: list[str] | str | None = None) -> GenericGPO: + """ + Link GPO. + """ + pass + + @abstractmethod + def unlink(self) -> None: + """ + Unlink GPO. + """ + pass + + @abstractmethod + def permissions(self, target: str, permission_level: str, target_type: str | None = "Group") -> GenericGPO: + """ + Configure GPO permissions. + """ + pass + + @abstractmethod + def policy(self, logon_rights: dict[str, list[GenericUser]], cfg: dict[str, Any] | None = None) -> GenericGPO: + """ + GPO configuration. + """ + pass diff --git a/sssd_test_framework/roles/samba.py b/sssd_test_framework/roles/samba.py index 4333e366..52e3b349 100644 --- a/sssd_test_framework/roles/samba.py +++ b/sssd_test_framework/roles/samba.py @@ -5,6 +5,8 @@ from typing import Any, TypeAlias import ldap.modlist +import configparser +import os.path from pytest_mh.cli import CLIBuilderArgs from pytest_mh.ssh import SSHProcessResult @@ -18,11 +20,13 @@ __all__ = [ "Samba", "SambaObject", + "SambaComputer", "SambaUser", "SambaGroup", "SambaOrganizationalUnit", "SambaAutomount", "SambaSudoRule", + "GPO", ] @@ -225,6 +229,63 @@ def test_example_netgroup(client: Client, samba: Samba): """ return SambaNetgroup(self, name, basedn) + def computer(self, name: str) -> SambaComputer: + """ + Get computer object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.AD) + def test_example(client: Client, samba: Samba): + # Create OU + ou = ad.ou("test").add().dn + # Move computer object + ad.computer(client.host.hostname.split(".")[0]).move(ou) + + client.sssd.start() + + :param name: Computer name. + :type name: str + :return: New computer object. + :rtype: ADComputer + """ + return SambaComputer(self, name) + + def gpo(self, name: str) -> GPO: + """ + Get group policy object. + + .. code-block:: python + :caption: Example usage + + @pytest.mark.topology(KnownTopology.AD) + def test_ad__gpo_is_set_to_enforcing(client: Client, samba: Samba): + user = ad.user("user").add() + allow_user = ad.user("allow_user").add() + deny_user = ad.user("deny_user").add() + + ad.gpo("test policy").add().policy( + { + "SeInteractiveLogonRight": [allow_user, ad.group("Domain Admins")], + "SeRemoteInteractiveLogonRight": [allow_user, ad.group("Domain Admins")], + "SeDenyInteractiveLogonRight": [deny_user], + "SeDenyRemoteInteractiveLogonRight": [deny_user], + } + ).link() + + client.sssd.domain["ad_gpo_access_control"] = "enforcing" + client.sssd.start() + + assert client.auth.ssh.password(username="allow_user", password="Secret123") + assert not client.auth.ssh.password(username="user", password="Secret123") + assert not client.auth.ssh.password(username="deny_user", password="Secret123") + + :param name: Name of the GPO. + :type name: str + """ + return GPO(self, name) + def ou(self, name: str, basedn: LDAPObject | str | None = None) -> SambaOrganizationalUnit: """ Get organizational unit object. @@ -313,6 +374,8 @@ def __init__(self, role: Samba, command: str, name: str) -> None: self.__dn: str | None = None + self.__sid: str | None = None + def _exec(self, op: str, args: list[str] | None = None, **kwargs) -> SSHProcessResult: """ Execute samba-tool command. @@ -412,6 +475,18 @@ def dn(self) -> str: self.__dn = obj.pop("dn")[0] return self.__dn + @property + def sid(self) -> str: + """ + Object's security identifier. + """ + if self.__sid is not None: + return self.__sid + + obj = self.get(["objectSid"]) + self.__sid = obj.pop("objectSid")[0] + return self.__sid + class SambaUser(SambaObject): """ @@ -679,6 +754,245 @@ def __get_member_args(self, members: list[SambaUser | SambaGroup]) -> list[str]: return [",".join([x.name for x in members])] +class SambaComputer(SambaObject): + """ + AD computer management. + """ + + def __init__(self, role: Samba, name: str) -> None: + """ + :param role: AD role object. + :type role: AD + :param name: Computer name. + :type name: str + """ + super().__init__(role, "Computer", name) + + def move(self, target: str) -> SambaComputer: + """ + Move a computer object. + :param target: Target path. + :type target: str + + :return: Self. + :rtype: SambaComputer + """ + if self.name.startswith("cn"): + _name = self.name.split(",")[0].split("=")[1] + self._exec("Move", [self.name, target]) + + return self + + +class GPO(BaseObject[SambaHost, Samba]): + """ + Group policy object management. + """ + + def __init__(self, role: Samba, name: str) -> None: + """ + :param name: GPO name, defaults to 'Domain Test Policy' + :type name: str, optional + """ + super().__init__(role) + + self.name: str = name + """Group policy display name.""" + + self.target: str | None = None + """Group policy target.""" + + self._search_base: str = f"cn=policies,cn=system,{self.role.host.naming_context}" + """Group policy search base.""" + + self._dn = self.get("dn") + """Group policy dn.""" + + self._cn = self.get("GPO") + """Group policy cn.""" + + def get(self, key: str) -> str | None: + """ + Get group policy attributes. + + :param key: Attribute to get. + :type key: str + :return: Key value. + :rtype: str + """ + result = [] + if self.name is not None: + for i in self.role.host.ssh.run("samba-tool gpo listall").stdout_lines: + if "GENSEC" not in i: + result.append(i) + + result = attrs_parse(result, [key]) + + for k, v in result.items(): + if k == self.name: + return v[0] + + return None + + def delete(self) -> None: + """ + Delete group policy object. + """ + self.role.host.ssh.run(f"samba-tool gpo del {self._cn}") + + def add(self) -> GPO: + """ + Add a group policy object. + + :return: Group policy object + :rtype: GPO + """ + self.role.host.ssh.run(f"samba-tool gpo create {self.name}") + + self._cn = self.get("GPO") + self._dn = self.get("dn") + + return self + + def link( + self, + target: str | None = "Default-First-Site-Name", + args: list[str] | str | None = None, + ) -> GPO: + """ + Link the group policy to the target object inside the directory, a site, domain or an ou. + + ..Note:: + The New and Set cmdlets are identical. To modify an existing link, + change the $op parameter to "Set", i.e. to disable 'Enforced' + + ou_policy.link("Set", args=["-Enforced No"]) + + :param target: Group policy target, defaults to 'Default-First-Site-Name' + :type target: str, optional + :param args: Additional arguments + :type args: list[str] | None, optional + :return: Group policy object + :rtype: GPO + :TODO: Need to check args and map them to samba args + """ + if args is None: + args = [] + + if isinstance(args, list): + args = " ".join(args) + elif args is None: + args = "" + + self.target = target + + self.role.host.ssh.run(f"samba-tool gpo setlink {self.target} {self._cn}") + # -UAdministrator --enforce --disable + + return self + + def unlink(self) -> GPO: + """ + Unlink the group policy from the target. + + :return: Group policy object + :rtype: GPO + """ + self.role.host.ssh.run(f"samba-tool gpo dellink {self.target} {self._cn}") + + return self + + def permissions(self, level: str, target_type: str | None = "User") -> GPO: + """ + Configure group policy object permissions. + + :param level: Permission level + :type level: str, values should be 'GpoRead | GpoApply | GpoEdit | GpoEditDeleteModifySecurity | None' + :param target_type: Target type, defaults to 'user' + :type target_type: str, optional, values should be 'user | group | computer' + :return: Group policy object + :rtype: GPO + :TODO: Figure out dsacl and what permissions can we set on the GPO object + """ + self.role.host.ssh.run( + #f'Set-GPPermission -Guid "{self._cn}" -PermissionLevel {level} -Type "{target_type}" -Replace $True' + f"samba-tool dsacl" + ) + + return self + + def policy(self, logon_rights: dict[str, list[SambaObject]]) -> GPO: + """ + Group policy configuration. + + This method does the remaining configuration of the group policy. It updates + 'GptTmpl.inf' with security logon right keys with the SIDs of users and groups + objects. The *Remote* keys can be omitted, in which the corresponding keys values + will then be used. + + To add users and groups to the policy, the SID must be used for the values. The + values need to be prefixed with an '*' and use a comma for a de-limiter, i.e. + `*SID1-2-3-4,*SID-5-6-7-8` + + Additionally, gPCMachineExtensionNames need to be updated in the directory so + the GPO is readable to the client. The value is a list of Client Side + Extensions (CSEs), that is an index of what part of the policy is pushed and + processed by the client. + + :param logon_rights: List of logon rights. + :type logon_rights: dict[str, list[SambaObject]] + :return: Group policy object + :rtype: GPO + """ + _path: str = os.path.join("/var/lib/samba/sysvol/samba.test/Policies/", + self._cn, + "\{MACHINE/Microsoft/Windows\ NT/SecEdit/GptTmpl.inf") + + _keys: list[str] = [ + "SeInteractiveLogonRight", + "SeRemoteInteractiveLogonRight", + "SeDenyInteractiveLogonRight", + "SeDenyRemoteInteractiveLogonRight", + ] + + for i in _keys: + if i not in logon_rights.keys() and i == "SeRemoteInteractiveLogonRight": + logon_rights[i] = logon_rights["SeInteractiveLogonRight"] + if i not in logon_rights.keys() and i == "SeDenyRemoteInteractiveLogonRight": + logon_rights[i] = logon_rights["SeDenyInteractiveLogonRight"] + + for i in _keys: + if i not in logon_rights.keys(): + raise KeyError(f"Expected {i} but got {logon_rights.keys()}") + + _logon_rights: dict[str, Any] = {} + for k, v in logon_rights.items(): + sids: list[str] = [] + for j in v: + sids.append(f"*{j.sid}") + _logon_rights = {**_logon_rights, **{k: ",".join(sids)}} + + config = configparser.ConfigParser = configparser.ConfigParser(interpolation=None) + config.add_section("Unicode") + config.set("Unicode", "Unicode", "yes") + config.add_section("Version") + config.set("Version", "signature", "\"$CHICAGO$\"") + config.set("Version", "Revision", "1") + + for k, v in logon_rights.items(): + _value = "" + for i in enumerate(v): + if i != len(v) - 1: + _value = _value + str(i) + ";" + else: + _value = _value + str(i) + config.set("Privilege Rights", _value) + + self.host.fs.write(_path, config) + + return self + + SambaOrganizationalUnit: TypeAlias = LDAPOrganizationalUnit[SambaHost, Samba] SambaAutomount: TypeAlias = LDAPAutomount[SambaHost, Samba] SambaSudoRule: TypeAlias = LDAPSudoRule[SambaHost, Samba, SambaUser, SambaGroup]