Skip to content

Commit

Permalink
Add autocompletion and repl mode
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Jul 29, 2024
1 parent d46d8c2 commit 453bd96
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 54 deletions.
41 changes: 37 additions & 4 deletions hsm_secrets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from datetime import datetime
import os
import re
import click.shell_completion
from pydantic import BaseModel, ConfigDict, HttpUrl, Field, StringConstraints
from typing_extensions import Annotated
from typing import List, Literal, NewType, Optional, Sequence, Union
from typing import Any, Callable, List, Literal, NewType, Optional, Sequence, Union
from yubihsm.defs import CAPABILITY, ALGORITHM # type: ignore [import]
import click
import yaml
Expand Down Expand Up @@ -50,8 +51,8 @@ def get_domain_bitfield(self, names: set['HSMDomainName']) -> int:
assert 0 <= res <= 0xFFFF, f"Domain bitfield out of range: {res}"
return res

def find_def(self, id_or_label: Union[int, str], enforce_type: Optional[type] = None) -> 'HSMObjBase':
return _find_def_by_id_or_label(self, id_or_label, enforce_type)
def find_def(self, id_or_label: Union[int, str], enforce_type: Optional[type] = None, sub_conf: Any|None = None) -> 'HSMObjBase':
return _find_def_by_id_or_label(sub_conf or self, id_or_label, enforce_type)

@staticmethod
def domain_bitfield_to_nums(bitfield: int) -> set['HSMDomainNum']:
Expand Down Expand Up @@ -330,7 +331,7 @@ def load_hsm_config(file_name: str) -> 'HSMConfig':
return res


def find_config_items_of_class(conf: HSMConfig, cls: type) -> list:
def find_config_items_of_class(conf: Any, cls: type) -> list:
"""
Find all instances of a given class in the configuration object, recursively.
"""
Expand Down Expand Up @@ -409,3 +410,35 @@ def find_all_config_items_per_type(conf: HSMConfig) -> tuple[dict, dict]:
}
config_items_per_type: dict = {t: find_config_items_of_class(conf, t) for t in config_to_hsm_type.keys()} # type: ignore
return config_items_per_type, config_to_hsm_type


def click_hsm_obj_auto_complete(cls: type|None, subsection: str|None = None, ids: bool = True, labels: bool = True) -> Callable:
"""
Make a shell auto completion function for HSM objects (keys, etc.) in the configuration file.
:param cls: The class of the HSM object to complete. If None, all objects are included.
:subsection: The name of the configuration subsection to search in, e.g. 'ssh.root_ca_keys'
:param ids: Include the key IDs in the completion list.
:param labels: Include the key labels in the completion list.
"""
def autocomplete(ctx, args, incomplete) -> list[click.shell_completion.CompletionItem]:
conf: HSMConfig = ctx.obj['config']
if subsection:
for attr in subsection.split('.'):
conf = getattr(conf, attr)

if cls:
items: list[HSMObjBase] = find_config_items_of_class(conf, cls)
else:
all_items: dict[type, list[HSMObjBase]] = find_all_config_items_per_type(conf)[0]
items = []
for key_list in all_items.values():
items.extend(key_list)

res = []
if ids:
res += [click.shell_completion.CompletionItem(f'0x{x.id:04x}', help=f"'{x.label}' ({type(x).__name__})") for x in items if incomplete in f'0x{x.id:04x}']
if labels:
res += [click.shell_completion.CompletionItem(x.label, help=f"0x{x.id:04x} ({type(x).__name__})") for x in items if incomplete in x.label]
return res

return autocomplete
30 changes: 19 additions & 11 deletions hsm_secrets/hsm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@
from pathlib import Path
import sys
import tarfile
from typing import Sequence
import click
from click.shell_completion import CompletionItem

from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, find_all_config_items_per_type, parse_keyid
from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid
from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony
from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, open_hsm_session, open_hsm_session_with_password, pass_common_args, pretty_fmt_yubihsm_object, prompt_for_secret, pw_check_fromhex

