From aebcfa94c3ed7562d7b3ac60542c1d656a75a127 Mon Sep 17 00:00:00 2001 From: Jarno Elonen Date: Wed, 28 Aug 2024 00:02:02 +0300 Subject: [PATCH] Implement PIV key/cert import for Yubikey --- hsm_secrets/piv/__init__.py | 272 +++++++++++++++++++++++------------- 1 file changed, 171 insertions(+), 101 deletions(-) diff --git a/hsm_secrets/piv/__init__.py b/hsm_secrets/piv/__init__.py index af152f2..f02084d 100644 --- a/hsm_secrets/piv/__init__.py +++ b/hsm_secrets/piv/__init__.py @@ -6,7 +6,7 @@ import click from pathlib import Path -from typing import Union, List, Tuple, BinaryIO, Optional, cast, get_args +from typing import Callable, Union, List, Tuple, BinaryIO, Optional, cast, get_args from urllib.parse import urlparse @@ -19,6 +19,9 @@ import ykman from yubikit.piv import PivSession, SLOT, PIN_POLICY, TOUCH_POLICY from yubikit.core.smartcard import ApduError, SW +import yubikit.core +import yubikit.piv +from yubikit.hsmauth import HsmAuthSession # type: ignore [import] from hsm_secrets.config import HSMConfig, HSMKeyID, HSMOpaqueObject, X509Cert, X509NameType from hsm_secrets.piv.piv_cert_checks import PIVDomainControllerCertificateChecker, PIVUserCertificateChecker @@ -41,102 +44,23 @@ class PivKeyType(enum.Enum): ECP384 = "ecp384" -def _generate_piv_key_pair(key_type: PivKeyType) -> Tuple[Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey], Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]]: - private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey] - if key_type == PivKeyType.RSA2048: - private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) - elif key_type == PivKeyType.ECP256: - private_key = ec.generate_private_key(ec.SECP256R1()) - elif key_type == PivKeyType.ECP384: - private_key = ec.generate_private_key(ec.SECP384R1()) - else: - raise ValueError(f"Unsupported key type: {key_type}") - public_key = private_key.public_key() - return public_key, private_key - - -def _save_pem_certificate(cert: x509.Certificate, output_file: BinaryIO) -> None: - pem_data = cert.public_bytes(encoding=serialization.Encoding.PEM) - output_file.write(pem_data) - - -def import_to_yubikey_piv( - cert: x509.Certificate, - private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey], - slot: SLOT = SLOT.AUTHENTICATION, - pin: Optional[str] = None, - management_key: Optional[bytes] = None -) -> None: - """ - Import the certificate and private key into a Yubikey PIV slot. - - :param cert: The X.509 certificate to import - :param private_key: The private key corresponding to the certificate - :param slot: The PIV slot to use (default: Authentication slot) - :param pin: The PIN for the Yubikey PIV application (if None, will prompt) - :param management_key: The management key for the Yubikey PIV application (if None, will prompt) - """ - try: - yubikey = ykman.scripting.single() # Connect to the first Yubikey found, prompt user to insert one if not found - sc = yubikey.smart_card() - piv = PivSession(sc) - - if pin is None: - pin = click.prompt("Enter PIN", hide_input=True) - if pin is None: - raise click.ClickException("PIN is required") - piv.verify_pin(pin) - - if management_key is None: - mkey_str = click.prompt("Enter management key (hex)", hide_input=True) - if mkey_str is None: - raise click.ClickException("Management key is required") - management_key = bytes.fromhex(mkey_str) - mkm = piv.get_management_key_metadata() - piv.authenticate(mkm.key_type, management_key) - - cli_info(f"Importing private key to slot {slot.name}.") - cli_warn("Setting touch_policy=CACHED. Touch is not in PIV standard, so if this doesn't work, maybe try touch_policy=NEVER.") - piv.put_key(slot, private_key, pin_policy=PIN_POLICY.MATCH_ONCE, touch_policy=TOUCH_POLICY.CACHED) - - # Import certificate - piv.put_certificate(slot, cert, compress=True) - - click.echo(f"OK. Certificate and private key imported to YubiKey slot '{slot.name}' (0x{slot.value:x}).") - - except ApduError as e: - if e.sw == SW.AUTH_METHOD_BLOCKED: - click.echo("Error: PIN is blocked") - elif e.sw == SW.INCORRECT_PARAMETERS: - click.echo("Error: Incorrect PIN or management key") - elif e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: - click.echo("Error: Security condition not satisfied. Ensure you have the correct permissions.") - elif e.sw == SW.COMMAND_NOT_ALLOWED: - click.echo("Error: Command not allowed. The YubiKey may be in a state that doesn't allow this operation.") - else: - click.echo(f"Error: {str(e)}") - except Exception as e: - click.echo(f"Error: {str(e)}") - - - @cmd_piv.command('user-cert') @pass_common_args @click.option('--user', '-u', required=True, help="User identifier (username for Windows, email for macOS/Linux)") @click.option('--template', '-t', required=False, help="Template label, default: first template") @click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config") @click.option('--validity', '-v', type=int, help="Validity period in days, default: from config") -@click.option('--key-type', '-k', type=click.Choice(['RSA2048', 'ECP256', 'ECP384']), help="Key type, default: same as CA") +@click.option('--key-type', '-k', type=click.Choice(['RSA2048', 'ECP256', 'ECP384']), default='ECP384', help="Key type, default: same as CA") @click.option('--csr', type=click.Path(exists=True, dir_okay=False, resolve_path=True), help="Path to existing CSR file") @click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config") @click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=False), help="Output filename stem, default: ./-piv[.key/.cer]") @click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system") @click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.") -def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: Literal['RSA2048', 'ECP256', 'ECP384']|None, csr: str|None, ca: str, out: str, os_type: Literal["windows", "other"], san: List[str]): +def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: Literal['RSA2048', 'ECP256', 'ECP384'], csr: str|None, ca: str, out: str, os_type: Literal["windows", "other"], san: List[str]): """Create or sign a PIV user certificate - If a CSR is provided, this command signs the CSR with a CA certificate. - Otherwise it generates a new key pair (key type required) and signs a certificate for it. + If a CSR is provided, sign it with a CA certificate. + Otherwise generate a new key pair and signs a certificate for it. Example SAN types: - RFC822:alice@example.com @@ -145,8 +69,7 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: - OID:1.2.3.4.5=myValue """ # Set up default output filenames - if not out: - out = f"{user}-piv" + out = out or f"{_sanitize_username(user)}-piv" key_file = Path(out).with_suffix('.key.pem') csr_file = Path(out).with_suffix('.csr.pem') cer_file = Path(out).with_suffix('.cer.pem') @@ -188,13 +111,9 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: with open(csr, 'rb') as f: csr_obj = x509.load_pem_x509_csr(f.read()) private_key = None - elif key_type: - key_type_enum = PivKeyType[key_type] - _, private_key = _generate_piv_key_pair(key_type_enum) - csr_obj = None else: - raise click.ClickException("Either --csr or --key-type must be provided") - + _, private_key = _generate_piv_key_pair(PivKeyType[key_type]) + csr_obj = None # Add explicitly provided SANs x509_info.subject_alt_name = x509_info.subject_alt_name or x509_info.SubjectAltName() @@ -216,7 +135,9 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: x509_info.subject_alt_name.names.setdefault('rfc822', []).append(user) # Create X509CertBuilder - cert_builder = X509CertBuilder(ctx.conf, x509_info, private_key or csr_obj, dn_subject_override=subject) + key_or_csr = private_key or csr_obj + assert key_or_csr + cert_builder = X509CertBuilder(ctx.conf, x509_info, key_or_csr, dn_subject_override=subject) # Sign the certificate with CA ca_id = ca or ctx.conf.piv.default_ca_id @@ -326,13 +247,12 @@ def sign_dc_cert(ctx: HsmSecretsCtx, csr: click.File, validity: int, ca: str, ou @click.argument('cert', required=True, type=click.Path(exists=True, dir_okay=False, resolve_path=True, allow_dash=False)) @click.argument('key', required=True, type=click.Path(exists=True, dir_okay=False, resolve_path=True, allow_dash=False)) @click.option('--slot', '-s', type=click.Choice(['AUTHENTICATION', 'SIGNATURE', 'KEY_MANAGEMENT', 'CARD_AUTH']), default='AUTHENTICATION', help="PIV slot to import to") -@click.option('--pin', '-p', help="PIV PIN, default: prompt") @click.option('--management-key', '-m', help="PIV management key (hex), default: prompt") -def import_to_yubikey_piv_cmd(cert, key, slot, pin, management_key): +def import_to_yubikey_piv_cmd(cert: click.Path, key: click.Path, slot: str, management_key: str|None): """Import a certificate and private key to a YubiKey PIV slot""" # Load certificate and private key - cert_path = Path(cert) - key_path = Path(key) + cert_path = Path(str(cert)) + key_path = Path(str(key)) with cert_path.open('rb') as f: cert_data = f.read() @@ -344,17 +264,167 @@ def import_to_yubikey_piv_cmd(cert, key, slot, pin, management_key): if not isinstance(private_key, (rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey)): raise click.ClickException("Unsupported private key type. Only RSA and EC keys are supported for YubiKey PIV.") + cli_info("PEM files loaded:") + cli_info(f"- certificate: {cert_path}") + cli_info(f"- private key: {key_path}") + # Convert slot string to SLOT enum from yubikit.piv import SLOT slot_enum = getattr(SLOT, slot) - # Import to YubiKey - import_to_yubikey_piv( + _import_to_yubikey_piv( cert=certificate, private_key=private_key, slot=slot_enum, - pin=pin, management_key=bytes.fromhex(management_key) if management_key else None ) - cli_info(f"Certificate and private key imported to YubiKey PIV slot {slot}") + + +def _generate_piv_key_pair(key_type: PivKeyType) -> Tuple[Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey], Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]]: + private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey] + if key_type == PivKeyType.RSA2048: + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + elif key_type == PivKeyType.ECP256: + private_key = ec.generate_private_key(ec.SECP256R1()) + elif key_type == PivKeyType.ECP384: + private_key = ec.generate_private_key(ec.SECP384R1()) + else: + raise ValueError(f"Unsupported key type: {key_type}") + public_key = private_key.public_key() + return public_key, private_key + + +def _save_pem_certificate(cert: x509.Certificate, output_file: BinaryIO) -> None: + pem_data = cert.public_bytes(encoding=serialization.Encoding.PEM) + output_file.write(pem_data) + +def _sanitize_username(user: str) -> str: + user = re.sub(r'@.*', '', user) # Remove anything after '@' + user = re.sub(r'[^\w]', '_', user) # Replace special characters with underscores + return user or "user" + + +def _import_to_yubikey_piv( + cert: x509.Certificate, + private_key: Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], + slot: SLOT = SLOT.AUTHENTICATION, + management_key: Optional[bytes] = None +) -> None: + """ + Import the certificate and private key into a Yubikey PIV slot. + + :param cert: The X.509 certificate to import + :param private_key: The private key corresponding to the certificate + :param slot: The PIV slot to use (default: Authentication slot) + :param management_key: The management key for the Yubikey PIV application (if None, will prompt) + """ + def _import_op(piv: PivSession, slot: SLOT): + # Check for biometric support + bio_supported = True + try: + piv.get_bio_metadata() + except yubikit.core.NotSupportedError: + bio_supported = False + + # Import key if provided + if private_key: + cli_info(f"Importing private key to slot '{slot.name}' ({slot.value:02x})") + cli_info("- Setting touch requirement 'CACHED': needed if last touched over 15 seconds ago. Touch is a non-standard PIV extension.") + if bio_supported: + cli_info("- Biometric support detected. Enabling MATCH_ONCE for PIN policy.") + piv.put_key(slot, private_key, pin_policy=PIN_POLICY.MATCH_ONCE, touch_policy=TOUCH_POLICY.CACHED) + else: + cli_info("- Setting PIN policy to 'ONCE': PIN is needed once per session.") + piv.put_key(slot, private_key, pin_policy=PIN_POLICY.ONCE, touch_policy=TOUCH_POLICY.CACHED) + else: + md = piv.get_slot_metadata(slot) + if not md.public_key_encoded: + raise click.ClickException(f"Slot '{slot.name}' has not key pair, cannot import certificate without key") + + # Import certificate + piv.put_certificate(slot, cert, compress=True) + cli_info("OK") + + _yubikey_piv_operation(slot, _import_op, management_key) + + + +def _yubikey_piv_operation(slot: SLOT, op_func: Callable[[PivSession, SLOT], None], management_key: Optional[bytes] = None): + try: + hsm_yubikey, piv_yubikey = _scan_for_hsm_and_piv_yubikeys() + piv_yubikey = piv_yubikey or hsm_yubikey + if not piv_yubikey: + raise click.ClickException("No YubiKey found storing PIV credentials") + + if hsm_yubikey and piv_yubikey != hsm_yubikey: + cli_warn(f"Found YubiKey with HSM credentials: {hsm_yubikey} - skipping it for PIV operation.") + cli_info(f"Performing PIV operation on device: {str(piv_yubikey)}") + piv = PivSession(piv_yubikey.smart_card()) + + _yubikey_auth_with_piv_mgm_key(piv, management_key) + op_func(piv, slot) + + except ApduError as e: + if e.sw == SW.AUTH_METHOD_BLOCKED: + click.echo("Error: PIN is blocked") + elif e.sw == SW.INCORRECT_PARAMETERS: + click.echo("Error: Incorrect PIN or management key") + elif e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: + click.echo("Error: Security condition not satisfied. Ensure you have the correct permissions.") + elif e.sw == SW.COMMAND_NOT_ALLOWED: + click.echo("Error: Command not allowed. The YubiKey may be in a state that doesn't allow this operation.") + else: + click.echo(f"Error: {str(e)}") + except Exception as e: + click.echo(f"Error: {str(e)}") + + +def _scan_for_hsm_and_piv_yubikeys() -> Tuple[Optional[ykman.scripting.ScriptingDevice], Optional[ykman.scripting.ScriptingDevice]]: + """ + Scan for YubiKeys for a) HSM auth and b) PIV import. + + If there are multiple YubiKeys, select the one without HSM support for PIV import. + If there's only one, return it for both HSM and PIV. + + :return: Tuple of (device for HSM, device for PIV) + """ + hsm_yubikey, piv_yubikey = None, None + for yk_dev, yk_info in ykman.device.list_all_devices(): + yk = ykman.scripting.ScriptingDevice(yk_dev, yk_info) + sc = yk.smart_card() + try: + if HsmAuthSession(connection=sc).list_credentials(): + sc.close() + if hsm_yubikey: + raise click.ClickException("ERROR: Multiple YubiKeys found with HSM credentials. Heuristic is to pick the one without HSM credentials for PIV import, but this is ambiguous in this case.") + hsm_yubikey = yk + continue + except yubikit.core.NotSupportedError: + pass + sc.close() + if piv_yubikey: + raise click.ClickException("ERROR: Multiple YubiKeys found without HSM auth. Can't decide which one to use for PIV import.") + piv_yubikey = yk + if hsm_yubikey: + break + return hsm_yubikey, piv_yubikey + + +def _yubikey_auth_with_piv_mgm_key(piv: PivSession, management_key: Optional[bytes]) -> None: + """ + Authenticate with PIV management key. + :param piv: The PIV session to authenticate + :param management_key: The management key for the YubiKey PIV application + """ + mkm = piv.get_management_key_metadata() + if management_key is None: + if mkm.default_value: + cli_warn("WARNING! Using default management key. Change it immediately after import!") + management_key = yubikit.piv.DEFAULT_MANAGEMENT_KEY + else: + mkey_str = click.prompt("Enter PIV management key (hex)", hide_input=True) + if mkey_str is None: + raise click.ClickException("Management key is required") + management_key = bytes.fromhex(mkey_str) + piv.authenticate(mkm.key_type, management_key)