Skip to content

Commit

Permalink
Add support for multiple HSM devices and direct USB connections
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Jul 12, 2024
1 parent 1fb91c2 commit a809752
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 59 deletions.
11 changes: 7 additions & 4 deletions hsm-conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,12 @@
# generated and deleted by the tool itself.

general:
connector_url: http://localhost:12345
master_device: "27600137" # Serial number of the YubiHSM 2 that is cloning source for other devices.
all_devices: {
"27600137": "yhusb://serial=27600137", # For `yubihsm-connector`: http://localhost:12345
"27600136": "yhusb://serial=27600136",
"27600135": "yhusb://serial=27600135",
}

domains:
# These domain numbers separate different types of objects in the YubiHSM 2.
Expand Down Expand Up @@ -50,9 +55,7 @@ general:

# Subsystem for YubiHSM 2 device admin auth keys.
admin:
wrap_key_id_min: 0x0010 # The tool will store wrap keys in this range
wrap_key_id_max: 0x001F

wrap_key_id: 0x000F
default_admin_password: 'password'

default_admin_key:
Expand Down
8 changes: 5 additions & 3 deletions hsm_secrets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,11 @@ class HSMDomains(NoExtraBaseModel):
encryption: HSMDomainNum



class General(NoExtraBaseModel):
connector_url: HttpUrl
master_device: str # serial number of the master device
all_devices: dict[str, str] # serial number -> connection URL

domains: HSMDomains
x509_defaults: 'X509Info'

Expand Down Expand Up @@ -217,8 +220,7 @@ class X509Cert(NoExtraBaseModel):
# ----------------- Subsystem models -----------------

class Admin(NoExtraBaseModel):
wrap_key_id_min: KeyID
wrap_key_id_max: KeyID
wrap_key_id: KeyID
default_admin_password: str
default_admin_key: HSMAuthKey
shared_admin_key: HSMAuthKey
Expand Down
73 changes: 38 additions & 35 deletions hsm_secrets/hsm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import click
from hsm_secrets.config import HSMConfig
from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony
from hsm_secrets.utils import connect_hsm_and_auth_with_yubikey, open_hsm_session_with_default_admin, open_hsm_session_with_shared_admin, open_hsm_session_with_yubikey, print_yubihsm_object
from hsm_secrets.utils import open_hsm_session_with_default_admin, open_hsm_session_with_shared_admin, open_hsm_session_with_yubikey, print_yubihsm_object
import yubihsm.defs, yubihsm.exceptions, yubihsm.objects

from click import style
Expand Down Expand Up @@ -46,7 +46,7 @@ def list_objects(ctx: click.Context, use_default_admin: bool):

def do_it(conf, ses):
objects = ses.list_objects()
click.echo("YubiHSM Objects:")
click.echo(f"YubiHSM Objects:")
for o in objects:
print_yubihsm_object(o)

Expand All @@ -63,7 +63,7 @@ def do_it(conf, ses):
@click.pass_context
@click.option('--use-backup-secret', is_flag=True, help="Use backup secret instead of shared secret")
def add_insecure_admin_key(ctx: click.Context, use_backup_secret: bool):
"""Re-add the insecure default admin key, using shared or backup secret
"""Re-add insecure default admin key for management operations
Using either a shared secret or a backup secret, (re-)create the default admin key on the YubiHSM.
This is a temporary key that should be removed after the management operations are complete.
Expand Down Expand Up @@ -128,6 +128,36 @@ def do_it(conf, ses):
sys.exit(1)


@cmd_hsm.command('remove-insecure-admin-key')
@click.pass_context
def remove_insecure_admin_key(ctx: click.Context):
"""Remove the insecure default admin key"""
with open_hsm_session_with_default_admin(ctx) as (conf, ses):
default_key = ses.get_object(conf.admin.default_admin_key.id, yubihsm.defs.OBJECT.AUTHENTICATION_KEY)
assert isinstance(default_key, yubihsm.objects.AuthenticationKey)
try:
_ = default_key.get_info()
default_key.delete()
click.echo("Ok. Default admin key removed.")
except yubihsm.exceptions.YubiHsmDeviceError as e:
if e.code == yubihsm.defs.ERROR.OBJECT_NOT_FOUND:
click.echo("Default admin key not found. Nothing to remove.")
else:
raise e