import yubihsm.defs, yubihsm.exceptions, yubihsm.objects # type: ignore [import]
from yubihsm.core import AuthSession # type: ignore [import]
from yubihsm.defs import OBJECT # type: ignore [import]

from click import style
Expand Down Expand Up @@ -188,11 +189,12 @@ def default_admin_disable(ctx: HsmSecretsCtx, alldevs: bool, force: bool):
# ---------------

@cmd_hsm.command('admin-sharing-ceremony')
@click.option('--num-shares', type=int, required=True, help="Number of shares to generate")
@click.option('--threshold', type=int, required=True, help="Number of shares required to reconstruct the key")
@click.option('--skip-ceremony', is_flag=True, default=False, help="Skip the secret sharing ceremony, ask for password directly")
@click.option('--num-shares', '-n', type=int, required=True, help="Number of shares to generate")
@click.option('--threshold', '-t', type=int, required=True, help="Number of shares required to reconstruct the key")
@click.option('--with-backup', '-b', is_flag=True, default=False, help="Generate a backup key in addition to the shared key")
@click.option('--skip-ceremony', is_flag=True, default=False, help="Skip ceremony, store secret directly")
@pass_common_args
def make_shared_admin_key(ctx: HsmSecretsCtx, num_shares: int, threshold: int, skip_ceremony: bool):
def make_shared_admin_key(ctx: HsmSecretsCtx, num_shares: int, threshold: int, with_backup: bool, skip_ceremony: bool):
"""Host an admin key Secret Sharing Ceremony
The ceremony is a formal multi-step process where the system generates a new shared admin key
Expand All @@ -202,6 +204,13 @@ def make_shared_admin_key(ctx: HsmSecretsCtx, num_shares: int, threshold: int, s
This is a very heavy process, and should be only done once, on the master YubiHSM.
The resulting key can then be cloned to other devices via key wrapping operations.
A backup key can be generated in addition to the shared key. It's a non-shared
key that will be written down in parts by all custodians. You will be asked to
seal it in an envelope and hand over for secure storage by some "uber custodian".
If `--skip-ceremony` is given, the secret generation and sharing ceremony are skipped and
you are asked to enter the password to store on HSM directly.
"""
swear_you_are_on_airgapped_computer(ctx.quiet)
with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN) as ses:
Expand All @@ -210,12 +219,11 @@ def apply_password_fn(new_password: str):
info = ses.auth_key_put_derived(ctx.conf.admin.shared_admin_key, new_password)
cli_info(f"Auth key ID '{hex(info.id)}' ({info.label}) stored in YubiHSM device {ses.get_serial()}")


