Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Jul 23, 2024
1 parent b0d6516 commit 4f9cd01
Show file tree
Hide file tree
Showing 8 changed files with 377 additions and 333 deletions.
239 changes: 108 additions & 131 deletions hsm_secrets/hsm/__init__.py

Large diffs are not rendered by default.

97 changes: 73 additions & 24 deletions hsm_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,57 +9,106 @@
from hsm_secrets.passwd import cmd_pass
from hsm_secrets.config import HSMConfig, load_hsm_config
from hsm_secrets.user import cmd_user
from hsm_secrets.utils import list_yubikey_hsm_creds
from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, list_yubikey_hsm_creds, pass_common_args
from hsm_secrets.x509 import cmd_x509


# --- Main CLI Entrypoint ---

@click.group(context_settings={'show_default': True, 'help_option_names': ['-h', '--help']})
#@click.option('-d', '--debug', is_flag=True, help="Enable debug mode")
@click.option('-c', '--config', required=True, type=click.Path(), default='hsm-conf.yml', help="Path to configuration file")
@click.option("-y", "--yklabel", required=False, help="Yubikey HSM auth key label")
@click.option("-s", "--devserial", required=False, help="YubiHSM serial number to connect to (default: from config)")
@click.option('-c', '--config', required=False, type=click.Path(), default=None, help="Path to configuration file")
@click.option("-y", "--yklabel", required=False, help="Yubikey HSM auth key label (default: first slot)")
@click.option("-s", "--hsmserial", required=False, help="YubiHSM serial number to connect to (default: get master from config)")
@click.option("--auth-yubikey", required=False, is_flag=True, help="Use Yubikey HSM auth key for HSM login")
@click.option("--auth-default-admin", required=False, is_flag=True, help="Use default auth key for HSM login")
@click.option("--auth-password-id", required=False, type=str, help="Auth key ID (hex) to login with password from env HSM_PASSWORD")
@click.version_option()
@click.pass_context
def cli(ctx: click.Context, config: str, yklabel: str|None, devserial: str|None):
"""Centralized secret management tool for YubiHSM2."""
def cli(ctx: click.Context, config: str|None, yklabel: str|None, hsmserial: str|None,
auth_default_admin: str|None, auth_yubikey: str|None, auth_password_id: str|None):
"""Config file driven secret management tool for YubiHSM2 devices.
yk_label = yklabel
if not yk_label:
Unless --config is specified, configuration file will be searched first
from the environment variable HSM_SECRETS_CONFIG, then from the current
directory, and finally from the user's home directory.
Default HSM login method depends on the command (most use yubikey),
but can be overridden with the --auth-* options:
--auth-yubikey: Use Yubikey HSM auth key for HSM login. If --yklabel
is not not specified, the first hsmauth label on the Yubikey will be used.
--auth-default-admin: Use insecure default auth key (see config).
--auth-password-id <ID>: Use password from environment variable
HSM_PASSWORD with the specified auth key ID (hex).
"""

# Use config file from env var or default locations if not specified
if not config:
env_var = "HSM_SECRETS_CONFIG"
default_paths = ["./hsm-conf.yml", "~/.hsm-conf.yml"]
config = os.getenv(env_var)
if not config:
for alt in default_paths:
if not config and os.path.exists(os.path.expanduser(alt)):
config = alt
if not config:
raise click.UsageError(f"No configuration file found in env or {str(default_paths)}. Please specify a config file with -c/--config or set the {env_var} environment variable.")

conf = load_hsm_config(config)
assert conf.general.master_device, "No master YubiHSM serial specified in config file."

# Get first Yubikey HSM auth key label from device if not specified
yubikey_label = yklabel
if not yubikey_label:
creds = list_yubikey_hsm_creds()
if not creds:
click.echo("Note: No Yubikey HSM credentials found, Yubikey auth will be disabled.", err=True)
yk_label = ""
yubikey_label = ""
else:
yk_label = creds[0].label

