Skip to content

Commit

Permalink
Implement compare-to-config
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Jul 14, 2024
1 parent 579d0ae commit fd2b873
Showing 1 changed file with 87 additions and 8 deletions.
95 changes: 87 additions & 8 deletions hsm_secrets/hsm/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ def cmd_hsm(ctx):
2. Set a common wrap key to all devices.
3. Host a Secret Sharing Ceremony to add a super admin key.
4. Add user keys (Yubikey auth) to master device.
5. Sync configuration to master device (generate missing secrets).
6. Clone master device to other ones.
7. Remove default admin key from all devices.
5. Generate and store necessary keys on the master device.
6. Check that all keys are present on master (`compare-to-config`). Iterate if needed.
7. Clone master device to other ones.
8. Double check that all keys are present on all devices (`compare-to-config --alldevs`).
9. Remove default admin key from all devices.
Management workflow:
Expand Down Expand Up @@ -403,12 +405,89 @@ def add_user_auth(ctx: click.Context, label: str, alldevs: bool):

# ---------------

@cmd_hsm.command('sync-with-config')
@click.option('--config-file', type=click.Path(exists=True), required=True, help="Path to the configuration file")
@cmd_hsm.command('compare-to-config')
@click.option('--alldevs', is_flag=True, help="Compare all devices")
@click.option('--use-user-auth', is_flag=True, help="Use user auth key instead of default admin key")
@click.pass_context
def sync_with_config(ctx: click.Context, config_file: str):
"""Synchronize the YubiHSM with the provided configuration file"""
raise NotImplementedError("This command is not yet implemented.")
def compare_to_config(ctx: click.Context, alldevs: bool, use_user_auth: bool):
"""Check that the YubiHSM configuration matches the configuration file (i.e. all keys are present)
Lists all objects by type (auth, wrap, etc.) in the configuration file, and then checks
that they exist in the YubiHSM(s). Shows which objects are missing and which are found.
By default, only checks the master device, using the default admin key.
Override with the options as needed.
"""

conf = ctx.obj['config']
assert isinstance(conf, HSMConfig)

# Util function to find all instances of a certain type in a nested structure
from typing import Type, TypeVar, Generator, Any
T = TypeVar('T')
def find_instances(obj: Any, target_type: Type[T]) -> Generator[T, None, None]:
if isinstance(obj, target_type):
yield obj
elif isinstance(obj, (list, tuple, set)):
for item in obj:
yield from find_instances(item, target_type)
elif isinstance(obj, dict):
for value in obj.values():
yield from find_instances(value, target_type)
elif hasattr(obj, '__dict__'):
for value in vars(obj).values():
yield from find_instances(value, target_type)

from hsm_secrets.config import HSMAsymmetricKey, HSMSymmetricKey, HSMWrapKey, OpaqueObject, HSMHmacKey, HSMAuthKey
config_to_hsm_type = {
HSMAuthKey: yubihsm.objects.AuthenticationKey,
HSMWrapKey: yubihsm.objects.WrapKey,
HSMHmacKey: yubihsm.objects.HmacKey,
HSMSymmetricKey: yubihsm.objects.SymmetricKey,
HSMAsymmetricKey: yubihsm.objects.AsymmetricKey,
OpaqueObject: yubihsm.objects.Opaque,
}
config_items_per_type: dict = {t: list(find_instances(conf, t)) for t in config_to_hsm_type.keys()} # type: ignore

click.echo("")
click.echo("Reading objects from the YubiHSM(s)...")
dev_serials = conf.general.all_devices.keys() if alldevs else [ctx.obj['devserial']]
for serial in dev_serials:

def do_it(conf, ses, serial):
device_objs: list[yubihsm.objects.YhsmObject] = list(ses.list_objects())
click.echo("")
click.echo(f"--- YubiHSM device {serial} ---")
objects_accounted_for = {}
for t, items in config_items_per_type.items():
click.echo(f"{t.__name__}")
for it in items:
found = False
for obj in device_objs:
if obj.id == it.id and isinstance(obj, config_to_hsm_type[t]):
found = True
objects_accounted_for[obj.id] = True
break

checkbox = "✅" if found else "❌"
click.echo(f" {checkbox} '{it.label}' (0x{it.id:04x})")

if len(objects_accounted_for) < len(device_objs):
click.echo("EXTRA OBJECTS (on the device but not in the config)")
for obj in device_objs:
if obj.id not in objects_accounted_for:
info = obj.get_info()
click.echo(f" ❓'{info.label}' (0x{obj.id:04x}) <{obj.__class__.__name__}>")
click.echo("")

if use_user_auth:
with open_hsm_session_with_yubikey(ctx, device_serial=serial) as (conf, ses):
do_it(conf, ses, serial)
else:
with open_hsm_session_with_default_admin(ctx, device_serial=serial) as (conf, ses):
do_it(conf, ses, serial)



# ---------------

Expand Down

0 comments on commit fd2b873

Please sign in to comment.