Skip to content

Commit

Permalink
Move log storage to sqlite, add log subcommand
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Aug 4, 2024
1 parent 9013e6f commit 14ee148
Show file tree
Hide file tree
Showing 10 changed files with 689 additions and 334 deletions.
7 changes: 6 additions & 1 deletion hsm_secrets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from pydantic import BaseModel, ConfigDict, HttpUrl, Field, StringConstraints
from typing_extensions import Annotated
from typing import Any, Callable, Iterable, List, Literal, NewType, Optional, Sequence, Union, cast
from yubihsm.defs import CAPABILITY, ALGORITHM # type: ignore [import]
from yubihsm.defs import CAPABILITY, ALGORITHM, COMMAND # type: ignore [import]
import click
import yaml # type: ignore [import]

Expand Down Expand Up @@ -170,6 +170,11 @@ def apply_defaults(self):
if cmd not in self.command_logging:
self.command_logging[cmd] = self.default_command_logging

def lookup_hsm_cmd(cmd: YubiHsm2Command) -> COMMAND:
x = COMMAND._member_map_[cmd.upper().replace("-","_")]
assert isinstance(x, COMMAND), f"Command '{cmd}' not found in the YubiHSM library."
return x

# -- Asymmetric key models --
AsymmetricAlgorithm = Literal["rsa2048", "rsa3072", "rsa4096", "ecp256", "ecp384", "ecp521", "eck256", "ecbp256", "ecbp384", "ecbp512", "ed25519", "ecp224"]
AsymmetricCapabilityName = Literal[
Expand Down
166 changes: 3 additions & 163 deletions hsm_secrets/hsm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
from dataclasses import astuple
import datetime
from hashlib import sha256
from io import BytesIO
import json
from pathlib import Path
import struct
import sys
import tarfile
from typing import Sequence, cast
from typing import cast
import click
from click.shell_completion import CompletionItem

from hsm_secrets.config import HSMAsymmetricKey, HSMAuditSettings, HSMConfig, YubiHsm2AuditMode, YubiHsm2Command, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid
from hsm_secrets.hsm import yhsm_log
from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, click_hsm_obj_auto_complete, find_all_config_items_per_type, parse_keyid
from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony
from hsm_secrets.log import _check_and_format_audit_conf_differences
from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, open_hsm_session, open_hsm_session_with_password, pass_common_args, pretty_fmt_yubihsm_object, prompt_for_secret, pw_check_fromhex

import yubihsm.defs, yubihsm.exceptions, yubihsm.objects # type: ignore [import]
Expand Down Expand Up @@ -572,157 +566,3 @@ def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool):

cli_info("")
cli_info("Restore complete.")


# ---------------

@cmd_hsm.command('apply-audit-settings')
@pass_common_args
@click.option('--alldevs', is_flag=True, help="Set on all devices")
@click.option('--force', is_flag=True, help="Don't ask for confirmation before setting")
def apply_audit_settings(ctx: HsmSecretsCtx, alldevs: bool, force: bool):
"""Apply audit settings from config
Apply the audit/logging settings from configuration file to the YubiHSM(s).
"""
conf_settings = ctx.conf.admin.audit
conf_settings.apply_defaults()

hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial]
for serial in hsm_serials:
with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses:
cli_info(f"Checking audit settings on device {serial}...")
cur_settings, _unknown_audits = ses.get_audit_settings()
mismatches_str = _check_and_format_audit_conf_differences(cur_settings, conf_settings)
if not mismatches_str:
cli_info(" └– Already ok. Audit settings match the config file.")
continue
else:
cli_info(" └– Mismatching audit commands (current -> new):")
cli_info(mismatches_str)
if not force and not click.confirm("Do you want to set the audit settings to match the configuration?", default=False, err=True):
cli_info(f" └– Skipping device {serial}.")
else:
# Remove any 'fixed' commands from the configuration before applying.
# The YubiHSM2 command will fail otherwise.
without_fixed: dict[YubiHsm2Command, YubiHsm2AuditMode] = {k:v for k,v in conf_settings.command_logging.items() if v != 'fixed'}
to_apply = HSMAuditSettings(
forced_audit = conf_settings.forced_audit,
default_command_logging = conf_settings.default_command_logging,
command_logging = without_fixed)
ses.set_audit_settings(to_apply)
cli_info(" └– Audit settings applied.")


def _check_and_format_audit_conf_differences(cur_settings: HSMAuditSettings, conf_settings: HSMAuditSettings, raise_if_fixed_change = True) -> str|None:
"""
Check the audit settings in the YubiHSM against the configuration file.
Returns a formatted string with the differences, or None if there are none.
Raises ValueError if a fixed command is set in the device and the configuration wants to change it,
and `raise_if_fixed_change` is True.
"""
mismatches: dict[str, tuple[YubiHsm2AuditMode|None, YubiHsm2AuditMode|None]] = {}