conf = load_hsm_config(config)
assert conf.general.master_device, "No master YubiHSM serial specified in config file."
yubikey_label = creds[0].label
echo("Yubikey hsmauth label (using first slot): " + click.style(yubikey_label, fg='cyan'))

ctx.obj = {
# 'debug': debug,
'yk_label': yk_label,
'yubikey_label': yubikey_label,
'config': conf,
'devserial': devserial or conf.general.master_device
'hsmserial': hsmserial or conf.general.master_device,
'forced_auth_method': None,
'auth_password_id': int(auth_password_id.replace('0x', ''), 16) if auth_password_id else None,
'auth_password': os.getenv("HSM_PASSWORD", None),
}

if yk_label:
echo("Yubikey hsmauth label: " + click.style(yk_label, fg='cyan'))
echo("")
# Check for forced auth method
if sum([(1 if x else 0) for x in [auth_default_admin, auth_yubikey, auth_password_id]]) > 1:
raise click.UsageError("Only one forced auth method can be specified.")
if auth_default_admin:
ctx.obj['forced_auth_method'] = HSMAuthMethod.DEFAULT_ADMIN
elif auth_yubikey:
ctx.obj['forced_auth_method'] = HSMAuthMethod.YUBIKEY
elif auth_password_id:
ctx.obj['forced_auth_method'] = HSMAuthMethod.PASSWORD
if not ctx.obj['auth_password']:
raise click.UsageError("HSM_PASSWORD environment variable not set for password auth method.")
if not ctx.obj['auth_password_id']:
raise click.UsageError("Auth key ID not specified for password auth method.")

echo("")


@click.command('nop', short_help='Validate config and exit.')
@click.pass_context
def cmd_nop(ctx):
@pass_common_args
def cmd_nop(ctx: HsmSecretsCtx):
echo("No errors. Exiting.")


cli.add_command(cmd_ssh, "ssh")
cli.add_command(cmd_tls, "tls")
cli.add_command(cmd_ssh, "ssh")
cli.add_command(cmd_tls, "tls")
cli.add_command(cmd_pass, "pass")
cli.add_command(cmd_hsm, "hsm")
cli.add_command(cmd_nop, "nop")
cli.add_command(cmd_hsm, "hsm")
cli.add_command(cmd_nop, "nop")
cli.add_command(cmd_x509, "x509")
cli.add_command(cmd_user, "user")

Expand Down
26 changes: 12 additions & 14 deletions hsm_secrets/passwd/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@
from mnemonic import Mnemonic

from hsm_secrets.config import HSMConfig, PasswordDerivationRule, PwRotationToken, find_config_items_of_class
from hsm_secrets.utils import click_echo_colored_commands, group_by_4, hsm_obj_exists, open_hsm_session_with_yubikey, secure_display_secret
from hsm_secrets.utils import HsmSecretsCtx, click_echo_colored_commands, group_by_4, hsm_obj_exists, open_hsm_session, open_hsm_session_with_yubikey, pass_common_args, secure_display_secret


@click.group()
@click.pass_context
def cmd_pass(ctx):
def cmd_pass(ctx: click.Context):
"""Password derivation"""
ctx.ensure_object(dict)


@cmd_pass.command('get')
@click.pass_context
@pass_common_args
@click.argument('name', required=True, type=str, metavar='<name>')
@click.option('--prev', '-p', required=False, type=int, help="Previous password index (default: 0)", default=0)
@click.option('--rule', '-r', required=False, type=str, help="Derivation rule to use (default: read from config)", default=None)
def get_password(ctx: click.Context, name: str, prev: int, rule: str|None):
def get_password(ctx: HsmSecretsCtx, name: str, prev: int, rule: str|None):
"""Get password for given name
Shows current password by default, or previous password if --prev is specified. For example,
Expand All @@ -36,15 +36,14 @@ def get_password(ctx: click.Context, name: str, prev: int, rule: str|None):
Before the first rotation, nonce is an empty string.
"""
conf: HSMConfig = ctx.obj['config']
rule_id = rule or str(conf.password_derivation.default_rule)
rule_id = rule or str(ctx.conf.password_derivation.default_rule)

