-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactor PIV, implement direct-to-Yubikey cert creation
- Loading branch information
Showing
8 changed files
with
476 additions
and
320 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
from typing_extensions import Literal | ||
import click | ||
|
||
from typing import Union, Optional, get_args | ||
|
||
from cryptography.hazmat.primitives.asymmetric import rsa, ec | ||
from cryptography import x509 | ||
|
||
from hsm_secrets.config import HSMOpaqueObject, X509NameType | ||
from hsm_secrets.piv.piv_cert_checks import PIVUserCertificateChecker | ||
from hsm_secrets.utils import HsmSecretsCtx, open_hsm_session | ||
from hsm_secrets.x509.cert_builder import X509CertBuilder | ||
from hsm_secrets.x509.def_utils import find_cert_def, merge_x509_info_with_defaults | ||
|
||
|
||
PivKeyTypeName = Literal['rsa2048', 'ecp256', 'ecp384'] | ||
|
||
|
||
def make_signed_piv_user_cert( | ||
ctx: HsmSecretsCtx, | ||
user: str, | ||
template: str|None, | ||
subject: str, | ||
validity: int, | ||
key_type: PivKeyTypeName, | ||
csr_pem: str|None, | ||
ca: str, | ||
os_type: Literal["windows", "other"], | ||
san: list[str]) -> tuple[ | ||
Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], | ||
x509.CertificateSigningRequest, | ||
x509.Certificate]: | ||
# Get template | ||
if template: | ||
if template not in ctx.conf.piv.user_cert_templates: | ||
raise click.ClickException(f"Template '{template}' not found in configuration") | ||
cert_template = ctx.conf.piv.user_cert_templates[template] | ||
else: | ||
# Use first template if not specified | ||
cert_template = next(iter(ctx.conf.piv.user_cert_templates.values())) | ||
assert cert_template, "No user certificate templates found in configuration" | ||
|
||
# Merge template with defaults | ||
x509_info = merge_x509_info_with_defaults(cert_template, ctx.conf) | ||
assert x509_info, "No user certificate templates found in configuration" | ||
assert x509_info.attribs, "No user certificate attributes found in configuration" | ||
|
||
# Override template values with command-line options | ||
if validity: | ||
x509_info.validity_days = validity | ||
|
||
# Generate subject DN if not explicitly provided | ||
if not subject: | ||
subject = f"CN={user}" | ||
if x509_info.attribs: | ||
for k,v in { | ||
'O': x509_info.attribs.organization, | ||
'L': x509_info.attribs.locality, | ||
'ST': x509_info.attribs.state, | ||
'C': x509_info.attribs.country, | ||
}.items(): | ||
if v: | ||
subject += f",{k}={v}" | ||
|
||
# Handle CSR or key generation | ||
if csr_pem: | ||
csr_obj = x509.load_pem_x509_csr(csr_pem.encode()) | ||
private_key = None | ||
else: | ||
_, private_key = _generate_piv_key_pair(key_type) | ||
csr_obj = None | ||
|
||
# Add explicitly provided SANs | ||
x509_info.subject_alt_name = x509_info.subject_alt_name or x509_info.SubjectAltName() | ||
valid_san_types = get_args(X509NameType) | ||
for san_entry in san: | ||
try: | ||
san_type, san_value = san_entry.split(':', 1) | ||
except ValueError: | ||
raise click.ClickException(f"Invalid SAN: '{san_entry}'. Must be in the form 'type:value', where type is one of: {', '.join(valid_san_types)}") | ||
san_type_lower = san_type.lower() | ||
if san_type_lower not in valid_san_types: | ||
raise click.ClickException(f"Provided '{san_type.lower()}' is not a supported X509NameType. Must be one of: {', '.join(valid_san_types)}") | ||
x509_info.subject_alt_name.names.setdefault(san_type_lower, []).append(san_value) # type: ignore [arg-type] | ||
|
||
# Add UPN or email to SANs based on OS type | ||
if '@' in user: | ||
username = user | ||
elif domain := ctx.conf.piv.default_piv_domain.strip(): | ||
username = user + '@' + domain.lstrip('@') | ||
else: | ||
username = user | ||
|
||
if os_type == 'windows': | ||
x509_info.subject_alt_name.names.setdefault('upn', []).append(username) | ||
else: | ||
x509_info.subject_alt_name.names.setdefault('rfc822', []).append(username) | ||
|
||
# Create X509CertBuilder | ||
key_or_csr = private_key or csr_obj | ||
assert key_or_csr | ||
cert_builder = X509CertBuilder(ctx.conf, x509_info, key_or_csr, dn_subject_override=subject) | ||
|
||
csr_obj = cert_builder.generate_csr() if private_key else csr_obj | ||
assert csr_obj # Should be set by now | ||
|
||
# Sign the certificate with CA | ||
ca_id = ca or ctx.conf.piv.default_ca_id | ||
issuer_cert_def = ctx.conf.find_def(ca_id, HSMOpaqueObject) | ||
|
||
with open_hsm_session(ctx) as ses: | ||
issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_def.id) | ||
assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_def.id:04x}" | ||
issuer_cert = ses.get_certificate(issuer_cert_def) | ||
issuer_key = ses.get_private_key(issuer_x509_def.key) | ||
signed_cert = cert_builder.build_and_sign(issuer_cert, issuer_key) | ||
|
||
PIVUserCertificateChecker(signed_cert, os_type).check_and_show_issues() | ||
return private_key, csr_obj, signed_cert | ||
|
||
|
||
|
||
def _generate_piv_key_pair(key_type: PivKeyTypeName) -> tuple[Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey], Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]]: | ||
private_key: Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey] | ||
if key_type == 'rsa2048': | ||
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) | ||
elif key_type == 'ecp256': | ||
private_key = ec.generate_private_key(ec.SECP256R1()) | ||
elif key_type == 'ecp384': | ||
private_key = ec.generate_private_key(ec.SECP384R1()) | ||
else: | ||
raise ValueError(f"Unsupported key type: {key_type}") | ||
public_key = private_key.public_key() | ||
return public_key, private_key | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,123 @@ | ||
from typing import Callable, TypeVar, Union, Optional | ||
|
||
from cryptography.hazmat.primitives.asymmetric import rsa, ec | ||
from cryptography import x509 | ||
|
||
from yubikit.piv import PivSession, SLOT, PIN_POLICY, TOUCH_POLICY | ||
from yubikit.core.smartcard import ApduError, SW | ||
import yubikit.core | ||
import yubikit.piv | ||
|
||
from hsm_secrets.utils import cli_info, cli_warn, scan_local_yubikeys | ||
|
||
import click | ||
|
||
|
||
def import_to_yubikey_piv( | ||
cert: x509.Certificate, | ||
private_key: Optional[Union[rsa.RSAPrivateKey, ec.EllipticCurvePrivateKey]], | ||
slot: SLOT = SLOT.AUTHENTICATION, | ||
management_key: Optional[bytes] = None | ||
) -> None: | ||
""" | ||
Import the certificate and private key into a Yubikey PIV slot. | ||
:param cert: The X.509 certificate to import | ||
:param private_key: The private key corresponding to the certificate | ||
:param slot: The PIV slot to use (default: Authentication slot) | ||
:param management_key: The management key for the Yubikey PIV application (if None, will prompt) | ||
""" | ||
def _import_key_cert_op(piv: PivSession, slot: SLOT): | ||
if private_key: | ||
cli_info(f"Importing private key to slot '{slot.name.lower()}' ({slot.value:02x})...") | ||
cli_info(" - Setting touch requirement 'CACHED'. Touch is a non-standard Yubico PIV extension.") | ||
if _check_yubikey_bio_support(piv): | ||
cli_info(" - Biometric support detected. Enabling MATCH_ONCE for PIN policy.") | ||
piv.put_key(slot, private_key, pin_policy=PIN_POLICY.MATCH_ONCE, touch_policy=TOUCH_POLICY.CACHED) | ||
else: | ||
cli_info(" - Setting PIN policy to 'ONCE': PIN is needed once per session.") | ||
piv.put_key(slot, private_key, pin_policy=PIN_POLICY.ONCE, touch_policy=TOUCH_POLICY.CACHED) | ||
else: | ||
md = piv.get_slot_metadata(slot) | ||
if not md.public_key_encoded: | ||
raise click.ClickException(f"Slot '{slot.name}' has not key pair, cannot import certificate without key") | ||
|
||
# Import certificate | ||
piv.put_certificate(slot, cert, compress=True) | ||
cli_info("OK") | ||
|
||
_yubikey_piv_operation(slot, _import_key_cert_op, management_key) | ||
|
||
|
||
''' | ||
# TODO: This would be slightly more secure than generating a RAM-stored key on the Python side and importing, | ||
# but it's not a huge difference. It's feasible but would require writing KeyAdapter classes for YubiKey-stored RSA/ECC keys, | ||
# to make CSRs for the HSM-backed CA to sign. | ||
def _generate_on_yubikey_piv(slot: SLOT, key_type: yubikit.piv.KEY_TYPE, management_key: Optional[bytes] = None) -> Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]: | ||
""" | ||
Generate a key pair on YubiKey, returning the public key. | ||
""" | ||
def _generate_keypair_op(piv: PivSession, slot: SLOT) -> Union[rsa.RSAPublicKey, ec.EllipticCurvePublicKey]: | ||
bio_supported = _check_yubikey_bio_support(piv) | ||
pin_policy = PIN_POLICY.MATCH_ONCE if bio_supported else PIN_POLICY.ONCE | ||
cli_info(f"Generating {key_type.name} key pair in slot '{slot.name}' ({slot.value:02x})") | ||
cli_info("- Setting touch requirement 'CACHED': needed if last touched over 15 seconds ago.") | ||
cli_info(f"- Setting PIN policy to '{pin_policy.name}'") | ||
return piv.generate_key(slot, key_type, pin_policy=pin_policy, touch_policy=TOUCH_POLICY.CACHED) | ||
return _yubikey_piv_operation(slot, _generate_keypair_op, management_key) | ||
''' | ||
|
||
T = TypeVar('T') | ||
def _yubikey_piv_operation(slot: SLOT, op_func: Callable[[PivSession, SLOT], T], management_key: Optional[bytes] = None) -> T: | ||
try: | ||
_, piv_yubikey = scan_local_yubikeys(require_one_hsmauth=False, require_one_other=True) | ||
assert piv_yubikey | ||
|
||
cli_info(f"Device for PIV storage: '{str(piv_yubikey)}'") | ||
piv = PivSession(piv_yubikey.smart_card()) | ||
|
||
_yubikey_auth_with_piv_mgm_key(piv, management_key) | ||
return op_func(piv, slot) | ||
|
||
except ApduError as e: | ||
if e.sw == SW.AUTH_METHOD_BLOCKED: | ||
click.echo("Error: PIN is blocked") | ||
elif e.sw == SW.INCORRECT_PARAMETERS: | ||
click.echo("Error: Incorrect PIN or management key") | ||
elif e.sw == SW.SECURITY_CONDITION_NOT_SATISFIED: | ||
click.echo("Error: Security condition not satisfied. Ensure you have the correct permissions.") | ||
elif e.sw == SW.COMMAND_NOT_ALLOWED: | ||
click.echo("Error: Command not allowed. The YubiKey may be in a state that doesn't allow this operation.") | ||
else: | ||
click.echo(f"Error: {str(e)}") | ||
except Exception as e: | ||
click.echo(f"Error: {str(e)}") | ||
raise click.ClickException("Failed to perform PIV operation") | ||
|
||
|
||
def _check_yubikey_bio_support(piv: PivSession) -> bool: | ||
try: | ||
piv.get_bio_metadata() | ||
return True | ||
except yubikit.core.NotSupportedError: | ||
return False | ||
|
||
|
||
def _yubikey_auth_with_piv_mgm_key(piv: PivSession, management_key: Optional[bytes]) -> None: | ||
""" | ||
Authenticate with PIV management key. | ||
:param piv: The PIV session to authenticate | ||
:param management_key: The management key for the YubiKey PIV application. If None, will use default key or prompt user. | ||
""" | ||
mkm = piv.get_management_key_metadata() | ||
if management_key is None: | ||
if mkm.default_value: | ||
cli_warn("WARNING! Using default management key. Change it immediately after import!") | ||
management_key = yubikit.piv.DEFAULT_MANAGEMENT_KEY | ||
else: | ||
mkey_str = click.prompt("Enter PIV management key (hex)", hide_input=True) | ||
if mkey_str is None: | ||
raise click.ClickException("Management key is required") | ||
management_key = bytes.fromhex(mkey_str) | ||
piv.authenticate(mkm.key_type, management_key) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.