diff --git a/hsm_secrets/config.py b/hsm_secrets/config.py index 428aab2..97f3680 100644 --- a/hsm_secrets/config.py +++ b/hsm_secrets/config.py @@ -4,9 +4,10 @@ from datetime import datetime import os import re +import click.shell_completion from pydantic import BaseModel, ConfigDict, HttpUrl, Field, StringConstraints from typing_extensions import Annotated -from typing import List, Literal, NewType, Optional, Sequence, Union +from typing import Any, Callable, List, Literal, NewType, Optional, Sequence, Union from yubihsm.defs import CAPABILITY, ALGORITHM # type: ignore [import] import click import yaml @@ -50,8 +51,8 @@ def get_domain_bitfield(self, names: set['HSMDomainName']) -> int: assert 0 <= res <= 0xFFFF, f"Domain bitfield out of range: {res}" return res - def find_def(self, id_or_label: Union[int, str], enforce_type: Optional[type] = None) -> 'HSMObjBase': - return _find_def_by_id_or_label(self, id_or_label, enforce_type) + def find_def(self, id_or_label: Union[int, str], enforce_type: Optional[type] = None, sub_conf: Any|None = None) -> 'HSMObjBase': + return _find_def_by_id_or_label(sub_conf or self, id_or_label, enforce_type) @staticmethod def domain_bitfield_to_nums(bitfield: int) -> set['HSMDomainNum']: @@ -330,7 +331,7 @@ def load_hsm_config(file_name: str) -> 'HSMConfig': return res -def find_config_items_of_class(conf: HSMConfig, cls: type) -> list: +def find_config_items_of_class(conf: Any, cls: type) -> list: """ Find all instances of a given class in the configuration object, recursively. """ @@ -409,3 +410,35 @@ def find_all_config_items_per_type(conf: HSMConfig) -> tuple[dict, dict]: } config_items_per_type: dict = {t: find_config_items_of_class(conf, t) for t in config_to_hsm_type.keys()} # type: ignore return config_items_per_type, config_to_hsm_type + + +def click_hsm_obj_auto_complete(cls: type|None, subsection: str|None = None, ids: bool = True, labels: bool = True) -> Callable: + """ + Make a shell auto completion function for HSM objects (keys, etc.) in the configuration file. + :param cls: The class of the HSM object to complete. If None, all objects are included. + :subsection: The name of the configuration subsection to search in, e.g. 'ssh.root_ca_keys' + :param ids: Include the key IDs in the completion list. + :param labels: Include the key labels in the completion list. + """ + def autocomplete(ctx, args, incomplete) -> list[click.shell_completion.CompletionItem]: + conf: HSMConfig = ctx.obj['config'] + if subsection: + for attr in subsection.split('.'): + conf = getattr(conf, attr) + + if cls: + items: list[HSMObjBase] = find_config_items_of_class(conf, cls) + else: + all_items: dict[type, list[HSMObjBase]] = find_all_config_items_per_type(conf)[0] + items = [] + for key_list in all_items.values(): + items.extend(key_list) + + res = [] + if ids: + res += [click.shell_completion.CompletionItem(f'0x{x.id:04x}', help=f"'{x.label}' ({type(x).__name__})") for x in items if incomplete in f'0x{x.id:04x}'] + if labels: + res += [click.shell_completion.CompletionItem(x.label, help=f"0x{x.id:04x} ({type(x).__name__})") for x in items if incomplete in x.label] + return res + + return autocomplete diff --git a/hsm_secrets/hsm/__init__.py b/hsm_secrets/hsm/__init__.py index 859635b..a7e9ed2 100644 --- a/hsm_secrets/hsm/__init__.py +++ b/hsm_secrets/hsm/__init__.py @@ -3,14 +3,15 @@ from pathlib import Path import sys import tarfile +from typing import Sequence import click +from click.shell_completion import CompletionItem -from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, find_all_config_items_per_type, parse_keyid +from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, open_hsm_session, open_hsm_session_with_password, pass_common_args, pretty_fmt_yubihsm_object, prompt_for_secret, pw_check_fromhex import yubihsm.defs, yubihsm.exceptions, yubihsm.objects # type: ignore [import] -from yubihsm.core import AuthSession # type: ignore [import] from yubihsm.defs import OBJECT # type: ignore [import] from click import style @@ -188,11 +189,12 @@ def default_admin_disable(ctx: HsmSecretsCtx, alldevs: bool, force: bool): # --------------- @cmd_hsm.command('admin-sharing-ceremony') -@click.option('--num-shares', type=int, required=True, help="Number of shares to generate") -@click.option('--threshold', type=int, required=True, help="Number of shares required to reconstruct the key") -@click.option('--skip-ceremony', is_flag=True, default=False, help="Skip the secret sharing ceremony, ask for password directly") +@click.option('--num-shares', '-n', type=int, required=True, help="Number of shares to generate") +@click.option('--threshold', '-t', type=int, required=True, help="Number of shares required to reconstruct the key") +@click.option('--with-backup', '-b', is_flag=True, default=False, help="Generate a backup key in addition to the shared key") +@click.option('--skip-ceremony', is_flag=True, default=False, help="Skip ceremony, store secret directly") @pass_common_args -def make_shared_admin_key(ctx: HsmSecretsCtx, num_shares: int, threshold: int, skip_ceremony: bool): +def make_shared_admin_key(ctx: HsmSecretsCtx, num_shares: int, threshold: int, with_backup: bool, skip_ceremony: bool): """Host an admin key Secret Sharing Ceremony The ceremony is a formal multi-step process where the system generates a new shared admin key @@ -202,6 +204,13 @@ def make_shared_admin_key(ctx: HsmSecretsCtx, num_shares: int, threshold: int, s This is a very heavy process, and should be only done once, on the master YubiHSM. The resulting key can then be cloned to other devices via key wrapping operations. + + A backup key can be generated in addition to the shared key. It's a non-shared + key that will be written down in parts by all custodians. You will be asked to + seal it in an envelope and hand over for secure storage by some "uber custodian". + + If `--skip-ceremony` is given, the secret generation and sharing ceremony are skipped and + you are asked to enter the password to store on HSM directly. """ swear_you_are_on_airgapped_computer(ctx.quiet) with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses: @@ -210,12 +219,11 @@ def apply_password_fn(new_password: str): info = ses.auth_key_put_derived(ctx.conf.admin.shared_admin_key, new_password) cli_info(f"Auth key ID '{hex(info.id)}' ({info.label}) stored in YubiHSM device {ses.get_serial()}") - if skip_ceremony: apply_password_fn(prompt_for_secret("Enter the (new) shared admin password to store", confirm=True)) else: secret = ses.get_pseudo_random(256//8) - cli_splitting_ceremony(threshold, num_shares, apply_password_fn, pre_secret=secret) + cli_splitting_ceremony(threshold, num_shares, apply_password_fn, with_backup_key=with_backup, pre_secret=secret) cli_info("OK. Shared admin key added successfully.") @@ -258,7 +266,7 @@ def make_wrap_key(ctx: HsmSecretsCtx): # --------------- @cmd_hsm.command('delete') -@click.argument('obj_ids', nargs=-1, type=str, metavar=' ...') +@click.argument('obj_ids', nargs=-1, type=str, metavar=' ...', shell_complete=click_hsm_obj_auto_complete(None)) @click.option('--alldevs', is_flag=True, help="Delete on all devices") @click.option('--force', is_flag=True, help="Force deletion without confirmation (use with caution)") @pass_common_args @@ -407,7 +415,7 @@ def compare_config(ctx: HsmSecretsCtx, alldevs: bool, create: bool): @cmd_hsm.command('attest') @pass_common_args -@click.argument('key_id', required=True, type=str, metavar='') +@click.argument('key_id', required=True, type=str, metavar='', shell_complete=click_hsm_obj_auto_complete(HSMAsymmetricKey)) @click.option('--out', '-o', type=click.File('w', encoding='utf8'), help='Output file (default: stdout)', default=click.get_text_stream('stdout')) def attest_key(ctx: HsmSecretsCtx, key_id: str, out: click.File): """Attest an asymmetric key in the YubiHSM @@ -484,7 +492,7 @@ def backup_hsm(ctx: HsmSecretsCtx, out: click.File|None): @cmd_hsm.command('restore') @pass_common_args -@click.argument('backup_file', type=click.Path(exists=True, allow_dash=False), required=True, metavar='') +@click.argument('backup_file', type=click.Path(exists=True, allow_dash=False, dir_okay=False), required=True, metavar='') @click.option('--force', is_flag=True, help="Don't ask for confirmation before restoring") def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool): """Restore a .tar.gz backup to HSM diff --git a/hsm_secrets/hsm/secret_sharing_ceremony.py b/hsm_secrets/hsm/secret_sharing_ceremony.py index e223080..8ed5f7f 100644 --- a/hsm_secrets/hsm/secret_sharing_ceremony.py +++ b/hsm_secrets/hsm/secret_sharing_ceremony.py @@ -72,12 +72,15 @@ def cli_splitting_ceremony( cli_ui_msg("") # Get custodian names and passwords - custodian_names = {} + custodian_names: dict[int, str] = {} custodian_passwords = {} click.clear() for i in range(1, num_shares + 1): name = click.prompt(f"Enter the name of custodian #{i}", err=True).strip() or f"#{i}" + if name in custodian_names.values(): + raise click.UsageError(f"Name '{name}' is already in use. Please enter a unique name.") + custodian_names[i] = name if click.confirm(f"Password-protect share?", abort=False, err=True): pw = click.prompt("Custodian " + click.style(f"'{name}'", fg='green') + ", enter the password", hide_input=True, err=True).strip() diff --git a/hsm_secrets/main.py b/hsm_secrets/main.py index 2b85cd8..57ca8c4 100755 --- a/hsm_secrets/main.py +++ b/hsm_secrets/main.py @@ -9,6 +9,7 @@ from hsm_secrets.user import cmd_user from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_warn, list_yubikey_hsm_creds, pass_common_args, cli_info from hsm_secrets.x509 import cmd_x509 +from click_repl import register_repl # type: ignore # --- Main CLI Entrypoint --- @@ -118,6 +119,7 @@ def cmd_nop(ctx: HsmSecretsCtx): cli.add_command(cmd_nop, "nop") cli.add_command(cmd_x509, "x509") cli.add_command(cmd_user, "user") +register_repl(cli) if __name__ == '__main__': cli() diff --git a/hsm_secrets/ssh/__init__.py b/hsm_secrets/ssh/__init__.py index 2b52582..4eee3e1 100644 --- a/hsm_secrets/ssh/__init__.py +++ b/hsm_secrets/ssh/__init__.py @@ -1,18 +1,14 @@ from pathlib import Path from textwrap import dedent import time -from typing import Sequence +from typing import Sequence, cast import click -from hsm_secrets.config import HSMAsymmetricKey, HSMConfig +from hsm_secrets.config import HSMAsymmetricKey, HSMKeyID, click_hsm_obj_auto_complete from hsm_secrets.utils import HsmSecretsCtx, cli_code_info, cli_result, cli_warn, open_hsm_session, pass_common_args from cryptography.hazmat.primitives import _serialization from cryptography.hazmat.primitives.serialization import ssh -import yubihsm.defs # type: ignore [import] -from yubihsm.objects import AsymmetricKey # type: ignore [import] - - @click.group() @click.pass_context def cmd_ssh(ctx: click.Context): @@ -23,24 +19,19 @@ def cmd_ssh(ctx: click.Context): @cmd_ssh.command('get-ca') @pass_common_args @click.option('--all', '-a', 'get_all', is_flag=True, help="Get all certificates") -@click.argument('cert_ids', nargs=-1, type=str, metavar='...') -def get_ca(ctx: HsmSecretsCtx, get_all: bool, cert_ids: Sequence[str]): +@click.argument('certs', nargs=-1, type=str, metavar='...', shell_complete=click_hsm_obj_auto_complete(HSMAsymmetricKey, 'ssh.root_ca_keys')) +def get_ca(ctx: HsmSecretsCtx, get_all: bool, certs: Sequence[str]): """Get the public keys of the SSH CA keys""" - all_ids = set([str(ca.id) for ca in ctx.conf.ssh.root_ca_keys]) - selected_ids = all_ids if get_all else set(cert_ids) - - if not selected_ids: - raise click.BadArgumentUsage("ERROR: specify at least one CA key ID, or use --all") - - if len(selected_ids - all_ids) > 0: - raise click.ClickException(f"Unknown CA key IDs: {selected_ids - all_ids}") - selected_keys = [ca for ca in ctx.conf.ssh.root_ca_keys if str(ca.id) in selected_ids] - + if get_all: + selected_keys = ctx.conf.ssh.root_ca_keys + else: + selected_keys = [cast(HSMAsymmetricKey, ctx.conf.find_def(s, HSMAsymmetricKey, ctx.conf.ssh.root_ca_keys)) for s in certs] if not selected_keys: - raise click.ClickException("No CA keys selected") + raise click.BadArgumentUsage("ERROR: No keys to get") with open_hsm_session(ctx) as ses: for key in selected_keys: + assert isinstance(key, HSMAsymmetricKey) pubkey = ses.get_public_key(key).public_bytes(encoding=_serialization.Encoding.OpenSSH, format=_serialization.PublicFormat.OpenSSH).decode('ascii') cli_result(f"{pubkey} {key.label}") diff --git a/hsm_secrets/user/__init__.py b/hsm_secrets/user/__init__.py index 3870ef1..5489804 100644 --- a/hsm_secrets/user/__init__.py +++ b/hsm_secrets/user/__init__.py @@ -1,7 +1,7 @@ import re import secrets import click -from hsm_secrets.config import HSMAuthKey +from hsm_secrets.config import HSMAuthKey, HSMConfig, click_hsm_obj_auto_complete from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_info, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, secure_display_secret import yubikit.hsmauth @@ -37,7 +37,7 @@ def change_yubikey_mgt(ctx: HsmSecretsCtx): @cmd_user.command('add-yubikey') @pass_common_args -@click.argument('label', type=str, metavar='