diff --git a/hsm-conf.yml b/hsm-conf.yml index bb49ae2..3c211cf 100644 --- a/hsm-conf.yml +++ b/hsm-conf.yml @@ -497,9 +497,11 @@ nac: algorithm: opaque-x509-certificate sign_by: 0x0131 +# PIV (Personal Identity Verification) keys for smartcard login piv: - # PIV (Personal Identity Verification) keys for smartcard login default_ca_id: 0x0431 + default_piv_domain: '@example.directory' # AD UPN suffix for Windows, rfc822 suffix for Linux/macOS + intermediate_certs: - key: diff --git a/hsm_secrets/config.py b/hsm_secrets/config.py index 3b131f2..dede871 100644 --- a/hsm_secrets/config.py +++ b/hsm_secrets/config.py @@ -84,6 +84,7 @@ class TLS(NoExtraBaseModel): class PIV(NoExtraBaseModel): default_ca_id: HSMKeyID + default_piv_domain: str intermediate_certs: List['X509Cert'] dc_cert_templates: Dict[str, 'X509Info'] # Overrides global defaults user_cert_templates: Dict[str, 'X509Info'] diff --git a/hsm_secrets/piv/__init__.py b/hsm_secrets/piv/__init__.py index f02084d..c782484 100644 --- a/hsm_secrets/piv/__init__.py +++ b/hsm_secrets/piv/__init__.py @@ -1,34 +1,25 @@ -from ipaddress import ip_address import re -import enum -import datetime from typing_extensions import Literal import click from pathlib import Path -from typing import Callable, Union, List, Tuple, BinaryIO, Optional, cast, get_args - -from urllib.parse import urlparse +from typing import cast, get_args from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa, ec from cryptography import x509 -import ykman.device -import ykman.scripting -import ykman -from yubikit.piv import PivSession, SLOT, PIN_POLICY, TOUCH_POLICY -from yubikit.core.smartcard import ApduError, SW -import yubikit.core +from asn1crypto.core import OctetString # type: ignore [import] + import yubikit.piv -from yubikit.hsmauth import HsmAuthSession # type: ignore [import] -from hsm_secrets.config import HSMConfig, HSMKeyID, HSMOpaqueObject, X509Cert, X509NameType -from hsm_secrets.piv.piv_cert_checks import PIVDomainControllerCertificateChecker, PIVUserCertificateChecker -from hsm_secrets.utils import HsmSecretsCtx, cli_info, cli_warn, open_hsm_session, pass_common_args +from hsm_secrets.config import HSMOpaqueObject, X509NameType +from hsm_secrets.piv.piv_cert_checks import PIVDomainControllerCertificateChecker +from hsm_secrets.piv.piv_cert_utils import PivKeyTypeName, make_signed_piv_user_cert +from hsm_secrets.piv.yubikey_piv import import_to_yubikey_piv +from hsm_secrets.utils import HsmSecretsCtx, cli_code_info, cli_info, open_hsm_session, pass_common_args from hsm_secrets.x509.cert_builder import X509CertBuilder from hsm_secrets.x509.def_utils import find_cert_def, merge_x509_info_with_defaults -from hsm_secrets.yubihsm import HSMSession @click.group() @@ -37,139 +28,10 @@ def cmd_piv(ctx: click.Context): """PIV commands (Yubikey Windows login)""" ctx.ensure_object(dict) - -class PivKeyType(enum.Enum): - RSA2048 = "rsa2048" - ECP256 = "ecp256" - ECP384 = "ecp384" - - -@cmd_piv.command('user-cert') -@pass_common_args -@click.option('--user', '-u', required=True, help="User identifier (username for Windows, email for macOS/Linux)") -@click.option('--template', '-t', required=False, help="Template label, default: first template") -@click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config") -@click.option('--validity', '-v', type=int, help="Validity period in days, default: from config") -@click.option('--key-type', '-k', type=click.Choice(['RSA2048', 'ECP256', 'ECP384']), default='ECP384', help="Key type, default: same as CA") -@click.option('--csr', type=click.Path(exists=True, dir_okay=False, resolve_path=True), help="Path to existing CSR file") -@click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config") -@click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=False), help="Output filename stem, default: ./-piv[.key/.cer]") -@click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system") -@click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.") -def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: Literal['RSA2048', 'ECP256', 'ECP384'], csr: str|None, ca: str, out: str, os_type: Literal["windows", "other"], san: List[str]): - """Create or sign a PIV user certificate - - If a CSR is provided, sign it with a CA certificate. - Otherwise generate a new key pair and signs a certificate for it. - - Example SAN types: - - RFC822:alice@example.com - - UPN:alice@example.com - - DIRECTORY:/C=US/O=Example/CN=example.com - - OID:1.2.3.4.5=myValue - """ - # Set up default output filenames - out = out or f"{_sanitize_username(user)}-piv" - key_file = Path(out).with_suffix('.key.pem') - csr_file = Path(out).with_suffix('.csr.pem') - cer_file = Path(out).with_suffix('.cer.pem') - - # Get template - if template: - if template not in ctx.conf.piv.user_cert_templates: - raise click.ClickException(f"Template '{template}' not found in configuration") - cert_template = ctx.conf.piv.user_cert_templates[template] - else: - # Use first template if not specified - cert_template = next(iter(ctx.conf.piv.user_cert_templates.values())) - assert cert_template, "No user certificate templates found in configuration" - - # Merge template with defaults - x509_info = merge_x509_info_with_defaults(cert_template, ctx.conf) - assert x509_info, "No user certificate templates found in configuration" - assert x509_info.attribs, "No user certificate attributes found in configuration" - - # Override template values with command-line options - if validity: - x509_info.validity_days = validity - - # Generate subject DN if not explicitly provided - if not subject: - subject = f"CN={user}" - if x509_info.attribs: - for k,v in { - 'O': x509_info.attribs.organization, - 'L': x509_info.attribs.locality, - 'ST': x509_info.attribs.state, - 'C': x509_info.attribs.country, - }.items(): - if v: - subject += f",{k}={v}" - - # Handle CSR or key generation - if csr: - with open(csr, 'rb') as f: - csr_obj = x509.load_pem_x509_csr(f.read()) - private_key = None - else: - _, private_key = _generate_piv_key_pair(PivKeyType[key_type]) - csr_obj = None - - # Add explicitly provided SANs - x509_info.subject_alt_name = x509_info.subject_alt_name or x509_info.SubjectAltName() - valid_san_types = get_args(X509NameType) - for san_entry in san: - try: - san_type, san_value = san_entry.split(':', 1) - except ValueError: - raise click.ClickException(f"Invalid SAN: '{san_entry}'. Must be in the form 'type:value', where type is one of: {', '.join(valid_san_types)}") - san_type_lower = san_type.lower() - if san_type_lower not in valid_san_types: - raise click.ClickException(f"Provided '{san_type.lower()}' is not a supported X509NameType. Must be one of: {', '.join(valid_san_types)}") - x509_info.subject_alt_name.names.setdefault(san_type_lower, []).append(san_value) # type: ignore [arg-type] - - # Add UPN or email to SANs based on OS type - if os_type == 'windows': - x509_info.subject_alt_name.names.setdefault('upn', []).append(user) - else: - x509_info.subject_alt_name.names.setdefault('rfc822', []).append(user) - - # Create X509CertBuilder - key_or_csr = private_key or csr_obj - assert key_or_csr - cert_builder = X509CertBuilder(ctx.conf, x509_info, key_or_csr, dn_subject_override=subject) - - # Sign the certificate with CA - ca_id = ca or ctx.conf.piv.default_ca_id - issuer_cert_def = ctx.conf.find_def(ca_id, HSMOpaqueObject) - - with open_hsm_session(ctx) as ses: - issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_def.id) - assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_def.id:04x}" - issuer_cert = ses.get_certificate(issuer_cert_def) - issuer_key = ses.get_private_key(issuer_x509_def.key) - signed_cert = cert_builder.build_and_sign(issuer_cert, issuer_key) - - PIVUserCertificateChecker(signed_cert, os_type).check_and_show_issues() - - # Save files - if private_key: - key_file.write_bytes(private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() - )) - cli_info(f"Private key saved to: {key_file}") - - csr_obj = cert_builder.generate_csr() - csr_file.write_bytes(csr_obj.public_bytes(serialization.Encoding.PEM)) - cli_info(f"CSR saved to: {csr_file}") - elif csr: - cli_info(f"Using provided CSR: {csr}") - - _save_pem_certificate(signed_cert, cer_file.open('wb')) - cli_info(f"Certificate saved to: {cer_file}") - +@cmd_piv.group('yubikey') +def cmd_piv_yubikey(): + """YubiKey PIV slot management""" + pass @cmd_piv.command('sign-dc-cert') @@ -181,7 +43,7 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: @click.option('--san', multiple=True, help="Additional (GeneralName) SANs") @click.option('--hostname', '-h', required=True, help="Hostname (CommonName) for the DC certificate") @click.option('--template', '-t', required=False, help="Template label, default: first template") -def sign_dc_cert(ctx: HsmSecretsCtx, csr: click.File, validity: int, ca: str, out: str|None, san: List[str], hostname: str, template: str|None): +def sign_dc_cert(ctx: HsmSecretsCtx, csr: click.File, validity: int, ca: str, out: str|None, san: list[str], hostname: str, template: str|None): """Sign a DC Kerberos PKINIT certificate for PIV""" csr_path = Path(csr.name) with csr_path.open('rb') as f: @@ -238,26 +100,85 @@ def sign_dc_cert(ctx: HsmSecretsCtx, csr: click.File, validity: int, ca: str, ou PIVDomainControllerCertificateChecker(signed_cert).check_and_show_issues() # Save the signed certificate - _save_pem_certificate(signed_cert, out_path.open('wb')) + with open(out_path, 'wb') as f: + f.write(signed_cert.public_bytes(encoding=serialization.Encoding.PEM)) cli_info(f"Signed certificate saved to: {out_path}") +@cmd_piv.command('user-cert') +@pass_common_args +@click.option('--user', '-u', required=True, help="User identifier (username for Windows, email for macOS/Linux)") +@click.option('--template', '-t', required=False, help="Template label, default: first template") +@click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config") +@click.option('--validity', '-v', type=int, help="Validity period in days, default: from config") +@click.option('--key-type', '-k', type=click.Choice(['rsa2048', 'ecp256', 'ecp384']), default='ecp384', help="Key type, default: same as CA") +@click.option('--csr', type=click.Path(exists=True, dir_okay=False, resolve_path=True), help="Path to existing CSR file") +@click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config") +@click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=False), help="Output filename stem, default: ./-piv[.key/.cer]") +@click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system") +@click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.") +def save_user_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: PivKeyTypeName, csr: str|None, ca: str, out: str, os_type: Literal["windows", "other"], san: list[str]): + """Create or sign PIV user certificate, save to files + + If a CSR is provided, sign it with a CA certificate. + Otherwise generate a new key pair and signs a certificate for it. + + Example SAN types: + - RFC822:alice@example.com + - UPN:alice@example.com + - DIRECTORY:/C=US/O=Example/CN=example.com + - OID:1.2.3.4.5=myValue + """ + csr_pem: str|None = None + if csr: + with open(csr, 'rb') as fi: + csr_pem = fi.read().decode() + + private_key, csr_obj, signed_cert = make_signed_piv_user_cert(ctx, user, template, subject, validity, key_type, csr_pem, ca, os_type, san) + _show_piv_cert_summary(signed_cert) + + # Save files + def _sanitize_username(user: str) -> str: + user = re.sub(r'@.*', '', user) # Remove anything after '@' + user = re.sub(r'[^\w]', '_', user) # Replace special characters with underscores + return user or "user" + out = out or f"{_sanitize_username(user)}-piv" + key_file = Path(out).with_suffix('.key.pem') + csr_file = Path(out).with_suffix('.csr.pem') + cer_file = Path(out).with_suffix('.cer.pem') + + if private_key: + key_file.write_bytes(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + )) + cli_info(f"Private key saved to: {key_file}") + csr_file.write_bytes(csr_obj.public_bytes(serialization.Encoding.PEM)) + cli_info(f"CSR saved to: {csr_file}") -@cmd_piv.command('yubikey-import') + with open(cer_file, 'wb') as fo: + fo.write(signed_cert.public_bytes(encoding=serialization.Encoding.PEM)) + cli_info(f"Certificate saved to: {cer_file}") + + +@cmd_piv_yubikey.command('import') +@pass_common_args @click.argument('cert', required=True, type=click.Path(exists=True, dir_okay=False, resolve_path=True, allow_dash=False)) @click.argument('key', required=True, type=click.Path(exists=True, dir_okay=False, resolve_path=True, allow_dash=False)) @click.option('--slot', '-s', type=click.Choice(['AUTHENTICATION', 'SIGNATURE', 'KEY_MANAGEMENT', 'CARD_AUTH']), default='AUTHENTICATION', help="PIV slot to import to") @click.option('--management-key', '-m', help="PIV management key (hex), default: prompt") -def import_to_yubikey_piv_cmd(cert: click.Path, key: click.Path, slot: str, management_key: str|None): - """Import a certificate and private key to a YubiKey PIV slot""" - # Load certificate and private key - cert_path = Path(str(cert)) - key_path = Path(str(key)) +def import_to_yubikey_piv_cmd(ctx: HsmSecretsCtx, cert: click.Path, key: click.Path, slot: str, management_key: str|None): + """Import cert and key from files to YubiKey PIV slot + If two YubiKeys are connected, the one _without_ HSM auth will be used. + """ + cert_path = Path(str(cert)) with cert_path.open('rb') as f: cert_data = f.read() certificate = x509.load_pem_x509_certificate(cert_data) + key_path = Path(str(key)) with key_path.open('rb') as f: key_data = f.read() private_key = serialization.load_pem_private_key(key_data, password=None) @@ -265,166 +186,88 @@ def import_to_yubikey_piv_cmd(cert: click.Path, key: click.Path, slot: str, mana raise click.ClickException("Unsupported private key type. Only RSA and EC keys are supported for YubiKey PIV.") cli_info("PEM files loaded:") - cli_info(f"- certificate: {cert_path}") - cli_info(f"- private key: {key_path}") + cli_code_info(f" - certificate: `{cert_path.name}`") + cli_code_info(f" - private key: `{key_path.name}`") + + _show_piv_cert_summary(certificate) # Convert slot string to SLOT enum from yubikit.piv import SLOT slot_enum = getattr(SLOT, slot) - _import_to_yubikey_piv( + import_to_yubikey_piv( cert=certificate, private_key=private_key, slot=slot_enum, management_key=bytes.fromhex(management_key) if management_key else None ) +@cmd_piv_yubikey.command('generate') +@pass_common_args +@click.argument('user', required=True) +@click.option('--slot', '-s', type=click.Choice(['AUTHENTICATION', 'SIGNATURE', 'KEY_MANAGEMENT', 'CARD_AUTH']), default='AUTHENTICATION', help="PIV slot to import to") +@click.option('--management-key', '-m', help="PIV management key (hex), default: prompt") +@click.option('--template', '-t', required=False, help="Template label, default: first template") +@click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config") +@click.option('--validity', '-v', type=int, help="Validity period in days, default: from config") +@click.option('--key-type', '-k', type=click.Choice(['rsa2048', 'ecp256', 'ecp384']), default='ecp384', help="Key type, default: same as CA") +@click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config") +@click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system") +@click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.") +def yubikey_gen_user_cert(ctx: HsmSecretsCtx, user: str, slot: str, management_key: str|None, template: str|None, subject: str, validity: int, key_type: PivKeyTypeName, ca: str, os_type: Literal["windows", "other"], san: list[str]): + """Generate a PIV key + cert and store directly in YubiKey + User argument should be a AD username for Windows or email for macOS/Linux. -def _generate_piv_key_pair(key_type: PivKeyType) -> Tuple[Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey], Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]]: - private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey] - if key_type == PivKeyType.RSA2048: - private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) - elif key_type == PivKeyType.ECP256: - private_key = ec.generate_private_key(ec.SECP256R1()) - elif key_type == PivKeyType.ECP384: - private_key = ec.generate_private_key(ec.SECP384R1()) - else: - raise ValueError(f"Unsupported key type: {key_type}") - public_key = private_key.public_key() - return public_key, private_key - - -def _save_pem_certificate(cert: x509.Certificate, output_file: BinaryIO) -> None: - pem_data = cert.public_bytes(encoding=serialization.Encoding.PEM) - output_file.write(pem_data) - -def _sanitize_username(user: str) -> str: - user = re.sub(r'@.*', '', user) # Remove anything after '@' - user = re.sub(r'[^\w]', '_', user) # Replace special characters with underscores - return user or "user" - - -def _import_to_yubikey_piv( - cert: x509.Certificate, - private_key: Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], - slot: SLOT = SLOT.AUTHENTICATION, - management_key: Optional[bytes] = None -) -> None: + If two YubiKeys are connected, the one _without_ HSM auth will be used for PIV. """ - Import the certificate and private key into a Yubikey PIV slot. + slot_enum: yubikit.piv.SLOT = getattr(yubikit.piv.SLOT, slot) + private_key, _csr_obj, signed_cert = make_signed_piv_user_cert(ctx, user, template, subject, validity, key_type, None, ca, os_type, san) + _show_piv_cert_summary(signed_cert) + import_to_yubikey_piv( + cert = signed_cert, + private_key = private_key, + slot = slot_enum, + management_key = bytes.fromhex(management_key) if management_key else None + ) - :param cert: The X.509 certificate to import - :param private_key: The private key corresponding to the certificate - :param slot: The PIV slot to use (default: Authentication slot) - :param management_key: The management key for the Yubikey PIV application (if None, will prompt) - """ - def _import_op(piv: PivSession, slot: SLOT): - # Check for biometric support - bio_supported = True - try: - piv.get_bio_metadata() - except yubikit.core.NotSupportedError: - bio_supported = False - - # Import key if provided - if private_key: - cli_info(f"Importing private key to slot '{slot.name}' ({slot.value:02x})") - cli_info("- Setting touch requirement 'CACHED': needed if last touched over 15 seconds ago. Touch is a non-standard PIV extension.") - if bio_supported: - cli_info("- Biometric support detected. Enabling MATCH_ONCE for PIN policy.") - piv.put_key(slot, private_key, pin_policy=PIN_POLICY.MATCH_ONCE, touch_policy=TOUCH_POLICY.CACHED) - else: - cli_info("- Setting PIN policy to 'ONCE': PIN is needed once per session.") - piv.put_key(slot, private_key, pin_policy=PIN_POLICY.ONCE, touch_policy=TOUCH_POLICY.CACHED) - else: - md = piv.get_slot_metadata(slot) - if not md.public_key_encoded: - raise click.ClickException(f"Slot '{slot.name}' has not key pair, cannot import certificate without key") - - # Import certificate - piv.put_certificate(slot, cert, compress=True) - cli_info("OK") - - _yubikey_piv_operation(slot, _import_op, management_key) - - - -def _yubikey_piv_operation(slot: SLOT, op_func: Callable[[PivSession, SLOT], None], management_key: Optional[bytes] = None): - try: - hsm_yubikey, piv_yubikey = _scan_for_hsm_and_piv_yubikeys() - piv_yubikey = piv_yubikey or hsm_yubikey - if not piv_yubikey: - raise click.ClickException("No YubiKey found storing PIV credentials") - - if hsm_yubikey and piv_yubikey != hsm_yubikey: - cli_warn(f"Found YubiKey with HSM credentials: {hsm_yubikey} - skipping it for PIV operation.") - cli_info(f"Performing PIV operation on device: {str(piv_yubikey)}") - piv = PivSession(piv_yubikey.smart_card()) - - _yubikey_auth_with_piv_mgm_key(piv, management_key) - op_func(piv, slot) - - except ApduError as e: - if e.sw == SW.AUTH_METHOD_BLOCKED: - click.echo("Error: PIN is blocked") - elif e.sw == SW.INCORRECT_PARAMETERS: - click.echo("Error: Incorrect PIN or management key") - elif e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: - click.echo("Error: Security condition not satisfied. Ensure you have the correct permissions.") - elif e.sw == SW.COMMAND_NOT_ALLOWED: - click.echo("Error: Command not allowed. The YubiKey may be in a state that doesn't allow this operation.") + +def _show_piv_cert_summary(signed_cert: x509.Certificate): + cli_info(f"PIV certificate summary:") + cli_code_info(f" - Serial: `{signed_cert.serial_number:x}` (❗️store for revocation❗️)") + cli_code_info(f" - Subject: {signed_cert.subject.rfc4514_string()}") + for i, san in enumerate(signed_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value): + if isinstance(san, x509.OtherName): + type_str = 'UPN' if san.type_id == x509.ObjectIdentifier('1.3.6.1.4.1.311.20.2.3') else f'OID {san.type_id.dotted_string}' + san_str = f"{type_str}: {OctetString.load(san.value).native.decode().strip()}" + elif isinstance(san, x509.RFC822Name): + san_str = f"RFC822: {san.value}" else: - click.echo(f"Error: {str(e)}") - except Exception as e: - click.echo(f"Error: {str(e)}") + san_str = str(san) + cli_code_info(f" - SAN {i+1}: {san_str}") + cli_code_info(f" - Issuer: {signed_cert.issuer.rfc4514_string()}") +''' +def generate_on_yubikey_piv_cmd(slot: str, key_type: str, management_key: Optional[str], subject: str, validity: int): + """Generate a PIV key on YubiKey and make a certificate for it""" + # Convert slot string to SLOT enum + slot_enum = getattr(SLOT, slot) -def _scan_for_hsm_and_piv_yubikeys() -> Tuple[Optional[ykman.scripting.ScriptingDevice], Optional[ykman.scripting.ScriptingDevice]]: - """ - Scan for YubiKeys for a) HSM auth and b) PIV import. + # Convert key type string to PivKeyType enum + key_type_enum = PivKeyType[key_type] + public_key = _generate_on_yubikey_piv(slot_enum, KEY_TYPE[key_type], bytes.fromhex(management_key) if management_key else None) - If there are multiple YubiKeys, select the one without HSM support for PIV import. - If there's only one, return it for both HSM and PIV. + # Create a dummy certificate + x509_info = X509CertBuilder.get_default_x509_info() + x509_info.validity_days = validity + x509_info.attribs.common_name = subject + x509_info.subject_alt_name = x509_info.SubjectAltName() + cert_builder = X509CertBuilder(HSMConfig(), x509_info, public_key, dn_subject_override=subject) + dummy_cert = cert_builder.build_self_signed() - :return: Tuple of (device for HSM, device for PIV) - """ - hsm_yubikey, piv_yubikey = None, None - for yk_dev, yk_info in ykman.device.list_all_devices(): - yk = ykman.scripting.ScriptingDevice(yk_dev, yk_info) - sc = yk.smart_card() - try: - if HsmAuthSession(connection=sc).list_credentials(): - sc.close() - if hsm_yubikey: - raise click.ClickException("ERROR: Multiple YubiKeys found with HSM credentials. Heuristic is to pick the one without HSM credentials for PIV import, but this is ambiguous in this case.") - hsm_yubikey = yk - continue - except yubikit.core.NotSupportedError: - pass - sc.close() - if piv_yubikey: - raise click.ClickException("ERROR: Multiple YubiKeys found without HSM auth. Can't decide which one to use for PIV import.") - piv_yubikey = yk - if hsm_yubikey: - break - return hsm_yubikey, piv_yubikey - - -def _yubikey_auth_with_piv_mgm_key(piv: PivSession, management_key: Optional[bytes]) -> None: - """ - Authenticate with PIV management key. - :param piv: The PIV session to authenticate - :param management_key: The management key for the YubiKey PIV application - """ - mkm = piv.get_management_key_metadata() - if management_key is None: - if mkm.default_value: - cli_warn("WARNING! Using default management key. Change it immediately after import!") - management_key = yubikit.piv.DEFAULT_MANAGEMENT_KEY - else: - mkey_str = click.prompt("Enter PIV management key (hex)", hide_input=True) - if mkey_str is None: - raise click.ClickException("Management key is required") - management_key = bytes.fromhex(mkey_str) - piv.authenticate(mkm.key_type, management_key) + _import_to_yubikey_piv( + cert=dummy_cert, + private_key=None, + slot=slot_enum, + management +''' diff --git a/hsm_secrets/piv/piv_cert_utils.py b/hsm_secrets/piv/piv_cert_utils.py new file mode 100644 index 0000000..885128e --- /dev/null +++ b/hsm_secrets/piv/piv_cert_utils.py @@ -0,0 +1,135 @@ +from typing_extensions import Literal +import click + +from typing import Union, Optional, get_args + +from cryptography.hazmat.primitives.asymmetric import rsa, ec +from cryptography import x509 + +from hsm_secrets.config import HSMOpaqueObject, X509NameType +from hsm_secrets.piv.piv_cert_checks import PIVUserCertificateChecker +from hsm_secrets.utils import HsmSecretsCtx, open_hsm_session +from hsm_secrets.x509.cert_builder import X509CertBuilder +from hsm_secrets.x509.def_utils import find_cert_def, merge_x509_info_with_defaults + + +PivKeyTypeName = Literal['rsa2048', 'ecp256', 'ecp384'] + + +def make_signed_piv_user_cert( + ctx: HsmSecretsCtx, + user: str, + template: str|None, + subject: str, + validity: int, + key_type: PivKeyTypeName, + csr_pem: str|None, + ca: str, + os_type: Literal["windows", "other"], + san: list[str]) -> tuple[ + Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], + x509.CertificateSigningRequest, + x509.Certificate]: + # Get template + if template: + if template not in ctx.conf.piv.user_cert_templates: + raise click.ClickException(f"Template '{template}' not found in configuration") + cert_template = ctx.conf.piv.user_cert_templates[template] + else: + # Use first template if not specified + cert_template = next(iter(ctx.conf.piv.user_cert_templates.values())) + assert cert_template, "No user certificate templates found in configuration" + + # Merge template with defaults + x509_info = merge_x509_info_with_defaults(cert_template, ctx.conf) + assert x509_info, "No user certificate templates found in configuration" + assert x509_info.attribs, "No user certificate attributes found in configuration" + + # Override template values with command-line options + if validity: + x509_info.validity_days = validity + + # Generate subject DN if not explicitly provided + if not subject: + subject = f"CN={user}" + if x509_info.attribs: + for k,v in { + 'O': x509_info.attribs.organization, + 'L': x509_info.attribs.locality, + 'ST': x509_info.attribs.state, + 'C': x509_info.attribs.country, + }.items(): + if v: + subject += f",{k}={v}" + + # Handle CSR or key generation + if csr_pem: + csr_obj = x509.load_pem_x509_csr(csr_pem.encode()) + private_key = None + else: + _, private_key = _generate_piv_key_pair(key_type) + csr_obj = None + + # Add explicitly provided SANs + x509_info.subject_alt_name = x509_info.subject_alt_name or x509_info.SubjectAltName() + valid_san_types = get_args(X509NameType) + for san_entry in san: + try: + san_type, san_value = san_entry.split(':', 1) + except ValueError: + raise click.ClickException(f"Invalid SAN: '{san_entry}'. Must be in the form 'type:value', where type is one of: {', '.join(valid_san_types)}") + san_type_lower = san_type.lower() + if san_type_lower not in valid_san_types: + raise click.ClickException(f"Provided '{san_type.lower()}' is not a supported X509NameType. Must be one of: {', '.join(valid_san_types)}") + x509_info.subject_alt_name.names.setdefault(san_type_lower, []).append(san_value) # type: ignore [arg-type] + + # Add UPN or email to SANs based on OS type + if '@' in user: + username = user + elif domain := ctx.conf.piv.default_piv_domain.strip(): + username = user + '@' + domain.lstrip('@') + else: + username = user + + if os_type == 'windows': + x509_info.subject_alt_name.names.setdefault('upn', []).append(username) + else: + x509_info.subject_alt_name.names.setdefault('rfc822', []).append(username) + + # Create X509CertBuilder + key_or_csr = private_key or csr_obj + assert key_or_csr + cert_builder = X509CertBuilder(ctx.conf, x509_info, key_or_csr, dn_subject_override=subject) + + csr_obj = cert_builder.generate_csr() if private_key else csr_obj + assert csr_obj # Should be set by now + + # Sign the certificate with CA + ca_id = ca or ctx.conf.piv.default_ca_id + issuer_cert_def = ctx.conf.find_def(ca_id, HSMOpaqueObject) + + with open_hsm_session(ctx) as ses: + issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_def.id) + assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_def.id:04x}" + issuer_cert = ses.get_certificate(issuer_cert_def) + issuer_key = ses.get_private_key(issuer_x509_def.key) + signed_cert = cert_builder.build_and_sign(issuer_cert, issuer_key) + + PIVUserCertificateChecker(signed_cert, os_type).check_and_show_issues() + return private_key, csr_obj, signed_cert + + + +def _generate_piv_key_pair(key_type: PivKeyTypeName) -> tuple[Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey], Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]]: + private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey] + if key_type == 'rsa2048': + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + elif key_type == 'ecp256': + private_key = ec.generate_private_key(ec.SECP256R1()) + elif key_type == 'ecp384': + private_key = ec.generate_private_key(ec.SECP384R1()) + else: + raise ValueError(f"Unsupported key type: {key_type}") + public_key = private_key.public_key() + return public_key, private_key + diff --git a/hsm_secrets/piv/yubikey_piv.py b/hsm_secrets/piv/yubikey_piv.py new file mode 100644 index 0000000..5d6ba97 --- /dev/null +++ b/hsm_secrets/piv/yubikey_piv.py @@ -0,0 +1,123 @@ +from typing import Callable, TypeVar, Union, Optional + +from cryptography.hazmat.primitives.asymmetric import rsa, ec +from cryptography import x509 + +from yubikit.piv import PivSession, SLOT, PIN_POLICY, TOUCH_POLICY +from yubikit.core.smartcard import ApduError, SW +import yubikit.core +import yubikit.piv + +from hsm_secrets.utils import cli_info, cli_warn, scan_local_yubikeys + +import click + + +def import_to_yubikey_piv( + cert: x509.Certificate, + private_key: Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], + slot: SLOT = SLOT.AUTHENTICATION, + management_key: Optional[bytes] = None +) -> None: + """ + Import the certificate and private key into a Yubikey PIV slot. + + :param cert: The X.509 certificate to import + :param private_key: The private key corresponding to the certificate + :param slot: The PIV slot to use (default: Authentication slot) + :param management_key: The management key for the Yubikey PIV application (if None, will prompt) + """ + def _import_key_cert_op(piv: PivSession, slot: SLOT): + if private_key: + cli_info(f"Importing private key to slot '{slot.name.lower()}' ({slot.value:02x})...") + cli_info(" - Setting touch requirement 'CACHED'. Touch is a non-standard Yubico PIV extension.") + if _check_yubikey_bio_support(piv): + cli_info(" - Biometric support detected. Enabling MATCH_ONCE for PIN policy.") + piv.put_key(slot, private_key, pin_policy=PIN_POLICY.MATCH_ONCE, touch_policy=TOUCH_POLICY.CACHED) + else: + cli_info(" - Setting PIN policy to 'ONCE': PIN is needed once per session.") + piv.put_key(slot, private_key, pin_policy=PIN_POLICY.ONCE, touch_policy=TOUCH_POLICY.CACHED) + else: + md = piv.get_slot_metadata(slot) + if not md.public_key_encoded: + raise click.ClickException(f"Slot '{slot.name}' has not key pair, cannot import certificate without key") + + # Import certificate + piv.put_certificate(slot, cert, compress=True) + cli_info("OK") + + _yubikey_piv_operation(slot, _import_key_cert_op, management_key) + + +''' +# TODO: This would be slightly more secure than generating a RAM-stored key on the Python side and importing, +# but it's not a huge difference. It's feasible but would require writing KeyAdapter classes for YubiKey-stored RSA/ECC keys, +# to make CSRs for the HSM-backed CA to sign. + +def _generate_on_yubikey_piv(slot: SLOT, key_type: yubikit.piv.KEY_TYPE, management_key: Optional[bytes] = None) -> Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]: + """ + Generate a key pair on YubiKey, returning the public key. + """ + def _generate_keypair_op(piv: PivSession, slot: SLOT) -> Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]: + bio_supported = _check_yubikey_bio_support(piv) + pin_policy = PIN_POLICY.MATCH_ONCE if bio_supported else PIN_POLICY.ONCE + cli_info(f"Generating {key_type.name} key pair in slot '{slot.name}' ({slot.value:02x})") + cli_info("- Setting touch requirement 'CACHED': needed if last touched over 15 seconds ago.") + cli_info(f"- Setting PIN policy to '{pin_policy.name}'") + return piv.generate_key(slot, key_type, pin_policy=pin_policy, touch_policy=TOUCH_POLICY.CACHED) + return _yubikey_piv_operation(slot, _generate_keypair_op, management_key) +''' + +T = TypeVar('T') +def _yubikey_piv_operation(slot: SLOT, op_func: Callable[[PivSession, SLOT], T], management_key: Optional[bytes] = None) -> T: + try: + _, piv_yubikey = scan_local_yubikeys(require_one_hsmauth=False, require_one_other=True) + assert piv_yubikey + + cli_info(f"Device for PIV storage: '{str(piv_yubikey)}'") + piv = PivSession(piv_yubikey.smart_card()) + + _yubikey_auth_with_piv_mgm_key(piv, management_key) + return op_func(piv, slot) + + except ApduError as e: + if e.sw == SW.AUTH_METHOD_BLOCKED: + click.echo("Error: PIN is blocked") + elif e.sw == SW.INCORRECT_PARAMETERS: + click.echo("Error: Incorrect PIN or management key") + elif e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: + click.echo("Error: Security condition not satisfied. Ensure you have the correct permissions.") + elif e.sw == SW.COMMAND_NOT_ALLOWED: + click.echo("Error: Command not allowed. The YubiKey may be in a state that doesn't allow this operation.") + else: + click.echo(f"Error: {str(e)}") + except Exception as e: + click.echo(f"Error: {str(e)}") + raise click.ClickException("Failed to perform PIV operation") + + +def _check_yubikey_bio_support(piv: PivSession) -> bool: + try: + piv.get_bio_metadata() + return True + except yubikit.core.NotSupportedError: + return False + + +def _yubikey_auth_with_piv_mgm_key(piv: PivSession, management_key: Optional[bytes]) -> None: + """ + Authenticate with PIV management key. + :param piv: The PIV session to authenticate + :param management_key: The management key for the YubiKey PIV application. If None, will use default key or prompt user. + """ + mkm = piv.get_management_key_metadata() + if management_key is None: + if mkm.default_value: + cli_warn("WARNING! Using default management key. Change it immediately after import!") + management_key = yubikit.piv.DEFAULT_MANAGEMENT_KEY + else: + mkey_str = click.prompt("Enter PIV management key (hex)", hide_input=True) + if mkey_str is None: + raise click.ClickException("Management key is required") + management_key = bytes.fromhex(mkey_str) + piv.authenticate(mkm.key_type, management_key) diff --git a/hsm_secrets/user/__init__.py b/hsm_secrets/user/__init__.py index 9409601..a97f232 100644 --- a/hsm_secrets/user/__init__.py +++ b/hsm_secrets/user/__init__.py @@ -2,10 +2,11 @@ import secrets import click from hsm_secrets.config import HSMAuthKey, HSMConfig, click_hsm_obj_auto_complete -from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_info, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, secure_display_secret +from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_info, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, scan_local_yubikeys, secure_display_secret import yubikit.hsmauth import ykman.scripting +import ykman.device import yubihsm.defs, yubihsm.objects # type: ignore [import] @@ -28,7 +29,8 @@ def change_yubikey_mgt(ctx: HsmSecretsCtx): This can also be done with the `yubihsm-auth -a change-mgmkey -k ` command. It's included here for convenience. """ - yubikey = ykman.scripting.single() # Connect to the first Yubikey found, prompt user to insert one if not found + yubikey, _ = scan_local_yubikeys(require_one_hsmauth=True) + assert yubikey auth_ses = yubikit.hsmauth.HsmAuthSession(connection=yubikey.smart_card()) _, old_mgt_key_bin = _ask_yubikey_hsm_mgt_key("Enter the old Management Key", default=True) _change_yubikey_hsm_mgt_key(auth_ses, old_mgt_key_bin, ask_before_change=False) @@ -56,7 +58,11 @@ def add_user_yubikey(ctx: HsmSecretsCtx, label: str, alldevs: bool): user_key_conf = user_key_configs[0] - yubikey = ykman.scripting.single() # Connect to the first Yubikey found, prompt user to insert one if not found + + n_yubikeys = len(ykman.device.list_all_devices()) + if n_yubikeys > 1: + raise click.ClickException(f"Found {n_yubikeys} Yubikeys. Can't decide which one to set HSM auth on.") + yubikey = ykman.scripting.single(prompt = True) # Connect to the first Yubikey found, prompt user to insert one if not found yk_auth_ses = yubikit.hsmauth.HsmAuthSession(connection=yubikey.smart_card()) existing_slots = list(yk_auth_ses.list_credentials()) diff --git a/hsm_secrets/utils.py b/hsm_secrets/utils.py index 040da60..0dd17d6 100644 --- a/hsm_secrets/utils.py +++ b/hsm_secrets/utils.py @@ -1,13 +1,11 @@ from dataclasses import dataclass from enum import Enum import os -from pathlib import Path from textwrap import dedent from typing import Callable, Generator, Optional, Sequence from contextlib import contextmanager -import click - +# YubiHSM 2 from yubihsm import YubiHsm # type: ignore [import] from yubihsm.core import AuthSession # type: ignore [import] from yubihsm.defs import CAPABILITY, ALGORITHM, ERROR, OBJECT # type: ignore [import] @@ -15,7 +13,10 @@ from yubikit.hsmauth import HsmAuthSession # type: ignore [import] from yubihsm.exceptions import YubiHsmDeviceError # type: ignore [import] -from ykman import scripting +# YubiKey +import ykman.device +import ykman.scripting +import ykman import yubikit.core import yubikit.hsmauth as hsmauth @@ -195,15 +196,6 @@ def group_by_4(s: str) -> str: return res -def _list_yubikey_hsm_creds() -> Sequence[hsmauth.Credential]: - """ - List the labels of all YubiKey HSM auth credentials. - """ - yubikey = scripting.single() # Connect to the first YubiKey found - auth_ses = hsmauth.HsmAuthSession(connection=yubikey.smart_card()) - return list(auth_ses.list_credentials()) - - def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_label: str|None, device_serial: str|None, yubikey_password: Optional[str] = None) -> AuthSession: """ Connects to a YubHSM and authenticates a session using the first YubiKey found. @@ -224,7 +216,8 @@ def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_labe if not connector_url: raise ValueError(f"Device serial '{device_serial}' not found in config file.") - yubikey = scripting.single() # Connect to the first YubiKey found + yubikey, _ = scan_local_yubikeys(require_one_hsmauth=True) + assert yubikey hsmauth = HsmAuthSession(yubikey.smart_card()) # Get first Yubikey HSM auth key label from device if not specified @@ -271,6 +264,59 @@ def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_labe exit(1) +def scan_local_yubikeys(require_one_hsmauth = True, require_one_other = False) -> tuple[Optional[ykman.scripting.ScriptingDevice], Optional[ykman.scripting.ScriptingDevice]]: + """ + Scan for YubiKeys for a) HSM auth and b) other uses (e.g., PIV). + This allows modifying other YubiKeys while authenticating on the HSM with a different YubiKey. + + If there's only one YubiKey, it will be returned for both uses. + + :param require_hsm: Require exactly one YubiKey with HSM credentials + :param require_other: Require exactly one YubiKey for other uses + :return: Tuple of (YubiKey with HSM credentials, YubiKey for other uses) + :raises click.ClickException: If requirements are not met, or if multiple YubiKeys are found for a single use + """ + n_devices = len(ykman.device.list_all_devices()) + if n_devices == 0: + if require_one_hsmauth or require_one_other: + raise click.ClickException("No local YubiKey(s) found") + elif n_devices == 1: + # Only one YubiKey => return it for both HSM and other uses + yk_dev, yk_info = ykman.device.list_all_devices()[0] + yk = ykman.scripting.ScriptingDevice(yk_dev, yk_info) + return yk, yk + + # Multiple YubiKeys found, decide which one to use for which purpose + hsm_yubikey, piv_yubikey = None, None + for yk_dev, yk_info in ykman.device.list_all_devices(): + yk = ykman.scripting.ScriptingDevice(yk_dev, yk_info) + sc = yk.smart_card() + try: + if HsmAuthSession(connection=sc).list_credentials(): + if not hsm_yubikey: + hsm_yubikey = yk + sc.close() + continue + elif require_one_hsmauth: + raise click.ClickException("ERROR: Multiple YubiKeys found with HSM credentials. Can't decide which one to use for HSM auth.") + except yubikit.core.NotSupportedError: + pass + + sc.close() + if not piv_yubikey: + piv_yubikey = yk + continue + elif require_one_other: + raise click.ClickException("ERROR: Multiple YubiKeys found for other uses. Can't decide which one to return.") + + if not (hsm_yubikey and piv_yubikey): + raise click.ClickException("ERROR: Initial scan found multiple YubiKeys, but failed to pick one for each use. Bug or unexpected device state change.") + + return hsm_yubikey, piv_yubikey + + + + def verify_hsm_device_info(device_serial, hsm): info = hsm.get_device_info() if int(device_serial) != int(info.serial): diff --git a/run-tests.sh b/run-tests.sh index 3b23561..56c4841 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -143,7 +143,7 @@ test_tls_certificates() { test_piv_user_certificate_key_type() { setup - local output=$(run_cmd piv user-cert -u test.user@example.com --os-type windows --key-type RSA2048 --san "RFC822:test.user@example.com" --san "DIRECTORY:C=US,O=Organization,CN=test.user" --out $TEMPDIR/testuser-piv-key) + local output=$(run_cmd piv user-cert -u test.user@example.com --os-type windows --key-type rsa2048 --san "RFC822:test.user@example.com" --san "DIRECTORY:C=US,O=Organization,CN=test.user" --out $TEMPDIR/testuser-piv-key) assert_success echo "$output" assert_not_grep "Cert errors" "$output"