diff --git a/hsm_secrets/config.py b/hsm_secrets/config.py index e8043c6..5b40e4a 100644 --- a/hsm_secrets/config.py +++ b/hsm_secrets/config.py @@ -3,6 +3,7 @@ from dataclasses import dataclass from datetime import datetime import os +import re from pydantic import BaseModel, ConfigDict, HttpUrl, Field, StringConstraints from typing_extensions import Annotated from typing import List, Literal, NewType, Optional, Sequence, Union @@ -49,6 +50,9 @@ def get_domain_bitfield(self, names: set['HSMDomainName']) -> int: assert 0 <= res <= 0xFFFF, f"Domain bitfield out of range: {res}" return res + def find_def(self, id_or_label: Union[int, str], enforce_type: Optional[type] = None) -> 'HSMDefBase': + return _find_def_by_id_or_label(self, id_or_label, enforce_type) + @staticmethod def domain_bitfield_to_nums(bitfield: int) -> set['HSMDomainNum']: return {i+1 for i in range(16) if bitfield & (1 << i)} @@ -123,7 +127,7 @@ class General(NoExtraBaseModel): x509_defaults: 'X509Info' -class HSMKeyBase(NoExtraBaseModel): +class HSMDefBase(NoExtraBaseModel): model_config = ConfigDict(extra="forbid") label: KeyLabel id: KeyID @@ -136,14 +140,14 @@ class HSMKeyBase(NoExtraBaseModel): "none", "sign-pkcs", "sign-pss", "sign-ecdsa", "sign-eddsa", "decrypt-pkcs", "decrypt-oaep", "derive-ecdh", "exportable-under-wrap", "sign-ssh-certificate", "sign-attestation-certificate" ] -class HSMAsymmetricKey(HSMKeyBase): +class HSMAsymmetricKey(HSMDefBase): capabilities: set[AsymmetricCapabilityName] algorithm: AsymmetricAlgorithm # -- Symmetric key models -- SymmetricAlgorithm = Literal["aes128", "aes192", "aes256"] SymmetricCapabilityName = Literal["none", "encrypt-ecb", "decrypt-ecb", "encrypt-cbc", "decrypt-cbc", "exportable-under-wrap"] -class HSMSymmetricKey(HSMKeyBase): +class HSMSymmetricKey(HSMDefBase): capabilities: set[SymmetricCapabilityName] algorithm: SymmetricAlgorithm @@ -159,7 +163,7 @@ class HSMSymmetricKey(HSMKeyBase): "put-opaque", "put-otp-aead-key", "put-template", "put-wrap-key", "randomize-otp-aead", "reset-device", "rewrap-from-otp-aead-key", "rewrap-to-otp-aead-key", "set-option", "sign-attestation-certificate", "sign-ecdsa", "sign-eddsa", "sign-hmac", "sign-pkcs", "sign-pss", "sign-ssh-certificate", "unwrap-data", "verify-hmac", "wrap-data"] -class HSMWrapKey(HSMKeyBase): +class HSMWrapKey(HSMDefBase): capabilities: set[WrapCapabilityName] delegated_capabilities: set[WrapDelegateCapabilityName] algorithm: WrapAlgorithm @@ -167,7 +171,7 @@ class HSMWrapKey(HSMKeyBase): # -- HMAC key models -- HmacAlgorithm = Literal["hmac-sha1", "hmac-sha256", "hmac-sha384", "hmac-sha512"] HmacCapabilityName = Literal["none", "sign-hmac", "verify-hmac", "exportable-under-wrap"] -class HSMHmacKey(HSMKeyBase): +class HSMHmacKey(HSMDefBase): capabilities: set[HmacCapabilityName] algorithm: HmacAlgorithm @@ -194,13 +198,13 @@ class HSMHmacKey(HSMKeyBase): "sign-eddsa", "sign-hmac", "sign-pkcs", "sign-pss", "sign-ssh-certificate", "unwrap-data", "verify-hmac", "wrap-data", "decrypt-ecb", "encrypt-ecb", "decrypt-cbc", "encrypt-cbc", ] -class HSMAuthKey(HSMKeyBase): +class HSMAuthKey(HSMDefBase): capabilities: set[AuthKeyCapabilityName] delegated_capabilities: set[AuthKeyDelegatedCapabilityName] # -- Opaque object models -- OpaqueObjectAlgorithm = Literal["opaque-data", "opaque-x509-certificate"] -class OpaqueObject(HSMKeyBase): +class HSMOpaqueObject(HSMDefBase): algorithm: OpaqueObjectAlgorithm sign_by: Optional[KeyID] # ID of the key to sign the object with (if applicable) @@ -244,7 +248,7 @@ class X509Info(NoExtraBaseModel): class X509Cert(NoExtraBaseModel): key: HSMAsymmetricKey x509_info: Optional[X509Info] = Field(default=None) # If None, use the default values from the global configuration (applies to sub-fields, too) - signed_certs: List[OpaqueObject] = Field(default_factory=list) # Storage for signed certificates + signed_certs: List[HSMOpaqueObject] = Field(default_factory=list) # Storage for signed certificates # ----- Subsystem models ----- @@ -348,6 +352,44 @@ def find_instances(obj: Any, target_type: Type[T]) -> Generator[T, None, None]: return list(find_instances(conf, cls)) +def parse_keyid(key_id: str) -> int: + """ + Parse a key ID from a string in the format '0x1234'. + :raises ValueError: If the key ID is not a hexadecimal number with the '0x' prefix. + """ + if not key_id.startswith('0x'): + raise ValueError(f"Key ID '{key_id}' must be a hexadecimal number with the '0x' prefix.") + return int(key_id.replace('0x',''), 16) + + +def _find_def_by_id_or_label(conf: HSMConfig, id_or_label: Union[int, str], enforce_type: Optional[type] = None) -> HSMDefBase: + """ + Find the configuration object for a given key ID or label. + :raises KeyError: If the key is not found in the configuration file. + """ + # Check and parse the id/label + id = None + if isinstance(id_or_label, str): + if re.match(r'^0x[0-9a-fA-F]+$', id_or_label.strip()): + id = parse_keyid(id_or_label) + elif id_or_label.isdigit(): + raise ValueError(f"Key ID ('{id_or_label}') must be a hexadecimal number with the '0x' prefix.") + elif isinstance(id_or_label, int): + id = id_or_label + if id <= 0 or id >= 0xFFFF: + raise ValueError(f"Key ID '{id}' is out of range (16 bit unsigned integer).") + + # Search by ID or label + for t in [HSMAsymmetricKey, HSMSymmetricKey, HSMWrapKey, HSMHmacKey, HSMAuthKey, HSMOpaqueObject]: + for key in find_config_items_of_class(conf, t): + if (id and key.id == id) or key.label == id_or_label: + if enforce_type and not isinstance(key, enforce_type): + raise ValueError(f"Key '{id_or_label}' is not of the expected type '{enforce_type.__name__}'.") + return key + + raise KeyError(f"Key with ID or label '{id_or_label}' not found in the configuration file.") + + def find_all_config_items_per_type(conf: HSMConfig) -> tuple[dict, dict]: """ @@ -356,14 +398,14 @@ def find_all_config_items_per_type(conf: HSMConfig) -> tuple[dict, dict]: """ import yubihsm.objects # type: ignore [import] - from hsm_secrets.config import HSMAsymmetricKey, HSMSymmetricKey, HSMWrapKey, OpaqueObject, HSMHmacKey, HSMAuthKey + from hsm_secrets.config import HSMAsymmetricKey, HSMSymmetricKey, HSMWrapKey, HSMOpaqueObject, HSMHmacKey, HSMAuthKey config_to_hsm_type = { HSMAuthKey: yubihsm.objects.AuthenticationKey, HSMWrapKey: yubihsm.objects.WrapKey, HSMHmacKey: yubihsm.objects.HmacKey, HSMSymmetricKey: yubihsm.objects.SymmetricKey, HSMAsymmetricKey: yubihsm.objects.AsymmetricKey, - OpaqueObject: yubihsm.objects.Opaque, + HSMOpaqueObject: yubihsm.objects.Opaque, } config_items_per_type: dict = {t: find_config_items_of_class(conf, t) for t in config_to_hsm_type.keys()} # type: ignore return config_items_per_type, config_to_hsm_type diff --git a/hsm_secrets/hsm/__init__.py b/hsm_secrets/hsm/__init__.py index bcff65e..0c513e6 100644 --- a/hsm_secrets/hsm/__init__.py +++ b/hsm_secrets/hsm/__init__.py @@ -5,7 +5,7 @@ import tarfile import click -from hsm_secrets.config import HSMConfig, find_all_config_items_per_type +from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, find_all_config_items_per_type, parse_keyid from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, cli_ui_msg, cli_warn, hsm_generate_asymmetric_key, hsm_generate_hmac_key, hsm_generate_symmetric_key, hsm_obj_exists, hsm_put_derived_auth_key, hsm_put_wrap_key, open_hsm_session, open_hsm_session_with_password, pass_common_args, pretty_fmt_yubihsm_object, prompt_for_secret, pw_check_fromhex @@ -249,15 +249,15 @@ def make_wrap_key(ctx: HsmSecretsCtx): # --------------- @cmd_hsm.command('delete-object') -@click.argument('cert_ids', nargs=-1, type=str, metavar='...') +@click.argument('obj_ids', nargs=-1, type=str, metavar=' ...') @click.option('--alldevs', is_flag=True, help="Delete on all devices") @click.option('--force', is_flag=True, help="Force deletion without confirmation (use with caution)") @pass_common_args -def delete_object(ctx: HsmSecretsCtx, cert_ids: tuple, alldevs: bool, force: bool): - """Delete an object from the YubiHSM +def delete_object(ctx: HsmSecretsCtx, obj_ids: tuple, alldevs: bool, force: bool): + """Delete object(s) from the YubiHSM - Deletes an object with the given ID from the YubiHSM. - YubiHSM2 identifies objects by type in addition to ID, so the command + Deletes an object(s) with the given ID or label from the YubiHSM. + YubiHSM2 can have the same id for different types of objects, so this command asks you to confirm the type of the object before deleting it. With `--force` ALL objects with the given ID will be deleted @@ -266,13 +266,17 @@ def delete_object(ctx: HsmSecretsCtx, cert_ids: tuple, alldevs: bool, force: boo hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] for serial in hsm_serials: with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: - not_found = set(cert_ids) - for id in cert_ids: - id_int = int(id.replace('0x', ''), 16) + not_found = set(obj_ids) + for id_or_label in obj_ids: + try: + id_int = ctx.conf.find_def(id_or_label).id + except KeyError: + cli_warn(f"Object '{id_or_label}' not found in the configuration file. Assuming it's raw ID on the device.") + id_int = parse_keyid(id_or_label) objects = ses.list_objects() for o in objects: if o.id == id_int: - not_found.remove(id) + not_found.remove(id_or_label) if not force: cli_ui_msg("Object found:") cli_ui_msg(pretty_fmt_yubihsm_object(o)) @@ -331,8 +335,8 @@ def compare_config(ctx: HsmSecretsCtx, alldevs: bool, create: bool): if create: need_create = obj is None if need_create: - from hsm_secrets.config import HSMAsymmetricKey, HSMSymmetricKey, HSMWrapKey, OpaqueObject, HSMHmacKey, HSMAuthKey - unsupported_types = (HSMWrapKey, HSMAuthKey, OpaqueObject) + from hsm_secrets.config import HSMAsymmetricKey, HSMSymmetricKey, HSMWrapKey, HSMOpaqueObject, HSMHmacKey, HSMAuthKey + unsupported_types = (HSMWrapKey, HSMAuthKey, HSMOpaqueObject) gear_emoji = click.style("⚙️", fg='cyan') @@ -374,22 +378,22 @@ def compare_config(ctx: HsmSecretsCtx, alldevs: bool, create: bool): @cmd_hsm.command('attest-key') @pass_common_args -@click.argument('cert_id', required=True, type=str, metavar='') +@click.argument('obj_id', required=True, type=str, metavar='') @click.option('--out', '-o', type=click.File('w', encoding='utf8'), help='Output file (default: stdout)', default=click.get_text_stream('stdout')) -def attest_key(ctx: HsmSecretsCtx, cert_id: str, out: click.File): +def attest_key(ctx: HsmSecretsCtx, obj_id: str, out: click.File): """Attest an asymmetric key in the YubiHSM Create an a key attestation certificate, signed by the Yubico attestation key, for the given key ID (in hex). """ from cryptography.hazmat.primitives.serialization import Encoding + id = ctx.conf.find_def(obj_id, HSMAsymmetricKey).id - id = int(cert_id.replace('0x', ''), 16) with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, ctx.hsm_serial) as ses: key = ses.get_object(id, yubihsm.defs.OBJECT.ASYMMETRIC_KEY) assert isinstance(key, yubihsm.objects.AsymmetricKey) if not hsm_obj_exists(key): - raise click.ClickException(f"Key with ID 0x{id:04x} not found in the YubiHSM.") + raise click.ClickException(f"Asymmetric key 0x{id:04x} not found in the YubiHSM.") cert = key.attest() pem = cert.public_bytes(Encoding.PEM).decode('UTF-8') out.write(pem) # type: ignore @@ -493,7 +497,7 @@ def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool): name = tarinfo.name assert name.endswith('.bin'), f"Unexpected file extension in tar archive: '{name}'" assert name.count('--') == 2, f"Unexpected file name format in tar archive: '{name}'" - obj_id = int(name.split('--')[1].replace('0x', ''), 16) + obj_id = parse_keyid(name.split('--')[1]) obj_type = name.split('--')[0] cli_info(f"- Importing object from '{tarinfo.name}'...") diff --git a/hsm_secrets/main.py b/hsm_secrets/main.py index fab7c93..52b12ac 100644 --- a/hsm_secrets/main.py +++ b/hsm_secrets/main.py @@ -5,7 +5,7 @@ from hsm_secrets.ssh import cmd_ssh from hsm_secrets.tls import cmd_tls from hsm_secrets.passwd import cmd_pass -from hsm_secrets.config import HSMConfig, load_hsm_config +from hsm_secrets.config import HSMAuthKey, load_hsm_config from hsm_secrets.user import cmd_user from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_warn, list_yubikey_hsm_creds, pass_common_args, cli_info from hsm_secrets.x509 import cmd_x509 @@ -40,8 +40,8 @@ def cli(ctx: click.Context, config: str|None, quiet: bool, yklabel: str|None, hs --auth-default-admin: Use insecure default auth key (see config). - --auth-password-id : Use password from environment variable - HSM_PASSWORD with the specified auth key ID (hex). + --auth-password-id : Use password from environment variable + HSM_PASSWORD with the specified auth key ID (hex) or label. """ ctx.obj = {'quiet': quiet} # early setup for cli_info and other utils to work @@ -82,7 +82,7 @@ def cli(ctx: click.Context, config: str|None, quiet: bool, yklabel: str|None, hs 'quiet': quiet, 'hsmserial': hsmserial or conf.general.master_device, 'forced_auth_method': None, - 'auth_password_id': int(auth_password_id.replace('0x', ''), 16) if auth_password_id else None, + 'auth_password_id': conf.find_def(auth_password_id, HSMAuthKey).id if auth_password_id else None, 'auth_password': os.getenv("HSM_PASSWORD", None), } diff --git a/hsm_secrets/ssh/__init__.py b/hsm_secrets/ssh/__init__.py index c0d4e46..8d7680e 100644 --- a/hsm_secrets/ssh/__init__.py +++ b/hsm_secrets/ssh/__init__.py @@ -4,7 +4,7 @@ from typing import Sequence import click -from hsm_secrets.config import HSMConfig +from hsm_secrets.config import HSMAsymmetricKey, HSMConfig from hsm_secrets.utils import HsmSecretsCtx, cli_code_info, cli_result, cli_warn, open_hsm_session, pass_common_args from cryptography.hazmat.primitives import _serialization @@ -48,7 +48,7 @@ def get_ca(ctx: HsmSecretsCtx, get_all: bool, cert_ids: Sequence[str]): @cmd_ssh.command('sign-key') @click.option('--out', '-o', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), help="Output file (default: deduce from input)", default=None) -@click.option('--ca', '-c', required=False, help="CA key ID (hex) to sign with. Default: read from config", default=None) +@click.option('--ca', '-c', required=False, help="CA key ID (hex) or label to sign with. Default: read from config", default=None) @click.option('--username', '-u', required=False, help="Key owner's name (for auditing)", default=None) @click.option('--certid', '-n', required=False, help="Explicit certificate ID (default: auto-generated)", default=None) @click.option('--validity', '-t', required=False, default=365*24*60*60, help="Validity period in seconds (default: 1 year)") @@ -74,7 +74,7 @@ def sign_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None, cer from hsm_secrets.ssh.openssh.ssh_certificate import cert_for_ssh_pub_id, str_to_extension from hsm_secrets.key_adapters import make_private_key_adapter - ca_key_id = int(ca.replace('0x',''), 16) if ca else ctx.conf.ssh.default_ca + ca_key_id = ctx.conf.find_def(ca, HSMAsymmetricKey).id if ca else ctx.conf.ssh.default_ca ca_def = [c for c in ctx.conf.ssh.root_ca_keys if c.id == ca_key_id] if not ca_def: diff --git a/hsm_secrets/tls/__init__.py b/hsm_secrets/tls/__init__.py index 362ff82..c2c2d3f 100644 --- a/hsm_secrets/tls/__init__.py +++ b/hsm_secrets/tls/__init__.py @@ -11,7 +11,7 @@ import yubihsm.defs # type: ignore [import] import yubihsm.objects # type: ignore [import] -from hsm_secrets.config import X509CertAttribs, X509Info +from hsm_secrets.config import HSMOpaqueObject, X509CertAttribs, X509Info from hsm_secrets.key_adapters import PrivateKey, make_private_key_adapter from hsm_secrets.utils import HsmSecretsCtx, cli_code_info, cli_info, cli_ui_msg, cli_warn, hsm_obj_exists, open_hsm_session, open_hsm_session_with_yubikey, pass_common_args from hsm_secrets.x509.cert_builder import X509CertBuilder @@ -31,7 +31,7 @@ def cmd_tls(ctx: click.Context): @click.option('--san-ip', '-i', multiple=True, help="IP SAN (Subject Alternative Name)") @click.option('--validity', '-v', default=365, help="Validity period in days") @click.option('--keyfmt', '-f', type=click.Choice(['rsa4096', 'ed25519', 'ecp256', 'ecp384']), default='ecp384', help="Key format") -@click.option('--sign-crt', '-s', type=str, required=False, help="CA ID (hex) to sign with, or 'self'. Default: use config", default=None) +@click.option('--sign-crt', '-s', type=str, required=False, help="CA ID (hex) or label to sign with, or 'self'. Default: use config", default=None) def new_http_server_cert(ctx: HsmSecretsCtx, out: click.Path, common_name: str, san_dns: list[str], san_ip: list[str], validity: int, keyfmt: str, sign_crt: str): """Create a TLS server certificate + key @@ -48,7 +48,7 @@ def new_http_server_cert(ctx: HsmSecretsCtx, out: click.Path, common_name: str, issuer_x509_def = None issuer_cert_id = -1 if (sign_crt or '').strip().lower() != 'self': - issuer_cert_id = int(sign_crt.replace('0x',''), 16) if sign_crt else ctx.conf.tls.default_ca_id + issuer_cert_id = ctx.conf.find_def(sign_crt, HSMOpaqueObject).id if sign_crt else ctx.conf.tls.default_ca_id issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_id) assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_id:04x}" @@ -141,7 +141,7 @@ def new_http_server_cert(ctx: HsmSecretsCtx, out: click.Path, common_name: str, @pass_common_args @click.argument('csr', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), default='-', required=True, metavar='') @click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True), help="Output filename (default: deduce from input)", default=None) -@click.option('--ca', '-c', type=str, required=False, help="CA ID (hex) to sign with. Default: use config", default=None) +@click.option('--ca', '-c', type=str, required=False, help="CA ID (hex) or label to sign with. Default: use config", default=None) @click.option('--validity', '-v', default=365, help="Validity period in days") def sign_csr(ctx: HsmSecretsCtx, csr: click.Path, out: click.Path|None, ca: str|None, validity: int): """Sign a CSR with a CA key @@ -160,7 +160,7 @@ def sign_csr(ctx: HsmSecretsCtx, csr: click.Path, out: click.Path|None, ca: str| csr_obj = cryptography.x509.load_pem_x509_csr(csr_data) # Find the issuer CA definition - issuer_cert_id = int(ca.replace('0x',''), 16) if ca else ctx.conf.tls.default_ca_id + issuer_cert_id = ctx.conf.find_def(ca, HSMOpaqueObject).id if ca else ctx.conf.tls.default_ca_id issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_id) assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_id:04x}" diff --git a/hsm_secrets/user/__init__.py b/hsm_secrets/user/__init__.py index 4397fa0..3433f36 100644 --- a/hsm_secrets/user/__init__.py +++ b/hsm_secrets/user/__init__.py @@ -1,6 +1,7 @@ import re import secrets import click +from hsm_secrets.config import HSMAuthKey from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_info, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, hsm_put_derived_auth_key, hsm_put_symmetric_auth_key, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, secure_display_secret import yubikit.hsmauth @@ -122,10 +123,10 @@ def add_user_yubikey(ctx: HsmSecretsCtx, label: str, alldevs: bool): @cmd_user.command('add-service-account') @pass_common_args -@click.argument('cert_ids', nargs=-1, type=str, metavar='...') +@click.argument('obj_ids', nargs=-1, type=str, metavar='...') @click.option('--all', '-a', 'all_accts', is_flag=True, help="Add all configured service users") @click.option('--askpw', is_flag=True, help="Ask for password(s) instead of generating") -def add_service_account(ctx: HsmSecretsCtx, cert_ids: tuple[str], all_accts: bool, askpw: bool): +def add_service_account(ctx: HsmSecretsCtx, obj_ids: tuple[str], all_accts: bool, askpw: bool): """Add a service user(s) to master device Cert IDs are 16-bit hex values (e.g. '0x12af' or '12af'). @@ -135,11 +136,11 @@ def add_service_account(ctx: HsmSecretsCtx, cert_ids: tuple[str], all_accts: boo The command will generate (and show) passwords by default. Use the --askpw to be prompted for passwords instead. """ - if not all_accts and not cert_ids: + if not all_accts and not obj_ids: raise click.ClickException("No service users specified for addition.") - id_strings = [str(x.id) for x in ctx.conf.service_keys] if all_accts else cert_ids - ids = [int(id.replace("0x", ""), 16) for id in id_strings] + id_strings = [str(x.id) for x in ctx.conf.service_keys] if all_accts else obj_ids + ids = [ctx.conf.find_def(id, HSMAuthKey).id for id in id_strings] if not ids: raise click.ClickException("No service account ids specified.") diff --git a/hsm_secrets/x509/__init__.py b/hsm_secrets/x509/__init__.py index e018647..2a6159c 100644 --- a/hsm_secrets/x509/__init__.py +++ b/hsm_secrets/x509/__init__.py @@ -1,5 +1,6 @@ from pathlib import Path from textwrap import indent +from typing import cast from cryptography import x509 from yubihsm.core import AuthSession # type: ignore [import] @@ -7,7 +8,7 @@ import yubihsm.defs # type: ignore [import] from cryptography.hazmat.primitives import serialization -from hsm_secrets.config import HSMConfig, KeyID, OpaqueObject, X509Cert, find_config_items_of_class +from hsm_secrets.config import HSMConfig, KeyID, HSMOpaqueObject, X509Cert, find_config_items_of_class from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_result, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, hsm_obj_exists, open_hsm_session, cli_code_info, pass_common_args, cli_info @@ -52,46 +53,38 @@ def create_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_ids @click.option('--all', '-a', 'all_certs', is_flag=True, help="Get all certificates") @click.option('--outdir', '-o', type=click.Path(), required=False, help="Write PEMs into files here") @click.option('--bundle', '-b', type=click.Path(), required=False, help="Write a single PEM bundle file") -@click.argument('cert_ids', nargs=-1, type=str, metavar='...') +@click.argument('cert_ids', nargs=-1, type=str, metavar='...') def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle: str|None, cert_ids: tuple): """Get certificate(s) from the HSM - You can specify multiple IDs to get multiple certificates, + You can specify multiple IDs/labels to get multiple certificates, or use the --all flag to get all certificates defined in the config. Specify --bundle to get a single PEM file with all selected certificates. """ if outdir and bundle: raise click.ClickException("Error: --outdir and --bundle options are mutually exclusive.") - cert_def_for_id = {c.id: c for c in find_config_items_of_class(ctx.conf, OpaqueObject)} - all_cert_ids = [f"0x{c.id:04x}" for c in cert_def_for_id.values()] - - selected_ids = all_cert_ids if all_certs else list(cert_ids) - if len(set(selected_ids) - set(all_cert_ids)) > 0: - raise click.ClickException(f"Unknown certificate ID(s): {set(selected_ids) - set(all_cert_ids)}.\nValid IDs are: {all_cert_ids}") - if not selected_ids: - raise click.ClickException("No certificates specified for retrieval.") + all_cert_defs: list[HSMOpaqueObject] = find_config_items_of_class(ctx.conf, HSMOpaqueObject) + selected_certs = all_cert_defs if all_certs else [cast(HSMOpaqueObject, ctx.conf.find_def(id, HSMOpaqueObject)) for id in cert_ids] - for cd in [cert_def_for_id[int(id.replace("0x", ""), 16)] for id in selected_ids]: + for cd in selected_certs: cli_info(f"- Fetching PEM for 0x{cd.id:04x}: '{cd.label}'") cli_info("") with open_hsm_session(ctx) as ses: - for cert_id in selected_ids: - cert_id_int = int(cert_id.replace("0x", ""), 16) - obj = ses.get_object(cert_id_int, yubihsm.defs.OBJECT.OPAQUE) + for cd in selected_certs: + obj = ses.get_object(cd.id, yubihsm.defs.OBJECT.OPAQUE) assert isinstance(obj, yubihsm.objects.Opaque) - cert_def = cert_def_for_id[cert_id_int] pem = obj.get_certificate().public_bytes(encoding=serialization.Encoding.PEM).decode() if outdir: - pem_file = Path(outdir) / f"{cert_def.label}.pem" + pem_file = Path(outdir) / f"{cd.label}.pem" pem_file.write_text(pem.strip() + "\n") - cli_info(f"Wrote 0x{cert_id_int:04x} to {pem_file}") + cli_info(f"Wrote 0x{cd.id:04x} to {pem_file}") elif bundle: pem_file = Path(bundle) with open(pem_file, "a") as f: f.write(pem.strip() + "\n") - cli_info(f"Appended 0x{cert_id_int:04x} to {pem_file}") + cli_info(f"Appended 0x{cd.id:04x} to {pem_file}") else: cli_result(pem) @@ -102,9 +95,10 @@ def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle: def create_certs_impl(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_ids: tuple): """ Create certificates on a YubiHSM2, based on the configuration file and CLI arguments. + Performs a topological sort of the certificates to ensure that any dependencies are created first. """ # Enumerate all certificate definitions in the config - scid_to_opq_def: dict[KeyID, OpaqueObject] = {} + scid_to_opq_def: dict[KeyID, HSMOpaqueObject] = {} scid_to_x509_def: dict[KeyID, X509Cert] = {} for x in find_config_items_of_class(ctx.conf, X509Cert): @@ -113,24 +107,11 @@ def create_certs_impl(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_i scid_to_opq_def[opq.id] = opq scid_to_x509_def[opq.id] = x - def _selected_defs() -> list[OpaqueObject]: - # Based on cli arguments, select the certificates to create - selected: list[OpaqueObject] = [] - if all_certs: - selected = list(scid_to_opq_def.values()) - else: - try: - cert_ids_int = [int(id.replace("0x", ""), 16) for id in cert_ids] - selected = [scid_to_opq_def[id] for id in cert_ids_int if id in scid_to_opq_def] - missing = [f"0x{id:04x}" for id in (set(cert_ids_int) - set(scid_to_opq_def.keys()))] - if missing: - raise click.ClickException(f"Error: Certificate ID(s) not found: {missing}") - except ValueError: - raise click.ClickException("Invalid certificate ID(s) specified. Must be in hex format (e.g. 0x1234).") - return selected - def _do_it(ses: AuthSession|None): - creation_order = topological_sort_x509_cert_defs( _selected_defs()) + selected_defs = list(scid_to_opq_def.values()) if all_certs \ + else [cast(HSMOpaqueObject, ctx.conf.find_def(id, HSMOpaqueObject)) for id in cert_ids] + + creation_order = topological_sort_x509_cert_defs(selected_defs) id_to_cert_obj: dict[KeyID, x509.Certificate] = {} # Create the certificates in topological order diff --git a/hsm_secrets/x509/def_utils.py b/hsm_secrets/x509/def_utils.py index dc52cd5..b1dab6e 100644 --- a/hsm_secrets/x509/def_utils.py +++ b/hsm_secrets/x509/def_utils.py @@ -2,7 +2,7 @@ from copy import deepcopy from typing import Dict, List, Optional -from hsm_secrets.config import HSMConfig, KeyID, OpaqueObject, X509Cert, X509Info, find_config_items_of_class +from hsm_secrets.config import HSMConfig, KeyID, HSMOpaqueObject, X509Cert, X509Info, find_config_items_of_class """ Utility functions for working with certificate definitions from the HSMConfig object. @@ -86,7 +86,7 @@ def pretty_x509_info(x509_info: X509Info) -> str: return res -def topological_sort_x509_cert_defs(cert_defs: List[OpaqueObject]) -> list[OpaqueObject]: +def topological_sort_x509_cert_defs(cert_defs: List[HSMOpaqueObject]) -> list[HSMOpaqueObject]: """ Sort a list of certificate definitions topologically based on their signing dependencies, such that the root CA certs come first, followed by intermediate certs, and finally leaf certs. @@ -100,11 +100,11 @@ def topological_sort_x509_cert_defs(cert_defs: List[OpaqueObject]) -> list[Opaqu signer_to_signees[cd.sign_by].append(cd.id) # Step 2: Perform a topological sort with loop detection - sorted_certs: List[OpaqueObject] = [] + sorted_certs: List[HSMOpaqueObject] = [] visited: set[KeyID] = set() in_path: set[KeyID] = set() - def dfs(c: OpaqueObject): + def dfs(c: HSMOpaqueObject): if c.id in in_path: raise Exception(f"Issuer/signing loop detected involving certificate id 0x{c.id:04x}") if c.id not in visited: