From 813ef5f8431f556dc769d5c138fea1a11fb1aa07 Mon Sep 17 00:00:00 2001 From: Jarno Elonen Date: Sat, 3 Aug 2024 18:55:25 +0300 Subject: [PATCH] Move log storage to sqlite, add log subcommand --- hsm_secrets/config.py | 7 +- hsm_secrets/hsm/__init__.py | 166 +------------------------- hsm_secrets/hsm/yhsm_log.py | 130 --------------------- hsm_secrets/log/__init__.py | 226 ++++++++++++++++++++++++++++++++++++ hsm_secrets/log/log_db.py | 149 ++++++++++++++++++++++++ hsm_secrets/log/yhsm_log.py | 96 +++++++++++++++ hsm_secrets/main.py | 2 + hsm_secrets/utils.py | 10 +- hsm_secrets/yubihsm.py | 106 ++++++++++++----- run-tests.sh | 119 ++++++++++++++++++- 10 files changed, 683 insertions(+), 328 deletions(-) delete mode 100644 hsm_secrets/hsm/yhsm_log.py create mode 100644 hsm_secrets/log/__init__.py create mode 100644 hsm_secrets/log/log_db.py create mode 100644 hsm_secrets/log/yhsm_log.py diff --git a/hsm_secrets/config.py b/hsm_secrets/config.py index 370c89c..2a7273d 100644 --- a/hsm_secrets/config.py +++ b/hsm_secrets/config.py @@ -9,7 +9,7 @@ from pydantic import BaseModel, ConfigDict, HttpUrl, Field, StringConstraints from typing_extensions import Annotated from typing import Any, Callable, Iterable, List, Literal, NewType, Optional, Sequence, Union, cast -from yubihsm.defs import CAPABILITY, ALGORITHM # type: ignore [import] +from yubihsm.defs import CAPABILITY, ALGORITHM, COMMAND # type: ignore [import] import click import yaml # type: ignore [import] @@ -170,6 +170,11 @@ def apply_defaults(self): if cmd not in self.command_logging: self.command_logging[cmd] = self.default_command_logging +def lookup_hsm_cmd(cmd: YubiHsm2Command) -> COMMAND: + x = COMMAND._member_map_[cmd.upper().replace("-","_")] + assert isinstance(x, COMMAND), f"Command '{cmd}' not found in the YubiHSM library." + return x + # -- Asymmetric key models -- AsymmetricAlgorithm = Literal["rsa2048", "rsa3072", "rsa4096", "ecp256", "ecp384", "ecp521", "eck256", "ecbp256", "ecbp384", "ecbp512", "ed25519", "ecp224"] AsymmetricCapabilityName = Literal[ diff --git a/hsm_secrets/hsm/__init__.py b/hsm_secrets/hsm/__init__.py index 2ae9a69..49bbf86 100644 --- a/hsm_secrets/hsm/__init__.py +++ b/hsm_secrets/hsm/__init__.py @@ -1,19 +1,13 @@ -from dataclasses import astuple import datetime -from hashlib import sha256 from io import BytesIO -import json from pathlib import Path -import struct -import sys import tarfile -from typing import Sequence, cast +from typing import cast import click -from click.shell_completion import CompletionItem -from hsm_secrets.config import HSMAsymmetricKey, HSMAuditSettings, HSMConfig, YubiHsm2AuditMode, YubiHsm2Command, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid -from hsm_secrets.hsm import yhsm_log +from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, click_hsm_obj_auto_complete, find_all_config_items_per_type, parse_keyid from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony +from hsm_secrets.log import _check_and_format_audit_conf_differences from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, open_hsm_session, open_hsm_session_with_password, pass_common_args, pretty_fmt_yubihsm_object, prompt_for_secret, pw_check_fromhex import yubihsm.defs, yubihsm.exceptions, yubihsm.objects # type: ignore [import] @@ -572,157 +566,3 @@ def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool): cli_info("") cli_info("Restore complete.") - - -# --------------- - -@cmd_hsm.command('apply-audit-settings') -@pass_common_args -@click.option('--alldevs', is_flag=True, help="Set on all devices") -@click.option('--force', is_flag=True, help="Don't ask for confirmation before setting") -def apply_audit_settings(ctx: HsmSecretsCtx, alldevs: bool, force: bool): - """Apply audit settings from config - - Apply the audit/logging settings from configuration file to the YubiHSM(s). - """ - conf_settings = ctx.conf.admin.audit - conf_settings.apply_defaults() - - hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] - for serial in hsm_serials: - with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: - cli_info(f"Checking audit settings on device {serial}...") - cur_settings, _unknown_audits = ses.get_audit_settings() - mismatches_str = _check_and_format_audit_conf_differences(cur_settings, conf_settings) - if not mismatches_str: - cli_info(" └– Already ok. Audit settings match the config file.") - continue - else: - cli_info(" └– Mismatching audit commands (current -> new):") - cli_info(mismatches_str) - if not force and not click.confirm("Do you want to set the audit settings to match the configuration?", default=False, err=True): - cli_info(f" └– Skipping device {serial}.") - else: - # Remove any 'fixed' commands from the configuration before applying. - # The YubiHSM2 command will fail otherwise. - without_fixed: dict[YubiHsm2Command, YubiHsm2AuditMode] = {k:v for k,v in conf_settings.command_logging.items() if v != 'fixed'} - to_apply = HSMAuditSettings( - forced_audit = conf_settings.forced_audit, - default_command_logging = conf_settings.default_command_logging, - command_logging = without_fixed) - ses.set_audit_settings(to_apply) - cli_info(" └– Audit settings applied.") - - -def _check_and_format_audit_conf_differences(cur_settings: HSMAuditSettings, conf_settings: HSMAuditSettings, raise_if_fixed_change = True) -> str|None: - """ - Check the audit settings in the YubiHSM against the configuration file. - Returns a formatted string with the differences, or None if there are none. - - Raises ValueError if a fixed command is set in the device and the configuration wants to change it, - and `raise_if_fixed_change` is True. - """ - mismatches: dict[str, tuple[YubiHsm2AuditMode|None, YubiHsm2AuditMode|None]] = {} - - if cur_settings.forced_audit != conf_settings.forced_audit: - mismatches[''] = (cur_settings.forced_audit, conf_settings.forced_audit) - - for k, new_v in conf_settings.command_logging.items(): - cur_v = cur_settings.command_logging.get(cast(YubiHsm2Command, k), None) - if cur_v != new_v: - mismatches[k] = (cur_v, new_v) - if cur_v == 'fixed' and raise_if_fixed_change: - raise ValueError(f"Command '{k}' is set to 'fixed' in the device. Cannot change it without resetting the HSM.") - - if not mismatches: - return None - return '\n'.join([f" - {mk.ljust(30)} {cv} -> {nv}" for mk, (cv, nv) in sorted(mismatches.items())]) - - -# --------------- - -@cmd_hsm.command('log-fetch') -@pass_common_args -@click.argument('jsonfile', type=click.Path(exists=False, allow_dash=False, dir_okay=False), metavar='', required=False) -@click.option('--no-verify', is_flag=True, help="Don't verify against previous entry") -@click.option('--clear', '-c', is_flag=True, help="Clear the log entries after fetching") -def log_fetch(ctx: HsmSecretsCtx, jsonfile: str|None, no_verify: bool, clear: bool): - """Fetch and store log entries from HSM - - Retrieve new log entries from device and optionally store them in a JSON file. - If no JSON file is specified, show logs without storing (implies --no-verify). - - YubiHSM2 log forms an audit chain where each entry is verified against - the previous one for integrity. The script first checks `jsonfile`, - then .1.json, if the file is empty or missing, when looking for the last entry. - - Always maintain secure backups of JSON files; any missing or tampered files will - break the chain's integrity for future entries. - - By default, fetched log entries remain in the HSM unless --clear is used. - This option frees up log space in the HSM after successfully saving the entries - to the JSON file. - - For scripting, suppress all non-errors with the global --quiet option. - """ - if not jsonfile: - cli_warn("No JSON file specified. This implies --no-verify.") - no_verify = True - - if clear and not jsonfile: - raise click.ClickException("The --clear option requires a file to store the log entries.") - - # Read JSON file if it exists - prev_entries = yhsm_log.read_json_files(jsonfile, False) if jsonfile else {} - - # Get the last entry for verification - prev = None - if prev_entries: - n = max(prev_entries.keys()) - ld_bytes = bytes.fromhex(prev_entries[n]['data']) - prev = yubihsm.core.LogEntry.parse(ld_bytes) - - # Get new entries from the HSM - with open_hsm_session(ctx, HSMAuthMethod.PASSWORD) as ses: - new_log_data = ses.get_log_entries() - dev_serial = ses.get_serial() - for e in new_log_data.entries: - if e.number in prev_entries: - cli_info(f"- Skipping already stored entry {e.number}") - assert e.data.hex() == prev_entries[e.number]['data'][:32], f"Data mismatch between already stored entry {e.number} and the one from HSM. At least GET_LOG_ENTRIES is known to cause this, so don't log it." - elif prev and not no_verify: - if not e.validate(prev): - raise click.ClickException(f"Log entry {e.number} FAILED validation against previous entry. Audit chain broken.") - prev = e - - new_json_entries = {e.number: yhsm_log.decode_log_entry_to_dict(e, ctx.conf, dev_serial) for e in new_log_data.entries} - - # Print the new entries to console - cli_info(json.dumps(new_json_entries, indent=2, sort_keys=True)) - - # Update (/create) the JSON file - if jsonfile: - yhsm_log.update_json_file(Path(jsonfile), new_json_entries, dev_serial) - if clear: - last_to_free = max(new_json_entries.keys()) - ses.free_log_entries(last_to_free) - cli_info(f"HSM updated: log space up to event {last_to_free} freed for reuse.") - - -@cmd_hsm.command('log-verify-all') -@pass_common_args -@click.argument('jsonfile', type=click.Path(exists=True, allow_dash=False), metavar='', required=True) -@click.option('--initial-num', '-i', type=int, help="Entry number to treat as first in the chain", default=1) -def log_verify_all(ctx: HsmSecretsCtx, jsonfile: str, initial_num: int): - """Verify the entire log chain - - Read all JSON files and verify the integrity of the entire log chain - starting from entry #1 (or the specified initial number). - - Starts reading from the given JSON file and continues with .1.json - and so on. - - This command does not connect to the HSM or fetch new entries. - """ - all_entries = yhsm_log.read_json_files(jsonfile, True) - yhsm_log.verify_log_chain(all_entries, initial_num) diff --git a/hsm_secrets/hsm/yhsm_log.py b/hsm_secrets/hsm/yhsm_log.py deleted file mode 100644 index 851bd16..0000000 --- a/hsm_secrets/hsm/yhsm_log.py +++ /dev/null @@ -1,130 +0,0 @@ -from dataclasses import astuple -import json -import struct -import datetime -from pathlib import Path -import click -import yubihsm # type: ignore [import] -from typing import Dict, List, Tuple -from filelock import FileLock, Timeout - -from hsm_secrets.config import HSMConfig -from hsm_secrets.utils import cli_info -from hsm_secrets.yubihsm import HSMSession - - -def read_json_files(jsonfile: str, need_all: bool) -> dict[int, dict]: - """Read log entries from JSON files. - Given `jsonfile` is the latest file, and the function checks for - older files with the same name but with a suffix '.1.json', '.2.json', etc. - - If `need_all` is not set, the function stops reading files as soon as it finds - a file at least one entry. - """ - prev_entries = {} - file_i = 0 - while True: - p = Path(jsonfile) if file_i == 0 else Path(jsonfile).with_suffix(f".{file_i}.json") - if file_i > 0 and not p.exists(): - break - if p.exists(): - cli_info(f"Reading old entries from '{p}'...") - with p.open('r') as fh: - data = json.load(fh) - for k, v in data.items(): - assert isinstance(int(k), int), "Entry number must be an integer." - assert isinstance(v, dict), "Entry data must be a dictionary." - if k in prev_entries: - raise ValueError(f"Duplicate entry number '{k}' in file {p}") - assert isinstance(v.get('data'), str), "Missing or invalid 'data' field in entry." - assert isinstance(v.get('fetch_time'), str), "Missing or invalid 'fetch_time' field in entry." - assert isinstance(v.get('hsm_serial'), int), "Missing or invalid 'hsm_serial' field in entry." - prev_entries[int(k)] = v - if len(prev_entries) > 0 and not need_all: - break - file_i += 1 - return prev_entries - - - -def decode_log_entry_to_dict(entry: yubihsm.core.LogEntry, conf: HSMConfig, hsm_serial: int) -> dict: - """ - Convert a log entry to a JSON-serializable dictionary. - """ - def find_info(id: int) -> str: - try: - if id in [0, 0xffff]: - return f'0x{id:04x}: -' - kd = conf.find_def(id) - return f"0x{id:04x}: '{kd.label}' ({kd.__class__.__name__})" - except KeyError: - return f"0x{id:04x} (UNKNOWN)" - - return { - "data": (struct.pack(entry.FORMAT, *astuple(entry))).hex(), - "hsm_serial": hsm_serial, - "fetch_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), - "_info": { - "cmd": f'{entry.command.name} ({entry.command.value})', - "len": entry.length, - "ses_key": find_info(entry.session_key), - "tgt_key": find_info(entry.target_key), - "2nd_key": find_info(entry.second_key), - "result": entry.result, - "tick": entry.tick, - } - } - - -def verify_log_chain(prev_entries: dict, initial_num: int) -> yubihsm.core.LogEntry|None: - """ - Verify the log chain from the initial number to the last entry in the JSON file. - :return: The last log entry in the chain. - :raises: click.ClickException if the chain is broken. - """ - if initial_num not in prev_entries: - raise click.ClickException(f"Initial entry {initial_num} not found in the JSON file. Audit chain broken.") - n = initial_num - prev = None - while n in prev_entries: - ld_bytes = bytes.fromhex(prev_entries[n]['data']) - ld = yubihsm.core.LogEntry.parse(ld_bytes) - if prev: - if prev_entries[n]['hsm_serial'] != prev_entries[n-1]['hsm_serial']: - raise click.ClickException(f"Log entry {n} has different HSM serial than previous entry. Audit chain broken.") - if not ld.validate(prev): - raise click.ClickException(f"Log entry {n} FAILED validation against previous entry. Audit chain broken.") - prev = ld - n += 1 - cli_info(f"Ok, previously stored entries from {initial_num} to {n-1} verified successfully.") - return prev - - -def update_json_file(jsonfile: Path, new_json_entries: dict[int, dict], dev_serial: int): - """ - Merge new log entries into the JSON file. - """ - lockfile = f"{jsonfile}.lock" - file_entries = {} - try: - with FileLock(lockfile, timeout=30): - if Path(jsonfile).exists(): - with Path(jsonfile).open('r') as fh: - file_entries = json.load(fh) - - if any(e['hsm_serial'] != dev_serial for e in file_entries.values()): - raise click.ClickException("The JSON file contains entries from a different HSM serial. Cannot mix entries.") - - for jn, jdict in new_json_entries.items(): - assert jn not in file_entries, f"Duplicate entry number {jn} in JSON file. Should have been caught earlier." - file_entries[str(jn)] = jdict - - with Path(jsonfile).open('w') as fh: - json.dump(file_entries, fh, indent=2, sort_keys=True) - cli_info(f"New entries added to '{jsonfile}'") - - Path(lockfile).unlink() - - except Timeout as e: - cli_info(f"Failed to acquire file lock '{lockfile}': {e}") - raise click.ClickException("Failed to acquire lock on the JSON file. Please try again later.") diff --git a/hsm_secrets/log/__init__.py b/hsm_secrets/log/__init__.py new file mode 100644 index 0000000..fb27a76 --- /dev/null +++ b/hsm_secrets/log/__init__.py @@ -0,0 +1,226 @@ +import datetime +import sqlite3 +from typing import cast +import click + +from hsm_secrets.config import HSMAuditSettings, HSMConfig, YubiHsm2AuditMode, YubiHsm2Command +from hsm_secrets.log import log_db, yhsm_log +from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, open_hsm_session, pass_common_args + + +@click.group() +@click.pass_context +def cmd_log(ctx: click.Context): + """YubiHSM2 log / audit commands""" + ctx.ensure_object(dict) + + +@cmd_log.command('apply-settings') +@pass_common_args +@click.option('--alldevs', is_flag=True, help="Set on all devices") +@click.option('--force', is_flag=True, help="Don't ask for confirmation before setting") +def apply_audit_settings(ctx: HsmSecretsCtx, alldevs: bool, force: bool): + """Apply log settings from config to HSM + + Apply the audit/logging settings from configuration file to the YubiHSM(s). + """ + conf_settings = ctx.conf.admin.audit + conf_settings.apply_defaults() + + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: + cli_info(f"Checking audit settings on device {serial}...") + cur_settings, _unknown_audits = ses.get_audit_settings() + mismatches_str = _check_and_format_audit_conf_differences(cur_settings, conf_settings) + if not mismatches_str: + cli_info(" └– Already ok. Audit settings match the config file.") + continue + else: + cli_info(" └– Mismatching audit commands (current -> new):") + cli_info(mismatches_str) + if not force and not click.confirm("Do you want to set the audit settings to match the configuration?", default=False, err=True): + cli_info(f" └– Skipping device {serial}.") + else: + # Remove any 'fixed' commands from the current settings before applying. + # The YubiHSM2 command will fail otherwise. + without_fixed: dict[YubiHsm2Command, YubiHsm2AuditMode] = {k:v for k,v in conf_settings.command_logging.items() if cur_settings.command_logging.get(k) != 'fixed'} + to_apply = HSMAuditSettings( + forced_audit = conf_settings.forced_audit, + default_command_logging = conf_settings.default_command_logging, + command_logging = without_fixed) + ses.set_audit_settings(to_apply) + cli_info(" └– Audit settings applied.") + + +def _check_and_format_audit_conf_differences(cur_settings: HSMAuditSettings, conf_settings: HSMAuditSettings, raise_if_fixed_change = True) -> str|None: + """ + Check the audit settings in the YubiHSM against the configuration file. + Returns a formatted string with the differences, or None if there are none. + + Raises ValueError if a fixed command is set in the device and the configuration wants to change it, + and `raise_if_fixed_change` is True. + """ + mismatches: dict[str, tuple[YubiHsm2AuditMode|None, YubiHsm2AuditMode|None]] = {} + + if cur_settings.forced_audit != conf_settings.forced_audit: + mismatches[''] = (cur_settings.forced_audit, conf_settings.forced_audit) + + for k, new_v in conf_settings.command_logging.items(): + cur_v = cur_settings.command_logging.get(cast(YubiHsm2Command, k), None) + if cur_v != new_v: + mismatches[k] = (cur_v, new_v) + if cur_v == 'fixed' and raise_if_fixed_change: + raise ValueError(f"Command '{k}' is set to 'fixed' in the device. Cannot change it without resetting the HSM.") + + if not mismatches: + return None + return '\n'.join([f" - {mk.ljust(30)} {cv} -> {nv}" for mk, (cv, nv) in sorted(mismatches.items())]) + + +# --------------- + + +@cmd_log.command('fetch') +@pass_common_args +@click.argument('db_path', type=click.Path(exists=False), required=False) +@click.option('--clear', '-c', is_flag=True, help="Clear the log entries after fetching") +@click.option('--no-verify', '-n', is_flag=True, help="Ignore log integrity verification failures") +def log_fetch(ctx: HsmSecretsCtx, db_path: str, clear: bool, no_verify: bool): + """ + Fetch log entries from HSM and store in SQLite DB + + This command retrieves new log entries from the YubiHSM device and stores them in the specified SQLite database. + If the database doesn't exist, it will be created. + + Each new entry is verified against previous one to ensure log integrity. Failure aborts the process. + + If --clear is specified, log entries will be cleared from the HSM after they are successfully verified and stored. + """ + config: HSMConfig = ctx.conf + with open_hsm_session(ctx, HSMAuthMethod.PASSWORD) as session: + new_log_data = session.get_log_entries() + hsm_serial = session.get_serial() + + if not db_path: + cli_info("No database path specified. Using in-memory database.") + if clear: + raise click.ClickException("Refusing to --clear log entries from device without a persistent database.") + db_path = ':memory:' + + with sqlite3.connect(db_path) as conn: + log_db.init_db(conn) + conn.row_factory = sqlite3.Row + + new, skipped = 0, 0 + for entry in new_log_data.entries: + try: + if not log_db.insert_log_entry(conn, hsm_serial, entry, datetime.datetime.now(), no_verify, lambda id: yhsm_log.find_info(id, config)): + cli_info(f"- Log entry {entry.number} already in DB, skipping.") + skipped += 1 + continue + if e := log_db.get_last_log_entry(conn, hsm_serial): + cli_info(yhsm_log.summarize_log_entry(e)) + new += 1 + except ValueError as e: + raise click.ClickException(f"Error inserting entry {entry.number}: {str(e)}") + + cli_info(f"\nFetched {new+skipped} entries. Stored {new} in '{db_path}', skipped {skipped} pre-existing.") + + if clear: + last_entry = log_db.get_last_log_entry(conn, hsm_serial) + if last_entry: + session.free_log_entries(last_entry["entry_number"]) + cli_info(f"Cleared log entries up to {last_entry['entry_number']}") + else: + cli_info("No entries to clear") + + +@cmd_log.command('review') +@pass_common_args +@click.argument('db_path', type=click.Path(exists=True), required=True) +@click.option('--start-num', '-s', type=int, help="Start entry number", required=False) +@click.option('--end-num', '-e', type=int, help="End entry number", required=False) +@click.option('--start-id', '-S', type=int, help="Start row ID", required=False) +@click.option('--end-id', '-E', type=int, help="End row ID", required=False) +@click.option('--jsonl', is_flag=True, help="In JSONL format, not summary") +def log_review(ctx: HsmSecretsCtx, db_path: str, start_num: int|None, end_num: int|None, start_id: int|None, end_id: int|None, jsonl: bool): + """ + Review log entries stored in DB + + This command retrieves log entries from the specified SQLite database and displays them in a human-readable format. + + YubiHSM log entry numbers wrap around at 2^16, so use the row ID to specify a range that crosses the wrap-around point. + """ + with sqlite3.connect(db_path) as conn: + conn.row_factory = sqlite3.Row + for e in log_db.get_log_entries(conn, int(ctx.hsm_serial)): + if (start_num and e['entry_number'] < start_num) or (end_num and e['entry_number'] > end_num) or \ + (start_id and e['id'] < start_id) or (end_id and e['id'] > end_id): + continue + if jsonl: + cli_result(yhsm_log.export_to_jsonl(e, pretty=False, with_summary=False)) + else: + cli_result(yhsm_log.summarize_log_entry(e)) + + +@cmd_log.command('verify-all') +@pass_common_args +@click.argument('db_path', type=click.Path(exists=True)) +@click.option('--initial-num', '-i', type=int, help="Entry number to treat as first in the chain", default=1) +def log_verify_all(ctx: HsmSecretsCtx, db_path: str, initial_num: int): + """ + Verify the entire (previously stored) log chain + + This command checks the integrity of the log chain stored in the database for the specified HSM serial number. + It verifies that each log entry correctly validates against the previous one, ensuring the chain hasn't been tampered with. + + : Path to the SQLite database file containing the log entries. + """ + serial = int(ctx.hsm_serial) + with sqlite3.connect(db_path) as conn: + conn.row_factory = sqlite3.Row + entries = log_db.get_log_entries(conn, serial) + + try: + yhsm_log.verify_log_chain(entries, initial_num) + cli_info("Log chain verified successfully") + except ValueError as e: + cli_info(f"Log chain verification failed: {str(e)}") + + +@cmd_log.command('export') +@pass_common_args +@click.argument('db_path', type=click.Path(exists=True)) +@click.option('--out', '-o', type=click.File('a', lazy=True), default='-', help="Output file ('-' for stdout)") +@click.option('--restart', '-r', is_flag=True, help="Start exporting from the beginning") +@click.option('--no-summary', is_flag=True, help="Don't include human-readable summary in JSONL output") +def log_export_jsonl(ctx: HsmSecretsCtx, db_path: str, out, restart: bool, no_summary: bool): + """ + Export log entries to JSONL format for a specific HSM. + + The command keeps track of the last exported entry and only exports new entries in subsequent runs. + This makes it suitable for incremental exports to a log aggregator. + """ + serial = int(ctx.hsm_serial) + with sqlite3.connect(db_path) as conn: + if restart: + log_db.update_last_exported_id(conn, serial, 0) + + count, last_exported_id = 0, None + + with out as fh: + conn.row_factory = sqlite3.Row + for e in log_db.get_non_exported_log_entries(conn, serial): + l = yhsm_log.export_to_jsonl(e, pretty=False, with_summary=not no_summary) + fh.write(l + '\n') + last_exported_id = e['id'] + count += 1 + + if last_exported_id: + log_db.update_last_exported_id(conn, serial, last_exported_id) + + if count: + cli_info(f"Exported {count} new entries from database {db_path} to {out.name}") + else: + cli_info("No new entries to export") diff --git a/hsm_secrets/log/log_db.py b/hsm_secrets/log/log_db.py new file mode 100644 index 0000000..5ab342a --- /dev/null +++ b/hsm_secrets/log/log_db.py @@ -0,0 +1,149 @@ +import sqlite3 +from typing import Generator, List, Optional, Callable +import datetime +import yubihsm # type: ignore [import] + +def init_db(conn: sqlite3.Connection) -> None: + """Initialize the database with the required schema.""" + conn.executescript(""" + CREATE TABLE IF NOT EXISTS log_entries ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + entry_number INTEGER NOT NULL, + hsm_serial INTEGER NOT NULL, + raw_entry BLOB NOT NULL, + fetch_time DATETIME NOT NULL, + command INTEGER NOT NULL, + command_desc TEXT, + length INTEGER NOT NULL, + session_key INTEGER NOT NULL, + session_key_desc TEXT, + target_key INTEGER NOT NULL, + target_key_desc TEXT, + second_key INTEGER NOT NULL, + second_key_desc TEXT, + result INTEGER NOT NULL, + tick INTEGER NOT NULL, + UNIQUE(hsm_serial, id), + UNIQUE(hsm_serial, raw_entry) + ); + + CREATE INDEX IF NOT EXISTS idx_hsm_serial_entry_number ON log_entries(hsm_serial, entry_number); + + CREATE TABLE IF NOT EXISTS export_tracking ( + hsm_serial INTEGER PRIMARY KEY, + last_exported_id INTEGER NOT NULL + ); + """) + + +def previous_entry_number(current: int) -> int: + """Calculate the previous entry number, handling wraparound.""" + return (current - 1) & 0xFFFF + +def insert_log_entry(conn: sqlite3.Connection, hsm_serial: int, new_entry: yubihsm.core.LogEntry, + fetch_time: datetime.datetime, no_verify: bool, find_info_func: Callable[[int], str|None]) -> bool: + """ + Insert a new log entry. + + YubiHSM log entry numbering wraps around at 0xFFFF, so we can't rely on the entry number alone to + determine the previous entry in the chain => test multiple candidates for more robustness. + + If no_verify is set, the new entry is inserted without verifying it against the previous entry. + This could result in a broken audit chain, so think twice before using this option. + """ + cursor = conn.cursor() + + if not no_verify: + # Get all entries with entry_number one less than the new entry + previous_entry_number_value = previous_entry_number(new_entry.number) + cursor.execute("SELECT id, raw_entry FROM log_entries WHERE hsm_serial = ? AND (entry_number = ? OR entry_number = ?)", (hsm_serial, new_entry.number, previous_entry_number_value)) + candidates = cursor.fetchall() + + # Pick the first entry that forms a valid chain with the new entry + valid_previous_entry = None + for _, raw_entry in candidates: + if raw_entry == (new_entry.data + new_entry.digest): + return False # Entry already exists + previous_entry = yubihsm.core.LogEntry.parse(raw_entry) + if new_entry.validate(previous_entry): + valid_previous_entry = previous_entry # Don't break yet, the "new" entry might already exist but haven't been seen yet + + if valid_previous_entry is None and candidates: + raise ValueError("New entry doesn't validate against any previous entry candidates") + + # Insert the new log entry + cursor.execute(""" + INSERT INTO log_entries (entry_number, hsm_serial, raw_entry, fetch_time, command, + command_desc, length, session_key, session_key_desc, target_key, + target_key_desc, second_key, second_key_desc, result, tick) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, (new_entry.number, + hsm_serial, + (new_entry.data + new_entry.digest), + fetch_time, + new_entry.command.value, new_entry.command.name, + new_entry.length, + new_entry.session_key, find_info_func(new_entry.session_key), + new_entry.target_key, find_info_func(new_entry.target_key), + new_entry.second_key, find_info_func(new_entry.second_key), + new_entry.result, + new_entry.tick)) + + return True + + +def get_log_entries(conn: sqlite3.Connection, hsm_serial: int) -> Generator[sqlite3.Row, None, None]: + """Retrieve all log entries for a given HSM serial.""" + cursor = conn.cursor() + for c in cursor.execute(""" + SELECT id, entry_number, hsm_serial, raw_entry, fetch_time, command, command_desc, + length, session_key, session_key_desc, target_key, target_key_desc, + second_key, second_key_desc, result, tick + FROM log_entries + WHERE hsm_serial = ? + ORDER BY id ASC + """, (hsm_serial,)): + yield c + + +def get_last_log_entry(conn: sqlite3.Connection, hsm_serial: int) -> Optional[sqlite3.Row]: + """Retrieve the last log entry for a given HSM serial.""" + cursor = conn.cursor() + cursor.execute(""" + SELECT id, entry_number, hsm_serial, raw_entry, fetch_time, command, command_desc, + length, session_key, session_key_desc, target_key, target_key_desc, + second_key, second_key_desc, result, tick + FROM log_entries + WHERE hsm_serial = ? + ORDER BY id DESC + LIMIT 1 + """, (hsm_serial,)) + return cursor.fetchone() + + +def get_non_exported_log_entries(conn: sqlite3.Connection, hsm_serial: int) -> Generator[sqlite3.Row, None, None]: + """Retrieve new log entries for a given HSM serial since the last export.""" + cursor = conn.cursor() + + # Get the last exported ID + cursor.execute("SELECT last_exported_id FROM export_tracking WHERE hsm_serial = ?", (hsm_serial,)) + result = cursor.fetchone() + last_exported_id = result[0] if result else 0 + + # Fetch new entries + for row in cursor.execute(""" + SELECT id, entry_number, hsm_serial, raw_entry, fetch_time, command, command_desc, + length, session_key, session_key_desc, target_key, target_key_desc, + second_key, second_key_desc, result, tick + FROM log_entries + WHERE hsm_serial = ? AND id > ? + ORDER BY id ASC + """, (hsm_serial, last_exported_id)): + yield row + + + +def update_last_exported_id(conn: sqlite3.Connection, hsm_serial: int, last_exported_id: int) -> None: + """Update the last exported ID for a given HSM serial.""" + conn.execute("INSERT OR REPLACE INTO export_tracking (hsm_serial, last_exported_id) VALUES (?, ?)", (hsm_serial, last_exported_id)) + conn.commit() diff --git a/hsm_secrets/log/yhsm_log.py b/hsm_secrets/log/yhsm_log.py new file mode 100644 index 0000000..d60d47f --- /dev/null +++ b/hsm_secrets/log/yhsm_log.py @@ -0,0 +1,96 @@ +import datetime +import json +import sqlite3 +from typing import Generator, Iterable, List, Sequence +import yubihsm # type: ignore [import] +from hsm_secrets.config import HSMConfig + +def find_info(id: int, conf: HSMConfig) -> str|None: + """Find info for a given ID using HSMConfig.""" + try: + if id in [0, 0xffff]: + return None + kd = conf.find_def(id) + return kd.label + except KeyError: + return "(UNKNOWN)" + + +def export_to_jsonl(entry: sqlite3.Row, pretty: bool, with_summary: bool) -> str: + """ + Export log entries to JSONL format. + Use `pretty` to show to user, otherwise outputs a single line. + """ + json_entry = { + "id": entry['id'], + "entry_number": entry['entry_number'], + "hsm_serial": entry['hsm_serial'], + "raw_entry": entry['raw_entry'].hex(), + "fetch_time": entry['fetch_time'], + "command": entry['command'], + "command_desc": entry['command_desc'], + "length": entry['length'], + "session_key": entry['session_key'], + "session_key_desc": entry['session_key_desc'], + "target_key": entry['target_key'], + "target_key_desc": entry['target_key_desc'], + "second_key": entry['second_key'], + "second_key_desc": entry['second_key_desc'], + "result": entry['result'], + "tick": entry['tick'] + } + if with_summary: + json_entry["summary"] = summarize_log_entry(entry) + return json.dumps(json_entry, indent=4 if pretty else None) + + +def summarize_log_entry(e: sqlite3.Row) -> str: + """Summarize a log entry in human-readable form.""" + + def obj_info(id, desc, default): + return f"'{desc}'" if desc else f"'{default}' (0x{id:04x})" + + txt = f'{e["entry_number"]:05}: ' + obj_info(e['command'], e['command_desc'], 'UNKNOWN COMMAND') + + objs = [] + if e['target_key'] not in (0, 0xffff, None): + objs.append(obj_info(e['target_key'], e['target_key_desc'], 'UNKNOWN KEY')) + if e['second_key'] not in (0, 0xffff, None): + objs.append(obj_info(e['second_key'], e['second_key_desc'], 'UNKNOWN KEY')) + if objs: + txt += ' on ' + ' and '.join(objs) + + if e['session_key'] not in (0, 0xffff, None): + txt += ' by ' + obj_info(e['session_key'], e['session_key_desc'], 'UNKNOWN KEY') + + date = datetime.datetime.fromisoformat(e["fetch_time"]).strftime("%Y-%m-%d") + txt += f' (log fetch {date}, row id {e["id"]})' + + return txt + + +def verify_log_chain(entries: Iterable[sqlite3.Row], chain_start: int|None) -> None: + """ + Verify the log chain from the initial number to the last entry. + If `chain_start` is given, it will be compared to the first entry number. + """ + first_num = None + prev: yubihsm.core.LogEntry|None = None + for entry in entries: + if not prev: + first_num = entry["entry_number"] + if chain_start is not None and first_num != chain_start: + raise ValueError(f"Initial number {chain_start} does not match the first entry number {entry['entry_number']}") + + cur = yubihsm.core.LogEntry.parse(entry["raw_entry"]) + if prev and not cur.validate(prev): + raise ValueError(f"Log entry {entry['entry_number']} failed validation against previous entry") + + prev = cur + + if prev: + print(f"OK. Log chain verified from entry {first_num} to {prev.number}") + elif first_num: + print(f"Ok. Log only contains one entry {first_num}, so trivially valid") + else: + print("Log chain is empty") diff --git a/hsm_secrets/main.py b/hsm_secrets/main.py index e1a7222..768da06 100755 --- a/hsm_secrets/main.py +++ b/hsm_secrets/main.py @@ -2,6 +2,7 @@ import click from hsm_secrets.hsm import cmd_hsm +from hsm_secrets.log import cmd_log from hsm_secrets.ssh import cmd_ssh from hsm_secrets.tls import cmd_tls from hsm_secrets.passwd import cmd_pass @@ -103,6 +104,7 @@ def cmd_nop(ctx: HsmSecretsCtx): cli.add_command(cmd_tls, "tls") cli.add_command(cmd_pass, "pass") cli.add_command(cmd_hsm, "hsm") +cli.add_command(cmd_log, "log") cli.add_command(cmd_nop, "nop") cli.add_command(cmd_x509, "x509") cli.add_command(cmd_user, "user") diff --git a/hsm_secrets/utils.py b/hsm_secrets/utils.py index 30f6332..f802688 100644 --- a/hsm_secrets/utils.py +++ b/hsm_secrets/utils.py @@ -291,9 +291,15 @@ def open_hsm_session( # Mock HSM session for testing if ctx.mock_file: cli_warn("~🤡~ !! SIMULATED (mock) HSM session !! Authentication skipped. ~🤡~") - open_mock_hsms(ctx.mock_file, int(device_serial), ctx.conf) + auth_key_id = ctx.conf.admin.default_admin_key.id + if auth_method == HSMAuthMethod.PASSWORD: + assert ctx.auth_password_id + auth_key_id = ctx.auth_password_id + elif auth_method == HSMAuthMethod.YUBIKEY: + auth_key_id = ctx.conf.user_keys[0].id # Mock YubiKey auth key with first user key from config + open_mock_hsms(ctx.mock_file, int(device_serial), ctx.conf, auth_key_id) try: - yield MockHSMSession(int(device_serial)) + yield MockHSMSession(int(device_serial), auth_key_id) finally: save_mock_hsms(ctx.mock_file) return diff --git a/hsm_secrets/yubihsm.py b/hsm_secrets/yubihsm.py index 7340e18..2f4859f 100644 --- a/hsm_secrets/yubihsm.py +++ b/hsm_secrets/yubihsm.py @@ -1,6 +1,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass import datetime +from hashlib import sha256 from typing import Sequence, cast import pickle import os @@ -27,7 +28,7 @@ import cryptography.hazmat.primitives.asymmetric.padding as haz_asym_padding import cryptography.x509 as haz_x509 -from hsm_secrets.config import HSMAsymmetricKey, HSMAuditSettings, HSMAuthKey, HSMConfig, HSMHmacKey, HSMKeyID, HSMObjBase, HSMOpaqueObject, HSMSymmetricKey, HSMWrapKey, NoExtraBaseModel, YubiHsm2AuditMode, YubiHsm2Command +from hsm_secrets.config import HSMAsymmetricKey, HSMAuditSettings, HSMAuthKey, HSMConfig, HSMHmacKey, HSMKeyID, HSMObjBase, HSMOpaqueObject, HSMSymmetricKey, HSMWrapKey, NoExtraBaseModel, YubiHsm2AuditMode, YubiHsm2Command, lookup_hsm_cmd from hsm_secrets.key_adapters import PrivateKeyOrAdapter, make_private_key_adapter """ @@ -526,7 +527,7 @@ def free_log_entries(self, up_until_num: int) -> None: def get_audit_settings(self) -> tuple[HSMAuditSettings, dict[str, YubiHsm2AuditMode]]: def tristate(val: AUDIT) -> YubiHsm2AuditMode: - return 'off' if val == AUDIT.OFF else 'on' if val == AUDIT.ON else 'fixed' + return 'off' if val == AUDIT.OFF else ('on' if val == AUDIT.ON else 'fixed') uknown_res: dict[str, YubiHsm2AuditMode] = {} known_res = HSMAuditSettings( @@ -547,10 +548,11 @@ def tristate(val: AUDIT) -> YubiHsm2AuditMode: return known_res, uknown_res + def set_audit_settings(self, settings: HSMAuditSettings) -> None: tristate: dict[YubiHsm2AuditMode, AUDIT] = {'off': AUDIT.OFF, 'on': AUDIT.ON, 'fixed': AUDIT.FIXED} self.backend.set_force_audit(tristate[settings.forced_audit]) - audit_mapping: dict[COMMAND, AUDIT] = {COMMAND._member_map_[k.upper().replace("-","_")].value: tristate[v] for k,v in settings.command_logging.items()} + audit_mapping: dict[COMMAND, AUDIT] = {lookup_hsm_cmd(k): tristate[v] for k,v in settings.command_logging.items()} self.backend.set_command_audit(audit_mapping) @@ -560,7 +562,7 @@ def set_audit_settings(self, settings: HSMAuditSettings) -> None: _g_mock_hsms: dict[int, 'MockHSMDevice'] = {} _g_conf: HSMConfig|None -def open_mock_hsms(path: str, serial: int, conf: HSMConfig): +def open_mock_hsms(path: str, serial: int, conf: HSMConfig, auth_key_id: HSMKeyID): """ Open mock HSM devices from a pickle file and/or create a new mock HSM device with the given serial. @@ -579,7 +581,7 @@ def open_mock_hsms(path: str, serial: int, conf: HSMConfig): click.echo(click.style(f"~🤡~ Created new mock YubiHSM with serial {serial} ~🤡~", fg='yellow'), err=True) # Store the default admin key in the device, like on a fresh YubiHSM2 - ses = MockHSMSession(serial) + ses = MockHSMSession(serial, auth_key_id) ses.auth_key_put_derived( keydef = _g_conf.admin.default_admin_key, password = _g_conf.admin.default_admin_password) @@ -600,23 +602,48 @@ def save_mock_hsms(path: str): class MockHSMDevice: serial: int - objects: dict[tuple[HSMKeyID, OBJECT], 'MockYhsmObject'] = {} - log_entries:list[LogEntry] = [] + objects: dict[tuple[HSMKeyID, OBJECT], 'MockYhsmObject'] + log_entries:list[LogEntry] + prev_entry: LogEntry def __init__(self, serial: int, objects: dict): self.serial = serial self.objects = objects - now = int(datetime.datetime.now().timestamp()*1000) - self.start_ts = now - initial_digest = b"0123456789abcdef" - - # FORMAT: ClassVar[str] = "!HBHHHHBL16s" - # LENGTH: ClassVar[int] = struct.calcsize(FORMAT) - #reset_entry_bytes = b"\xff"*32 - #reset_entry = LogEntry.parse(reset_entry_bytes) - #reset_entry.tick = now - self.start_ts - self.log_entries.append(LogEntry(0, cast(COMMAND, 0xff), 0xff, 0xff, 0xff, 0xff, 0xff, self.start_ts-now, initial_digest)) - self.log_entries.append(LogEntry(1, cast(COMMAND, 0), 0, 0, 0, 0, 0, self.start_ts-now, initial_digest)) + self.audit_settings = HSMAuditSettings(forced_audit='off', default_command_logging='off', command_logging={}) + # Inject example initial log entries from an actual YubiHSM2 + self.log_entries = [LogEntry.parse(bytes.fromhex(e)) for e in ( + '0001ffffffffffffffffffffffffffffcf87d1b7256b135b12ca27ec1365e50e', + '0002000000ffff000000000000000000fc215fbee7154f4d061d80806250f678')] + self.prev_entry = self.log_entries[-1] + + def add_log(self, cmd_name: YubiHsm2Command, target_key: HSMKeyID|None, second_key: HSMKeyID|None): + # Emulate the YubiHSM2 logging + assert _g_conf + session_key = _g_conf.admin.default_admin_key.id # TODO: emulate other auth keys on the mock device? + default_logging = self.audit_settings.default_command_logging + + if self.audit_settings.command_logging.get(cmd_name, default_logging) != 'off': + + if len(self.log_entries) >= 62 and self.audit_settings.forced_audit != 'off': + if cmd_name not in ('authenticate-session', 'reset-device', 'close-session', 'create-session', 'set-log-index', 'get-log-entries'): + raise YubiHsmDeviceError(ERROR.LOG_FULL) + + e = LogEntry( + number = (self.prev_entry.number + 1) & 0xffff, + command = lookup_hsm_cmd(cmd_name), length = 123, + session_key = session_key or 0, + target_key = target_key or 0, + second_key = second_key or 0, + result = 42, tick = self.prev_entry.tick + 7, digest = b'') + new_digest = sha256(e.data + self.prev_entry.digest).digest()[:16] + + res = LogEntry(e.number, e.command, e.length, + e.session_key, e.target_key, e.second_key, + e.result, e.tick, new_digest) + + self.log_entries.append(res) + self.prev_entry = res + def get_mock_object(self, key: HSMKeyID, type: OBJECT) -> 'MockYhsmObject': if (key, type) not in self.objects: @@ -676,6 +703,10 @@ def get_info(self) -> ObjectInfo: if deleg_caps := getattr(self.mock_obj, "delegated_capabilities", None): yhsm_deleg_caps = _g_conf.capability_from_names(set(deleg_caps)) + global _g_mock_hsms + device = _g_mock_hsms[self.mock_device_serial] + device.add_log('get-object-info', self.mock_obj.id, None) + return ObjectInfo( id = self.mock_obj.id, object_type = self.object_type, @@ -695,7 +726,6 @@ def delete(self) -> None: assert self.mock_device_serial in _g_mock_hsms, f"Mock device not found: {self.mock_device_serial}" device = _g_mock_hsms[self.mock_device_serial] device.del_mock_object(key[0], key[1]) - def __repr__(self): return "{0.__class__.__name__}(id={0.id})".format(self) @@ -704,12 +734,11 @@ class MockHSMSession(HSMSession): """ Implementation of the HSM session for a mock HSM device. """ - - def __init__(self, dev_serial: int): + def __init__(self, dev_serial: int, auth_key_id: HSMKeyID): global _g_mock_hsms self.backend = _g_mock_hsms[dev_serial] self.dev_serial = dev_serial - self.audit_settings = HSMAuditSettings(forced_audit='off', default_command_logging='off', command_logging={}) + self.auth_key = auth_key_id def get_serial(self) -> int: return self.dev_serial @@ -733,6 +762,7 @@ def object_exists(self, objdef: HSMObjBase) -> ObjectInfo | None: def object_exists_raw(self, id: HSMKeyID, type: OBJECT) -> ObjectInfo | None: try: + self.backend.add_log('get-object-info', id, None) return self.backend.get_mock_object(id, type).get_info() except YubiHsmDeviceError as e: if e.code == ERROR.OBJECT_NOT_FOUND: @@ -742,6 +772,7 @@ def object_exists_raw(self, id: HSMKeyID, type: OBJECT) -> ObjectInfo | None: def put_wrap_key(self, keydef: HSMWrapKey, secret: bytes) -> ObjectInfo: obj = MockYhsmObject(self.backend.serial, keydef, secret) self.backend.objects[(keydef.id, OBJECT.WRAP_KEY)] = obj + self.backend.add_log('put-wrap-key', keydef.id, None) return obj.get_info() def attest_asym_key(self, key_id: HSMKeyID) -> haz_x509.Certificate: @@ -765,6 +796,8 @@ def attest_asym_key(self, key_id: HSMKeyID) -> haz_x509.Certificate: digital_signature=True, content_commitment=False, key_encipherment=True, data_encipherment=False, key_agreement=False, key_cert_sign=False, crl_sign=False, encipher_only=False, decipher_only=False ), critical=True) + + self.backend.add_log('sign-attestation-certificate', key_id, None) return builder.sign(issuer_key, haz_hashes.SHA256()) def export_wrapped(self, wrap_key: HSMWrapKey, obj_id: HSMKeyID, obj_type: OBJECT) -> bytes: @@ -778,6 +811,7 @@ def export_wrapped(self, wrap_key: HSMWrapKey, obj_id: HSMKeyID, obj_type: OBJEC encryptor = cipher.encryptor() enc_blob = encryptor.update(export_blob) + encryptor.finalize() tag = encryptor.tag + self.backend.add_log('export-wrapped', wrap_key.id, obj_id) return pickle.dumps((enc_blob, tag)) def import_wrapped(self, wrap_key: HSMWrapKey, data: bytes) -> ObjectInfo: @@ -788,24 +822,28 @@ def import_wrapped(self, wrap_key: HSMWrapKey, data: bytes) -> ObjectInfo: obj: MockYhsmObject = pickle.loads(export_blob) assert isinstance(obj.mock_obj, HSM_KEY_TYPES) self.backend.put_mock_object(obj) + self.backend.add_log('import-wrapped', wrap_key.id, obj.id) return obj.get_info() def auth_key_put_derived(self, keydef: HSMAuthKey, password: str) -> ObjectInfo: data = f"derived:{password}".encode() obj = MockYhsmObject(self.backend.serial, keydef, data) self.backend.objects[(keydef.id, OBJECT.AUTHENTICATION_KEY)] = obj + self.backend.add_log('put-authentication-key', keydef.id, None) return obj.get_info() def auth_key_put(self, keydef: HSMAuthKey, key_enc: bytes, key_mac: bytes) -> ObjectInfo: data = f"key_enc:{key_enc.hex()},key_mac:{key_mac.hex()}".encode() obj = MockYhsmObject(self.backend.serial, keydef, data) self.backend.objects[(keydef.id, OBJECT.AUTHENTICATION_KEY)] = obj + self.backend.add_log('put-authentication-key', keydef.id, None) return obj.get_info() def sym_key_generate(self, keydef: HSMSymmetricKey) -> ObjectInfo: data = {"key_enc": self.get_pseudo_random(256//8), "key_mac": self.get_pseudo_random(256//8)} obj = MockYhsmObject(self.backend.serial, keydef, pickle.dumps(data)) self.backend.objects[(keydef.id, OBJECT.SYMMETRIC_KEY)] = obj + self.backend.add_log('generate-symmetric-key', keydef.id, None) return obj.get_info() def asym_key_generate(self, keydef: HSMAsymmetricKey) -> ObjectInfo: @@ -822,18 +860,22 @@ def asym_key_generate(self, keydef: HSMAsymmetricKey) -> ObjectInfo: priv_pem = priv_key.private_bytes(haz_ser.Encoding.PEM, haz_ser.PrivateFormat.PKCS8, haz_ser.NoEncryption()) obj = MockYhsmObject(self.backend.serial, keydef, priv_pem) self.backend.objects[(keydef.id, OBJECT.ASYMMETRIC_KEY)] = obj + self.backend.add_log('generate-asymmetric-key', keydef.id, None) return obj.get_info() def hmac_key_generate(self, keydef: HSMHmacKey) -> ObjectInfo: data = self.get_pseudo_random(256//8) obj = MockYhsmObject(self.backend.serial, keydef, data) self.backend.put_mock_object(obj) + self.backend.add_log('generate-hmac-key', keydef.id, None) return obj.get_info() def get_pseudo_random(self, length: int) -> bytes: + self.backend.add_log('get-pseudo-random', None, None) return (b'0123' * length)[:length] # Mock: use deterministic data for tests def list_objects(self) -> Sequence[MockYhsmObject]: + self.backend.add_log('list-objects', None, None) return list(self.backend.objects.values()) def delete_object(self, objdef: HSMObjBase) -> None: @@ -842,20 +884,24 @@ def delete_object(self, objdef: HSMObjBase) -> None: self.delete_object_raw(objdef.id, obj_type) def delete_object_raw(self, id: HSMKeyID, type: OBJECT) -> None: + self.backend.add_log('delete-object', id, None) self.backend.del_mock_object(id, type) def sign_hmac(self, keydef: HSMHmacKey, data: bytes) -> bytes: hmac_key = self.backend.objects[(keydef.id, OBJECT.HMAC_KEY)].data hmac = haz_hmac.HMAC(hmac_key, haz_hashes.SHA256()) hmac.update(data) + self.backend.add_log('sign-hmac', keydef.id, None) return hmac.finalize() def get_certificate(self, keydef: HSMOpaqueObject) -> haz_x509.Certificate: + self.backend.add_log('get-opaque', keydef.id, None) return haz_x509.load_pem_x509_certificate(self.backend.objects[(keydef.id, OBJECT.OPAQUE)].data) def put_certificate(self, keydef: HSMOpaqueObject, certificate: haz_x509.Certificate) -> ObjectInfo: obj = MockYhsmObject(self.backend.serial, keydef, certificate.public_bytes(encoding=haz_ser.Encoding.PEM)) self.backend.objects[(keydef.id, OBJECT.OPAQUE)] = obj + self.backend.add_log('put-opaque', keydef.id, None) return obj.get_info() def get_private_key(self, keydef: HSMAsymmetricKey) -> PrivateKeyOrAdapter: @@ -868,16 +914,24 @@ def get_public_key(self, keydef: HSMAsymmetricKey) -> haz_rsa.RSAPublicKey | haz asym_pem = self.backend.objects[(keydef.id, OBJECT.ASYMMETRIC_KEY)].data asym_key = haz_ser.load_pem_private_key(asym_pem, password=None) assert isinstance(asym_key, (haz_rsa.RSAPrivateKey, haz_ec.EllipticCurvePrivateKey, haz_ed25519.Ed25519PrivateKey)) + self.backend.add_log('get-public-key', keydef.id, None) return asym_key.public_key() def get_log_entries(self, previous_entry: LogEntry | None = None) -> LogData: - return LogData(0, 0, self.backend.log_entries) + res = LogData(0, 0, self.backend.log_entries) + self.backend.add_log('get-log-entries', None, None) + return res def free_log_entries(self, up_until_num: int) -> None: - self.backend.log_entries = [entry for entry in self.backend.log_entries if entry.number > up_until_num] + self.backend.log_entries = [e for e in self.backend.log_entries if e.number > up_until_num] + self.backend.add_log('set-log-index', None, None) def get_audit_settings(self) -> tuple[HSMAuditSettings, dict[str, YubiHsm2AuditMode]]: - return self.audit_settings, {} + self.backend.audit_settings.apply_defaults() + return self.backend.audit_settings, {} # No unknown audit settings on the mock device def set_audit_settings(self, settings: HSMAuditSettings) -> None: - self.audit_settings = settings + self.backend.add_log('set-option', None, None) + self.backend.audit_settings = settings + self.backend.audit_settings.apply_defaults() + diff --git a/run-tests.sh b/run-tests.sh index c232815..8b2f06a 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -33,20 +33,31 @@ run_cmd() { assert_success() { if [ $? -ne 0 ]; then echo "ERROR: Expected success, but command failed" - return 1 + exit 1 fi } assert_grep() { if ! grep -q "$1" <<< "$2"; then echo "ERROR: Expected output to contain '$1'" - return 1 + exit 1 + fi +} + +assert_not_grep() { + if grep -q "$1" <<< "$2"; then + echo "ERROR: Expected output not to contain '$1'" + exit 1 fi } setup() { run_cmd -q hsm compare --create + assert_success + run_cmd x509 create -a + assert_success + # `add-service` command is interactive => use `expect` to provide input expect << EOF $EXPECT_PREAMBLE @@ -60,7 +71,10 @@ setup() { } $EXPECT_POSTAMBLE EOF + assert_success + run_cmd -q hsm make-wrap-key + assert_success } # ------------------ test cases ------------------------- @@ -72,7 +86,31 @@ test_fresh_device() { test_create_all() { setup + + # Run simplified secret sharing command + expect << EOF + $EXPECT_PREAMBLE + spawn sh -c "$CMD hsm admin-sharing-ceremony --skip-ceremony -n 3 -t 2 2>&1" + expect { + "airgapped" { sleep 0.1; send "y\r"; exp_continue } + "admin password" { sleep 0.1; send "passw123\r"; exp_continue } + "again" { sleep 0.1; send "passw123\r"; exp_continue } + timeout { handle_timeout } + eof {} + } + $EXPECT_POSTAMBLE +EOF + assert_success + local count=$(run_cmd -q hsm compare | grep -c '\[x\]') + assert_success + [ "$count" -eq 36 ] || { echo "Expected 36 objects, but found $count"; return 1; } + + # Remove default admin key + run_cmd hsm default-admin-disable + assert_success + local count=$(run_cmd -q hsm compare | grep -c '\[x\]') + assert_success [ "$count" -eq 35 ] || { echo "Expected 35 objects, but found $count"; return 1; } } @@ -86,10 +124,11 @@ test_tls_certificates() { local output=$(openssl crl2pkcs7 -nocrl -certfile $TEMPDIR/www-example-com.cer.pem | openssl pkcs7 -print_certs | openssl x509 -text -noout) assert_success - assert_grep 'Subject:.*CN=www.example.com.*L=Duckburg.*ST=Calisota.*C=US' "$output" - assert_grep 'DNS:www.example.org' "$output" - assert_grep 'IP Address:192.168.0.1' "$output" - assert_grep 'IP Address:FD12:123' "$output" + echo "$output" + assert_grep 'Subject.*CN.*=.*www.example.com.*L.*=.*Duckburg.*ST.*=.*Calisota.*C.*=.*US' "$output" + assert_grep 'DNS.*www.example.org' "$output" + assert_grep 'IP Address.*192.168.0.1' "$output" + assert_grep 'IP Address.*FD12:123' "$output" assert_grep 'Public.*4096' "$output" assert_grep 'Signature.*ecdsa' "$output" @@ -101,13 +140,16 @@ test_tls_certificates() { test_password_derivation() { setup local output=$(run_cmd -q pass get www.example.com) + assert_success assert_grep 'dignity.proud.material.upset.elegant.finish' "$output" local nonce=$(run_cmd -q pass rotate www.example.com | grep nonce) + assert_success sed -E "s|^( *)\-.*name_hmac.*nonce.*ts.*$|\1${nonce}|" < $TEMPDIR/hsm-conf.yml > $TEMPDIR/rotated-conf.yml mv $TEMPDIR/rotated-conf.yml $TEMPDIR/hsm-conf.yml output=$(run_cmd -q pass get www.example.com) + assert_success ! grep -q 'dignity.proud.material.upset.elegant.finish' <<< "$output" || { echo "ERROR: password not rotated"; return 1; } } @@ -120,10 +162,14 @@ test_wrapped_backup() { tar tvfz $TEMPDIR/backup.tgz | grep -q 'OPAQUE' || { echo "ERROR: No certificates found in backup"; return 1; } run_cmd -q hsm delete --force 0x0210 + assert_success run_cmd -q hsm compare | grep -q '[ ].*ca-root-key-rsa' || { echo "ERROR: Key not deleted"; return 1; } + assert_success run_cmd -q hsm restore --force $TEMPDIR/backup.tgz + assert_success run_cmd -q hsm compare | grep -q '[x].*ca-root-key-rsa' || { echo "ERROR: Key not restored"; return 1; } + assert_success } test_ssh_user_certificates() { @@ -167,6 +213,66 @@ test_ssh_host_certificates() { } +test_logging_commands() { + local DB_PATH="$TEMPDIR/test_log.db" + export HSM_PASSWORD="password123-not-really-set" + + # Test first fetch + run_cmd --auth-password-id='log-audit' log fetch "$DB_PATH" + assert_success + [ -f "$DB_PATH" ] || { echo "ERROR: Log database not created"; return 1; } + + # Apply audit settings + local apply_output=$(run_cmd log apply-settings --force) + assert_success + assert_grep "settings applied" "$apply_output" + + setup # Create some objects + + # Fetch again twice to log SET_LOG_INDEX due to --clear + run_cmd --auth-password-id='log-audit' log fetch "$DB_PATH" --clear + assert_success + run_cmd --auth-password-id='log-audit' log fetch "$DB_PATH" + assert_success + + # Test log review + local review_output=$(run_cmd log review "$DB_PATH") + assert_success + echo "$review_output" + assert_grep "SET_LOG_INDEX" "$review_output" + assert_grep "DEFAULT AUTHKEY" "$review_output" + assert_grep "GENERATE_ASYMMETRIC_KEY" "$review_output" + assert_grep "PUT_OPAQUE" "$review_output" + + # Test log verify-all + run_cmd log verify-all "$DB_PATH" + assert_success + + # Test log export + local export_file="$TEMPDIR/log_export.jsonl" + run_cmd log export "$DB_PATH" --out "$export_file" + assert_success + [ -f "$export_file" ] || { echo "ERROR: Log export file not created"; return 1; } + local export_content=$(cat "$export_file") + assert_grep "GENERATE_ASYMMETRIC_KEY" "$export_content" + + # Make an arbitrary test operation and verify that it is logged + assert_not_grep 'SIGN_HMAC' "$export_file" + run_cmd -q pass get wiki.example.com + assert_success + + run_cmd --auth-password-id 'log-audit' log fetch "$DB_PATH" -c + assert_success + run_cmd log verify-all "$DB_PATH" + assert_success + + local export_content=$(run_cmd log export "$DB_PATH") + assert_success + echo "$export_content" + assert_grep "SIGN_HMAC" "$export_content" + + echo "All logging tests passed" +} # ------------------------------------------------------ @@ -203,5 +309,6 @@ run_test test_password_derivation run_test test_wrapped_backup run_test test_ssh_user_certificates run_test test_ssh_host_certificates +run_test test_logging_commands echo "All tests passed successfully!"