Skip to content

Commit

Permalink
Reorganize commands to reduce clutter
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Aug 27, 2024
1 parent 26fa437 commit e66403c
Show file tree
Hide file tree
Showing 7 changed files with 370 additions and 286 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,14 @@ You can always force a different authentication type, though:

2. Perform initial setup on an airgapped system. In summary:
- Connect all HSMs and reset to factory defaults
- Distribute a common wrap key with `hsm-secrets hsm make-wrap-key`
- Create a Shamir's Shared Secret admin key with `hsm-secrets hsm admin-sharing-ceremony`
- Distribute a common wrap key with `hsm-secrets hsm backup make-key`
- Create a Shamir's Shared Secret admin key with `hsm-secrets hsm admin sharing-ceremony`
- shares can be optionally password-protected
- Add user YubiKeys with `hsm-secrets user add-yubikey`
- Generate keys and certificates with `hsm-secrets hsm compare --create` and `hsm-secrets x509 create --all`
- Clone master HSM to other devices using `hsm backup` and `hsm restore`
- Generate keys and certificates with `hsm-secrets hsm objects create-missing`
- Apply your logging settings from config to the device with `hsm log apply-settings`
- Check that everything's been created with `hsm compare`
- Clone master HSM to other devices using `hsm backup export` and `hsm backup import`

See [Setup Workflow](doc/setup-workflow.md) for the full process.

Expand All @@ -155,10 +157,10 @@ You can always force a different authentication type, though:
- `hsm-secrets pass rotate` to rotate derived password(s)

4. Rarely, for admin changes, on an airgapped computer:
- Temporarily enable default admin key with `hsm-secrets hsm default-admin-enable` (asks key custodians for SSSS shared secret)
- Temporarily enable default admin key with `hsm-secrets hsm admin default-enable` (asks key custodians for SSSS shared secret)
- Make changes on one of the devices (master)
- Re-clone HSMs with `hsm backup` and `hsm restore`
- Disable the default admin again with `hsm-secrets hsm default-admin-disable`
- Re-clone HSMs with `hsm backup export` and `hsm backup import`
- Disable the default admin again with `hsm-secrets hsm admin default-disable`

YubiHSM2 devices are easy to reset, so you might want to do a test-run or two before an actual production deployment.

Expand Down
440 changes: 232 additions & 208 deletions hsm_secrets/hsm/__init__.py

Large diffs are not rendered by default.

53 changes: 33 additions & 20 deletions hsm_secrets/piv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,17 @@ def import_to_yubikey_piv(
@click.option('--template', '-t', required=False, help="Template label, default: first template")
@click.option('--subject', '-s', required=False, help="Cert subject (DN), default: from config")
@click.option('--validity', '-v', type=int, help="Validity period in days, default: from config")
@click.option('--key-type', '-k', type=click.Choice(['RSA2048', 'ECP256', 'ECP384']), default='RSA2048', help="Key type, default: same as CA")
@click.option('--key-type', '-k', type=click.Choice(['RSA2048', 'ECP256', 'ECP384']), help="Key type, default: same as CA")
@click.option('--csr', type=click.Path(exists=True, dir_okay=False, resolve_path=True), help="Path to existing CSR file")
@click.option('--ca', '-c', required=False, help="CA ID (hex) or label, default: from config")
@click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=False), help="Output filename stem, default: ./<user>-piv[.key/.cer]")
@click.option('--os-type', type=click.Choice(['windows', 'other']), default='windows', help="Target operating system")
@click.option('--san', multiple=True, help="AdditionalSANs, e.g., 'DNS:example.com', 'IP:10.0.0.2', etc.")
def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: str, ca: str, out: str, os_type: Literal["windows", "other"], san: List[str]):
"""Create a PIV user certificate
def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject: str, validity: int, key_type: Literal['RSA2048', 'ECP256', 'ECP384']|None, csr: str|None, ca: str, out: str, os_type: Literal["windows", "other"], san: List[str]):
"""Create or sign a PIV user certificate
This command generates a new PIV user certificate and key pair, and signs it with a CA certificate.
If a CSR is provided, this command signs the CSR with a CA certificate.
Otherwise it generates a new key pair (key type required) and signs a certificate for it.
Example SAN types:
- RFC822:[email protected]
Expand Down Expand Up @@ -181,9 +183,18 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject:
if v:
subject += f",{k}={v}"