if skip_ceremony:
apply_password_fn(prompt_for_secret("Enter the (new) shared admin password to store", confirm=True))
else:
secret = ses.get_pseudo_random(256//8)
cli_splitting_ceremony(threshold, num_shares, apply_password_fn, pre_secret=secret)
cli_splitting_ceremony(threshold, num_shares, apply_password_fn, with_backup_key=with_backup, pre_secret=secret)

cli_info("OK. Shared admin key added successfully.")

Expand Down Expand Up @@ -258,7 +266,7 @@ def make_wrap_key(ctx: HsmSecretsCtx):
# ---------------

@cmd_hsm.command('delete')
@click.argument('obj_ids', nargs=-1, type=str, metavar='<id|label> ...')
@click.argument('obj_ids', nargs=-1, type=str, metavar='<id|label> ...', shell_complete=click_hsm_obj_auto_complete(None))
@click.option('--alldevs', is_flag=True, help="Delete on all devices")
@click.option('--force', is_flag=True, help="Force deletion without confirmation (use with caution)")
@pass_common_args
Expand Down Expand Up @@ -407,7 +415,7 @@ def compare_config(ctx: HsmSecretsCtx, alldevs: bool, create: bool):

@cmd_hsm.command('attest')
@pass_common_args
@click.argument('key_id', required=True, type=str, metavar='<id|label>')
@click.argument('key_id', required=True, type=str, metavar='<id|label>', shell_complete=click_hsm_obj_auto_complete(HSMAsymmetricKey))
@click.option('--out', '-o', type=click.File('w', encoding='utf8'), help='Output file (default: stdout)', default=click.get_text_stream('stdout'))
def attest_key(ctx: HsmSecretsCtx, key_id: str, out: click.File):
"""Attest an asymmetric key in the YubiHSM
Expand Down Expand Up @@ -484,7 +492,7 @@ def backup_hsm(ctx: HsmSecretsCtx, out: click.File|None):

@cmd_hsm.command('restore')
@pass_common_args
@click.argument('backup_file', type=click.Path(exists=True, allow_dash=False), required=True, metavar='<backup_file>')
@click.argument('backup_file', type=click.Path(exists=True, allow_dash=False, dir_okay=False), required=True, metavar='<backup_file>')
@click.option('--force', is_flag=True, help="Don't ask for confirmation before restoring")
def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool):
"""Restore a .tar.gz backup to HSM
Expand Down
5 changes: 4 additions & 1 deletion hsm_secrets/hsm/secret_sharing_ceremony.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@ def cli_splitting_ceremony(
cli_ui_msg("")

# Get custodian names and passwords
custodian_names = {}
custodian_names: dict[int, str] = {}
custodian_passwords = {}

click.clear()
for i in range(1, num_shares + 1):
name = click.prompt(f"Enter the name of custodian #{i}", err=True).strip() or f"#{i}"
if name in custodian_names.values():
raise click.UsageError(f"Name '{name}' is already in use. Please enter a unique name.")

custodian_names[i] = name
if click.confirm(f"Password-protect share?", abort=False, err=True):
pw = click.prompt("Custodian " + click.style(f"'{name}'", fg='green') + ", enter the password", hide_input=True, err=True).strip()
Expand Down
2 changes: 2 additions & 0 deletions hsm_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from hsm_secrets.user import cmd_user
from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_warn, list_yubikey_hsm_creds, pass_common_args, cli_info
from hsm_secrets.x509 import cmd_x509
from click_repl import register_repl # type: ignore


# --- Main CLI Entrypoint ---
Expand Down Expand Up @@ -118,6 +119,7 @@ def cmd_nop(ctx: HsmSecretsCtx):
cli.add_command(cmd_nop, "nop")
cli.add_command(cmd_x509, "x509")
cli.add_command(cmd_user, "user")
register_repl(cli)

if __name__ == '__main__':
cli()
29 changes: 10 additions & 19 deletions hsm_secrets/ssh/__init__.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
from pathlib import Path
from textwrap import dedent
import time
from typing import Sequence
from typing import Sequence, cast
import click

from hsm_secrets.config import HSMAsymmetricKey, HSMConfig
from hsm_secrets.config import HSMAsymmetricKey, HSMKeyID, click_hsm_obj_auto_complete
from hsm_secrets.utils import HsmSecretsCtx, cli_code_info, cli_result, cli_warn, open_hsm_session, pass_common_args
from cryptography.hazmat.primitives import _serialization
from cryptography.hazmat.primitives.serialization import ssh

import yubihsm.defs # type: ignore [import]
from yubihsm.objects import AsymmetricKey # type: ignore [import]


@click.group()
@click.pass_context
def cmd_ssh(ctx: click.Context):
Expand All @@ -23,24 +19,19 @@ def cmd_ssh(ctx: click.Context):
@cmd_ssh.command('get-ca')
@pass_common_args
@click.option('--all', '-a', 'get_all', is_flag=True, help="Get all certificates")
@click.argument('cert_ids', nargs=-1, type=str, metavar='<id>...')
def get_ca(ctx: HsmSecretsCtx, get_all: bool, cert_ids: Sequence[str]):
@click.argument('certs', nargs=-1, type=str, metavar='<id|label>...', shell_complete=click_hsm_obj_auto_complete(HSMAsymmetricKey, 'ssh.root_ca_keys'))
def get_ca(ctx: HsmSecretsCtx, get_all: bool, certs: Sequence[str]):
"""Get the public keys of the SSH CA keys"""
all_ids = set([str(ca.id) for ca in ctx.conf.ssh.root_ca_keys])
selected_ids = all_ids if get_all else set(cert_ids)

if not selected_ids:
raise click.BadArgumentUsage("ERROR: specify at least one CA key ID, or use --all")

if len(selected_ids - all_ids) > 0:
raise click.ClickException(f"Unknown CA key IDs: {selected_ids - all_ids}")
selected_keys = [ca for ca in ctx.conf.ssh.root_ca_keys if str(ca.id) in selected_ids]

if get_all:
selected_keys = ctx.conf.ssh.root_ca_keys
else:
selected_keys = [cast(HSMAsymmetricKey, ctx.conf.find_def(s, HSMAsymmetricKey, ctx.conf.ssh.root_ca_keys)) for s in certs]
if not selected_keys:
raise click.ClickException("No CA keys selected")
raise click.BadArgumentUsage("ERROR: No keys to get")

with open_hsm_session(ctx) as ses:
for key in selected_keys:
assert isinstance(key, HSMAsymmetricKey)
pubkey = ses.get_public_key(key).public_bytes(encoding=_serialization.Encoding.OpenSSH, format=_serialization.PublicFormat.OpenSSH).decode('ascii')
cli_result(f"{pubkey} {key.label}")

Expand Down
6 changes: 3 additions & 3 deletions hsm_secrets/user/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import re
import secrets
import click
from hsm_secrets.config import HSMAuthKey
from hsm_secrets.config import HSMAuthKey, HSMConfig, click_hsm_obj_auto_complete
from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_info, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, group_by_4, open_hsm_session, pass_common_args, prompt_for_secret, pw_check_fromhex, secure_display_secret

import yubikit.hsmauth
Expand Down Expand Up @@ -37,7 +37,7 @@ def change_yubikey_mgt(ctx: HsmSecretsCtx):

@cmd_user.command('add-yubikey')
@pass_common_args
@click.argument('label', type=str, metavar='<label>')
@click.argument('label', type=str, metavar='<label>', shell_complete=click_hsm_obj_auto_complete(HSMAuthKey, 'user_keys', ids=False))
@click.option('--alldevs', is_flag=True, help="Add to all devices")
def add_user_yubikey(ctx: HsmSecretsCtx, label: str, alldevs: bool):
"""Register Yubikey auth for a user
Expand Down Expand Up @@ -129,7 +129,7 @@ def add_user_yubikey(ctx: HsmSecretsCtx, label: str, alldevs: bool):

@cmd_user.command('add-service')
@pass_common_args
@click.argument('obj_ids', nargs=-1, type=str, metavar='<id|label>...')
@click.argument('obj_ids', nargs=-1, type=str, metavar='<id|label>...', shell_complete=click_hsm_obj_auto_complete(HSMAuthKey, 'service_keys'))
@click.option('--all', '-a', 'all_accts', is_flag=True, help="Add all configured service users")
@click.option('--askpw', is_flag=True, help="Ask for password(s) instead of generating")
def add_service(ctx: HsmSecretsCtx, obj_ids: tuple[str], all_accts: bool, askpw: bool):
Expand Down
10 changes: 7 additions & 3 deletions hsm_secrets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import yubikit.hsmauth as hsmauth

import hsm_secrets.config as hscfg
import unicurses as curses
import unicurses as curses # type: ignore [import]
import click

from functools import wraps
Expand Down Expand Up @@ -425,7 +425,7 @@ def secure_display_secret(secret_to_show: str, wipe_char='x'):
"""
secret = secret_to_show + " "
def do_it(stdscr):
from unicurses import getmaxyx, clear, box, mvwaddstr, wrefresh
from unicurses import getmaxyx, clear, wclear, box, mvwaddstr, wrefresh, delwin, refresh
clear()

# Create a new window
Expand All @@ -442,9 +442,13 @@ def do_it(stdscr):
click.pause("") # Wait for ENTER key

# Overwrite the secret with wipe_char
clear()
wclear(win)
box(win)
mvwaddstr(win, 1, 2, wipe_char * len(secret))
wrefresh(win)

clear()
refresh()
delwin(win)

curses.wrapper(do_it)
20 changes: 8 additions & 12 deletions hsm_secrets/x509/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import yubihsm.defs # type: ignore [import]

from cryptography.hazmat.primitives import serialization
from hsm_secrets.config import HSMConfig, HSMKeyID, HSMOpaqueObject, X509Cert, find_config_items_of_class
from hsm_secrets.config import HSMConfig, HSMKeyID, HSMOpaqueObject, X509Cert, click_hsm_obj_auto_complete, find_config_items_of_class

from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_result, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, open_hsm_session, cli_code_info, pass_common_args, cli_info

Expand All @@ -33,29 +33,25 @@ def cmd_x509(ctx: click.Context):
@pass_common_args
@click.option('--all', '-a', 'all_certs', is_flag=True, help="Create all certificates")
@click.option("--dry-run", "-n", is_flag=True, help="Dry run (do not create certificates)")
@click.argument('cert_ids', nargs=-1, type=str, metavar='<id>...')
def create_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, cert_ids: tuple):
@click.argument('certs', nargs=-1, type=str, metavar='<id|label>...', shell_complete=click_hsm_obj_auto_complete(HSMOpaqueObject))
def create_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, dry_run: bool, certs: tuple):
"""Create certificate(s) on the HSM
ID is a 16-bit hex value (e.g. '0x12af' or '12af').
You can specify multiple IDs to create multiple certificates,
or use the --all flag to create all certificates defined in the config.
Specified certificates will be created in topological order, so that
any dependencies are created first.
"""
if not all_certs and not cert_ids:
if not all_certs and not certs:
raise click.ClickException("Error: No certificates specified for creation.")
create_certs_impl(ctx, all_certs, dry_run, cert_ids)
create_certs_impl(ctx, all_certs, dry_run, certs)