# Make sure it's really gone
try:
_ = default_key.get_info()
click.echo(click.style("ERROR!!! Insecure admin key still exists. Don't leave the airgapped session before removing it.", fg='red'))
except yubihsm.exceptions.YubiHsmDeviceError as e:
if e.code == yubihsm.defs.ERROR.OBJECT_NOT_FOUND:
pass # Ok, it's gone
else:
click.echo("ERROR!! Unexpected error while checking that the key is removed. PLEASE VERIFY MANUALLY THAT IT'S GONE!")
click.prompt("Press ENTER to continue...", type=str)
raise e


@cmd_hsm.command('add-shared-admin-key')
@click.option('--num-shares', type=int, required=True, help="Number of shares to generate")
@click.option('--threshold', type=int, required=True, help="Number of shares required to reconstruct the key")
Expand Down Expand Up @@ -179,42 +209,15 @@ def apply_password(new_password: str):
sys.exit(1)


@cmd_hsm.command('remove-insecure-admin-key')
@click.pass_context
def remove_insecure_admin_key(ctx: click.Context):
"""Remove the insecure default admin key"""
with open_hsm_session_with_default_admin(ctx) as (conf, ses):
default_key = ses.get_object(conf.admin.default_admin_key.id, yubihsm.defs.OBJECT.AUTHENTICATION_KEY)
assert isinstance(default_key, yubihsm.objects.AuthenticationKey)
try:
_ = default_key.get_info()
default_key.delete()
click.echo("Ok. Default admin key removed.")
except yubihsm.exceptions.YubiHsmDeviceError as e:
if e.code == yubihsm.defs.ERROR.OBJECT_NOT_FOUND:
click.echo("Default admin key not found. Nothing to remove.")
else:
raise e

# Make sure it's really gone
try:
_ = default_key.get_info()
click.echo(click.style("ERROR!!! Insecure admin key still exists. Don't leave the airgapped session before removing it.", fg='red'))
except yubihsm.exceptions.YubiHsmDeviceError as e:
if e.code == yubihsm.defs.ERROR.OBJECT_NOT_FOUND:
pass # Ok, it's gone
else:
click.echo("ERROR!! Unexpected error while checking that the key is removed. PLEASE VERIFY MANUALLY THAT IT'S GONE!")
click.prompt("Press ENTER to continue...", type=str)
raise e


@cmd_hsm.command('add-wrap-key')
@cmd_hsm.command('set-wrap-keys')
@click.option('--key-id', type=int, required=True, help="ID of the wrap key")
@click.pass_context
def add_wrap_key(ctx: click.Context, key_id: int):
"""Add a new wrap key to the YubiHSM"""
raise NotImplementedError("This command is not yet implemented.")
"""Share a new wrap key among YubiHSMs
Generate a new wrap key and set it to the YubiHSMs.
"""


@cmd_hsm.command('sync-with-config')
Expand Down
13 changes: 9 additions & 4 deletions hsm_secrets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,20 @@
from hsm_secrets.ssh import cmd_ssh
from hsm_secrets.tls import cmd_tls
from hsm_secrets.passwd import cmd_pass
from hsm_secrets.config import load_hsm_config
from hsm_secrets.config import HSMConfig, load_hsm_config
from hsm_secrets.utils import list_yubikey_hsm_creds


# --- Main CLI Entrypoint ---