# Generate key pair
key_type_enum = PivKeyType[key_type]
_public_key, private_key = _generate_piv_key_pair(key_type_enum)
# Handle CSR or key generation
if csr:
with open(csr, 'rb') as f:
csr_obj = x509.load_pem_x509_csr(f.read())
private_key = None
elif key_type:
key_type_enum = PivKeyType[key_type]
_, private_key = _generate_piv_key_pair(key_type_enum)
csr_obj = None
else:
raise click.ClickException("Either --csr or --key-type must be provided")


# Add explicitly provided SANs
x509_info.subject_alt_name = x509_info.subject_alt_name or x509_info.SubjectAltName()
Expand All @@ -205,7 +216,7 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject:
x509_info.subject_alt_name.names.setdefault('rfc822', []).append(user)

# Create X509CertBuilder
cert_builder = X509CertBuilder(ctx.conf, x509_info, private_key, dn_subject_override=subject)
cert_builder = X509CertBuilder(ctx.conf, x509_info, private_key or csr_obj, dn_subject_override=subject)

# Sign the certificate with CA
ca_id = ca or ctx.conf.piv.default_ca_id
Expand All @@ -220,20 +231,22 @@ def create_piv_cert(ctx: HsmSecretsCtx, user: str, template: str|None, subject:

PIVUserCertificateChecker(signed_cert, os_type).check_and_show_issues()

# Save files
key_file.write_bytes(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))

csr = cert_builder.generate_csr()
csr_file.write_bytes(csr.public_bytes(serialization.Encoding.PEM))
# Save files
if private_key:
key_file.write_bytes(private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
))
cli_info(f"Private key saved to: {key_file}")

csr_obj = cert_builder.generate_csr()
csr_file.write_bytes(csr_obj.public_bytes(serialization.Encoding.PEM))
cli_info(f"CSR saved to: {csr_file}")
elif csr:
cli_info(f"Using provided CSR: {csr}")

_save_pem_certificate(signed_cert, cer_file.open('wb'))

cli_info(f"Private key saved to: {key_file}")
cli_info(f"CSR saved to: {csr_file}")
cli_info(f"Certificate saved to: {cer_file}")