if prev < 0:
raise click.ClickException(f"Invalid previous password index: {prev}")

rule_def, key_id = _find_rule_and_key(conf, rule_id)
rule_def, key_id = _find_rule_and_key(ctx.conf, rule_id)

with open_hsm_session_with_yubikey(ctx) as (conf, ses):
with open_hsm_session(ctx) as ses:
obj = ses.get_object(key_id, yubihsm.defs.OBJECT.HMAC_KEY)
assert isinstance(obj, HmacKey)
if not hsm_obj_exists(obj):
Expand Down Expand Up @@ -80,26 +79,25 @@ def get_password(ctx: click.Context, name: str, prev: int, rule: str|None):


@cmd_pass.command('rotate')
@click.pass_context
@pass_common_args
@click.argument('name', required=False, nargs=-1, type=str, metavar='[name] ...', default=None)
@click.option('--rule', '-r', required=False, type=str, help="Derivation rule to use (default: read from config)", default=None)
@click.option('--all', '-a', required=False, is_flag=True, help="Rotate all passwords")
def rotate_password(ctx: click.Context, name: list[str]|None, rule: str|None, all: bool):
def rotate_password(ctx: HsmSecretsCtx, name: list[str]|None, rule: str|None, all: bool):
"""Rotate password(s) for given name(s)
Rotates the password for the given name(s) or all names if --all is specified.
"""
conf: HSMConfig = ctx.obj['config']
rule_id = rule or str(conf.password_derivation.default_rule)
rule_id = rule or str(ctx.conf.password_derivation.default_rule)

if all and name:
raise click.ClickException("Cannot specify both --all and specific names.")
if not all and not name:
raise click.ClickException("Must specify either --all or at least one name.")

_, key_id = _find_rule_and_key(conf, rule_id)
_, key_id = _find_rule_and_key(ctx.conf, rule_id)

with open_hsm_session_with_yubikey(ctx) as (conf, ses):
with open_hsm_session(ctx) as ses:
obj = ses.get_object(key_id, yubihsm.defs.OBJECT.HMAC_KEY)
assert isinstance(obj, HmacKey)
if not hsm_obj_exists(obj):
Expand Down
37 changes: 15 additions & 22 deletions hsm_secrets/ssh/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,15 @@
from base64 import b64encode
from math import floor
from pathlib import Path
import struct
from textwrap import dedent
import time
from typing import Sequence
import click

from hsm_secrets.config import HSMConfig
from hsm_secrets.ssh.openssh.ssh_certificate import ExtensionLabelType
from hsm_secrets.ssh.ssh_utils import create_request, create_template
from hsm_secrets.utils import click_echo_colored_commands, encode_algorithm, encode_capabilities, hsm_generate_asymmetric_key, open_hsm_session_with_yubikey
from yubihsm.objects import YhsmObject, AsymmetricKey, Template
from cryptography.hazmat.primitives import (_serialization, serialization)
import yubihsm.defs
from hsm_secrets.utils import HsmSecretsCtx, click_echo_colored_commands, open_hsm_session, pass_common_args
from cryptography.hazmat.primitives import _serialization

import yubihsm.defs # type: ignore [import]
from yubihsm.objects import AsymmetricKey # type: ignore [import]


@click.group()
Expand All @@ -24,14 +20,12 @@ def cmd_ssh(ctx: click.Context):