@click.group(context_settings={'show_default': True})
@click.group(context_settings={'show_default': True, 'help_option_names': ['-h', '--help']})
@click.option('-d', '--debug', is_flag=True, help="Enable debug mode")
@click.option('-c', '--config', required=True, type=click.Path(), default='hsm-conf.yml', help="Path to configuration file")
@click.option("-y", "--yklabel", required=False, help="Yubikey HSM auth key label")
@click.option("-s", "--devserial", required=False, help="YubiHSM serial number to connect to (default: from config)")
@click.version_option()
@click.pass_context
def cli(ctx: click.Context, debug: bool, config: str, yklabel: str|None):
def cli(ctx: click.Context, debug: bool, config: str, yklabel: str|None, devserial: str|None):
"""HSM secret management tool with HSM integration."""

yk_label = yklabel
Expand All @@ -29,10 +30,14 @@ def cli(ctx: click.Context, debug: bool, config: str, yklabel: str|None):
raise click.ClickException("No Yubikey HSM credentials found.")
yk_label = creds[0].label

conf = load_hsm_config(config)
assert conf.general.master_device, "No master YubiHSM serial specified in config file."

ctx.obj = {
'debug': debug,
'yk_label': yk_label,
'config': load_hsm_config(config)
'config': conf,
'devserial': devserial or conf.general.master_device
}

echo("Yubikey hsmauth label: " + click.style(yk_label, fg='cyan'))
Expand Down
6 changes: 3 additions & 3 deletions hsm_secrets/ssh/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import click

from hsm_secrets.ssh.ssh_utils import create_request, create_template
from hsm_secrets.utils import connect_hsm_and_auth_with_yubikey, create_asymmetric_keys_on_hsm, encode_algorithm, encode_capabilities, open_hsm_session_with_yubikey
from hsm_secrets.utils import create_asymmetric_keys_on_hsm, encode_algorithm, encode_capabilities, open_hsm_session_with_yubikey
from yubihsm.objects import YhsmObject, AsymmetricKey, Template
from cryptography.hazmat.primitives import (_serialization, serialization)
import yubihsm.defs
Expand All @@ -26,7 +26,7 @@ def cmd_ssh(ctx: click.Context):
def new_root_ca(ctx: click.Context, validity: int):
"""Create a new SSH Root CA"""

with open_hsm_session_with_yubikey(ctx, "full-admin", "ssh-mgt") as (conf, ses):
with open_hsm_session_with_yubikey(ctx) as (conf, ses):
root_keys = create_asymmetric_keys_on_hsm(ses, conf, conf.ssh.root_ca_keys)
pubkeys = [
key.get_public_key().public_bytes(encoding=_serialization.Encoding.OpenSSH, format=_serialization.PublicFormat.OpenSSH)
Expand All @@ -43,7 +43,7 @@ def new_root_ca(ctx: click.Context, validity: int):
def get_root_ca_pubkeys(ctx: click.Context, outdir: str):
"""Write SSH Root CA .pub files"""
outdir = outdir.rstrip('/')
with open_hsm_session_with_yubikey(ctx, "full-admin", "ssh-mgt") as (conf, ses):
with open_hsm_session_with_yubikey(ctx) as (conf, ses):
for key in conf.ssh.root_ca_keys:
obj = ses.get_object(key.id, yubihsm.defs.OBJECT.ASYMMETRIC_KEY)
assert isinstance(obj, AsymmetricKey)
Expand Down
34 changes: 25 additions & 9 deletions hsm_secrets/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def list_yubikey_hsm_creds() -> Sequence[hsmauth.Credential]:
return list(hsm.list_credentials())


def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_label: str, yubikey_password: Optional[str] = None) -> AuthSession:
def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_label: str, device_serial: str|None, yubikey_password: Optional[str] = None) -> AuthSession:
"""
Connects to a YubHSM and authenticates a session using the first YubiKey found.
YubiHSM auth key ID is read from the config file by label (arg yubikey_slot_label).
Expand All @@ -42,15 +42,21 @@ def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_labe
username (str): The username for the key labels.
config (Config): The configuration object containing the connector URL and user.
yubikey_slot_label (str): The label of the YubiKey slot to use for authenticating with the HSM.
device_serial (str): Serial number of the YubiHSM device to connect to.
yubikey_password (Optional[str]): The password for the YubiKey HSM slot. If None, the user is asked for the password.
Returns:
HsmAuthSession: The authenticated HSM session.
"""
try:
assert device_serial, "HSM device serial not provided nor inferred."
connector_url = config.general.all_devices.get(device_serial)
if not connector_url:
raise ValueError(f"Device serial '{device_serial}' not found in config file.")