if cur_settings.forced_audit != conf_settings.forced_audit:
mismatches['<FORCED AUDIT>'] = (cur_settings.forced_audit, conf_settings.forced_audit)

for k, new_v in conf_settings.command_logging.items():
cur_v = cur_settings.command_logging.get(cast(YubiHsm2Command, k), None)
if cur_v != new_v:
mismatches[k] = (cur_v, new_v)
if cur_v == 'fixed' and raise_if_fixed_change:
raise ValueError(f"Command '{k}' is set to 'fixed' in the device. Cannot change it without resetting the HSM.")

if not mismatches:
return None
return '\n'.join([f" - {mk.ljust(30)} {cv} -> {nv}" for mk, (cv, nv) in sorted(mismatches.items())])


# ---------------

@cmd_hsm.command('log-fetch')
@pass_common_args
@click.argument('jsonfile', type=click.Path(exists=False, allow_dash=False, dir_okay=False), metavar='<jsonfile>', required=False)
@click.option('--no-verify', is_flag=True, help="Don't verify against previous entry")
@click.option('--clear', '-c', is_flag=True, help="Clear the log entries after fetching")
def log_fetch(ctx: HsmSecretsCtx, jsonfile: str|None, no_verify: bool, clear: bool):
"""Fetch and store log entries from HSM
Retrieve new log entries from device and optionally store them in a JSON file.
If no JSON file is specified, show logs without storing (implies --no-verify).
YubiHSM2 log forms an audit chain where each entry is verified against
the previous one for integrity. The script first checks `jsonfile`,
then <stem>.1.json, if the file is empty or missing, when looking for the last entry.
Always maintain secure backups of JSON files; any missing or tampered files will
break the chain's integrity for future entries.
By default, fetched log entries remain in the HSM unless --clear is used.
This option frees up log space in the HSM after successfully saving the entries
to the JSON file.
For scripting, suppress all non-errors with the global --quiet option.
"""
if not jsonfile:
cli_warn("No JSON file specified. This implies --no-verify.")
no_verify = True

if clear and not jsonfile:
raise click.ClickException("The --clear option requires a file to store the log entries.")

# Read JSON file if it exists
prev_entries = yhsm_log.read_json_files(jsonfile, False) if jsonfile else {}

# Get the last entry for verification
prev = None
if prev_entries:
n = max(prev_entries.keys())
ld_bytes = bytes.fromhex(prev_entries[n]['data'])
prev = yubihsm.core.LogEntry.parse(ld_bytes)

# Get new entries from the HSM
with open_hsm_session(ctx, HSMAuthMethod.PASSWORD) as ses:
new_log_data = ses.get_log_entries()
dev_serial = ses.get_serial()
for e in new_log_data.entries:
if e.number in prev_entries:
cli_info(f"- Skipping already stored entry {e.number}")
assert e.data.hex() == prev_entries[e.number]['data'][:32], f"Data mismatch between already stored entry {e.number} and the one from HSM. At least GET_LOG_ENTRIES is known to cause this, so don't log it."
elif prev and not no_verify:
if not e.validate(prev):
raise click.ClickException(f"Log entry {e.number} FAILED validation against previous entry. Audit chain broken.")
prev = e

new_json_entries = {e.number: yhsm_log.decode_log_entry_to_dict(e, ctx.conf, dev_serial) for e in new_log_data.entries}

# Print the new entries to console
cli_info(json.dumps(new_json_entries, indent=2, sort_keys=True))

# Update (/create) the JSON file
if jsonfile:
yhsm_log.update_json_file(Path(jsonfile), new_json_entries, dev_serial)
if clear:
last_to_free = max(new_json_entries.keys())
ses.free_log_entries(last_to_free)
cli_info(f"HSM updated: log space up to event {last_to_free} freed for reuse.")


@cmd_hsm.command('log-verify-all')
@pass_common_args
@click.argument('jsonfile', type=click.Path(exists=True, allow_dash=False), metavar='<jsonfile>', required=True)
@click.option('--initial-num', '-i', type=int, help="Entry number to treat as first in the chain", default=1)
def log_verify_all(ctx: HsmSecretsCtx, jsonfile: str, initial_num: int):
"""Verify the entire log chain
Read all JSON files and verify the integrity of the entire log chain
starting from entry #1 (or the specified initial number).
Starts reading from the given JSON file and continues with <stem>.1.json
and so on.
This command does not connect to the HSM or fetch new entries.
"""
all_entries = yhsm_log.read_json_files(jsonfile, True)
yhsm_log.verify_log_chain(all_entries, initial_num)
130 changes: 0 additions & 130 deletions hsm_secrets/hsm/yhsm_log.py

This file was deleted.

Loading

0 comments on commit 14ee148

Please sign in to comment.