@cmd_ssh.command('get-ca')
@click.pass_context
@pass_common_args
@click.option('--all', '-a', 'get_all', is_flag=True, help="Get all certificates")
@click.argument('cert_ids', nargs=-1, type=str, metavar='<id>...')
def get_ca(ctx: click.Context, get_all: bool, cert_ids: Sequence[str]):
def get_ca(ctx: HsmSecretsCtx, get_all: bool, cert_ids: Sequence[str]):
"""Get the public keys of the SSH CA keys"""
conf: HSMConfig = ctx.obj['config']

all_ids = set([str(ca.id) for ca in conf.ssh.root_ca_keys])
all_ids = set([str(ca.id) for ca in ctx.conf.ssh.root_ca_keys])
selected_ids = all_ids if get_all else set(cert_ids)

if not selected_ids:
Expand All @@ -40,13 +34,13 @@ def get_ca(ctx: click.Context, get_all: bool, cert_ids: Sequence[str]):

if len(selected_ids - all_ids) > 0:
raise ValueError(f"Unknown CA key IDs: {selected_ids - all_ids}")
selected_keys = [ca for ca in conf.ssh.root_ca_keys if str(ca.id) in selected_ids]
selected_keys = [ca for ca in ctx.conf.ssh.root_ca_keys if str(ca.id) in selected_ids]

if not selected_keys:
click.echo("No CA keys selected")
return

with open_hsm_session_with_yubikey(ctx) as (conf, ses):
with open_hsm_session(ctx) as ses:
for key in selected_keys:
obj = ses.get_object(key.id, yubihsm.defs.OBJECT.ASYMMETRIC_KEY)
assert isinstance(obj, AsymmetricKey)
Expand All @@ -63,8 +57,8 @@ def get_ca(ctx: click.Context, get_all: bool, cert_ids: Sequence[str]):
@click.option('--principals', '-p', required=False, help="Comma-separated list of principals", default='')
@click.option('--extentions', '-e', help="Comma-separated list of SSH extensions", default='permit-X11-forwarding,permit-agent-forwarding,permit-port-forwarding,permit-pty,permit-user-rc')
@click.argument('keyfile', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), default='-')
@click.pass_context
def sign_key(ctx: click.Context, out: str, ca: str|None, username: str|None, certid: str|None, validity: int, principals: str, extentions: str, keyfile: str):
@pass_common_args
def sign_key(ctx: HsmSecretsCtx, out: str, ca: str|None, username: str|None, certid: str|None, validity: int, principals: str, extentions: str, keyfile: str):
"""Make and sign an SSH user certificate
[keyfile]: file containing the public key to sign (default: stdin)
Expand All @@ -82,10 +76,9 @@ def sign_key(ctx: click.Context, out: str, ca: str|None, username: str|None, cer
from hsm_secrets.ssh.openssh.ssh_certificate import cert_for_ssh_pub_id, str_to_extension
from hsm_secrets.key_adapters import make_private_key_adapter

conf: HSMConfig = ctx.obj['config']
ca_key_id = int(ca.replace('0x',''), 16) if ca else conf.ssh.default_ca
ca_key_id = int(ca.replace('0x',''), 16) if ca else ctx.conf.ssh.default_ca

ca_def = [c for c in conf.ssh.root_ca_keys if c.id == ca_key_id]
ca_def = [c for c in ctx.conf.ssh.root_ca_keys if c.id == ca_key_id]
if not ca_def:
raise ValueError(f"CA key 0x{ca_key_id:04x} not found in config")

Expand Down Expand Up @@ -137,7 +130,7 @@ def sign_key(ctx: click.Context, out: str, ca: str|None, username: str|None, cer
path = str(p)

# Sign & write out
with open_hsm_session_with_yubikey(ctx) as (conf, ses):
with open_hsm_session(ctx) as ses:
obj = ses.get_object(ca_key_id, yubihsm.defs.OBJECT.ASYMMETRIC_KEY)
assert isinstance(obj, AsymmetricKey)

Expand Down
Loading

0 comments on commit 4f9cd01

Please sign in to comment.