yubikey = scripting.single() # Connect to the first YubiKey found
hsmauth = HsmAuthSession(yubikey.smart_card())
hsm = YubiHsm.connect(str(config.general.connector_url))
hsm = YubiHsm.connect(connector_url)

auth_key_id = config.find_auth_key(yubikey_slot_label).id
click.echo(f"Using YubiHSM auth key ID '{hex(auth_key_id)}' authed with local YubiKey slot '{yubikey_slot_label}'")
Expand Down Expand Up @@ -83,25 +89,27 @@ def connect_hsm_and_auth_with_yubikey(config: hscfg.HSMConfig, yubikey_slot_labe


@contextmanager
def open_hsm_session_with_yubikey(ctx: click.Context):
def open_hsm_session_with_yubikey(ctx: click.Context, device_serial: str|None = None):
"""
Open a session to the HSM using the first YubiKey found, and authenticate with the YubiKey HSM auth label.
Args:
ctx (click.Context): The Click context object
"""
conf: hscfg.HSMConfig = ctx.obj['config']
device_serial = device_serial or ctx.obj['devserial']
passwd = os.environ.get('YUBIKEY_PASSWORD', None)
if passwd:
echo("Using YubiKey password from environment variable.")
session = connect_hsm_and_auth_with_yubikey(conf, ctx.obj['yk_label'], passwd)
session = connect_hsm_and_auth_with_yubikey(conf, ctx.obj['yk_label'], device_serial, passwd)
try:
yield conf, session
finally:
session.close()


@contextmanager
def open_hsm_session_with_default_admin(ctx: click.Context):
def open_hsm_session_with_default_admin(ctx: click.Context, device_serial: str|None = None):
"""
Open a session to the HSM using the first YubiKey found, and authenticate with the YubiKey HSM auth label.
Expand All @@ -111,7 +119,11 @@ def open_hsm_session_with_default_admin(ctx: click.Context):
echo("Using insecure default admin key to authenticate.")

conf: hscfg.HSMConfig = ctx.obj['config']
hsm = YubiHsm.connect(str(conf.general.connector_url))
connector_url = conf.general.all_devices.get(device_serial or ctx.obj['devserial'])
if not connector_url:
raise ValueError(f"Device serial '{device_serial}' not found in config file.")

hsm = YubiHsm.connect(connector_url)
session = None

try:
Expand All @@ -128,16 +140,20 @@ def open_hsm_session_with_default_admin(ctx: click.Context):
session.close()



@contextmanager
def open_hsm_session_with_shared_admin(ctx: click.Context, password: str):
def open_hsm_session_with_shared_admin(ctx: click.Context, password: str, device_serial: str|None = None):
"""
Open a session to the HSM using a share admin password (either reconstructed from shares or from backup).
Args:
ctx (click.Context): The Click context object
"""
conf: hscfg.HSMConfig = ctx.obj['config']
hsm = YubiHsm.connect(str(conf.general.connector_url))

connector_url = conf.general.all_devices.get(device_serial or ctx.obj['devserial'])
if not connector_url:
raise ValueError(f"Device serial '{device_serial}' not found in config file.")

hsm = YubiHsm.connect(connector_url)
key = conf.admin.shared_admin_key.id
click.echo(f"Using shared admin key ID 0x{key:04x}")
session = hsm.create_session_derived(key, password)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
yubihsm[http]
yubihsm[http,usb]
yubikey-manager

click
Expand Down

0 comments on commit a809752

Please sign in to comment.