diff --git a/README.md b/README.md index 1103a86..54bacf8 100644 --- a/README.md +++ b/README.md @@ -4,26 +4,40 @@ Higher level interactive CLI tool for YubiHSM2 operations, based on a YAML configuration file (see [hsm-conf.yml](hsm-conf.yml)). -The config file approach simplifies planning, setup and daily use while maintaining high security standards. +The config file approach simplifies planning, setup, validity checking and daily use while maintaining high security standards. + +Built mostly on top of Yubico's Python APIs and the Cryptography library. ## Highlights - Centralized configuration in a single YAML file - Automatic key/cert generation based on the config file -- Authenticate HSM operators by YubiKey 5 hardware tokens to avoid credential theft by malware - - Integrated Yubikey HSM auth (yubihsm-auth) slot setup for operators -- Discourage leaking secrets in process listing, local disk or terminal scrollback - - Fully within one process, does not invoke external CLI tools (except in unit tests) + - Sensible default config with comments +- Authenticate HSM operators by YubiKey 5 hardware tokens + - Integrated Yubikey HSM auth (yubihsm-auth) slot management for operators - Integrate daily operations under a single tool: - OpenSSH certificate creation and signing, including hardware token **sk-ed25519** and **sk-ecdsa** keys - X.509 certificate creationg and signing + - Sanity checks / lint for generated certificates by usage - TLS server cert creation - - Windows login certs for Yubikey with PIV + - PIV cert generation (e.g. Windows login with YubiKey) + - Store in YubiKey or save to disk - Password derivation for VMs etc. +- HSM audit logging + - Specify HSM audit policy in config file + - Incrementally fetch and parse log entries from YubiHSM + - from multiple devices (for HA / load balancing) + - store into SQlite database + - convenient "forced logging mode" support (with `log fetch --clear`) + - Show log entries in human-readable + - Verify audit chain integrity + - Export new logs to JSONL, for log server submission - Improved Secret Sharing ceremony vs. YubiHSM setup util (vs. yubihsm-setup) - password protected shares (optional) - better display hygiene - detailed interactive guiding +- Discourage leaking secrets in process listing, local disk or terminal scrollback + - Fully within one process, does not invoke external CLI tools (except in unit tests) ## Practical Examples @@ -77,12 +91,22 @@ openssl crl2pkcs7 -nocrl -certfile ./certs/wiki-server.cer.pem | openssl pkcs7 ``` In this example, the HTTPS server key was generated and written on local disk, for convenience. - For added separation of concerns, it could also have been created on the web server by a webmaster (perhaps with openssl), and the HSM operator would only have signed the CSR (Certificate Signing Request) with `hsm-secrets tls sign wiki-server.csr.pem`. -## Development status +## Development + +**Work in progress**, but usable and useful. -Work-in-progress, but usable and useful. +This rather niche software is being developed to scratch some particular sysops itches, not as a "product". +Even if you don't actually use the tool, I hope this repository shares some knowledge and technical details. +Corrections, improvements and observations are welcome. + +## Unit tests + +Run `make test` to install requirements and run a test suite. + +The tests **do not** use an actual YubiHSM device, but rather a mock +implementation (using `--mock` option) to test the commands with `openssl`, `ssh-keygen` etc. ## Installation and upgrade @@ -175,6 +199,12 @@ YubiHSM2 devices are easy to reset, so you might want to do a test-run or two be Airgapped setup is necessary to prevent supply chain attacks from exfiltrating any generated secrets. You might want to use something like Tails Linux on USB stick, and wipe/destroy the media after setup. +## License + +Released under the MIT license + +Copyright 2024 by Jarno Elonen + ## Disclaimer The software is provided "as is", the license disclaims any warranties and liabilities. Use at your own risk. diff --git a/doc/setup-workflow.md b/doc/setup-workflow.md index 415fc67..d923b4b 100644 --- a/doc/setup-workflow.md +++ b/doc/setup-workflow.md @@ -3,6 +3,8 @@ This is checklist for setting up YubiHSM2 devices using the hsm-secrets tool. The process should be performed in an airgapped environment for maximum security. +**TIP:** *You can try these steps withou actual HSM devices, by using `hsm-secrets --mock mock.pickle` instead of plain `hsm-secrets`* + ## Initial Setup Workflow 1. `[ ]` Connect all YubiHSM2 devices to the airgapped computer. @@ -12,56 +14,61 @@ The process should be performed in an airgapped environment for maximum security 3. `[ ]` Set a common wrap key on all devices: ``` - hsm-secrets hsm make-wrap-key + hsm-secrets hsm backup make-key ``` 4. `[ ]` Host a Secret Sharing Ceremony to add a super admin key: ``` - hsm-secrets hsm admin-sharing-ceremony + hsm-secrets hsm admin sharing-ceremony ``` - Follow the prompts to set up the number of shares and threshold. -5. `[ ]` Add user keys (YubiKey auth) to the master device: +5. `[ ]` Add YubiKey auth user keys to the master device: ``` - hsm-secrets user add-yubikey --label + hsm-secrets user add-yubikey ``` - User will need to connect their YubiKey and follow instructions. - - Repeat for each user key. + - Repeat for each user key + +6. `[ ]` Add service accounts to master device: + ``` + hsm-secrets user add-service --all + ``` -6. `[ ]` Generate keys on the master device: +7. `[ ]` Generate keys and certificates on the master device: ``` - hsm-secrets hsm compare --create + hsm-secrets hsm objects create-missing ``` -7. `[ ]` Create certificates from the generated keys: +8. `[ ]` Apply audit logging settings to devices: ``` - hsm-secrets x509 create --all + hsm-secrets log apply-settings --alldevs ``` -8. `[ ]` Verify all configured objects are present on the master device: +9. `[ ]` Verify all configured objects are present on the master device: ``` hsm-secrets hsm compare ``` -9. `[ ]` Create a (wrapped) backup of the master device: +10. `[ ]` Create a (wrapped) backup of the master device: ``` - hsm-secrets hsm backup + hsm-secrets hsm backup export ``` -10. `[ ]` Restore the backup to other devices (for HA): +11. `[ ]` Restore the backup to other devices (for HA): ``` - hsm-secrets hsm restore + hsm-secrets --hsmserial hsm backup import ``` - Repeat for each additional device. -11. `[ ]` Verify all keys are present on all devices: +12. `[ ]` Verify all keys are present on all devices: ``` hsm-secrets hsm compare --alldevs ``` -12. `[ ]` Remove the default admin key from all devices: +13. `[ ]` Remove the default admin key from all devices: ``` - hsm-secrets hsm default-admin-disable --alldevs + hsm-secrets hsm admin default-disable --alldevs ``` ## Post-Setup Verification diff --git a/hsm-conf.yml b/hsm-conf.yml index bb49ae2..c779030 100644 --- a/hsm-conf.yml +++ b/hsm-conf.yml @@ -1,5 +1,5 @@ # This is a configuration file for the 'hsm-secrets' tool. -# It is used to generate keys and certificates for the YubiHSM 2. +# It is used to generate keys and certificates for YubiHSM 2 devices. # # Most keys are provided in RSA, Ed25519 and NIST EC formats. # Ed25519 is recommended whenever possible, RSA (super slow on YubiHSM) is for compatibility with older systems. @@ -14,8 +14,8 @@ general: } domains: - # These domain numbers separate different types of objects in the YubiHSM 2. - # The names are used by the hsm-secrets tool for clarity, not by the device itself. + # Domain numbers (1-16) separate different types of objects in the YubiHSM 2. + # These text names are used by the hsm-secrets tool for clarity, not by the device itself. x509: 1 tls: 2 nac: 3 @@ -26,6 +26,8 @@ general: password_derivation: 8 encryption: 9 + # Default settings for X.509 certificates. + # These are used for all generated certificates when a specific setting is not provided (i.e. is null). x509_defaults: validity_days: 3650 basic_constraints: @@ -61,7 +63,8 @@ general: - "http://crl2.example.com/all.crl" authority_info_access: null # OCSP is increasingly deprecated due to security issues, in favor of CRLs again -# Subsystem for YubiHSM 2 device admin auth keys. + +# YubiHSM 2 device admin keys and settings admin: default_admin_password: 'password' @@ -131,7 +134,7 @@ admin: get-log-entries: 'off' # This seems to change after fetch (firmware bug?), so don't log it to avoid digest mismatch create-session: 'off' # Not much point, as auth session key is included in every other log entry close-session: 'off' # (--||--) - authenticate-session: 'off' # (--||--) + authenticate-session: 'off' # (--||--) session-message: 'off' # This is a low-level command, spams the log, not very useful to log @@ -211,7 +214,7 @@ service_keys: # Subsystem/domain for root CAs. -# Intermediate CAs in other subsystems (TLS, X.509, ..) are signed by these. +# Intermediate CAs in other subsystems are signed by these. # # usages: Generate a root CA with OpenSSL using these as private keys. # Generate attestation certificates for the root CAs when creating them, and store them @@ -434,7 +437,8 @@ tls: sign_by: 0x0131 # ECP384 Root CA -# NAC (Network Access Control) intermediate keys for 802.1X +# NAC (Network Access Control) intermediate keys for 802.1X EAP-TLS. +# NO COMMANDS FOR THIS SECTION IMPLEMENTED YET -- KEYS ARE GENERATED FOR FUTURE USE nac: intermediate_certs: - @@ -497,9 +501,11 @@ nac: algorithm: opaque-x509-certificate sign_by: 0x0131 +# PIV (Personal Identity Verification) keys for smartcard login piv: - # PIV (Personal Identity Verification) keys for smartcard login default_ca_id: 0x0431 + default_piv_domain: '@example.directory' # AD UPN suffix for Windows, rfc822 suffix for Linux/macOS + intermediate_certs: - key: @@ -636,7 +642,9 @@ ssh: # GPG/OpenPGP keys -# for future use, PKCS#11 support in GPG is not very good atm +# NO COMMANDS FOR THIS SECTION IMPLEMENTED YET -- KEYS ARE GENERATED FOR FUTURE USE +# +# Note: PKCS#11 support in GPG is not very good atm # # RSA keys can be used for both SCA (Sign, Certify, Authenticate) and E (Encrypt) operations in GnuPG, but # it is recommended to use separate keys for these purposes, so make two keys here. Allow both sign & crypt @@ -689,6 +697,7 @@ gpg: # Code signing keys for signing software, firmware, etc. +# NO COMMANDS FOR THIS SECTION IMPLEMENTED YET -- KEYS ARE GENERATED FOR FUTURE USE codesign: intermediate_certs: - @@ -747,6 +756,7 @@ codesign: algorithm: opaque-x509-certificate sign_by: 0x0131 + # For deriving unique passwords from usernames, hostnames, etc. password_derivation: keys: @@ -776,7 +786,8 @@ password_derivation: # For generic encryption of secrets, passwords, etc. -# (For limited and infrequent use, YubiHSM is not very fast) +# Limited and infrequent use, YubiHSM is not very fast +# NO COMMANDS FOR THIS SECTION IMPLEMENTED YET -- KEYS ARE GENERATED FOR FUTURE USE encryption: keys: - label: enc-1 diff --git a/hsm_secrets/config.py b/hsm_secrets/config.py index 3b131f2..dede871 100644 --- a/hsm_secrets/config.py +++ b/hsm_secrets/config.py @@ -84,6 +84,7 @@ class TLS(NoExtraBaseModel): class PIV(NoExtraBaseModel): default_ca_id: HSMKeyID + default_piv_domain: str intermediate_certs: List['X509Cert'] dc_cert_templates: Dict[str, 'X509Info'] # Overrides global defaults user_cert_templates: Dict[str, 'X509Info'] diff --git a/hsm_secrets/piv/__init__.py b/hsm_secrets/piv/__init__.py index f02084d..c782484 100644 --- a/hsm_secrets/piv/__init__.py +++ b/hsm_secrets/piv/__init__.py @@ -1,34 +1,25 @@ -from ipaddress import ip_address import re -import enum -import datetime from typing_extensions import Literal import click from pathlib import Path -from typing import Callable, Union, List, Tuple, BinaryIO, Optional, cast, get_args - -from urllib.parse import urlparse +from typing import cast, get_args from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa, ec from cryptography import x509 -import ykman.device -import ykman.scripting -import ykman -from yubikit.piv import PivSession, SLOT, PIN_POLICY, TOUCH_POLICY -from yubikit.core.smartcard import ApduError, SW -import yubikit.core +from asn1crypto.core import OctetString # type: ignore [import] + import yubikit.piv -from yubikit.hsmauth import HsmAuthSession # type: ignore [import] -from hsm_secrets.config import HSMConfig, HSMKeyID, HSMOpaqueObject, X509Cert, X509NameType -from hsm_secrets.piv.piv_cert_checks import PIVDomainControllerCertificateChecker, PIVUserCertificateChecker -from hsm_secrets.utils import HsmSecretsCtx, cli_info, cli_warn, open_hsm_session, pass_common_args +from hsm_secrets.config import HSMOpaqueObject, X509NameType +from hsm_secrets.piv.piv_cert_checks import PIVDomainControllerCertificateChecker +from hsm_secrets.piv.piv_cert_utils import PivKeyTypeName, make_signed_piv_user_cert +from hsm_secrets.piv.yubikey_piv import import_to_yubikey_piv +from hsm_secrets.utils import HsmSecretsCtx, cli_code_info, cli_info, open_hsm_session, pass_common_args from hsm_secrets.x509.cert_builder import X509CertBuilder from hsm_secrets.x509.def_utils import find_cert_def, merge_x509_info_with_defaults -from hsm_secrets.yubihsm import HSMSession @click.group() @@ -37,139 +28,10 @@ def cmd_piv(ctx: click.Context): """PIV commands (Yubikey Windows login)""" ctx.ensure_object(dict) - -class PivKeyType(enum.Enum): - RSA2048 = "rsa2048" - ECP256 = "ecp256" - ECP384 = "ecp384" - - -@cmd_piv.command('user-cert') -@pass_common_args -@click.option('--user', '-u', required=True, help="User identifier (username for Windows, email for macOS/Linux)") -@click.option('--template', '-t', required=False, help="Template label, default: first template") -@click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config") -@click.option('--validity', '-v', type=int, help="Validity period in days, default: from config") -@click.option('--key-type', '-k', type=click.Choice(['RSA2048', 'ECP256', 'ECP384']), default='ECP384', help="Key type, default: same as CA") -@click.option('--csr', type=click.Path(exists=True, dir_okay=False, resolve_path=True), help="Path to existing CSR file") -@click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config") -@click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=False), help="Output filename stem, default: ./-piv[.key/.cer]") -@click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system") -@click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.") -def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: Literal['RSA2048', 'ECP256', 'ECP384'], csr: str|None, ca: str, out: str, os_type: Literal["windows", "other"], san: List[str]): - """Create or sign a PIV user certificate - - If a CSR is provided, sign it with a CA certificate. - Otherwise generate a new key pair and signs a certificate for it. - - Example SAN types: - - RFC822:alice@example.com - - UPN:alice@example.com - - DIRECTORY:/C=US/O=Example/CN=example.com - - OID:1.2.3.4.5=myValue - """ - # Set up default output filenames - out = out or f"{_sanitize_username(user)}-piv" - key_file = Path(out).with_suffix('.key.pem') - csr_file = Path(out).with_suffix('.csr.pem') - cer_file = Path(out).with_suffix('.cer.pem') - - # Get template - if template: - if template not in ctx.conf.piv.user_cert_templates: - raise click.ClickException(f"Template '{template}' not found in configuration") - cert_template = ctx.conf.piv.user_cert_templates[template] - else: - # Use first template if not specified - cert_template = next(iter(ctx.conf.piv.user_cert_templates.values())) - assert cert_template, "No user certificate templates found in configuration" - - # Merge template with defaults - x509_info = merge_x509_info_with_defaults(cert_template, ctx.conf) - assert x509_info, "No user certificate templates found in configuration" - assert x509_info.attribs, "No user certificate attributes found in configuration" - - # Override template values with command-line options - if validity: - x509_info.validity_days = validity - - # Generate subject DN if not explicitly provided - if not subject: - subject = f"CN={user}" - if x509_info.attribs: - for k,v in { - 'O': x509_info.attribs.organization, - 'L': x509_info.attribs.locality, - 'ST': x509_info.attribs.state, - 'C': x509_info.attribs.country, - }.items(): - if v: - subject += f",{k}={v}" - - # Handle CSR or key generation - if csr: - with open(csr, 'rb') as f: - csr_obj = x509.load_pem_x509_csr(f.read()) - private_key = None - else: - _, private_key = _generate_piv_key_pair(PivKeyType[key_type]) - csr_obj = None - - # Add explicitly provided SANs - x509_info.subject_alt_name = x509_info.subject_alt_name or x509_info.SubjectAltName() - valid_san_types = get_args(X509NameType) - for san_entry in san: - try: - san_type, san_value = san_entry.split(':', 1) - except ValueError: - raise click.ClickException(f"Invalid SAN: '{san_entry}'. Must be in the form 'type:value', where type is one of: {', '.join(valid_san_types)}") - san_type_lower = san_type.lower() - if san_type_lower not in valid_san_types: - raise click.ClickException(f"Provided '{san_type.lower()}' is not a supported X509NameType. Must be one of: {', '.join(valid_san_types)}") - x509_info.subject_alt_name.names.setdefault(san_type_lower, []).append(san_value) # type: ignore [arg-type] - - # Add UPN or email to SANs based on OS type - if os_type == 'windows': - x509_info.subject_alt_name.names.setdefault('upn', []).append(user) - else: - x509_info.subject_alt_name.names.setdefault('rfc822', []).append(user) - - # Create X509CertBuilder - key_or_csr = private_key or csr_obj - assert key_or_csr - cert_builder = X509CertBuilder(ctx.conf, x509_info, key_or_csr, dn_subject_override=subject) - - # Sign the certificate with CA - ca_id = ca or ctx.conf.piv.default_ca_id - issuer_cert_def = ctx.conf.find_def(ca_id, HSMOpaqueObject) - - with open_hsm_session(ctx) as ses: - issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_def.id) - assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_def.id:04x}" - issuer_cert = ses.get_certificate(issuer_cert_def) - issuer_key = ses.get_private_key(issuer_x509_def.key) - signed_cert = cert_builder.build_and_sign(issuer_cert, issuer_key) - - PIVUserCertificateChecker(signed_cert, os_type).check_and_show_issues() - - # Save files - if private_key: - key_file.write_bytes(private_key.private_bytes( - encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() - )) - cli_info(f"Private key saved to: {key_file}") - - csr_obj = cert_builder.generate_csr() - csr_file.write_bytes(csr_obj.public_bytes(serialization.Encoding.PEM)) - cli_info(f"CSR saved to: {csr_file}") - elif csr: - cli_info(f"Using provided CSR: {csr}") - - _save_pem_certificate(signed_cert, cer_file.open('wb')) - cli_info(f"Certificate saved to: {cer_file}") - +@cmd_piv.group('yubikey') +def cmd_piv_yubikey(): + """YubiKey PIV slot management""" + pass @cmd_piv.command('sign-dc-cert') @@ -181,7 +43,7 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: @click.option('--san', multiple=True, help="Additional (GeneralName) SANs") @click.option('--hostname', '-h', required=True, help="Hostname (CommonName) for the DC certificate") @click.option('--template', '-t', required=False, help="Template label, default: first template") -def sign_dc_cert(ctx: HsmSecretsCtx, csr: click.File, validity: int, ca: str, out: str|None, san: List[str], hostname: str, template: str|None): +def sign_dc_cert(ctx: HsmSecretsCtx, csr: click.File, validity: int, ca: str, out: str|None, san: list[str], hostname: str, template: str|None): """Sign a DC Kerberos PKINIT certificate for PIV""" csr_path = Path(csr.name) with csr_path.open('rb') as f: @@ -238,26 +100,85 @@ def sign_dc_cert(ctx: HsmSecretsCtx, csr: click.File, validity: int, ca: str, ou PIVDomainControllerCertificateChecker(signed_cert).check_and_show_issues() # Save the signed certificate - _save_pem_certificate(signed_cert, out_path.open('wb')) + with open(out_path, 'wb') as f: + f.write(signed_cert.public_bytes(encoding=serialization.Encoding.PEM)) cli_info(f"Signed certificate saved to: {out_path}") +@cmd_piv.command('user-cert') +@pass_common_args +@click.option('--user', '-u', required=True, help="User identifier (username for Windows, email for macOS/Linux)") +@click.option('--template', '-t', required=False, help="Template label, default: first template") +@click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config") +@click.option('--validity', '-v', type=int, help="Validity period in days, default: from config") +@click.option('--key-type', '-k', type=click.Choice(['rsa2048', 'ecp256', 'ecp384']), default='ecp384', help="Key type, default: same as CA") +@click.option('--csr', type=click.Path(exists=True, dir_okay=False, resolve_path=True), help="Path to existing CSR file") +@click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config") +@click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=False), help="Output filename stem, default: ./-piv[.key/.cer]") +@click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system") +@click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.") +def save_user_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: PivKeyTypeName, csr: str|None, ca: str, out: str, os_type: Literal["windows", "other"], san: list[str]): + """Create or sign PIV user certificate, save to files + + If a CSR is provided, sign it with a CA certificate. + Otherwise generate a new key pair and signs a certificate for it. + + Example SAN types: + - RFC822:alice@example.com + - UPN:alice@example.com + - DIRECTORY:/C=US/O=Example/CN=example.com + - OID:1.2.3.4.5=myValue + """ + csr_pem: str|None = None + if csr: + with open(csr, 'rb') as fi: + csr_pem = fi.read().decode() + + private_key, csr_obj, signed_cert = make_signed_piv_user_cert(ctx, user, template, subject, validity, key_type, csr_pem, ca, os_type, san) + _show_piv_cert_summary(signed_cert) + + # Save files + def _sanitize_username(user: str) -> str: + user = re.sub(r'@.*', '', user) # Remove anything after '@' + user = re.sub(r'[^\w]', '_', user) # Replace special characters with underscores + return user or "user" + out = out or f"{_sanitize_username(user)}-piv" + key_file = Path(out).with_suffix('.key.pem') + csr_file = Path(out).with_suffix('.csr.pem') + cer_file = Path(out).with_suffix('.cer.pem') + + if private_key: + key_file.write_bytes(private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption() + )) + cli_info(f"Private key saved to: {key_file}") + csr_file.write_bytes(csr_obj.public_bytes(serialization.Encoding.PEM)) + cli_info(f"CSR saved to: {csr_file}") -@cmd_piv.command('yubikey-import') + with open(cer_file, 'wb') as fo: + fo.write(signed_cert.public_bytes(encoding=serialization.Encoding.PEM)) + cli_info(f"Certificate saved to: {cer_file}") + + +@cmd_piv_yubikey.command('import') +@pass_common_args @click.argument('cert', required=True, type=click.Path(exists=True, dir_okay=False, resolve_path=True, allow_dash=False)) @click.argument('key', required=True, type=click.Path(exists=True, dir_okay=False, resolve_path=True, allow_dash=False)) @click.option('--slot', '-s', type=click.Choice(['AUTHENTICATION', 'SIGNATURE', 'KEY_MANAGEMENT', 'CARD_AUTH']), default='AUTHENTICATION', help="PIV slot to import to") @click.option('--management-key', '-m', help="PIV management key (hex), default: prompt") -def import_to_yubikey_piv_cmd(cert: click.Path, key: click.Path, slot: str, management_key: str|None): - """Import a certificate and private key to a YubiKey PIV slot""" - # Load certificate and private key - cert_path = Path(str(cert)) - key_path = Path(str(key)) +def import_to_yubikey_piv_cmd(ctx: HsmSecretsCtx, cert: click.Path, key: click.Path, slot: str, management_key: str|None): + """Import cert and key from files to YubiKey PIV slot + If two YubiKeys are connected, the one _without_ HSM auth will be used. + """ + cert_path = Path(str(cert)) with cert_path.open('rb') as f: cert_data = f.read() certificate = x509.load_pem_x509_certificate(cert_data) + key_path = Path(str(key)) with key_path.open('rb') as f: key_data = f.read() private_key = serialization.load_pem_private_key(key_data, password=None) @@ -265,166 +186,88 @@ def import_to_yubikey_piv_cmd(cert: click.Path, key: click.Path, slot: str, mana raise click.ClickException("Unsupported private key type. Only RSA and EC keys are supported for YubiKey PIV.") cli_info("PEM files loaded:") - cli_info(f"- certificate: {cert_path}") - cli_info(f"- private key: {key_path}") + cli_code_info(f" - certificate: `{cert_path.name}`") + cli_code_info(f" - private key: `{key_path.name}`") + + _show_piv_cert_summary(certificate) # Convert slot string to SLOT enum from yubikit.piv import SLOT slot_enum = getattr(SLOT, slot) - _import_to_yubikey_piv( + import_to_yubikey_piv( cert=certificate, private_key=private_key, slot=slot_enum, management_key=bytes.fromhex(management_key) if management_key else None ) +@cmd_piv_yubikey.command('generate') +@pass_common_args +@click.argument('user', required=True) +@click.option('--slot', '-s', type=click.Choice(['AUTHENTICATION', 'SIGNATURE', 'KEY_MANAGEMENT', 'CARD_AUTH']), default='AUTHENTICATION', help="PIV slot to import to") +@click.option('--management-key', '-m', help="PIV management key (hex), default: prompt") +@click.option('--template', '-t', required=False, help="Template label, default: first template") +@click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config") +@click.option('--validity', '-v', type=int, help="Validity period in days, default: from config") +@click.option('--key-type', '-k', type=click.Choice(['rsa2048', 'ecp256', 'ecp384']), default='ecp384', help="Key type, default: same as CA") +@click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config") +@click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system") +@click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.") +def yubikey_gen_user_cert(ctx: HsmSecretsCtx, user: str, slot: str, management_key: str|None, template: str|None, subject: str, validity: int, key_type: PivKeyTypeName, ca: str, os_type: Literal["windows", "other"], san: list[str]): + """Generate a PIV key + cert and store directly in YubiKey + User argument should be a AD username for Windows or email for macOS/Linux. -def _generate_piv_key_pair(key_type: PivKeyType) -> Tuple[Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey], Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]]: - private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey] - if key_type == PivKeyType.RSA2048: - private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) - elif key_type == PivKeyType.ECP256: - private_key = ec.generate_private_key(ec.SECP256R1()) - elif key_type == PivKeyType.ECP384: - private_key = ec.generate_private_key(ec.SECP384R1()) - else: - raise ValueError(f"Unsupported key type: {key_type}") - public_key = private_key.public_key() - return public_key, private_key - - -def _save_pem_certificate(cert: x509.Certificate, output_file: BinaryIO) -> None: - pem_data = cert.public_bytes(encoding=serialization.Encoding.PEM) - output_file.write(pem_data) - -def _sanitize_username(user: str) -> str: - user = re.sub(r'@.*', '', user) # Remove anything after '@' - user = re.sub(r'[^\w]', '_', user) # Replace special characters with underscores - return user or "user" - - -def _import_to_yubikey_piv( - cert: x509.Certificate, - private_key: Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], - slot: SLOT = SLOT.AUTHENTICATION, - management_key: Optional[bytes] = None -) -> None: + If two YubiKeys are connected, the one _without_ HSM auth will be used for PIV. """ - Import the certificate and private key into a Yubikey PIV slot. + slot_enum: yubikit.piv.SLOT = getattr(yubikit.piv.SLOT, slot) + private_key, _csr_obj, signed_cert = make_signed_piv_user_cert(ctx, user, template, subject, validity, key_type, None, ca, os_type, san) + _show_piv_cert_summary(signed_cert) + import_to_yubikey_piv( + cert = signed_cert, + private_key = private_key, + slot = slot_enum, + management_key = bytes.fromhex(management_key) if management_key else None + ) - :param cert: The X.509 certificate to import - :param private_key: The private key corresponding to the certificate - :param slot: The PIV slot to use (default: Authentication slot) - :param management_key: The management key for the Yubikey PIV application (if None, will prompt) - """ - def _import_op(piv: PivSession, slot: SLOT): - # Check for biometric support - bio_supported = True - try: - piv.get_bio_metadata() - except yubikit.core.NotSupportedError: - bio_supported = False - - # Import key if provided - if private_key: - cli_info(f"Importing private key to slot '{slot.name}' ({slot.value:02x})") - cli_info("- Setting touch requirement 'CACHED': needed if last touched over 15 seconds ago. Touch is a non-standard PIV extension.") - if bio_supported: - cli_info("- Biometric support detected. Enabling MATCH_ONCE for PIN policy.") - piv.put_key(slot, private_key, pin_policy=PIN_POLICY.MATCH_ONCE, touch_policy=TOUCH_POLICY.CACHED) - else: - cli_info("- Setting PIN policy to 'ONCE': PIN is needed once per session.") - piv.put_key(slot, private_key, pin_policy=PIN_POLICY.ONCE, touch_policy=TOUCH_POLICY.CACHED) - else: - md = piv.get_slot_metadata(slot) - if not md.public_key_encoded: - raise click.ClickException(f"Slot '{slot.name}' has not key pair, cannot import certificate without key") - - # Import certificate - piv.put_certificate(slot, cert, compress=True) - cli_info("OK") - - _yubikey_piv_operation(slot, _import_op, management_key) - - - -def _yubikey_piv_operation(slot: SLOT, op_func: Callable[[PivSession, SLOT], None], management_key: Optional[bytes] = None): - try: - hsm_yubikey, piv_yubikey = _scan_for_hsm_and_piv_yubikeys() - piv_yubikey = piv_yubikey or hsm_yubikey - if not piv_yubikey: - raise click.ClickException("No YubiKey found storing PIV credentials") - - if hsm_yubikey and piv_yubikey != hsm_yubikey: - cli_warn(f"Found YubiKey with HSM credentials: {hsm_yubikey} - skipping it for PIV operation.") - cli_info(f"Performing PIV operation on device: {str(piv_yubikey)}") - piv = PivSession(piv_yubikey.smart_card()) - - _yubikey_auth_with_piv_mgm_key(piv, management_key) - op_func(piv, slot) - - except ApduError as e: - if e.sw == SW.AUTH_METHOD_BLOCKED: - click.echo("Error: PIN is blocked") - elif e.sw == SW.INCORRECT_PARAMETERS: - click.echo("Error: Incorrect PIN or management key") - elif e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: - click.echo("Error: Security condition not satisfied. Ensure you have the correct permissions.") - elif e.sw == SW.COMMAND_NOT_ALLOWED: - click.echo("Error: Command not allowed. The YubiKey may be in a state that doesn't allow this operation.") + +def _show_piv_cert_summary(signed_cert: x509.Certificate): + cli_info(f"PIV certificate summary:") + cli_code_info(f" - Serial: `{signed_cert.serial_number:x}` (❗️store for revocation❗️)") + cli_code_info(f" - Subject: {signed_cert.subject.rfc4514_string()}") + for i, san in enumerate(signed_cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value): + if isinstance(san, x509.OtherName): + type_str = 'UPN' if san.type_id == x509.ObjectIdentifier('1.3.6.1.4.1.311.20.2.3') else f'OID {san.type_id.dotted_string}' + san_str = f"{type_str}: {OctetString.load(san.value).native.decode().strip()}" + elif isinstance(san, x509.RFC822Name): + san_str = f"RFC822: {san.value}" else: - click.echo(f"Error: {str(e)}") - except Exception as e: - click.echo(f"Error: {str(e)}") + san_str = str(san) + cli_code_info(f" - SAN {i+1}: {san_str}") + cli_code_info(f" - Issuer: {signed_cert.issuer.rfc4514_string()}") +''' +def generate_on_yubikey_piv_cmd(slot: str, key_type: str, management_key: Optional[str], subject: str, validity: int): + """Generate a PIV key on YubiKey and make a certificate for it""" + # Convert slot string to SLOT enum + slot_enum = getattr(SLOT, slot) -def _scan_for_hsm_and_piv_yubikeys() -> Tuple[Optional[ykman.scripting.ScriptingDevice], Optional[ykman.scripting.ScriptingDevice]]: - """ - Scan for YubiKeys for a) HSM auth and b) PIV import. + # Convert key type string to PivKeyType enum + key_type_enum = PivKeyType[key_type] + public_key = _generate_on_yubikey_piv(slot_enum, KEY_TYPE[key_type], bytes.fromhex(management_key) if management_key else None) - If there are multiple YubiKeys, select the one without HSM support for PIV import. - If there's only one, return it for both HSM and PIV. + # Create a dummy certificate + x509_info = X509CertBuilder.get_default_x509_info() + x509_info.validity_days = validity + x509_info.attribs.common_name = subject + x509_info.subject_alt_name = x509_info.SubjectAltName() + cert_builder = X509CertBuilder(HSMConfig(), x509_info, public_key, dn_subject_override=subject) + dummy_cert = cert_builder.build_self_signed() - :return: Tuple of (device for HSM, device for PIV) - """ - hsm_yubikey, piv_yubikey = None, None - for yk_dev, yk_info in ykman.device.list_all_devices(): - yk = ykman.scripting.ScriptingDevice(yk_dev, yk_info) - sc = yk.smart_card() - try: - if HsmAuthSession(connection=sc).list_credentials(): - sc.close() - if hsm_yubikey: - raise click.ClickException("ERROR: Multiple YubiKeys found with HSM credentials. Heuristic is to pick the one without HSM credentials for PIV import, but this is ambiguous in this case.") - hsm_yubikey = yk - continue - except yubikit.core.NotSupportedError: - pass - sc.close() - if piv_yubikey: - raise click.ClickException("ERROR: Multiple YubiKeys found without HSM auth. Can't decide which one to use for PIV import.") - piv_yubikey = yk - if hsm_yubikey: - break - return hsm_yubikey, piv_yubikey - - -def _yubikey_auth_with_piv_mgm_key(piv: PivSession, management_key: Optional[bytes]) -> None: - """ - Authenticate with PIV management key. - :param piv: The PIV session to authenticate - :param management_key: The management key for the YubiKey PIV application - """ - mkm = piv.get_management_key_metadata() - if management_key is None: - if mkm.default_value: - cli_warn("WARNING! Using default management key. Change it immediately after import!") - management_key = yubikit.piv.DEFAULT_MANAGEMENT_KEY - else: - mkey_str = click.prompt("Enter PIV management key (hex)", hide_input=True) - if mkey_str is None: - raise click.ClickException("Management key is required") - management_key = bytes.fromhex(mkey_str) - piv.authenticate(mkm.key_type, management_key) + _import_to_yubikey_piv( + cert=dummy_cert, + private_key=None, + slot=slot_enum, + management +''' diff --git a/hsm_secrets/piv/piv_cert_utils.py b/hsm_secrets/piv/piv_cert_utils.py new file mode 100644 index 0000000..885128e --- /dev/null +++ b/hsm_secrets/piv/piv_cert_utils.py @@ -0,0 +1,135 @@ +from typing_extensions import Literal +import click + +from typing import Union, Optional, get_args + +from cryptography.hazmat.primitives.asymmetric import rsa, ec +from cryptography import x509 + +from hsm_secrets.config import HSMOpaqueObject, X509NameType +from hsm_secrets.piv.piv_cert_checks import PIVUserCertificateChecker +from hsm_secrets.utils import HsmSecretsCtx, open_hsm_session +from hsm_secrets.x509.cert_builder import X509CertBuilder +from hsm_secrets.x509.def_utils import find_cert_def, merge_x509_info_with_defaults + + +PivKeyTypeName = Literal['rsa2048', 'ecp256', 'ecp384'] + + +def make_signed_piv_user_cert( + ctx: HsmSecretsCtx, + user: str, + template: str|None, + subject: str, + validity: int, + key_type: PivKeyTypeName, + csr_pem: str|None, + ca: str, + os_type: Literal["windows", "other"], + san: list[str]) -> tuple[ + Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], + x509.CertificateSigningRequest, + x509.Certificate]: + # Get template + if template: + if template not in ctx.conf.piv.user_cert_templates: + raise click.ClickException(f"Template '{template}' not found in configuration") + cert_template = ctx.conf.piv.user_cert_templates[template] + else: + # Use first template if not specified + cert_template = next(iter(ctx.conf.piv.user_cert_templates.values())) + assert cert_template, "No user certificate templates found in configuration" + + # Merge template with defaults + x509_info = merge_x509_info_with_defaults(cert_template, ctx.conf) + assert x509_info, "No user certificate templates found in configuration" + assert x509_info.attribs, "No user certificate attributes found in configuration" + + # Override template values with command-line options + if validity: + x509_info.validity_days = validity + + # Generate subject DN if not explicitly provided + if not subject: + subject = f"CN={user}" + if x509_info.attribs: + for k,v in { + 'O': x509_info.attribs.organization, + 'L': x509_info.attribs.locality, + 'ST': x509_info.attribs.state, + 'C': x509_info.attribs.country, + }.items(): + if v: + subject += f",{k}={v}" + + # Handle CSR or key generation + if csr_pem: + csr_obj = x509.load_pem_x509_csr(csr_pem.encode()) + private_key = None + else: + _, private_key = _generate_piv_key_pair(key_type) + csr_obj = None + + # Add explicitly provided SANs + x509_info.subject_alt_name = x509_info.subject_alt_name or x509_info.SubjectAltName() + valid_san_types = get_args(X509NameType) + for san_entry in san: + try: + san_type, san_value = san_entry.split(':', 1) + except ValueError: + raise click.ClickException(f"Invalid SAN: '{san_entry}'. Must be in the form 'type:value', where type is one of: {', '.join(valid_san_types)}") + san_type_lower = san_type.lower() + if san_type_lower not in valid_san_types: + raise click.ClickException(f"Provided '{san_type.lower()}' is not a supported X509NameType. Must be one of: {', '.join(valid_san_types)}") + x509_info.subject_alt_name.names.setdefault(san_type_lower, []).append(san_value) # type: ignore [arg-type] + + # Add UPN or email to SANs based on OS type + if '@' in user: + username = user + elif domain := ctx.conf.piv.default_piv_domain.strip(): + username = user + '@' + domain.lstrip('@') + else: + username = user + + if os_type == 'windows': + x509_info.subject_alt_name.names.setdefault('upn', []).append(username) + else: + x509_info.subject_alt_name.names.setdefault('rfc822', []).append(username) + + # Create X509CertBuilder + key_or_csr = private_key or csr_obj + assert key_or_csr + cert_builder = X509CertBuilder(ctx.conf, x509_info, key_or_csr, dn_subject_override=subject) + + csr_obj = cert_builder.generate_csr() if private_key else csr_obj + assert csr_obj # Should be set by now + + # Sign the certificate with CA + ca_id = ca or ctx.conf.piv.default_ca_id + issuer_cert_def = ctx.conf.find_def(ca_id, HSMOpaqueObject) + + with open_hsm_session(ctx) as ses: + issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_def.id) + assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_def.id:04x}" + issuer_cert = ses.get_certificate(issuer_cert_def) + issuer_key = ses.get_private_key(issuer_x509_def.key) + signed_cert = cert_builder.build_and_sign(issuer_cert, issuer_key) + + PIVUserCertificateChecker(signed_cert, os_type).check_and_show_issues() + return private_key, csr_obj, signed_cert + + + +def _generate_piv_key_pair(key_type: PivKeyTypeName) -> tuple[Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey], Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]]: + private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey] + if key_type == 'rsa2048': + private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + elif key_type == 'ecp256': + private_key = ec.generate_private_key(ec.SECP256R1()) + elif key_type == 'ecp384': + private_key = ec.generate_private_key(ec.SECP384R1()) + else: + raise ValueError(f"Unsupported key type: {key_type}") + public_key = private_key.public_key() + return public_key, private_key + diff --git a/hsm_secrets/piv/yubikey_piv.py b/hsm_secrets/piv/yubikey_piv.py new file mode 100644 index 0000000..5d6ba97 --- /dev/null +++ b/hsm_secrets/piv/yubikey_piv.py @@ -0,0 +1,123 @@ +from typing import Callable, TypeVar, Union, Optional + +from cryptography.hazmat.primitives.asymmetric import rsa, ec +from cryptography import x509 + +from yubikit.piv import PivSession, SLOT, PIN_POLICY, TOUCH_POLICY +from yubikit.core.smartcard import ApduError, SW +import yubikit.core +import yubikit.piv + +from hsm_secrets.utils import cli_info, cli_warn, scan_local_yubikeys + +import click + + +def import_to_yubikey_piv( + cert: x509.Certificate, + private_key: Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], + slot: SLOT = SLOT.AUTHENTICATION, + management_key: Optional[bytes] = None +) -> None: + """ + Import the certificate and private key into a Yubikey PIV slot. + + :param cert: The X.509 certificate to import + :param private_key: The private key corresponding to the certificate + :param slot: The PIV slot to use (default: Authentication slot) + :param management_key: The management key for the Yubikey PIV application (if None, will prompt) + """ + def _import_key_cert_op(piv: PivSession, slot: SLOT): + if private_key: + cli_info(f"Importing private key to slot '{slot.name.lower()}' ({slot.value:02x})...") + cli_info(" - Setting touch requirement 'CACHED'. Touch is a non-standard Yubico PIV extension.") + if _check_yubikey_bio_support(piv): + cli_info(" - Biometric support detected. Enabling MATCH_ONCE for PIN policy.") + piv.put_key(slot, private_key, pin_policy=PIN_POLICY.MATCH_ONCE, touch_policy=TOUCH_POLICY.CACHED) + else: + cli_info(" - Setting PIN policy to 'ONCE': PIN is needed once per session.") + piv.put_key(slot, private_key, pin_policy=PIN_POLICY.ONCE, touch_policy=TOUCH_POLICY.CACHED) + else: + md = piv.get_slot_metadata(slot) + if not md.public_key_encoded: + raise click.ClickException(f"Slot '{slot.name}' has not key pair, cannot import certificate without key") + + # Import certificate + piv.put_certificate(slot, cert, compress=True) + cli_info("OK") + + _yubikey_piv_operation(slot, _import_key_cert_op, management_key) + + +''' +# TODO: This would be slightly more secure than generating a RAM-stored key on the Python side and importing, +# but it's not a huge difference. It's feasible but would require writing KeyAdapter classes for YubiKey-stored RSA/ECC keys, +# to make CSRs for the HSM-backed CA to sign. + +def _generate_on_yubikey_piv(slot: SLOT, key_type: yubikit.piv.KEY_TYPE, management_key: Optional[bytes] = None) -> Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]: + """ + Generate a key pair on YubiKey, returning the public key. + """ + def _generate_keypair_op(piv: PivSession, slot: SLOT) -> Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]: + bio_supported = _check_yubikey_bio_support(piv) + pin_policy = PIN_POLICY.MATCH_ONCE if bio_supported else PIN_POLICY.ONCE + cli_info(f"Generating {key_type.name} key pair in slot '{slot.name}' ({slot.value:02x})") + cli_info("- Setting touch requirement 'CACHED': needed if last touched over 15 seconds ago.") + cli_info(f"- Setting PIN policy to '{pin_policy.name}'") + return piv.generate_key(slot, key_type, pin_policy=pin_policy, touch_policy=TOUCH_POLICY.CACHED) + return _yubikey_piv_operation(slot, _generate_keypair_op, management_key) +''' + +T = TypeVar('T') +def _yubikey_piv_operation(slot: SLOT, op_func: Callable[[PivSession, SLOT], T], management_key: Optional[bytes] = None) -> T: + try: + _, piv_yubikey = scan_local_yubikeys(require_one_hsmauth=False, require_one_other=True) + assert piv_yubikey + + cli_info(f"Device for PIV storage: '{str(piv_yubikey)}'") + piv = PivSession(piv_yubikey.smart_card()) + + _yubikey_auth_with_piv_mgm_key(piv, management_key) + return op_func(piv, slot) + + except ApduError as e: + if e.sw == SW.AUTH_METHOD_BLOCKED: + click.echo("Error: PIN is blocked") + elif e.sw == SW.INCORRECT_PARAMETERS: + click.echo("Error: Incorrect PIN or management key") + elif e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: + click.echo("Error: Security condition not satisfied. Ensure you have the correct permissions.") + elif e.sw == SW.COMMAND_NOT_ALLOWED: + click.echo("Error: Command not allowed. The YubiKey may be in a state that doesn't allow this operation.") + else: + click.echo(f"Error: {str(e)}") + except Exception as e: + click.echo(f"Error: {str(e)}") + raise click.ClickException("Failed to perform PIV operation") + + +def _check_yubikey_bio_support(piv: PivSession) -> bool: + try: + piv.get_bio_metadata() + return True + except yubikit.core.NotSupportedError: + return False + + +def _yubikey_auth_with_piv_mgm_key(piv: PivSession, management_key: Optional[bytes]) -> None: + """ + Authenticate with PIV management key. + :param piv: The PIV session to authenticate + :param management_key: The management key for the YubiKey PIV application. If None, will use default key or prompt user. + """ + mkm = piv.get_management_key_metadata() + if management_key is None: + if mkm.default_value: + cli_warn("WARNING! Using default management key. Change it immediately after import!") + management_key = yubikit.piv.DEFAULT_MANAGEMENT_KEY + else: + mkey_str = click.prompt("Enter PIV management key (hex)", hide_input=True) + if mkey_str is None: + raise click.ClickException("Management key is required") + management_key = bytes.fromhex(mkey_str) + piv.authenticate(mkm.key_type, management_key) diff --git a/hsm_secrets/user/__init__.py b/hsm_secrets/user/__init__.py index 9409601..a97f232 100644 --- a/hsm_secrets/user/__init__.py +++ b/hsm_secrets/user/__init__.py @@ -2,10 +2,11 @@ import secrets import click from hsm_secrets.config import HSMAuthKey, HSMConfig, click_hsm_obj_auto_complete -from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_info, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, secure_display_secret +from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_info, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, scan_local_yubikeys, secure_display_secret import yubikit.hsmauth import ykman.scripting +import ykman.device import yubihsm.defs, yubihsm.objects # type: ignore [import] @@ -28,7 +29,8 @@ def change_yubikey_mgt(ctx: HsmSecretsCtx): This can also be done with the `yubihsm-auth -a change-mgmkey -k ` command. It's included here for convenience. """ - yubikey = ykman.scripting.single() # Connect to the first Yubikey found, prompt user to insert one if not found + yubikey, _ = scan_local_yubikeys(require_one_hsmauth=True) + assert yubikey auth_ses = yubikit.hsmauth.HsmAuthSession(connection=yubikey.smart_card()) _, old_mgt_key_bin = _ask_yubikey_hsm_mgt_key("Enter the old Management Key", default=True) _change_yubikey_hsm_mgt_key(auth_ses, old_mgt_key_bin, ask_before_change=False) @@ -56,7 +58,11 @@ def add_user_yubikey(ctx: HsmSecretsCtx, label: str, alldevs: bool): user_key_conf = user_key_configs[0] - yubikey = ykman.scripting.single() # Connect to the first Yubikey found, prompt user to insert one if not found + + n_yubikeys = len(ykman.device.list_all_devices()) + if n_yubikeys > 1: + raise click.ClickException(f"Found {n_yubikeys} Yubikeys. Can't decide which one to set HSM auth on.") + yubikey = ykman.scripting.single(prompt = True) # Connect to the first Yubikey found, prompt user to insert one if not found yk_auth_ses = yubikit.hsmauth.HsmAuthSession(connection=yubikey.smart_card()) existing_slots = list(yk_auth_ses.list_credentials()) diff --git a/hsm_secrets/utils.py b/hsm_secrets/utils.py index 040da60..4f27fc1 100644 --- a/hsm_secrets/utils.py +++ b/hsm_secrets/utils.py @@ -1,13 +1,11 @@ from dataclasses import dataclass from enum import Enum import os -from pathlib import Path from textwrap import dedent from typing import Callable, Generator, Optional, Sequence from contextlib import contextmanager -import click - +# YubiHSM 2 from yubihsm import YubiHsm # type: ignore [import] from yubihsm.core import AuthSession # type: ignore [import] from yubihsm.defs import CAPABILITY, ALGORITHM, ERROR, OBJECT # type: ignore [import] @@ -15,7 +13,10 @@ from yubikit.hsmauth import HsmAuthSession # type: ignore [import] from yubihsm.exceptions import YubiHsmDeviceError # type: ignore [import] -from ykman import scripting +# YubiKey +import ykman.device +import ykman.scripting +import ykman import yubikit.core import yubikit.hsmauth as hsmauth @@ -195,15 +196,6 @@ def group_by_4(s: str) -> str: return res -def _list_yubikey_hsm_creds() -> Sequence[hsmauth.Credential]: - """ - List the labels of all YubiKey HSM auth credentials. - """ - yubikey = scripting.single() # Connect to the first YubiKey found - auth_ses = hsmauth.HsmAuthSession(connection=yubikey.smart_card()) - return list(auth_ses.list_credentials()) - - def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_label: str|None, device_serial: str|None, yubikey_password: Optional[str] = None) -> AuthSession: """ Connects to a YubHSM and authenticates a session using the first YubiKey found. @@ -224,7 +216,8 @@ def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_labe if not connector_url: raise ValueError(f"Device serial '{device_serial}' not found in config file.") - yubikey = scripting.single() # Connect to the first YubiKey found + yubikey, _ = scan_local_yubikeys(require_one_hsmauth=True) + assert yubikey hsmauth = HsmAuthSession(yubikey.smart_card()) # Get first Yubikey HSM auth key label from device if not specified @@ -271,6 +264,59 @@ def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_labe exit(1) +def scan_local_yubikeys(require_one_hsmauth = True, require_one_other = False) -> tuple[Optional[ykman.scripting.ScriptingDevice], Optional[ykman.scripting.ScriptingDevice]]: + """ + Scan for YubiKeys for a) HSM auth and b) other uses (e.g., PIV). + This allows modifying other YubiKeys while authenticating on the HSM with a different YubiKey. + + If there's only one YubiKey, it will be returned for both uses. + + :param require_hsm: Require exactly one YubiKey with HSM credentials + :param require_other: Require exactly one YubiKey for other uses + :return: Tuple of (YubiKey with HSM credentials, YubiKey for other uses) + :raises click.ClickException: If requirements are not met, or if multiple YubiKeys are found for a single use + """ + n_devices = len(ykman.device.list_all_devices()) + if n_devices == 0: + if require_one_hsmauth or require_one_other: + raise click.ClickException("No local YubiKey(s) found") + elif n_devices == 1: + # Only one YubiKey => return it for both HSM and other uses + yk_dev, yk_info = ykman.device.list_all_devices()[0] + yk = ykman.scripting.ScriptingDevice(yk_dev, yk_info) + return yk, yk + + # Multiple YubiKeys found, decide which one to use for which purpose + hsm_yubikey, piv_yubikey = None, None + for yk_dev, yk_info in ykman.device.list_all_devices(): + yk = ykman.scripting.ScriptingDevice(yk_dev, yk_info) + sc = yk.smart_card() + try: + if HsmAuthSession(connection=sc).list_credentials(): + if not hsm_yubikey: + hsm_yubikey = yk + sc.close() + continue + elif require_one_hsmauth: + raise click.ClickException("ERROR: Multiple YubiKeys found with HSM credentials. Can't decide which one to use for HSM auth.") + except yubikit.core.NotSupportedError: + pass + + sc.close() + if not piv_yubikey: + piv_yubikey = yk + continue + elif require_one_other: + raise click.ClickException("ERROR: Multiple YubiKeys found for other uses. Can't decide which one to return.") + + if not (hsm_yubikey and piv_yubikey): + raise click.ClickException("ERROR: Initial scan found multiple YubiKeys, but failed to pick one for each use. Bug or unexpected device state change.") + + return hsm_yubikey, piv_yubikey + + + + def verify_hsm_device_info(device_serial, hsm): info = hsm.get_device_info() if int(device_serial) != int(info.serial): @@ -293,7 +339,8 @@ def open_hsm_session( cli_warn("~🤡~ !! SIMULATED (mock) HSM session !! Authentication skipped. ~🤡~") auth_key_id = ctx.conf.admin.default_admin_key.id if auth_method == HSMAuthMethod.PASSWORD: - assert ctx.auth_password_id + if not ctx.auth_password_id: + raise click.UsageError("Auth key ID (user login as) not specified for password auth method.") auth_key_id = ctx.auth_password_id elif auth_method == HSMAuthMethod.YUBIKEY: auth_key_id = ctx.conf.user_keys[0].id # Mock YubiKey auth key with first user key from config diff --git a/run-tests.sh b/run-tests.sh index 3b23561..56c4841 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -143,7 +143,7 @@ test_tls_certificates() { test_piv_user_certificate_key_type() { setup - local output=$(run_cmd piv user-cert -u test.user@example.com --os-type windows --key-type RSA2048 --san "RFC822:test.user@example.com" --san "DIRECTORY:C=US,O=Organization,CN=test.user" --out $TEMPDIR/testuser-piv-key) + local output=$(run_cmd piv user-cert -u test.user@example.com --os-type windows --key-type rsa2048 --san "RFC822:test.user@example.com" --san "DIRECTORY:C=US,O=Organization,CN=test.user" --out $TEMPDIR/testuser-piv-key) assert_success echo "$output" assert_not_grep "Cert errors" "$output"