@cmd_x509.command('get')
@pass_common_args
@click.option('--all', '-a', 'all_certs', is_flag=True, help="Get all certificates")
@click.option('--outdir', '-o', type=click.Path(), required=False, help="Write PEMs into files here")
@click.option('--bundle', '-b', type=click.Path(), required=False, help="Write a single PEM bundle file")
@click.argument('cert_ids', nargs=-1, type=str, metavar='<id|label>...')
def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle: str|None, cert_ids: tuple):
@click.argument('certs', nargs=-1, type=str, metavar='<id|label>...', shell_complete=click_hsm_obj_auto_complete(HSMOpaqueObject))
def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle: str|None, certs: tuple):
"""Get certificate(s) from the HSM
You can specify multiple IDs/labels to get multiple certificates,
Expand All @@ -66,7 +62,7 @@ def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle:
raise click.ClickException("Error: --outdir and --bundle options are mutually exclusive.")

all_cert_defs: list[HSMOpaqueObject] = find_config_items_of_class(ctx.conf, HSMOpaqueObject)
selected_certs = all_cert_defs if all_certs else [cast(HSMOpaqueObject, ctx.conf.find_def(id, HSMOpaqueObject)) for id in cert_ids]
selected_certs = all_cert_defs if all_certs else [cast(HSMOpaqueObject, ctx.conf.find_def(id, HSMOpaqueObject)) for id in certs]
if not selected_certs:
raise click.ClickException("Error: No certificates selected.")

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ yubihsm[http,usb]
yubikey-manager

click
types-click
click-repl

pydantic

Expand Down

0 comments on commit 453bd96

Please sign in to comment.