diff --git a/hsm_secrets/hsm/__init__.py b/hsm_secrets/hsm/__init__.py index c59d7a7..eed7ffd 100644 --- a/hsm_secrets/hsm/__init__.py +++ b/hsm_secrets/hsm/__init__.py @@ -4,11 +4,13 @@ import sys import tarfile import click + from hsm_secrets.config import HSMConfig, find_all_config_items_per_type from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony -from hsm_secrets.utils import hsm_generate_asymmetric_key, hsm_generate_hmac_key, hsm_generate_symmetric_key, hsm_obj_exists, hsm_put_derived_auth_key, hsm_put_wrap_key, open_hsm_session_with_default_admin, open_hsm_session_with_shared_admin, open_hsm_session_with_yubikey, print_yubihsm_object, prompt_for_secret, pw_check_fromhex, secure_display_secret -import yubihsm.defs, yubihsm.exceptions, yubihsm.objects -from yubihsm.core import AuthSession +from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, hsm_generate_asymmetric_key, hsm_generate_hmac_key, hsm_generate_symmetric_key, hsm_obj_exists, hsm_put_derived_auth_key, hsm_put_wrap_key, open_hsm_session, open_hsm_session_with_password, pass_common_args, print_yubihsm_object, prompt_for_secret, pw_check_fromhex + +import yubihsm.defs, yubihsm.exceptions, yubihsm.objects # type: ignore [import] +from yubihsm.core import AuthSession # type: ignore [import] from click import style @@ -38,7 +40,7 @@ def swear_you_are_on_airgapped_computer(): @click.group() @click.pass_context -def cmd_hsm(ctx): +def cmd_hsm(ctx: click.Context): """YubiHSM2 device management commands These commands generally require a group of HSM custodians working together @@ -72,36 +74,26 @@ def cmd_hsm(ctx): # --------------- @cmd_hsm.command('list-objects') -@click.pass_context -@click.option('--use-default-admin', is_flag=True, help="Use the default admin key (instead of Yubikey)") +@pass_common_args @click.option('--alldevs', is_flag=True, help="List objects on all devices") -def list_objects(ctx: click.Context, use_default_admin: bool, alldevs: bool): +def list_objects(ctx: HsmSecretsCtx, alldevs: bool): """List objects in the YubiHSM""" - - def do_it(conf, ses, serial): - objects = ses.list_objects() - click.echo(f"YubiHSM Objects on device {serial}:") - for o in objects: + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + with open_hsm_session(ctx, device_serial=serial) as ses: + click.echo(f"YubiHSM Objects on device {serial}:") + for o in ses.list_objects(): + click.echo("") + print_yubihsm_object(o) click.echo("") - print_yubihsm_object(o) - click.echo("") - - dev_serials = ctx.obj['config'].general.all_devices.keys() if alldevs else [ctx.obj['devserial']] - for serial in dev_serials: - if use_default_admin: - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): - do_it(conf, ses, serial) - else: - with open_hsm_session_with_yubikey(ctx, device_serial=serial) as (conf, ses): - do_it(conf, ses, serial) # --------------- @cmd_hsm.command('insecure-admin-key-enable') -@click.pass_context +@pass_common_args @click.option('--use-backup-secret', is_flag=True, help="Use backup secret instead of shared secret") @click.option('--alldevs', is_flag=True, help="Add on all devices") -def insecure_admin_key_enable(ctx: click.Context, use_backup_secret: bool, alldevs: bool): +def insecure_admin_key_enable(ctx: HsmSecretsCtx, use_backup_secret: bool, alldevs: bool): """Re-add insecure default admin key to HSM Using either a shared secret or a backup secret, (re-)create the default admin key on the YubiHSM(s). @@ -114,56 +106,64 @@ def do_it(conf: HSMConfig, ses: AuthSession, serial: str): click.echo(f"OK. Default insecure admin key (0x{obj.id:04x}: '{conf.admin.default_admin_password}') added successfully.") click.echo("!!! DON'T FORGET TO REMOVE IT after you're done with the management operations.") - # Obtain the shared (or backup) password + # This command exceptionally uses shared or backup secret to authenticate, unless explicitly forced password = None - try: - if use_backup_secret: - click.echo("Using backup secret to authenticate (instead of shared secret).") - is_hex = click.prompt("Is the backup secret hex-encoded (instead of a direct password) [y/n]?", type=bool) - - password = prompt_for_secret("Backup secret", check_fn=(pw_check_fromhex if is_hex else None)) - if is_hex: - click.echo("Interpreting backup secret as hex-encoded UTF-8 string.") - password = bytes.fromhex(password).decode('UTF-8') - else: - click.echo("Using shared secret to authenticate.") - password = cli_reconstruction_ceremony().decode('UTF-8') - except UnicodeDecodeError: - click.echo("Failed to decode password as UTF-8.") - raise - - dev_serials = ctx.obj['config'].general.all_devices.keys() if alldevs else [ctx.obj['devserial']] - for serial in dev_serials: + if not ctx.forced_auth_method: + # Obtain the shared (or backup) password try: - with open_hsm_session_with_shared_admin(ctx, password, device_serial=serial ) as (conf, ses): - do_it(conf, ses, serial) + if use_backup_secret: + click.echo("Using backup secret to authenticate (instead of shared secret).") + is_hex = click.prompt("Is the backup secret hex-encoded (instead of a direct password) [y/n]?", type=bool) + + password = prompt_for_secret("Backup secret", check_fn=(pw_check_fromhex if is_hex else None)) + if is_hex: + click.echo("Interpreting backup secret as hex-encoded UTF-8 string.") + password = bytes.fromhex(password).decode('UTF-8') + else: + click.echo("Using shared secret to authenticate.") + password = cli_reconstruction_ceremony().decode('UTF-8') + except UnicodeDecodeError: + click.echo("Failed to decode password as UTF-8.") + raise + + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + try: + if not ctx.forced_auth_method: + assert password is not None + shared_key_id = ctx.conf.admin.shared_admin_key.id + with open_hsm_session_with_password(ctx, shared_key_id, password, device_serial=serial ) as ses: + do_it(ctx.conf, ses, serial) + else: + with open_hsm_session(ctx, device_serial=serial) as ses: + do_it(ctx.conf, ses, serial) except yubihsm.exceptions.YubiHsmAuthenticationError as e: click.echo("ERROR: Failed to authenticate with the provided password.") sys.exit(1) + # --------------- @cmd_hsm.command('insecure-admin-key-disable') -@click.pass_context +@pass_common_args @click.option('--alldevs', is_flag=True, help="Remove on all devices") @click.option('--force', is_flag=True, help="Force removal even if no other admin key exists") -def insecure_admin_key_disable(ctx: click.Context, alldevs: bool, force: bool): +def insecure_admin_key_disable(ctx: HsmSecretsCtx, alldevs: bool, force: bool): """Remove insecure default admin key from the YubiHSM(s) Last step in the management workflow. Remove the default admin key from the YubiHSM(s). The command first checks that a shared admin key exists on the device(s) before removing the default one. """ - dev_serials = ctx.obj['config'].general.all_devices.keys() if alldevs else [ctx.obj['devserial']] - for serial in dev_serials: - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): - - default_key = ses.get_object(conf.admin.default_admin_key.id, yubihsm.defs.OBJECT.AUTHENTICATION_KEY) + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: + default_key = ses.get_object(ctx.conf.admin.default_admin_key.id, yubihsm.defs.OBJECT.AUTHENTICATION_KEY) assert isinstance(default_key, yubihsm.objects.AuthenticationKey) if hsm_obj_exists(default_key): # Check that shared admin key exists before removing the default one if not force: - shared_key = ses.get_object(conf.admin.shared_admin_key.id, yubihsm.defs.OBJECT.AUTHENTICATION_KEY) + shared_key = ses.get_object(ctx.conf.admin.shared_admin_key.id, yubihsm.defs.OBJECT.AUTHENTICATION_KEY) assert isinstance(shared_key, yubihsm.objects.AuthenticationKey) if not hsm_obj_exists(shared_key): click.echo(f"ERROR: Shared admin key not found on device {serial}. You could lose access to the device, so refusing the operation (use --force to override).") @@ -192,8 +192,8 @@ def insecure_admin_key_disable(ctx: click.Context, alldevs: bool, force: bool): @click.option('--num-shares', type=int, required=True, help="Number of shares to generate") @click.option('--threshold', type=int, required=True, help="Number of shares required to reconstruct the key") @click.option('--skip-ceremony', is_flag=True, default=False, help="Skip the secret sharing ceremony, ask for password directly") -@click.pass_context -def make_shared_admin_key(ctx: click.Context, num_shares: int, threshold: int, skip_ceremony: bool): +@pass_common_args +def make_shared_admin_key(ctx: HsmSecretsCtx, num_shares: int, threshold: int, skip_ceremony: bool): """Host an admin key Secret Sharing Ceremony The ceremony is a formal multi-step process where the system generates a new shared admin key @@ -206,21 +206,19 @@ def make_shared_admin_key(ctx: click.Context, num_shares: int, threshold: int, s """ swear_you_are_on_airgapped_computer() - def do_it(conf, ses, serial): - def apply_password_fn(new_password: str): - hsm_put_derived_auth_key(ses, serial, conf, conf.admin.shared_admin_key, new_password) + try: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: + def apply_password_fn(new_password: str): + hsm_put_derived_auth_key(ses, ctx.hsm_serial, ctx.conf, ctx.conf.admin.shared_admin_key, new_password) - if skip_ceremony: - apply_password_fn(prompt_for_secret("Enter the (new) shared admin password to store", confirm=True)) - else: - secret = ses.get_pseudo_random(256//8) - cli_splitting_ceremony(num_shares, threshold, apply_password_fn, pre_secret=secret) + if skip_ceremony: + apply_password_fn(prompt_for_secret("Enter the (new) shared admin password to store", confirm=True)) + else: + secret = ses.get_pseudo_random(256//8) + cli_splitting_ceremony(num_shares, threshold, apply_password_fn, pre_secret=secret) - click.echo("OK. Shared admin key added successfully.") + click.echo("OK. Shared admin key added successfully.") - try: - with open_hsm_session_with_default_admin(ctx) as (conf, ses): - do_it(conf, ses, ctx.obj['devserial']) except yubihsm.exceptions.YubiHsmAuthenticationError as e: click.echo("ERROR: Failed to authenticate with the default admin key.") sys.exit(1) @@ -228,33 +226,32 @@ def apply_password_fn(new_password: str): # --------------- @cmd_hsm.command('make-common-wrap-key') -@click.pass_context -def make_wrap_key(ctx: click.Context): +@pass_common_args +def make_wrap_key(ctx: HsmSecretsCtx): """Set a new wrap key to all YubiHSMs Generate a new wrap key and set it to all configured YubiHSMs. It is used to export/import keys securely between the devices. This requires all the devices in config file to be connected and reachable. """ + hsm_serials = ctx.conf.general.all_devices.keys() + assert len(hsm_serials) > 0, "No devices found in the configuration file." + swear_you_are_on_airgapped_computer() - dev_serials = [] - secret = None - with open_hsm_session_with_default_admin(ctx) as (conf, ses): - dev_serials = conf.general.all_devices.keys() - assert len(dev_serials) > 0, "No devices found in the configuration file." + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: click.echo("Generating secret on master device...") secret = ses.get_pseudo_random(256//8) click.echo("Secret generated. Distributing it to all devices...") click.echo("") - for serial in dev_serials: - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): - hsm_put_wrap_key(ses, serial, conf, conf.admin.wrap_key, secret) + for serial in hsm_serials: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: + hsm_put_wrap_key(ses, serial, ctx.conf, ctx.conf.admin.wrap_key, secret) del secret - click.echo(f"OK. Common wrap key added to all devices (serials: {', '.join(dev_serials)}).") + click.echo(f"OK. Common wrap key added to all devices (serials: {', '.join(hsm_serials)}).") # --------------- @@ -262,8 +259,8 @@ def make_wrap_key(ctx: click.Context): @click.argument('cert_ids', nargs=-1, type=str, metavar='...') @click.option('--alldevs', is_flag=True, help="Delete on all devices") @click.option('--force', is_flag=True, help="Force deletion without confirmation (use with caution)") -@click.pass_context -def delete_object(ctx: click.Context, cert_ids: tuple, alldevs: bool, force: bool): +@pass_common_args +def delete_object(ctx: HsmSecretsCtx, cert_ids: tuple, alldevs: bool, force: bool): """Delete an object from the YubiHSM Deletes an object with the given ID from the YubiHSM. @@ -273,9 +270,9 @@ def delete_object(ctx: click.Context, cert_ids: tuple, alldevs: bool, force: boo With `--force` ALL objects with the given ID will be deleted without confirmation, regardless of their type. """ - dev_serials = ctx.obj['config'].general.all_devices.keys() if alldevs else [ctx.obj['devserial']] - for serial in dev_serials: - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: for id in cert_ids: id_int = int(id.replace('0x', ''), 16) objects = ses.list_objects() @@ -292,10 +289,9 @@ def delete_object(ctx: click.Context, cert_ids: tuple, alldevs: bool, force: boo @cmd_hsm.command('compare-config') @click.option('--alldevs', is_flag=True, help="Compare all devices") -@click.option('--user-auth', is_flag=True, help="Auth with user key instead of default admin key") @click.option('--create', is_flag=True, help="Create missing keys in the YubiHSM") -@click.pass_context -def compare_config(ctx: click.Context, alldevs: bool, user_auth: bool, create: bool): +@pass_common_args +def compare_config(ctx: HsmSecretsCtx, alldevs: bool, create: bool): """Compare config file with device contents Lists all objects by type (auth, wrap, etc.) in the configuration file, and then checks @@ -307,21 +303,17 @@ def compare_config(ctx: click.Context, alldevs: bool, user_auth: bool, create: b If `--create` is given, missing keys will be created in the YubiHSM. It only supports one device at a time, and requires the default admin key. """ + if create and alldevs: + raise click.ClickException("The --create option only supports one device at a time, and requires the default admin key.") - if create: - if alldevs or user_auth: - raise click.ClickException("The --create option only supports one device at a time, and requires the default admin key.") - - conf = ctx.obj['config'] - assert isinstance(conf, HSMConfig) - config_items_per_type, config_to_hsm_type = find_all_config_items_per_type(conf) + assert isinstance(ctx.conf, HSMConfig) + config_items_per_type, config_to_hsm_type = find_all_config_items_per_type(ctx.conf) click.echo("") click.echo("Reading objects from the YubiHSM(s)...") - dev_serials = conf.general.all_devices.keys() if alldevs else [ctx.obj['devserial']] - for serial in dev_serials: - - def do_it(_: HSMConfig, ses: AuthSession): + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: device_objs = list(ses.list_objects()) click.echo("") click.echo(f"--- YubiHSM device {serial} ---") @@ -353,15 +345,15 @@ def do_it(_: HSMConfig, ses: AuthSession): n_skipped += 1 elif isinstance(it, HSMAsymmetricKey): click.echo(f" └-> {gear_emoji} Creating...") - hsm_generate_asymmetric_key(ses, serial, conf, it) + hsm_generate_asymmetric_key(ses, serial, ctx.conf, it) n_created += 1 elif isinstance(it, HSMSymmetricKey): click.echo(f" └-> {gear_emoji} Creating...") - hsm_generate_symmetric_key(ses, serial, conf, it) + hsm_generate_symmetric_key(ses, serial, ctx.conf, it) n_created += 1 elif isinstance(it, HSMHmacKey): click.echo(f" └-> {gear_emoji} Creating...") - hsm_generate_hmac_key(ses, serial, conf, it) + hsm_generate_hmac_key(ses, serial, ctx.conf, it) n_created += 1 else: click.echo(click.style(f" └-> Unsupported object type: {it.__class__.__name__}. This is a bug. SKIPPING.", fg='red')) @@ -380,21 +372,14 @@ def do_it(_: HSMConfig, ses: AuthSession): click.echo("") - if user_auth: - with open_hsm_session_with_yubikey(ctx, device_serial=serial) as (conf, ses): - do_it(conf, ses) - else: - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): - do_it(conf, ses) - # --------------- @cmd_hsm.command('attest-key') -@click.pass_context +@pass_common_args @click.argument('cert_id', required=True, type=str, metavar='') @click.option('--out', '-o', type=click.File('w', encoding='utf8'), help='Output file (default: stdout)', default=click.get_text_stream('stdout')) -def attest_key(ctx: click.Context, cert_id: str, out: click.File): +def attest_key(ctx: HsmSecretsCtx, cert_id: str, out: click.File): """Attest an asymmetric key in the YubiHSM Create an a key attestation certificate, signed by the @@ -403,7 +388,7 @@ def attest_key(ctx: click.Context, cert_id: str, out: click.File): from cryptography.hazmat.primitives.serialization import Encoding id = int(cert_id.replace('0x', ''), 16) - with open_hsm_session_with_default_admin(ctx) as (conf, ses): + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, ctx.hsm_serial) as ses: key = ses.get_object(id, yubihsm.defs.OBJECT.ASYMMETRIC_KEY) assert isinstance(key, yubihsm.objects.AsymmetricKey) if not hsm_obj_exists(key): @@ -416,26 +401,22 @@ def attest_key(ctx: click.Context, cert_id: str, out: click.File): # --------------- @cmd_hsm.command('backup-hsm') -@click.pass_context +@pass_common_args @click.option('--out', '-o', type=click.Path(exists=False, allow_dash=False), required=False, help='Output file', default=None) -def backup_hsm(ctx: click.Context, out: click.File|None): +def backup_hsm(ctx: HsmSecretsCtx, out: click.File|None): """Make a .tar.gz backup of HSM Exports all objects under wrap from the YubiHSM and saves them to a .tar.gz file. The file can be used to restore the objects to the same or another YubiHSM device that has the same wrap key. """ - conf = ctx.obj['config'] - assert isinstance(conf, HSMConfig) - click.echo("") - serial = ctx.obj['devserial'] - click.echo(f"Reading objects from YubiHSM device {serial}...") + click.echo(f"Reading objects from YubiHSM device {ctx.hsm_serial}...") # Open the output file fh = None if out is None: - p = Path(f"yubihsm2-device-{serial}-wrapped-backup.tar.gz") + p = Path(f"yubihsm2-device-{ctx.hsm_serial}-wrapped-backup.tar.gz") if p.exists(): click.confirm(f"File '{p}' already exists. Overwrite?", abort=True) fh = p.open('wb') @@ -445,9 +426,9 @@ def backup_hsm(ctx: click.Context, out: click.File|None): tar = tarfile.open(fileobj=fh, mode='w:gz') skipped = 0 - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, ctx.hsm_serial) as ses: - wrap_key = ses.get_object(conf.admin.wrap_key.id, yubihsm.defs.OBJECT.WRAP_KEY) + wrap_key = ses.get_object(ctx.conf.admin.wrap_key.id, yubihsm.defs.OBJECT.WRAP_KEY) assert isinstance(wrap_key, yubihsm.objects.WrapKey) if not hsm_obj_exists(wrap_key): raise click.ClickException("Configured wrap key not found in the YubiHSM.") @@ -484,10 +465,10 @@ def backup_hsm(ctx: click.Context, out: click.File|None): @cmd_hsm.command('restore-hsm') -@click.pass_context +@pass_common_args @click.argument('backup_file', type=click.Path(exists=True, allow_dash=False), required=True, metavar='') @click.option('--force', is_flag=True, help="Don't ask for confirmation before restoring") -def restore_hsm(ctx: click.Context, backup_file: str, force: bool): +def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool): """Restore a .tar.gz backup to HSM Imports all objects from a .tar.gz backup file to the YubiHSM. @@ -496,19 +477,15 @@ def restore_hsm(ctx: click.Context, backup_file: str, force: bool): The same wrap key must be present in the YubiHSM to restore the objects as they were exported with. """ - conf = ctx.obj['config'] - assert isinstance(conf, HSMConfig) - click.echo("") - serial = ctx.obj['devserial'] if not force: - click.confirm(f"WARNING: This will overwrite existing objects in the YubiHSM device {serial}. Continue?", abort=True) - if serial == conf.general.master_device: + click.confirm(f"WARNING: This will overwrite existing objects in the YubiHSM device {ctx.hsm_serial}. Continue?", abort=True) + if ctx.hsm_serial == ctx.conf.general.master_device: click.confirm("This is the configured master device. Are you ABSOLUTELY sure you want to continue?", abort=True) - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: - wrap_key = ses.get_object(conf.admin.wrap_key.id, yubihsm.defs.OBJECT.WRAP_KEY) + wrap_key = ses.get_object(ctx.conf.admin.wrap_key.id, yubihsm.defs.OBJECT.WRAP_KEY) assert isinstance(wrap_key, yubihsm.objects.WrapKey) if not hsm_obj_exists(wrap_key): raise click.ClickException("Configured wrap key not found in the YubiHSM.") diff --git a/hsm_secrets/main.py b/hsm_secrets/main.py index 9e9e023..d8aaba0 100644 --- a/hsm_secrets/main.py +++ b/hsm_secrets/main.py @@ -9,7 +9,7 @@ from hsm_secrets.passwd import cmd_pass from hsm_secrets.config import HSMConfig, load_hsm_config from hsm_secrets.user import cmd_user -from hsm_secrets.utils import list_yubikey_hsm_creds +from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, list_yubikey_hsm_creds, pass_common_args from hsm_secrets.x509 import cmd_x509 @@ -17,49 +17,98 @@ @click.group(context_settings={'show_default': True, 'help_option_names': ['-h', '--help']}) #@click.option('-d', '--debug', is_flag=True, help="Enable debug mode") -@click.option('-c', '--config', required=True, type=click.Path(), default='hsm-conf.yml', help="Path to configuration file") -@click.option("-y", "--yklabel", required=False, help="Yubikey HSM auth key label") -@click.option("-s", "--devserial", required=False, help="YubiHSM serial number to connect to (default: from config)") +@click.option('-c', '--config', required=False, type=click.Path(), default=None, help="Path to configuration file") +@click.option("-y", "--yklabel", required=False, help="Yubikey HSM auth key label (default: first slot)") +@click.option("-s", "--hsmserial", required=False, help="YubiHSM serial number to connect to (default: get master from config)") +@click.option("--auth-yubikey", required=False, is_flag=True, help="Use Yubikey HSM auth key for HSM login") +@click.option("--auth-default-admin", required=False, is_flag=True, help="Use default auth key for HSM login") +@click.option("--auth-password-id", required=False, type=str, help="Auth key ID (hex) to login with password from env HSM_PASSWORD") @click.version_option() @click.pass_context -def cli(ctx: click.Context, config: str, yklabel: str|None, devserial: str|None): - """Centralized secret management tool for YubiHSM2.""" +def cli(ctx: click.Context, config: str|None, yklabel: str|None, hsmserial: str|None, + auth_default_admin: str|None, auth_yubikey: str|None, auth_password_id: str|None): + """Config file driven secret management tool for YubiHSM2 devices. - yk_label = yklabel - if not yk_label: + Unless --config is specified, configuration file will be searched first + from the environment variable HSM_SECRETS_CONFIG, then from the current + directory, and finally from the user's home directory. + + Default HSM login method depends on the command (most use yubikey), + but can be overridden with the --auth-* options: + + --auth-yubikey: Use Yubikey HSM auth key for HSM login. If --yklabel + is not not specified, the first hsmauth label on the Yubikey will be used. + + --auth-default-admin: Use insecure default auth key (see config). + + --auth-password-id : Use password from environment variable + HSM_PASSWORD with the specified auth key ID (hex). + """ + + # Use config file from env var or default locations if not specified + if not config: + env_var = "HSM_SECRETS_CONFIG" + default_paths = ["./hsm-conf.yml", "~/.hsm-conf.yml"] + config = os.getenv(env_var) + if not config: + for alt in default_paths: + if not config and os.path.exists(os.path.expanduser(alt)): + config = alt + if not config: + raise click.UsageError(f"No configuration file found in env or {str(default_paths)}. Please specify a config file with -c/--config or set the {env_var} environment variable.") + + conf = load_hsm_config(config) + assert conf.general.master_device, "No master YubiHSM serial specified in config file." + + # Get first Yubikey HSM auth key label from device if not specified + yubikey_label = yklabel + if not yubikey_label: creds = list_yubikey_hsm_creds() if not creds: click.echo("Note: No Yubikey HSM credentials found, Yubikey auth will be disabled.", err=True) - yk_label = "" + yubikey_label = "" else: - yk_label = creds[0].label - - conf = load_hsm_config(config) - assert conf.general.master_device, "No master YubiHSM serial specified in config file." + yubikey_label = creds[0].label + echo("Yubikey hsmauth label (using first slot): " + click.style(yubikey_label, fg='cyan')) ctx.obj = { # 'debug': debug, - 'yk_label': yk_label, + 'yubikey_label': yubikey_label, 'config': conf, - 'devserial': devserial or conf.general.master_device + 'hsmserial': hsmserial or conf.general.master_device, + 'forced_auth_method': None, + 'auth_password_id': int(auth_password_id.replace('0x', ''), 16) if auth_password_id else None, + 'auth_password': os.getenv("HSM_PASSWORD", None), } - if yk_label: - echo("Yubikey hsmauth label: " + click.style(yk_label, fg='cyan')) - echo("") + # Check for forced auth method + if sum([(1 if x else 0) for x in [auth_default_admin, auth_yubikey, auth_password_id]]) > 1: + raise click.UsageError("Only one forced auth method can be specified.") + if auth_default_admin: + ctx.obj['forced_auth_method'] = HSMAuthMethod.DEFAULT_ADMIN + elif auth_yubikey: + ctx.obj['forced_auth_method'] = HSMAuthMethod.YUBIKEY + elif auth_password_id: + ctx.obj['forced_auth_method'] = HSMAuthMethod.PASSWORD + if not ctx.obj['auth_password']: + raise click.UsageError("HSM_PASSWORD environment variable not set for password auth method.") + if not ctx.obj['auth_password_id']: + raise click.UsageError("Auth key ID not specified for password auth method.") + + echo("") @click.command('nop', short_help='Validate config and exit.') -@click.pass_context -def cmd_nop(ctx): +@pass_common_args +def cmd_nop(ctx: HsmSecretsCtx): echo("No errors. Exiting.") -cli.add_command(cmd_ssh, "ssh") -cli.add_command(cmd_tls, "tls") +cli.add_command(cmd_ssh, "ssh") +cli.add_command(cmd_tls, "tls") cli.add_command(cmd_pass, "pass") -cli.add_command(cmd_hsm, "hsm") -cli.add_command(cmd_nop, "nop") +cli.add_command(cmd_hsm, "hsm") +cli.add_command(cmd_nop, "nop") cli.add_command(cmd_x509, "x509") cli.add_command(cmd_user, "user") diff --git a/hsm_secrets/passwd/__init__.py b/hsm_secrets/passwd/__init__.py index dbb32a9..215b48e 100644 --- a/hsm_secrets/passwd/__init__.py +++ b/hsm_secrets/passwd/__init__.py @@ -7,22 +7,22 @@ from mnemonic import Mnemonic from hsm_secrets.config import HSMConfig, PasswordDerivationRule, PwRotationToken, find_config_items_of_class -from hsm_secrets.utils import click_echo_colored_commands, group_by_4, hsm_obj_exists, open_hsm_session_with_yubikey, secure_display_secret +from hsm_secrets.utils import HsmSecretsCtx, click_echo_colored_commands, group_by_4, hsm_obj_exists, open_hsm_session, open_hsm_session_with_yubikey, pass_common_args, secure_display_secret @click.group() @click.pass_context -def cmd_pass(ctx): +def cmd_pass(ctx: click.Context): """Password derivation""" ctx.ensure_object(dict) @cmd_pass.command('get') -@click.pass_context +@pass_common_args @click.argument('name', required=True, type=str, metavar='') @click.option('--prev', '-p', required=False, type=int, help="Previous password index (default: 0)", default=0) @click.option('--rule', '-r', required=False, type=str, help="Derivation rule to use (default: read from config)", default=None) -def get_password(ctx: click.Context, name: str, prev: int, rule: str|None): +def get_password(ctx: HsmSecretsCtx, name: str, prev: int, rule: str|None): """Get password for given name Shows current password by default, or previous password if --prev is specified. For example, @@ -36,15 +36,14 @@ def get_password(ctx: click.Context, name: str, prev: int, rule: str|None): Before the first rotation, nonce is an empty string. """ - conf: HSMConfig = ctx.obj['config'] - rule_id = rule or str(conf.password_derivation.default_rule) + rule_id = rule or str(ctx.conf.password_derivation.default_rule) if prev < 0: raise click.ClickException(f"Invalid previous password index: {prev}") - rule_def, key_id = _find_rule_and_key(conf, rule_id) + rule_def, key_id = _find_rule_and_key(ctx.conf, rule_id) - with open_hsm_session_with_yubikey(ctx) as (conf, ses): + with open_hsm_session(ctx) as ses: obj = ses.get_object(key_id, yubihsm.defs.OBJECT.HMAC_KEY) assert isinstance(obj, HmacKey) if not hsm_obj_exists(obj): @@ -80,26 +79,25 @@ def get_password(ctx: click.Context, name: str, prev: int, rule: str|None): @cmd_pass.command('rotate') -@click.pass_context +@pass_common_args @click.argument('name', required=False, nargs=-1, type=str, metavar='[name] ...', default=None) @click.option('--rule', '-r', required=False, type=str, help="Derivation rule to use (default: read from config)", default=None) @click.option('--all', '-a', required=False, is_flag=True, help="Rotate all passwords") -def rotate_password(ctx: click.Context, name: list[str]|None, rule: str|None, all: bool): +def rotate_password(ctx: HsmSecretsCtx, name: list[str]|None, rule: str|None, all: bool): """Rotate password(s) for given name(s) Rotates the password for the given name(s) or all names if --all is specified. """ - conf: HSMConfig = ctx.obj['config'] - rule_id = rule or str(conf.password_derivation.default_rule) + rule_id = rule or str(ctx.conf.password_derivation.default_rule) if all and name: raise click.ClickException("Cannot specify both --all and specific names.") if not all and not name: raise click.ClickException("Must specify either --all or at least one name.") - _, key_id = _find_rule_and_key(conf, rule_id) + _, key_id = _find_rule_and_key(ctx.conf, rule_id) - with open_hsm_session_with_yubikey(ctx) as (conf, ses): + with open_hsm_session(ctx) as ses: obj = ses.get_object(key_id, yubihsm.defs.OBJECT.HMAC_KEY) assert isinstance(obj, HmacKey) if not hsm_obj_exists(obj): diff --git a/hsm_secrets/ssh/__init__.py b/hsm_secrets/ssh/__init__.py index f759c7b..e5e9b1f 100644 --- a/hsm_secrets/ssh/__init__.py +++ b/hsm_secrets/ssh/__init__.py @@ -1,19 +1,15 @@ -from base64 import b64encode -from math import floor from pathlib import Path -import struct from textwrap import dedent import time from typing import Sequence import click from hsm_secrets.config import HSMConfig -from hsm_secrets.ssh.openssh.ssh_certificate import ExtensionLabelType -from hsm_secrets.ssh.ssh_utils import create_request, create_template -from hsm_secrets.utils import click_echo_colored_commands, encode_algorithm, encode_capabilities, hsm_generate_asymmetric_key, open_hsm_session_with_yubikey -from yubihsm.objects import YhsmObject, AsymmetricKey, Template -from cryptography.hazmat.primitives import (_serialization, serialization) -import yubihsm.defs +from hsm_secrets.utils import HsmSecretsCtx, click_echo_colored_commands, open_hsm_session, pass_common_args +from cryptography.hazmat.primitives import _serialization + +import yubihsm.defs # type: ignore [import] +from yubihsm.objects import AsymmetricKey # type: ignore [import] @click.group() @@ -24,14 +20,12 @@ def cmd_ssh(ctx: click.Context): @cmd_ssh.command('get-ca') -@click.pass_context +@pass_common_args @click.option('--all', '-a', 'get_all', is_flag=True, help="Get all certificates") @click.argument('cert_ids', nargs=-1, type=str, metavar='...') -def get_ca(ctx: click.Context, get_all: bool, cert_ids: Sequence[str]): +def get_ca(ctx: HsmSecretsCtx, get_all: bool, cert_ids: Sequence[str]): """Get the public keys of the SSH CA keys""" - conf: HSMConfig = ctx.obj['config'] - - all_ids = set([str(ca.id) for ca in conf.ssh.root_ca_keys]) + all_ids = set([str(ca.id) for ca in ctx.conf.ssh.root_ca_keys]) selected_ids = all_ids if get_all else set(cert_ids) if not selected_ids: @@ -40,13 +34,13 @@ def get_ca(ctx: click.Context, get_all: bool, cert_ids: Sequence[str]): if len(selected_ids - all_ids) > 0: raise ValueError(f"Unknown CA key IDs: {selected_ids - all_ids}") - selected_keys = [ca for ca in conf.ssh.root_ca_keys if str(ca.id) in selected_ids] + selected_keys = [ca for ca in ctx.conf.ssh.root_ca_keys if str(ca.id) in selected_ids] if not selected_keys: click.echo("No CA keys selected") return - with open_hsm_session_with_yubikey(ctx) as (conf, ses): + with open_hsm_session(ctx) as ses: for key in selected_keys: obj = ses.get_object(key.id, yubihsm.defs.OBJECT.ASYMMETRIC_KEY) assert isinstance(obj, AsymmetricKey) @@ -63,8 +57,8 @@ def get_ca(ctx: click.Context, get_all: bool, cert_ids: Sequence[str]): @click.option('--principals', '-p', required=False, help="Comma-separated list of principals", default='') @click.option('--extentions', '-e', help="Comma-separated list of SSH extensions", default='permit-X11-forwarding,permit-agent-forwarding,permit-port-forwarding,permit-pty,permit-user-rc') @click.argument('keyfile', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), default='-') -@click.pass_context -def sign_key(ctx: click.Context, out: str, ca: str|None, username: str|None, certid: str|None, validity: int, principals: str, extentions: str, keyfile: str): +@pass_common_args +def sign_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None, certid: str|None, validity: int, principals: str, extentions: str, keyfile: str): """Make and sign an SSH user certificate [keyfile]: file containing the public key to sign (default: stdin) @@ -82,10 +76,9 @@ def sign_key(ctx: click.Context, out: str, ca: str|None, username: str|None, cer from hsm_secrets.ssh.openssh.ssh_certificate import cert_for_ssh_pub_id, str_to_extension from hsm_secrets.key_adapters import make_private_key_adapter - conf: HSMConfig = ctx.obj['config'] - ca_key_id = int(ca.replace('0x',''), 16) if ca else conf.ssh.default_ca + ca_key_id = int(ca.replace('0x',''), 16) if ca else ctx.conf.ssh.default_ca - ca_def = [c for c in conf.ssh.root_ca_keys if c.id == ca_key_id] + ca_def = [c for c in ctx.conf.ssh.root_ca_keys if c.id == ca_key_id] if not ca_def: raise ValueError(f"CA key 0x{ca_key_id:04x} not found in config") @@ -137,7 +130,7 @@ def sign_key(ctx: click.Context, out: str, ca: str|None, username: str|None, cer path = str(p) # Sign & write out - with open_hsm_session_with_yubikey(ctx) as (conf, ses): + with open_hsm_session(ctx) as ses: obj = ses.get_object(ca_key_id, yubihsm.defs.OBJECT.ASYMMETRIC_KEY) assert isinstance(obj, AsymmetricKey) diff --git a/hsm_secrets/tls/__init__.py b/hsm_secrets/tls/__init__.py index 6e17949..2333ebc 100644 --- a/hsm_secrets/tls/__init__.py +++ b/hsm_secrets/tls/__init__.py @@ -7,24 +7,24 @@ from cryptography.hazmat.primitives import hashes import cryptography.x509 -import yubihsm -import yubihsm.defs -import yubihsm.objects +import yubihsm # type: ignore [import] +import yubihsm.defs # type: ignore [import] +import yubihsm.objects # type: ignore [import] -from hsm_secrets.config import HSMConfig, X509Cert, X509CertAttribs, X509Info +from hsm_secrets.config import X509CertAttribs, X509Info from hsm_secrets.key_adapters import PrivateKey, make_private_key_adapter -from hsm_secrets.utils import click_echo_colored_commands, hsm_obj_exists, open_hsm_session_with_yubikey +from hsm_secrets.utils import HsmSecretsCtx, click_echo_colored_commands, hsm_obj_exists, open_hsm_session, open_hsm_session_with_yubikey, pass_common_args from hsm_secrets.x509.cert_builder import X509CertBuilder from hsm_secrets.x509.def_utils import find_cert_def, merge_x509_info_with_defaults @click.group() @click.pass_context -def cmd_tls(ctx): +def cmd_tls(ctx: click.Context): """TLS certificate commands""" ctx.ensure_object(dict) @cmd_tls.command('make-server-cert') -@click.pass_context +@pass_common_args @click.option('--out', '-o', required=True, type=click.Path(exists=False, dir_okay=False, resolve_path=True), help="Output filename") @click.option('--common-name', '-c', required=True, help="CN, e.g. public DNS name") @click.option('--san-dns', '-d', multiple=True, help="DNS SAN (Subject Alternative Name)") @@ -32,7 +32,7 @@ def cmd_tls(ctx): @click.option('--validity', '-v', default=365, help="Validity period in days") @click.option('--keyfmt', '-f', type=click.Choice(['rsa4096', 'ed25519', 'ecp256', 'ecp384']), default='ecp384', help="Key format") @click.option('--sign-crt', '-s', type=str, required=False, help="CA ID (hex) to sign with, or 'self'. Default: use config", default=None) -def new_http_server_cert(ctx: click.Context, out: click.Path, common_name: str, san_dns: list[str], san_ip: list[str], validity: int, keyfmt: str, sign_crt: str): +def new_http_server_cert(ctx: HsmSecretsCtx, out: click.Path, common_name: str, san_dns: list[str], san_ip: list[str], validity: int, keyfmt: str, sign_crt: str): """Create a TLS server certificate + key Create a new TLS server certificate for the given CN and (optional) SANs. @@ -44,14 +44,12 @@ def new_http_server_cert(ctx: click.Context, out: click.Path, common_name: str, The --out option is used as a base filename, and the key, csr, and cert files written with the extensions '.key.pem', '.csr.pem', and '.cer.pem' respectively. """ - conf: HSMConfig = ctx.obj['config'] - # Find the issuer CA definition issuer_x509_def = None issuer_cert_id = -1 if (sign_crt or '').strip().lower() != 'self': - issuer_cert_id = int(sign_crt.replace('0x',''), 16) if sign_crt else conf.tls.default_ca_id - issuer_x509_def = find_cert_def(conf, issuer_cert_id) + issuer_cert_id = int(sign_crt.replace('0x',''), 16) if sign_crt else ctx.conf.tls.default_ca_id + issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_id) assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_id:04x}" info = X509Info() @@ -67,7 +65,7 @@ def new_http_server_cert(ctx: click.Context, out: click.Path, common_name: str, for n in san_ip or []: info.attribs.subject_alt_names['ip'].append(n) - merged_info = merge_x509_info_with_defaults(info, conf) + merged_info = merge_x509_info_with_defaults(info, ctx.conf) merged_info.path_len = None merged_info.ca = False @@ -97,11 +95,11 @@ def new_http_server_cert(ctx: click.Context, out: click.Path, common_name: str, if chain_file.exists(): click.confirm(f"Chain file {chain_file} already exists. Overwrite?", abort=True) - builder = X509CertBuilder(conf, merged_info, priv_key) + builder = X509CertBuilder(ctx.conf, merged_info, priv_key) issuer_cert = None if issuer_x509_def: assert issuer_cert_id >= 0 - with open_hsm_session_with_yubikey(ctx) as (conf, ses): + with open_hsm_session(ctx) as ses: ca_cert_obj = ses.get_object(issuer_cert_id, yubihsm.defs.OBJECT.OPAQUE) assert isinstance(ca_cert_obj, yubihsm.objects.Opaque) @@ -144,19 +142,17 @@ def new_http_server_cert(ctx: click.Context, out: click.Path, common_name: str, # ----- Sign CSR ----- @cmd_tls.command('sign-csr') -@click.pass_context +@pass_common_args @click.argument('csr', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), default='-', required=True, metavar='') @click.option('--out', '-o', required=False, type=click.Path(exists=False, dir_okay=False, resolve_path=True), help="Output filename (default: deduce from input)", default=None) @click.option('--ca', '-c', type=str, required=False, help="CA ID (hex) to sign with. Default: use config", default=None) @click.option('--validity', '-v', default=365, help="Validity period in days") -def sign_csr(ctx: click.Context, csr: click.Path, out: click.Path|None, ca: str|None, validity: int): +def sign_csr(ctx: HsmSecretsCtx, csr: click.Path, out: click.Path|None, ca: str|None, validity: int): """Sign a CSR with a CA key Sign a Certificate Signing Request (CSR) with a CA key from the HSM. The output is a signed certificate in PEM format. """ - conf: HSMConfig = ctx.obj['config'] - if csr == '-': click.echo("Reading CSR from stdin...") csr_path = Path('-') @@ -168,8 +164,8 @@ def sign_csr(ctx: click.Context, csr: click.Path, out: click.Path|None, ca: str| csr_obj = cryptography.x509.load_pem_x509_csr(csr_data) # Find the issuer CA definition - issuer_cert_id = int(ca.replace('0x',''), 16) if ca else conf.tls.default_ca_id - issuer_x509_def = find_cert_def(conf, issuer_cert_id) + issuer_cert_id = int(ca.replace('0x',''), 16) if ca else ctx.conf.tls.default_ca_id + issuer_x509_def = find_cert_def(ctx.conf, issuer_cert_id) assert issuer_x509_def, f"CA cert ID not found: 0x{issuer_cert_id:04x}" if out: @@ -179,7 +175,7 @@ def sign_csr(ctx: click.Context, csr: click.Path, out: click.Path|None, ca: str| if out_path.exists(): click.confirm(f"Output file '{out_path}' already exists. Overwrite?", abort=True) - with open_hsm_session_with_yubikey(ctx) as (conf, ses): + with open_hsm_session(ctx) as ses: ca_cert_obj = ses.get_object(issuer_cert_id, yubihsm.defs.OBJECT.OPAQUE) assert isinstance(ca_cert_obj, yubihsm.objects.Opaque) assert hsm_obj_exists(ca_cert_obj), f"CA cert ID not found on HSM: 0x{issuer_cert_id:04x}" diff --git a/hsm_secrets/user/__init__.py b/hsm_secrets/user/__init__.py index 6faa093..6a687bd 100644 --- a/hsm_secrets/user/__init__.py +++ b/hsm_secrets/user/__init__.py @@ -1,26 +1,24 @@ -import base64 import re import secrets import click -from hsm_secrets.config import HSMConfig -from hsm_secrets.utils import confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, hsm_obj_exists, hsm_put_derived_auth_key, hsm_put_symmetric_auth_key, open_hsm_session_with_default_admin, open_hsm_session_with_yubikey, prompt_for_secret, pw_check_fromhex, secure_display_secret +from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, hsm_put_derived_auth_key, hsm_put_symmetric_auth_key, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, secure_display_secret import yubikit.hsmauth import ykman.scripting -import yubihsm.defs, yubihsm.objects +import yubihsm.defs, yubihsm.objects # type: ignore [import] @click.group() @click.pass_context -def cmd_user(ctx): +def cmd_user(ctx: click.Context): """HSM user management commands""" ctx.ensure_object(dict) # --------------- @cmd_user.command('change-yubikey-mgt') -@click.pass_context -def change_yubikey_mgt(ctx: click.Context): +@pass_common_args +def change_yubikey_mgt(ctx: HsmSecretsCtx): """Change hsmauth mgt key on a Yubikey Set a new Management Key (aka. Admin Access Code) for currently connected @@ -37,17 +35,16 @@ def change_yubikey_mgt(ctx: click.Context): # --------------- @cmd_user.command('add-user-yubikey') -@click.pass_context +@pass_common_args @click.option('--label', required=True, help="Label of the Yubikey hsmauth slot / HSM key label") @click.option('--alldevs', is_flag=True, help="Add to all devices") -def add_user_yubikey(ctx: click.Context, label: str, alldevs: bool): +def add_user_yubikey(ctx: HsmSecretsCtx, label: str, alldevs: bool): """Register Yubikey auth for a user Generate a new password-protected public auth key, and store it in the YubiHSM(s) as a user key. The same label will be used on both the Yubikey and the YubiHSM. """ - conf: HSMConfig = ctx.obj['config'] - user_key_configs = [uk for uk in conf.user_keys if uk.label == label] + user_key_configs = [uk for uk in ctx.conf.user_keys if uk.label == label] if not user_key_configs: raise click.ClickException(f"User key with label '{label}' not found in the configuration file.") elif len(user_key_configs) > 1: @@ -89,7 +86,7 @@ def add_user_yubikey(ctx: click.Context, label: str, alldevs: bool): click.echo("Generating symmetric key for the slot...") key_enc, key_mac = None, None - with open_hsm_session_with_default_admin(ctx) as (conf, ses): + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: key_enc = ses.get_pseudo_random(128//8) key_mac = ses.get_pseudo_random(128//8) @@ -105,12 +102,12 @@ def add_user_yubikey(ctx: click.Context, label: str, alldevs: bool): click.echo(f"Auth key added to the Yubikey (serial {yubikey.info.serial}) hsmauth slot '{cred.label}' (type: {repr(cred.algorithm)})") # Store it in the YubiHSMs - dev_serials = conf.general.all_devices.keys() if alldevs else [ctx.obj['devserial']] - for serial in dev_serials: - with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses): - hsm_put_symmetric_auth_key(ses, serial, conf, user_key_conf, key_enc, key_mac) + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: + hsm_put_symmetric_auth_key(ses, serial, ctx.conf, user_key_conf, key_enc, key_mac) - click.echo("OK. User key added" + (f" to all devices (serials: {', '.join(dev_serials)})" if alldevs else "") + ".") + click.echo("OK. User key added" + (f" to all devices (serials: {', '.join(hsm_serials)})" if alldevs else "") + ".") click.echo("") click.echo("TIP! Test with the `list-objects` command to check that Yubikey hsmauth method works correctly.") @@ -127,11 +124,11 @@ def add_user_yubikey(ctx: click.Context, label: str, alldevs: bool): # --------------- @cmd_user.command('add-service-account') -@click.pass_context +@pass_common_args @click.argument('cert_ids', nargs=-1, type=str, metavar='...') @click.option('--all', '-a', 'all_accts', is_flag=True, help="Add all configured service users") @click.option('--askpw', is_flag=True, help="Ask for password(s) instead of generating") -def add_service_account(ctx: click.Context, cert_ids: tuple[str], all_accts: bool, askpw: bool): +def add_service_account(ctx: HsmSecretsCtx, cert_ids: tuple[str], all_accts: bool, askpw: bool): """Add a service user(s) to master device Cert IDs are 16-bit hex values (e.g. '0x12af' or '12af'). @@ -141,32 +138,29 @@ def add_service_account(ctx: click.Context, cert_ids: tuple[str], all_accts: boo The command will generate (and show) passwords by default. Use the --askpw to be prompted for passwords instead. """ - conf: HSMConfig = ctx.obj['config'] - dev_serial = ctx.obj['devserial'] - if not all_accts and not cert_ids: raise click.ClickException("No service users specified for addition.") - id_strings = [str(x.id) for x in conf.service_keys] if all_accts else cert_ids + id_strings = [str(x.id) for x in ctx.conf.service_keys] if all_accts else cert_ids ids = [int(id.replace("0x", ""), 16) for id in id_strings] if not ids: raise click.ClickException("No service account ids specified.") - acct_defs = [x for x in conf.service_keys if x.id in ids] + acct_defs = [x for x in ctx.conf.service_keys if x.id in ids] if len(acct_defs) != len(ids): unknown_ids = [f'0x{i:04x}' for i in (set(ids) - set([x.id for x in acct_defs]))] raise click.ClickException(f"Service user ID(s) {', '.join(unknown_ids)} not found in the configuration file.") for ad in acct_defs: - with open_hsm_session_with_default_admin(ctx) as (conf, ses): + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: obj = ses.get_object(ad.id, yubihsm.defs.OBJECT.AUTHENTICATION_KEY) assert isinstance(obj, yubihsm.objects.AuthenticationKey) - if not confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, obj, abort=False): + if not confirm_and_delete_old_yubihsm_object_if_exists(ctx.hsm_serial, obj, abort=False): click.echo(f"Skipping service user '{ad.label}' (ID: 0x{ad.id:04x})...") continue - click.echo(f"Adding service user '{ad.label}' (ID: 0x{ad.id:04x}) to device {dev_serial}...") + click.echo(f"Adding service user '{ad.label}' (ID: 0x{ad.id:04x}) to device {ctx.hsm_serial}...") if askpw: pw = prompt_for_secret(f"Enter password for service user '{ad.label}'", confirm=True) else: @@ -181,7 +175,7 @@ def add_service_account(ctx: click.Context, cert_ids: tuple[str], all_accts: boo continue else: break - hsm_put_derived_auth_key(ses, dev_serial, conf, ad, pw) + hsm_put_derived_auth_key(ses, ctx.hsm_serial, ctx.conf, ad, pw) # --------------- diff --git a/hsm_secrets/utils.py b/hsm_secrets/utils.py index dd6d457..dd0cfe3 100644 --- a/hsm_secrets/utils.py +++ b/hsm_secrets/utils.py @@ -1,23 +1,60 @@ +from dataclasses import dataclass +from enum import Enum import os -from typing import Callable, Optional, Sequence -from contextlib import contextmanager +from typing import Callable, Generator, Optional, Sequence +from contextlib import _GeneratorContextManager, contextmanager from click import echo import click -from yubihsm import YubiHsm # type: ignore -from yubihsm.core import AuthSession -from yubihsm.defs import CAPABILITY, ALGORITHM, ERROR, OBJECT -from yubihsm.objects import AsymmetricKey, HmacKey, SymmetricKey, WrapKey, YhsmObject, AuthenticationKey +from yubihsm import YubiHsm # type: ignore [import] +from yubihsm.core import AuthSession # type: ignore [import] +from yubihsm.defs import CAPABILITY, ALGORITHM, ERROR, OBJECT # type: ignore [import] +from yubihsm.objects import AsymmetricKey, HmacKey, SymmetricKey, WrapKey, YhsmObject, AuthenticationKey # type: ignore [import] +from yubikit.hsmauth import HsmAuthSession # type: ignore [import] +from yubihsm.exceptions import YubiHsmDeviceError # type: ignore [import] -from yubikit.hsmauth import HsmAuthSession #, DEFAULT_MANAGEMENT_KEY -from yubihsm.exceptions import YubiHsmDeviceError from ykman import scripting import yubikit.core import yubikit.hsmauth as hsmauth import hsm_secrets.config as hscfg import curses +import click + +from functools import wraps + +class HSMAuthMethod(Enum): + YUBIKEY = 1 + DEFAULT_ADMIN = 2 + PASSWORD = 3 + + +@dataclass +class HsmSecretsCtx: + click_ctx: click.Context + conf: hscfg.HSMConfig + hsm_serial: str + yubikey_label: str + forced_auth_method: Optional[HSMAuthMethod] = None + auth_password: Optional[str] = None + auth_password_id: Optional[int] = None + + +def pass_common_args(f): + @wraps(f) + def wrapper(*args, **kwargs): + click_ctx = click.get_current_context() + ctx = HsmSecretsCtx( + click_ctx = click_ctx, + conf = click_ctx.obj['config'], + hsm_serial = click_ctx.obj.get('hsmserial'), + yubikey_label = click_ctx.obj.get('yubikey_label'), + forced_auth_method = click_ctx.obj.get('forced_auth_method'), + auth_password = click_ctx.obj.get('auth_password'), + auth_password_id = click_ctx.obj.get('auth_password_id')) + return f(ctx, *args, **kwargs) + return wrapper def pw_check_fromhex(pw: str) -> str|None: @@ -144,42 +181,59 @@ def verify_hsm_device_info(device_serial, hsm): if int(device_serial) != int(info.serial): raise ValueError(f"Device serial mismatch! Connected='{hsm.serial}', Expected='{device_serial}'") +@contextmanager +def open_hsm_session( + ctx: HsmSecretsCtx, + default_auth_method: HSMAuthMethod = HSMAuthMethod.YUBIKEY, + device_serial: str | None = None) -> Generator[AuthSession, None, None]: + """ + Open a session to the HSM using forced or given default auth method. + This is an auto-selecting wrapper for the specific session context managers. + """ + auth_method = ctx.forced_auth_method or default_auth_method + device_serial = device_serial or ctx.hsm_serial + + if auth_method == HSMAuthMethod.YUBIKEY: + ctxman = open_hsm_session_with_yubikey(ctx, device_serial) + elif auth_method == HSMAuthMethod.DEFAULT_ADMIN: + ctxman = open_hsm_session_with_default_admin(ctx, device_serial) + elif auth_method == HSMAuthMethod.PASSWORD: + assert device_serial, "HSM device serial not provided nor inferred. Cannot use shared secret auth." + assert ctx.auth_password and ctx.auth_password_id + ctxman = open_hsm_session_with_password(ctx, ctx.auth_password_id, ctx.auth_password, device_serial) + else: + raise ValueError(f"Unknown auth method: {auth_method}") + with ctxman as session: + yield session + @contextmanager -def open_hsm_session_with_yubikey(ctx: click.Context, device_serial: str|None = None): +def open_hsm_session_with_yubikey(ctx: HsmSecretsCtx, device_serial: str|None = None) -> Generator[AuthSession, None, None]: """ Open a session to the HSM using the first YubiKey found, and authenticate with the YubiKey HSM auth label. - - Args: - ctx (click.Context): The Click context object """ - conf: hscfg.HSMConfig = ctx.obj['config'] - device_serial = device_serial or ctx.obj['devserial'] + device_serial = device_serial or ctx.hsm_serial passwd = os.environ.get('YUBIKEY_PASSWORD', None) if passwd: echo("Using YubiKey password from environment variable.") - session = connect_hsm_and_auth_with_yubikey(conf, ctx.obj['yk_label'], device_serial, passwd) + session = connect_hsm_and_auth_with_yubikey(ctx.conf, ctx.yubikey_label, device_serial, passwd) try: - yield conf, session + yield session finally: session.close() @contextmanager -def open_hsm_session_with_default_admin(ctx: click.Context, device_serial: str|None = None): +def open_hsm_session_with_default_admin(ctx: HsmSecretsCtx, device_serial: str|None = None) -> Generator[AuthSession, None, None]: """ Open a session to the HSM using the first YubiKey found, and authenticate with the YubiKey HSM auth label. - - Args: - ctx (click.Context): The Click context object """ - conf: hscfg.HSMConfig = ctx.obj['config'] - device_serial = device_serial or ctx.obj['devserial'] + device_serial = device_serial or ctx.hsm_serial assert device_serial, "HSM device serial not provided nor inferred." click.echo(click.style(f"Using insecure default admin key to auth on YubiHSM2 {device_serial}.", fg='magenta')) - connector_url = conf.general.all_devices.get(device_serial) + connector_url = ctx.conf.general.all_devices.get(device_serial) if not connector_url: raise ValueError(f"Device serial '{device_serial}' not found in config file.") @@ -189,16 +243,16 @@ def open_hsm_session_with_default_admin(ctx: click.Context, device_serial: str|N session = None try: - session = hsm.create_session_derived(conf.admin.default_admin_key.id, conf.admin.default_admin_password) + session = hsm.create_session_derived(ctx.conf.admin.default_admin_key.id, ctx.conf.admin.default_admin_password) click.echo(click.style(f"HSM session {session.sid} started.", fg='magenta')) except YubiHsmDeviceError as e: if e.code == ERROR.OBJECT_NOT_FOUND: - echo(click.style(f"Default admin key '0x{conf.admin.default_admin_key.id:04x}' not found. Aborting.", fg='red')) + echo(click.style(f"Default admin key '0x{ctx.conf.admin.default_admin_key.id:04x}' not found. Aborting.", fg='red')) exit(1) raise try: - yield conf, session + yield session finally: click.echo(click.style(f"Closing HSM session {session.sid}.", fg='magenta')) session.close() @@ -206,29 +260,24 @@ def open_hsm_session_with_default_admin(ctx: click.Context, device_serial: str|N @contextmanager -def open_hsm_session_with_shared_admin(ctx: click.Context, password: str, device_serial: str|None = None): +def open_hsm_session_with_password(ctx: HsmSecretsCtx, auth_key_id: int, password: str, device_serial: str|None = None) -> Generator[AuthSession, None, None]: """ - Open a session to the HSM using a share admin password (either reconstructed from shares or from backup). - Args: - ctx (click.Context): The Click context object + Open a session to the HSM using a password-derived auth key. """ - conf: hscfg.HSMConfig = ctx.obj['config'] - - device_serial = device_serial or ctx.obj['devserial'] + device_serial = device_serial or ctx.hsm_serial assert device_serial, "HSM device serial not provided nor inferred." - connector_url = conf.general.all_devices.get(device_serial) + connector_url = ctx.conf.general.all_devices.get(device_serial) if not connector_url: raise ValueError(f"Device serial '{device_serial}' not found in config file.") hsm = YubiHsm.connect(connector_url) verify_hsm_device_info(device_serial, hsm) - key = conf.admin.shared_admin_key.id - click.echo(f"Using shared admin key ID 0x{key:04x}") - session = hsm.create_session_derived(key, password) + click.echo(f"Using password login with key ID 0x{auth_key_id:04x}") + session = hsm.create_session_derived(auth_key_id, password) try: - yield conf, session + yield session finally: session.close() hsm.close() @@ -241,13 +290,13 @@ def encode_algorithm(name_literal: str|hscfg.AsymmetricAlgorithm) -> ALGORITHM: return hscfg.HSMConfig.algorithm_from_name(name_literal) # type: ignore -def hsm_put_wrap_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMWrapKey, key: bytes) -> WrapKey: +def hsm_put_wrap_key(ses: AuthSession, hsm_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMWrapKey, key: bytes) -> WrapKey: """ Put a (symmetric) wrap key into the HSM. """ wrap_key = ses.get_object(key_def.id, OBJECT.WRAP_KEY) assert isinstance(wrap_key, WrapKey) - confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, wrap_key) + confirm_and_delete_old_yubihsm_object_if_exists(hsm_serial, wrap_key) res = wrap_key.put( session = ses, object_id = key_def.id, @@ -257,17 +306,17 @@ def hsm_put_wrap_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConfig, k capabilities = conf.capability_from_names(set(key_def.capabilities)), delegated_capabilities = conf.capability_from_names(set(key_def.delegated_capabilities)), key = key) - click.echo(f"Wrap key ID '{hex(res.id)}' stored in YubiHSM device {dev_serial}") + click.echo(f"Wrap key ID '{hex(res.id)}' stored in YubiHSM device {hsm_serial}") return res -def hsm_put_derived_auth_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMAuthKey, password: str) -> AuthenticationKey: +def hsm_put_derived_auth_key(ses: AuthSession, hsm_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMAuthKey, password: str) -> AuthenticationKey: """ Put a password-derived authentication key into the HSM. """ auth_key = ses.get_object(key_def.id, OBJECT.AUTHENTICATION_KEY) assert isinstance(auth_key, AuthenticationKey) - confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, auth_key) + confirm_and_delete_old_yubihsm_object_if_exists(hsm_serial, auth_key) res = auth_key.put_derived( session = ses, object_id = key_def.id, @@ -276,17 +325,17 @@ def hsm_put_derived_auth_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMC capabilities = conf.capability_from_names(key_def.capabilities), delegated_capabilities = conf.capability_from_names(key_def.delegated_capabilities), password = password) - click.echo(f"Auth key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {dev_serial}") + click.echo(f"Auth key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {hsm_serial}") return res -def hsm_put_symmetric_auth_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMAuthKey, key_enc: bytes, key_mac: bytes) -> AuthenticationKey: +def hsm_put_symmetric_auth_key(ses: AuthSession, hsm_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMAuthKey, key_enc: bytes, key_mac: bytes) -> AuthenticationKey: """ Put a symmetric authentication key into the HSM. """ auth_key = ses.get_object(key_def.id, OBJECT.AUTHENTICATION_KEY) assert isinstance(auth_key, AuthenticationKey) - confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, auth_key) + confirm_and_delete_old_yubihsm_object_if_exists(hsm_serial, auth_key) res = auth_key.put( session = ses, object_id = key_def.id, @@ -296,17 +345,17 @@ def hsm_put_symmetric_auth_key(ses: AuthSession, dev_serial: str, conf: hscfg.HS delegated_capabilities = conf.capability_from_names(key_def.delegated_capabilities), key_enc = key_enc, key_mac = key_mac) - click.echo(f"Auth key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {dev_serial}") + click.echo(f"Auth key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {hsm_serial}") return res -def hsm_generate_symmetric_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMSymmetricKey) -> SymmetricKey: +def hsm_generate_symmetric_key(ses: AuthSession, hsm_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMSymmetricKey) -> SymmetricKey: """ Generate a symmetric key on the HSM. """ sym_key = ses.get_object(key_def.id, OBJECT.SYMMETRIC_KEY) assert isinstance(sym_key, SymmetricKey) - confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, sym_key) + confirm_and_delete_old_yubihsm_object_if_exists(hsm_serial, sym_key) click.echo(f"Generating symmetric key, type '{key_def.algorithm}'...") res = sym_key.generate( session = ses, @@ -315,17 +364,17 @@ def hsm_generate_symmetric_key(ses: AuthSession, dev_serial: str, conf: hscfg.HS domains = conf.get_domain_bitfield(key_def.domains), capabilities = conf.capability_from_names(set(key_def.capabilities)), algorithm = conf.algorithm_from_name(key_def.algorithm)) - click.echo(f"Symmetric key ID '{hex(res.id)}' ({key_def.label}) generated in YubiHSM device {dev_serial}") + click.echo(f"Symmetric key ID '{hex(res.id)}' ({key_def.label}) generated in YubiHSM device {hsm_serial}") return res -def hsm_generate_asymmetric_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMAsymmetricKey) -> AsymmetricKey: +def hsm_generate_asymmetric_key(ses: AuthSession, hsm_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMAsymmetricKey) -> AsymmetricKey: """ Generate an asymmetric key on the HSM. """ asym_key = ses.get_object(key_def.id, OBJECT.ASYMMETRIC_KEY) assert isinstance(asym_key, AsymmetricKey) - confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, asym_key) + confirm_and_delete_old_yubihsm_object_if_exists(hsm_serial, asym_key) click.echo(f"Generating asymmetric key, type '{key_def.algorithm}'...") if 'rsa' in key_def.algorithm.lower(): click.echo(" Note! RSA key generation is very slow. Please wait. The YubiHSM2 should be blinking while it works.") @@ -337,17 +386,17 @@ def hsm_generate_asymmetric_key(ses: AuthSession, dev_serial: str, conf: hscfg.H domains = conf.get_domain_bitfield(key_def.domains), capabilities = conf.capability_from_names(set(key_def.capabilities)), algorithm = conf.algorithm_from_name(key_def.algorithm)) - click.echo(f"Symmetric key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {dev_serial}") + click.echo(f"Symmetric key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {hsm_serial}") return res -def hsm_generate_hmac_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMHmacKey) -> HmacKey: +def hsm_generate_hmac_key(ses: AuthSession, hsm_serial: str, conf: hscfg.HSMConfig, key_def: hscfg.HSMHmacKey) -> HmacKey: """ Generate an HMAC key on the HSM. """ hmac_key = ses.get_object(key_def.id, OBJECT.HMAC_KEY) assert isinstance(hmac_key, HmacKey) - confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, hmac_key) + confirm_and_delete_old_yubihsm_object_if_exists(hsm_serial, hmac_key) click.echo(f"Generating HMAC key, type '{key_def.algorithm}'...") res = hmac_key.generate( session = ses, @@ -356,7 +405,7 @@ def hsm_generate_hmac_key(ses: AuthSession, dev_serial: str, conf: hscfg.HSMConf domains = conf.get_domain_bitfield(key_def.domains), capabilities = conf.capability_from_names(set(key_def.capabilities)), algorithm = conf.algorithm_from_name(key_def.algorithm)) - click.echo(f"HMAC key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {dev_serial}") + click.echo(f"HMAC key ID '{hex(res.id)}' ({key_def.label}) stored in YubiHSM device {hsm_serial}") return res diff --git a/hsm_secrets/x509/__init__.py b/hsm_secrets/x509/__init__.py index daa17cd..dee91f4 100644 --- a/hsm_secrets/x509/__init__.py +++ b/hsm_secrets/x509/__init__.py @@ -1,14 +1,14 @@ from pathlib import Path from cryptography import x509 -from yubihsm.core import AuthSession + +from yubihsm.core import AuthSession # type: ignore [import] +import yubihsm.objects # type: ignore [import] +import yubihsm.defs # type: ignore [import] from cryptography.hazmat.primitives import serialization from hsm_secrets.config import HSMConfig, KeyID, OpaqueObject, X509Cert, find_config_items_of_class -import yubihsm.objects -import yubihsm.defs - -from hsm_secrets.utils import confirm_and_delete_old_yubihsm_object_if_exists, hsm_obj_exists, open_hsm_session_with_default_admin, open_hsm_session_with_yubikey, click_echo_colored_commands +from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, confirm_and_delete_old_yubihsm_object_if_exists, hsm_obj_exists, open_hsm_session, click_echo_colored_commands, pass_common_args from hsm_secrets.x509.cert_builder import X509CertBuilder from hsm_secrets.x509.def_utils import display_x509_info, merge_x509_info_with_defaults, topological_sort_x509_cert_defs @@ -20,18 +20,18 @@ @click.group() @click.pass_context -def cmd_x509(ctx): +def cmd_x509(ctx: click.Context): """Genral X.509 Certificate Management""" ctx.ensure_object(dict) # --------------- @cmd_x509.command('create-cert') -@click.pass_context +@pass_common_args @click.option('--all', '-a', 'all_certs', is_flag=True, help="Create all certificates") @click.option("--dry-run", "-n", is_flag=True, help="Dry run (do not create certificates)") @click.argument('cert_ids', nargs=-1, type=str, metavar='...') -def create_cert_cmd(ctx: click.Context, all_certs: bool, dry_run: bool, cert_ids: tuple): +def create_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_ids: tuple): """Create certificate(s) on the HSM ID is a 16-bit hex value (e.g. '0x12af' or '12af'). @@ -48,25 +48,22 @@ def create_cert_cmd(ctx: click.Context, all_certs: bool, dry_run: bool, cert_ids @cmd_x509.command('get-cert') -@click.pass_context +@pass_common_args @click.option('--all', '-a', 'all_certs', is_flag=True, help="Get all certificates") @click.option('--outdir', '-o', type=click.Path(), required=False, help="Write PEMs into files here") @click.option('--bundle', '-b', type=click.Path(), required=False, help="Write a single PEM bundle file") -@click.option('--use-default-admin', is_flag=True, help="Use the default admin key (instead of Yubikey)") @click.argument('cert_ids', nargs=-1, type=str, metavar='...') -def get_cert_cmd(ctx: click.Context, all_certs: bool, outdir: str|None, bundle: str|None, cert_ids: tuple, use_default_admin: bool): +def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle: str|None, cert_ids: tuple): """Get certificate(s) from the HSM You can specify multiple IDs to get multiple certificates, or use the --all flag to get all certificates defined in the config. Specify --bundle to get a single PEM file with all selected certificates. """ - conf = ctx.obj['config'] - if outdir and bundle: raise click.ClickException("Error: --outdir and --bundle options are mutually exclusive.") - cert_def_for_id = {c.id: c for c in find_config_items_of_class(conf, OpaqueObject)} + cert_def_for_id = {c.id: c for c in find_config_items_of_class(ctx.conf, OpaqueObject)} all_cert_ids = [f"0x{c.id:04x}" for c in cert_def_for_id.values()] selected_ids = all_cert_ids if all_certs else list(cert_ids) @@ -79,7 +76,7 @@ def get_cert_cmd(ctx: click.Context, all_certs: bool, outdir: str|None, bundle: click.echo(f"- Fetching PEM for 0x{cd.id:04x}: '{cd.label}'") click.echo() - def do_it(conf: HSMConfig, ses: AuthSession): + with open_hsm_session(ctx) as ses: for cert_id in selected_ids: cert_id_int = int(cert_id.replace("0x", ""), 16) obj = ses.get_object(cert_id_int, yubihsm.defs.OBJECT.OPAQUE) @@ -101,27 +98,18 @@ def do_it(conf: HSMConfig, ses: AuthSession): click_echo_colored_commands("To view certificate details, use:\n`openssl crl2pkcs7 -nocrl -certfile | openssl pkcs7 -print_certs | openssl x509 -text -noout`") - if use_default_admin: - with open_hsm_session_with_default_admin(ctx) as (conf, ses): - do_it(conf, ses) - else: - with open_hsm_session_with_yubikey(ctx) as (conf, ses): - do_it(conf, ses) # --------------- -def create_certs_impl(ctx: click.Context, all_certs: bool, dry_run: bool, cert_ids: tuple): +def create_certs_impl(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_ids: tuple): """ Create certificates on a YubiHSM2, based on the configuration file and CLI arguments. """ - conf = ctx.obj['config'] - dev_serial = ctx.obj['devserial'] - # Enumerate all certificate definitions in the config scid_to_opq_def: dict[KeyID, OpaqueObject] = {} scid_to_x509_def: dict[KeyID, X509Cert] = {} - for x in find_config_items_of_class(conf, X509Cert): + for x in find_config_items_of_class(ctx.conf, X509Cert): assert isinstance(x, X509Cert) for opq in x.signed_certs: scid_to_opq_def[opq.id] = opq @@ -143,13 +131,13 @@ def _selected_defs() -> list[OpaqueObject]: raise click.ClickException("Invalid certificate ID(s) specified. Must be in hex format (e.g. 0x1234).") return selected - def _do_it(conf: HSMConfig, ses: AuthSession|None): + def _do_it(ses: AuthSession|None): creation_order = topological_sort_x509_cert_defs( _selected_defs()) id_to_cert_obj: dict[KeyID, x509.Certificate] = {} # Create the certificates in topological order for cd in creation_order: - x509_info = merge_x509_info_with_defaults(scid_to_x509_def[cd.id].x509_info, conf) + x509_info = merge_x509_info_with_defaults(scid_to_x509_def[cd.id].x509_info, ctx.conf) issuer = scid_to_opq_def[cd.sign_by] if cd.sign_by and cd.sign_by != cd.id else None click.echo(f"Creating 0x{cd.id:04x}: '{cd.label}' ({f"signed by: '{issuer.label}'" if issuer else 'self-signed'})") click.echo(" " + display_x509_info(x509_info).replace("\n", "\n ")) @@ -185,7 +173,7 @@ def _do_it(conf: HSMConfig, ses: AuthSession|None): # Create and sign the certificate assert x509_def.x509_info, "X.509 certificate definition is missing x509_info" - builder = X509CertBuilder(conf, x509_def.x509_info, key) + builder = X509CertBuilder(ctx.conf, x509_def.x509_info, key) if issuer_cert: assert issuer_key id_to_cert_obj[cd.id] = builder.generate_cross_signed_intermediate_cert([issuer_cert], [issuer_key])[0] @@ -198,20 +186,20 @@ def _do_it(conf: HSMConfig, ses: AuthSession|None): assert isinstance(ses, AuthSession) hsm_obj = ses.get_object(cd.id, yubihsm.defs.OBJECT.OPAQUE) assert isinstance(hsm_obj, yubihsm.objects.Opaque) - if confirm_and_delete_old_yubihsm_object_if_exists(dev_serial, hsm_obj, abort=False): + if confirm_and_delete_old_yubihsm_object_if_exists(ctx.hsm_serial, hsm_obj, abort=False): hsm_obj.put_certificate( session = ses, object_id = cd.id, label = cd.label, - domains = conf.get_domain_bitfield(cd.domains), - capabilities = conf.capability_from_names({'exportable-under-wrap'}), + domains = ctx.conf.get_domain_bitfield(cd.domains), + capabilities = ctx.conf.capability_from_names({'exportable-under-wrap'}), certificate = id_to_cert_obj[cd.id]) - click.echo(f"Certificate 0x{cd.id:04x} created and stored in YubiHSM (serial {dev_serial}).") + click.echo(f"Certificate 0x{cd.id:04x} created and stored in YubiHSM (serial {ctx.hsm_serial}).") if dry_run: click.echo(click.style("DRY RUN. Would create the following certificates:", fg='yellow')) - _do_it(conf, None) + _do_it(None) click.echo(click.style("End of dry run. NOTHING WAS ACTUALLY DONE.", fg='yellow')) else: - with open_hsm_session_with_default_admin(ctx) as (conf, ses): - _do_it(conf, ses) + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: + _do_it(ses)