Expand Down
4 changes: 2 additions & 2 deletions hsm_secrets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,13 +424,13 @@ def confirm_and_delete_old_yubihsm_object_if_exists(ses: HSMSession, obj_id: hsc
:param serial: The serial number of the YubiHSM device
:param hsm_key_obj: The object to check for
:param abort: Whether to abort (raise) if the user does not want to delete the object
:return: True if the object doesn't exist or was deleted, False if the user chose not to delete it
:return: True if the object doesn't exist or was deleted, False if user chose not to delete it
"""
if info := ses.object_exists_raw(obj_id, object_type):
cli_ui_msg(f"Object 0x{obj_id:04x} already exists on YubiHSM device:")
cli_ui_msg(pretty_fmt_yubihsm_object(info))
cli_info("")
if click.confirm("Replace the existing key?", default=False, abort=abort, err=True):
if click.confirm("Replace the existing object?", default=False, abort=abort, err=True):
ses.delete_object_raw(obj_id, object_type)
else:
return False
Expand Down
40 changes: 29 additions & 11 deletions hsm_secrets/x509/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
import click
import datetime
from pathlib import Path
Expand Down Expand Up @@ -68,7 +69,7 @@ def create_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, certs: t
"""
if not all_certs and not certs:
raise click.ClickException("Error: No certificates specified for creation.")
create_certs_impl(ctx, all_certs, dry_run, certs)
x509_create_certs(ctx, all_certs, dry_run, certs)

# ---------------

Expand Down Expand Up @@ -116,7 +117,7 @@ def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle:

# ---------------

def create_certs_impl(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_ids: tuple):
def x509_create_certs(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_ids: tuple, skip_existing: bool = False):
"""
Create certificates on a YubiHSM2, based on the configuration file and CLI arguments.
Performs a topological sort of the certificates to ensure that any dependencies are created first.
Expand All @@ -137,14 +138,21 @@ def _do_it(ses: HSMSession|None):

creation_order = topological_sort_x509_cert_defs(selected_defs)
id_to_cert_obj: dict[HSMKeyID, x509.Certificate] = {}
existing_cert_ids = set()
cert_issues: list[tuple[HSMOpaqueObject, list]] = []

# Create the certificates in topological order
for cd in creation_order:
if skip_existing:
if ses and ses.object_exists(cd):
existing_cert_ids.add(cd.id)
continue

x509_info = merge_x509_info_with_defaults(scid_to_x509_def[cd.id].x509_info, ctx.conf)
issuer = scid_to_opq_def[cd.sign_by] if cd.sign_by and cd.sign_by != cd.id else None
signer = f"signed by: '{issuer.label}'" if issuer else 'self-signed'

cli_info(f"Creating 0x{cd.id:04x}: '{cd.label}' ({signer})")
cli_info(f"\nCreating 0x{cd.id:04x}: '{cd.label}' ({signer})")
cli_info(indent(pretty_x509_info(x509_info), " "))

if not dry_run:
Expand All @@ -157,13 +165,14 @@ def _do_it(ses: HSMSession|None):
issuer_cert = id_to_cert_obj.get(cd.sign_by)
if not issuer_cert:
# Issuer cert was not created on this run, try to load it from the HSM
if not ses.object_exists(cd):
raise click.ClickException(f"ERROR: Certificate 0x{cd.sign_by:04x} not found in HSM. Create it first, to sign 0x{cd.id:04x}.")
issuer_cert = ses.get_certificate(cd)
issuer_def = scid_to_opq_def[cd.sign_by]
if not ses.object_exists(issuer_def):
raise click.ClickException(f"ERROR: Certificate 0x{cd.sign_by:04x} not found in HSM. Create it first to sign 0x{cd.id:04x}.")
issuer_cert = ses.get_certificate(issuer_def)

sign_key_def = scid_to_x509_def[cd.sign_by].key
if not ses.object_exists(sign_key_def):
raise click.ClickException(f"ERROR: Key 0x{sign_key_def.id:04x} not found in HSM. Create it first, to sign 0x{cd.id:04x}.")
raise click.ClickException(f"ERROR: Key 0x{sign_key_def.id:04x} not found in HSM. Create it first to sign 0x{cd.id:04x}.")
issuer_key = ses.get_private_key(sign_key_def)

# Create and sign the certificate
Expand All @@ -174,20 +183,29 @@ def _do_it(ses: HSMSession|None):
assert issuer_key
id_to_cert_obj[cd.id] = builder.build_and_sign(issuer_cert, issuer_key)
# NOTE: We'll assume all signed certs on HSM are CA -- fix this if storing leaf certs for some reason
X509IntermediateCACertificateChecker(id_to_cert_obj[cd.id]).check_and_show_issues()
issues = X509IntermediateCACertificateChecker(id_to_cert_obj[cd.id]).check_and_show_issues()
cert_issues.append((cd, issues))
else:
id_to_cert_obj[cd.id] = builder.generate_and_self_sign()
cli_info(f"Self-signed certificate created; assuming it's a root CA for checks...")
X509RootCACertificateChecker(id_to_cert_obj[cd.id]).check_and_show_issues()

issues = X509RootCACertificateChecker(id_to_cert_obj[cd.id]).check_and_show_issues()
cert_issues.append((cd, issues))

# Put the certificates into the HSM
for cd in creation_order:
if skip_existing and cd.id in existing_cert_ids:
continue
if not dry_run:
assert isinstance(ses, HSMSession)
if confirm_and_delete_old_yubihsm_object_if_exists(ses, cd.id, yubihsm.defs.OBJECT.OPAQUE, abort=False):
ses.put_certificate(cd, id_to_cert_obj[cd.id])
cli_info(f"Certificate 0x{cd.id:04x} created and stored in YubiHSM (serial {ctx.hsm_serial}).")
cli_info(f"Certificate 0x{cd.id:04x} stored in YubiHSM {ctx.hsm_serial}.")

# Show any issues found during certificate creation
for cd, issues in cert_issues:
if issues:
cli_warn(f"\n-- Check results for certificate 0x{cd.id:04x} ({cd.label}) --")
X509RootCACertificateChecker.show_issues(issues)

if dry_run:
cli_warn("DRY RUN. Would create the following certificates:")
Expand Down
38 changes: 19 additions & 19 deletions hsm_secrets/x509/cert_checker.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Any, Callable
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import rsa, ec, ed25519, ed448
from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID
Expand Down Expand Up @@ -88,8 +89,6 @@ def _check_key_type_and_size(self):
if isinstance(public_key, rsa.RSAPublicKey):
if public_key.key_size < 2048:
self._add_issue(f"RSA key size ({public_key.key_size}) is less than 2048 bits", IssueSeverity.ERROR)
elif public_key.key_size < 3072:
self._add_issue(f"RSA key size ({public_key.key_size}) is less than 3072 bits", IssueSeverity.NOTICE)
elif isinstance(public_key, ec.EllipticCurvePublicKey):
if public_key.curve.key_size < 256:
self._add_issue(f"EC key size ({public_key.curve.key_size}) is less than 256 bits", IssueSeverity.ERROR)
Expand Down Expand Up @@ -188,16 +187,17 @@ def _check_specific_subject_common_name_consistency(self, cn_value: str, san: x5
# To be implemented by subclasses
pass

def show_issues(self):
notices = [message for severity, message in self.issues if severity == IssueSeverity.NOTICE]
warnings = [message for severity, message in self.issues if severity == IssueSeverity.WARNING]
errors = [message for severity, message in self.issues if severity == IssueSeverity.ERROR]
@staticmethod
def show_issues(issues: list[tuple[IssueSeverity, str]]):
notices = [message for severity, message in issues if severity == IssueSeverity.NOTICE]
warnings = [message for severity, message in issues if severity == IssueSeverity.WARNING]
errors = [message for severity, message in issues if severity == IssueSeverity.ERROR]

if not (notices or warnings or errors):
return

prn = cli_error if errors else (cli_warn if warnings else cli_info)
prn("\nDetected issues:")
prn: Any = cli_error if errors else (cli_warn if warnings else cli_info)
prn("Detected issues:")

if notices:
cli_info(" - ℹ️ Cert notices:")
Expand All @@ -215,9 +215,10 @@ def show_issues(self):
cli_error(f" - {msg}")


def check_and_show_issues(self):
def check_and_show_issues(self) -> list[tuple[IssueSeverity, str]]:
self.check()
self.show_issues()
self.show_issues(self.issues)
return self.issues


# ------
Expand Down Expand Up @@ -251,12 +252,13 @@ def _check_specific_key_usage(self, key_usage: x509.KeyUsage):
self._add_issue("KeyUsage includes keyEncipherment, which is not typically needed for CA certificates", IssueSeverity.WARNING)

def _check_name_constraints(self):
try:
nc = self.certificate.extensions.get_extension_for_class(x509.NameConstraints)
if not nc.critical:
self._add_issue("NameConstraints extension should be marked critical", IssueSeverity.WARNING)
except x509.ExtensionNotFound:
self._add_issue("NameConstraints extension not found", IssueSeverity.NOTICE)
#try:
# nc = self.certificate.extensions.get_extension_for_class(x509.NameConstraints)
# if not nc.critical:
# self._add_issue("NameConstraints extension should be marked critical", IssueSeverity.WARNING)
#except x509.ExtensionNotFound:
# self._add_issue("NameConstraints extension not found", IssueSeverity.NOTICE)
pass

def _check_policy_extensions(self):
try:
Expand All @@ -283,8 +285,6 @@ def _check_key_type_and_size(self):
if isinstance(public_key, rsa.RSAPublicKey):
if public_key.key_size < 2048:
self._add_issue(f"RSA key size ({public_key.key_size}) is less than 2048 bits", IssueSeverity.ERROR)
elif public_key.key_size < 3072:
self._add_issue(f"RSA key size ({public_key.key_size}) is less than 3072 bits", IssueSeverity.NOTICE)
elif isinstance(public_key, ec.EllipticCurvePublicKey):
if public_key.curve.key_size < 256:
self._add_issue(f"EC key size ({public_key.curve.key_size}) is less than 256 bits", IssueSeverity.ERROR)
Expand Down Expand Up @@ -354,4 +354,4 @@ def _check_path_length_constraint(self, path_length):
if path_length is None:
self._add_issue("No path length constraint set.", IssueSeverity.NOTICE)
elif path_length > 0:
self._add_issue(f"Path length constraint is set to {path_length}. Ensure this aligns with your intended PKI structure.", IssueSeverity.NOTICE)
self._add_issue(f"Path length constraint is set to {path_length}. Ensure you need this intermediate CA to sign more intermediates.", IssueSeverity.NOTICE)
Loading

0 comments on commit e66403c

Please sign in to comment.