diff --git a/hsm-conf.yml b/hsm-conf.yml index 8b9ef19..2f002ac 100644 --- a/hsm-conf.yml +++ b/hsm-conf.yml @@ -82,6 +82,42 @@ admin: capabilities: ["wrap-data", "unwrap-data", "export-wrapped", "import-wrapped", "exportable-under-wrap"] delegated_capabilities: ['all'] + audit: + # Specify logging/audit policies for the devices. + # 'fixed' is like 'on', but cannot be turned off again except by a factory reset + forced_audit: 'off' # If on/fixed, HSM refuses further commands until log is audited when it fills up + default_command_logging: 'on' # Default for commands not listed below + command_logging: # Overrides for specific commands + reset-device: 'fixed' + put-opaque: 'fixed' + put-authentication-key: 'fixed' + put-asymmetric-key: 'fixed' + generate-asymmetric-key: 'fixed' + export-wrapped: 'fixed' + import-wrapped: 'fixed' + put-wrap-key: 'fixed' + set-option: 'fixed' + put-hmac-key: 'fixed' + delete-object: 'fixed' + generate-hmac-key: 'fixed' + generate-wrap-key: 'fixed' + put-template: 'fixed' + change-authentication-key: 'fixed' + put-symmetric-key: 'fixed' + generate-symmetric-key: 'fixed' + echo: 'off' + device-info: 'off' + get-storage-info: 'off' + get-object-info: 'off' + get-option: 'off' + get-pseudo-random: 'off' + blink-device: 'off' + get-log-entries: 'off' # This seems to change after fetch (firmware bug?), so don't log it to avoid digest mismatch + create-session: 'off' # Not much point, as auth session key is included in every other log entry + close-session: 'off' # (--||--) + authenticate-session: 'off' # (--||--) + session-message: 'off' # This is a low-level command, spams the log, not very useful to log + # User keys are for general use by human operators. # diff --git a/hsm_secrets/config.py b/hsm_secrets/config.py index 66b5694..370c89c 100644 --- a/hsm_secrets/config.py +++ b/hsm_secrets/config.py @@ -4,6 +4,7 @@ from datetime import datetime import os import re +import typing import click.shell_completion from pydantic import BaseModel, ConfigDict, HttpUrl, Field, StringConstraints from typing_extensions import Annotated @@ -155,6 +156,19 @@ class HSMObjBase(NoExtraBaseModel): id: HSMKeyID domains: set[HSMDomainName] +# -- Logging / audit -- +YubiHsm2AuditMode = Literal['off', 'on', 'fixed'] +YubiHsm2Command = Literal['echo', 'create-session', 'authenticate-session', 'session-message', 'device-info', 'reset-device', 'get-device-public-key', 'close-session', 'get-storage-info', 'put-opaque', 'get-opaque', 'put-authentication-key', 'put-asymmetric-key', 'generate-asymmetric-key', 'sign-pkcs1', 'list-objects', 'decrypt-pkcs1', 'export-wrapped', 'import-wrapped', 'put-wrap-key', 'get-log-entries', 'get-object-info', 'set-option', 'get-option', 'get-pseudo-random', 'put-hmac-key', 'sign-hmac', 'get-public-key', 'sign-pss', 'sign-ecdsa', 'derive-ecdh', 'delete-object', 'decrypt-oaep', 'generate-hmac-key', 'generate-wrap-key', 'verify-hmac', 'sign-ssh-certificate', 'put-template', 'get-template', 'decrypt-otp', 'create-otp-aead', 'randomize-otp-aead', 'rewrap-otp-aead', 'sign-attestation-certificate', 'put-otp-aead-key', 'generate-otp-aead-key', 'set-log-index', 'wrap-data', 'unwrap-data', 'sign-eddsa', 'blink-device', 'change-authentication-key', 'put-symmetric-key', 'generate-symmetric-key', 'decrypt-ecb', 'encrypt-ecb', 'decrypt-cbc', 'encrypt-cbc'] +class HSMAuditSettings(NoExtraBaseModel): + forced_audit: YubiHsm2AuditMode + default_command_logging: YubiHsm2AuditMode + command_logging: dict[YubiHsm2Command, YubiHsm2AuditMode] + + def apply_defaults(self): + # Fill command_logging with default value for any missing commands + for cmd in typing.get_args(YubiHsm2Command): + if cmd not in self.command_logging: + self.command_logging[cmd] = self.default_command_logging # -- Asymmetric key models -- AsymmetricAlgorithm = Literal["rsa2048", "rsa3072", "rsa4096", "ecp256", "ecp384", "ecp521", "eck256", "ecbp256", "ecbp384", "ecbp512", "ed25519", "ecp224"] @@ -281,6 +295,8 @@ class Admin(NoExtraBaseModel): default_admin_key: HSMAuthKey shared_admin_key: HSMAuthKey wrap_key: HSMWrapKey + audit: HSMAuditSettings + class X509(NoExtraBaseModel): root_certs: List[X509Cert] diff --git a/hsm_secrets/hsm/__init__.py b/hsm_secrets/hsm/__init__.py index ae4fa58..2ae9a69 100644 --- a/hsm_secrets/hsm/__init__.py +++ b/hsm_secrets/hsm/__init__.py @@ -1,13 +1,18 @@ +from dataclasses import astuple import datetime +from hashlib import sha256 from io import BytesIO +import json from pathlib import Path +import struct import sys import tarfile -from typing import Sequence +from typing import Sequence, cast import click from click.shell_completion import CompletionItem -from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid +from hsm_secrets.config import HSMAsymmetricKey, HSMAuditSettings, HSMConfig, YubiHsm2AuditMode, YubiHsm2Command, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid +from hsm_secrets.hsm import yhsm_log from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, open_hsm_session, open_hsm_session_with_password, pass_common_args, pretty_fmt_yubihsm_object, prompt_for_secret, pw_check_fromhex @@ -408,6 +413,25 @@ def compare_config(ctx: HsmSecretsCtx, alldevs: bool, create: bool): cli_info("") cli_info(f"KEY CREATION REPORT: Created {n_created} objects, skipped {n_skipped} objects. Run the command again without --create to verify status.") + + # Check logging / audit settings + + cur_audit_settings, unknown_audit_commands = ses.get_audit_settings() + if unknown_audit_commands: + cli_result("UNKNOWN AUDIT LOG COMMANDS ON DEVICE (not necessarily a problem)") + for k, v in unknown_audit_commands.items(): + cli_result(f" ??? {k} = {v}") + + new_audit_setting = ctx.conf.admin.audit + ctx.conf.admin.audit.apply_defaults() + + audit_settings_diff = _check_and_format_audit_conf_differences(cur_audit_settings, new_audit_setting, raise_if_fixed_change=False) + if audit_settings_diff: + cli_result("MISMATCHING AUDIT LOG SETTINGS (device -> config)") + cli_result(audit_settings_diff) + if create: + cli_warn("Use 'hsm apply-audit-settings' to change audit settings to match the configuration.") + cli_info("") @@ -548,3 +572,157 @@ def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool): cli_info("") cli_info("Restore complete.") + + +# --------------- + +@cmd_hsm.command('apply-audit-settings') +@pass_common_args +@click.option('--alldevs', is_flag=True, help="Set on all devices") +@click.option('--force', is_flag=True, help="Don't ask for confirmation before setting") +def apply_audit_settings(ctx: HsmSecretsCtx, alldevs: bool, force: bool): + """Apply audit settings from config + + Apply the audit/logging settings from configuration file to the YubiHSM(s). + """ + conf_settings = ctx.conf.admin.audit + conf_settings.apply_defaults() + + hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial] + for serial in hsm_serials: + with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses: + cli_info(f"Checking audit settings on device {serial}...") + cur_settings, _unknown_audits = ses.get_audit_settings() + mismatches_str = _check_and_format_audit_conf_differences(cur_settings, conf_settings) + if not mismatches_str: + cli_info(" └– Already ok. Audit settings match the config file.") + continue + else: + cli_info(" └– Mismatching audit commands (current -> new):") + cli_info(mismatches_str) + if not force and not click.confirm("Do you want to set the audit settings to match the configuration?", default=False, err=True): + cli_info(f" └– Skipping device {serial}.") + else: + # Remove any 'fixed' commands from the configuration before applying. + # The YubiHSM2 command will fail otherwise. + without_fixed: dict[YubiHsm2Command, YubiHsm2AuditMode] = {k:v for k,v in conf_settings.command_logging.items() if v != 'fixed'} + to_apply = HSMAuditSettings( + forced_audit = conf_settings.forced_audit, + default_command_logging = conf_settings.default_command_logging, + command_logging = without_fixed) + ses.set_audit_settings(to_apply) + cli_info(" └– Audit settings applied.") + + +def _check_and_format_audit_conf_differences(cur_settings: HSMAuditSettings, conf_settings: HSMAuditSettings, raise_if_fixed_change = True) -> str|None: + """ + Check the audit settings in the YubiHSM against the configuration file. + Returns a formatted string with the differences, or None if there are none. + + Raises ValueError if a fixed command is set in the device and the configuration wants to change it, + and `raise_if_fixed_change` is True. + """ + mismatches: dict[str, tuple[YubiHsm2AuditMode|None, YubiHsm2AuditMode|None]] = {} + + if cur_settings.forced_audit != conf_settings.forced_audit: + mismatches[''] = (cur_settings.forced_audit, conf_settings.forced_audit) + + for k, new_v in conf_settings.command_logging.items(): + cur_v = cur_settings.command_logging.get(cast(YubiHsm2Command, k), None) + if cur_v != new_v: + mismatches[k] = (cur_v, new_v) + if cur_v == 'fixed' and raise_if_fixed_change: + raise ValueError(f"Command '{k}' is set to 'fixed' in the device. Cannot change it without resetting the HSM.") + + if not mismatches: + return None + return '\n'.join([f" - {mk.ljust(30)} {cv} -> {nv}" for mk, (cv, nv) in sorted(mismatches.items())]) + + +# --------------- + +@cmd_hsm.command('log-fetch') +@pass_common_args +@click.argument('jsonfile', type=click.Path(exists=False, allow_dash=False, dir_okay=False), metavar='', required=False) +@click.option('--no-verify', is_flag=True, help="Don't verify against previous entry") +@click.option('--clear', '-c', is_flag=True, help="Clear the log entries after fetching") +def log_fetch(ctx: HsmSecretsCtx, jsonfile: str|None, no_verify: bool, clear: bool): + """Fetch and store log entries from HSM + + Retrieve new log entries from device and optionally store them in a JSON file. + If no JSON file is specified, show logs without storing (implies --no-verify). + + YubiHSM2 log forms an audit chain where each entry is verified against + the previous one for integrity. The script first checks `jsonfile`, + then .1.json, if the file is empty or missing, when looking for the last entry. + + Always maintain secure backups of JSON files; any missing or tampered files will + break the chain's integrity for future entries. + + By default, fetched log entries remain in the HSM unless --clear is used. + This option frees up log space in the HSM after successfully saving the entries + to the JSON file. + + For scripting, suppress all non-errors with the global --quiet option. + """ + if not jsonfile: + cli_warn("No JSON file specified. This implies --no-verify.") + no_verify = True + + if clear and not jsonfile: + raise click.ClickException("The --clear option requires a file to store the log entries.") + + # Read JSON file if it exists + prev_entries = yhsm_log.read_json_files(jsonfile, False) if jsonfile else {} + + # Get the last entry for verification + prev = None + if prev_entries: + n = max(prev_entries.keys()) + ld_bytes = bytes.fromhex(prev_entries[n]['data']) + prev = yubihsm.core.LogEntry.parse(ld_bytes) + + # Get new entries from the HSM + with open_hsm_session(ctx, HSMAuthMethod.PASSWORD) as ses: + new_log_data = ses.get_log_entries() + dev_serial = ses.get_serial() + for e in new_log_data.entries: + if e.number in prev_entries: + cli_info(f"- Skipping already stored entry {e.number}") + assert e.data.hex() == prev_entries[e.number]['data'][:32], f"Data mismatch between already stored entry {e.number} and the one from HSM. At least GET_LOG_ENTRIES is known to cause this, so don't log it." + elif prev and not no_verify: + if not e.validate(prev): + raise click.ClickException(f"Log entry {e.number} FAILED validation against previous entry. Audit chain broken.") + prev = e + + new_json_entries = {e.number: yhsm_log.decode_log_entry_to_dict(e, ctx.conf, dev_serial) for e in new_log_data.entries} + + # Print the new entries to console + cli_info(json.dumps(new_json_entries, indent=2, sort_keys=True)) + + # Update (/create) the JSON file + if jsonfile: + yhsm_log.update_json_file(Path(jsonfile), new_json_entries, dev_serial) + if clear: + last_to_free = max(new_json_entries.keys()) + ses.free_log_entries(last_to_free) + cli_info(f"HSM updated: log space up to event {last_to_free} freed for reuse.") + + +@cmd_hsm.command('log-verify-all') +@pass_common_args +@click.argument('jsonfile', type=click.Path(exists=True, allow_dash=False), metavar='', required=True) +@click.option('--initial-num', '-i', type=int, help="Entry number to treat as first in the chain", default=1) +def log_verify_all(ctx: HsmSecretsCtx, jsonfile: str, initial_num: int): + """Verify the entire log chain + + Read all JSON files and verify the integrity of the entire log chain + starting from entry #1 (or the specified initial number). + + Starts reading from the given JSON file and continues with .1.json + and so on. + + This command does not connect to the HSM or fetch new entries. + """ + all_entries = yhsm_log.read_json_files(jsonfile, True) + yhsm_log.verify_log_chain(all_entries, initial_num) diff --git a/hsm_secrets/hsm/yhsm_log.py b/hsm_secrets/hsm/yhsm_log.py new file mode 100644 index 0000000..851bd16 --- /dev/null +++ b/hsm_secrets/hsm/yhsm_log.py @@ -0,0 +1,130 @@ +from dataclasses import astuple +import json +import struct +import datetime +from pathlib import Path +import click +import yubihsm # type: ignore [import] +from typing import Dict, List, Tuple +from filelock import FileLock, Timeout + +from hsm_secrets.config import HSMConfig +from hsm_secrets.utils import cli_info +from hsm_secrets.yubihsm import HSMSession + + +def read_json_files(jsonfile: str, need_all: bool) -> dict[int, dict]: + """Read log entries from JSON files. + Given `jsonfile` is the latest file, and the function checks for + older files with the same name but with a suffix '.1.json', '.2.json', etc. + + If `need_all` is not set, the function stops reading files as soon as it finds + a file at least one entry. + """ + prev_entries = {} + file_i = 0 + while True: + p = Path(jsonfile) if file_i == 0 else Path(jsonfile).with_suffix(f".{file_i}.json") + if file_i > 0 and not p.exists(): + break + if p.exists(): + cli_info(f"Reading old entries from '{p}'...") + with p.open('r') as fh: + data = json.load(fh) + for k, v in data.items(): + assert isinstance(int(k), int), "Entry number must be an integer." + assert isinstance(v, dict), "Entry data must be a dictionary." + if k in prev_entries: + raise ValueError(f"Duplicate entry number '{k}' in file {p}") + assert isinstance(v.get('data'), str), "Missing or invalid 'data' field in entry." + assert isinstance(v.get('fetch_time'), str), "Missing or invalid 'fetch_time' field in entry." + assert isinstance(v.get('hsm_serial'), int), "Missing or invalid 'hsm_serial' field in entry." + prev_entries[int(k)] = v + if len(prev_entries) > 0 and not need_all: + break + file_i += 1 + return prev_entries + + + +def decode_log_entry_to_dict(entry: yubihsm.core.LogEntry, conf: HSMConfig, hsm_serial: int) -> dict: + """ + Convert a log entry to a JSON-serializable dictionary. + """ + def find_info(id: int) -> str: + try: + if id in [0, 0xffff]: + return f'0x{id:04x}: -' + kd = conf.find_def(id) + return f"0x{id:04x}: '{kd.label}' ({kd.__class__.__name__})" + except KeyError: + return f"0x{id:04x} (UNKNOWN)" + + return { + "data": (struct.pack(entry.FORMAT, *astuple(entry))).hex(), + "hsm_serial": hsm_serial, + "fetch_time": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'), + "_info": { + "cmd": f'{entry.command.name} ({entry.command.value})', + "len": entry.length, + "ses_key": find_info(entry.session_key), + "tgt_key": find_info(entry.target_key), + "2nd_key": find_info(entry.second_key), + "result": entry.result, + "tick": entry.tick, + } + } + + +def verify_log_chain(prev_entries: dict, initial_num: int) -> yubihsm.core.LogEntry|None: + """ + Verify the log chain from the initial number to the last entry in the JSON file. + :return: The last log entry in the chain. + :raises: click.ClickException if the chain is broken. + """ + if initial_num not in prev_entries: + raise click.ClickException(f"Initial entry {initial_num} not found in the JSON file. Audit chain broken.") + n = initial_num + prev = None + while n in prev_entries: + ld_bytes = bytes.fromhex(prev_entries[n]['data']) + ld = yubihsm.core.LogEntry.parse(ld_bytes) + if prev: + if prev_entries[n]['hsm_serial'] != prev_entries[n-1]['hsm_serial']: + raise click.ClickException(f"Log entry {n} has different HSM serial than previous entry. Audit chain broken.") + if not ld.validate(prev): + raise click.ClickException(f"Log entry {n} FAILED validation against previous entry. Audit chain broken.") + prev = ld + n += 1 + cli_info(f"Ok, previously stored entries from {initial_num} to {n-1} verified successfully.") + return prev + + +def update_json_file(jsonfile: Path, new_json_entries: dict[int, dict], dev_serial: int): + """ + Merge new log entries into the JSON file. + """ + lockfile = f"{jsonfile}.lock" + file_entries = {} + try: + with FileLock(lockfile, timeout=30): + if Path(jsonfile).exists(): + with Path(jsonfile).open('r') as fh: + file_entries = json.load(fh) + + if any(e['hsm_serial'] != dev_serial for e in file_entries.values()): + raise click.ClickException("The JSON file contains entries from a different HSM serial. Cannot mix entries.") + + for jn, jdict in new_json_entries.items(): + assert jn not in file_entries, f"Duplicate entry number {jn} in JSON file. Should have been caught earlier." + file_entries[str(jn)] = jdict + + with Path(jsonfile).open('w') as fh: + json.dump(file_entries, fh, indent=2, sort_keys=True) + cli_info(f"New entries added to '{jsonfile}'") + + Path(lockfile).unlink() + + except Timeout as e: + cli_info(f"Failed to acquire file lock '{lockfile}': {e}") + raise click.ClickException("Failed to acquire lock on the JSON file. Please try again later.") diff --git a/hsm_secrets/utils.py b/hsm_secrets/utils.py index 3da41d8..30f6332 100644 --- a/hsm_secrets/utils.py +++ b/hsm_secrets/utils.py @@ -305,7 +305,10 @@ def open_hsm_session( ctxman = open_hsm_session_with_default_admin(ctx, device_serial) elif auth_method == HSMAuthMethod.PASSWORD: assert device_serial, "HSM device serial not provided nor inferred. Cannot use shared secret auth." - assert ctx.auth_password and ctx.auth_password_id + if not ctx.auth_password_id: + raise click.UsageError("Auth key ID (user login as) not specified for password auth method.") + if not ctx.auth_password: + raise click.UsageError("HSM_PASSWORD environment variable not set for password auth method.") ctxman = open_hsm_session_with_password(ctx, ctx.auth_password_id, ctx.auth_password, device_serial) else: raise ValueError(f"Unknown auth method: {auth_method}") diff --git a/hsm_secrets/yubihsm.py b/hsm_secrets/yubihsm.py index 3cd3689..7340e18 100644 --- a/hsm_secrets/yubihsm.py +++ b/hsm_secrets/yubihsm.py @@ -4,11 +4,12 @@ from typing import Sequence, cast import pickle import os +import typing import click -from yubihsm.defs import CAPABILITY, ALGORITHM, ERROR, OBJECT, ORIGIN # type: ignore [import] +from yubihsm.defs import CAPABILITY, ALGORITHM, ERROR, OBJECT, ORIGIN, COMMAND, AUDIT # type: ignore [import] from yubihsm.objects import AsymmetricKey, HmacKey, SymmetricKey, WrapKey, YhsmObject, AuthenticationKey, Opaque # type: ignore [import] -from yubihsm.core import AuthSession # type: ignore [import] +from yubihsm.core import AuthSession, LogData, LogEntry # type: ignore [import] from yubihsm.exceptions import YubiHsmDeviceError # type: ignore [import] from yubihsm.objects import ObjectInfo @@ -26,7 +27,7 @@ import cryptography.hazmat.primitives.asymmetric.padding as haz_asym_padding import cryptography.x509 as haz_x509 -from hsm_secrets.config import HSMAsymmetricKey, HSMAuthKey, HSMConfig, HSMHmacKey, HSMKeyID, HSMObjBase, HSMOpaqueObject, HSMSymmetricKey, HSMWrapKey, NoExtraBaseModel +from hsm_secrets.config import HSMAsymmetricKey, HSMAuditSettings, HSMAuthKey, HSMConfig, HSMHmacKey, HSMKeyID, HSMObjBase, HSMOpaqueObject, HSMSymmetricKey, HSMWrapKey, NoExtraBaseModel, YubiHsm2AuditMode, YubiHsm2Command from hsm_secrets.key_adapters import PrivateKeyOrAdapter, make_private_key_adapter """ @@ -286,8 +287,53 @@ def get_public_key(self, keydef: HSMAsymmetricKey) -> haz_rsa.RSAPublicKey | haz """ pass + @abstractmethod + def get_log_entries(self, previous_entry: LogEntry | None = None) -> LogData: + """ + Get the log entries from the HSM. + + NOTE! If `previous_entry` is given, it must be the exactly previous entry to + the first one on the device! It's used for validation, NOT pagination by the + underlying Yubico library. + """ + pass + + @abstractmethod + def free_log_entries(self, up_until_num: int) -> None: + """ + Free log entries up until (and including) a given number (id), to make space for new ones. + + :param up_until_num: Log entry number (id) to free up until + """ + pass + + @abstractmethod + def get_audit_settings(self) -> tuple[HSMAuditSettings, dict[str, YubiHsm2AuditMode]]: + """ + Get the audit settings from the HSM. + First return value is the settings known in the config definition, second lists + any unknown ones read from the device. + """ + pass + + @abstractmethod + def set_audit_settings(self, settings: HSMAuditSettings) -> None: + """ + Set the audit settings on the HSM. + """ + pass + # --------- Real YubiHSM2 --------- +_conf_class_to_yhs_object_type = { + HSMAuthKey: OBJECT.AUTHENTICATION_KEY, + HSMWrapKey: OBJECT.WRAP_KEY, + HSMSymmetricKey: OBJECT.SYMMETRIC_KEY, + HSMAsymmetricKey: OBJECT.ASYMMETRIC_KEY, + HSMHmacKey: OBJECT.HMAC_KEY, + HSMOpaqueObject: OBJECT.OPAQUE +} + class RealHSMSession(HSMSession): """ Implementation of the HSM session for a real YubiHSM2 device. @@ -301,9 +347,9 @@ def __init__(self, conf: HSMConfig, session: AuthSession, dev_serial: int): :param session: Authenticated session with the YubiHSM2 :param dev_serial: Device serial number """ - self.dev_serial = dev_serial - self.backend = session - self.conf = conf + self.dev_serial: int = dev_serial + self.backend: AuthSession = session + self.conf: HSMConfig = conf def get_serial(self) -> HSMKeyID: return self.dev_serial @@ -472,6 +518,42 @@ def get_public_key(self, keydef: HSMAsymmetricKey) -> haz_rsa.RSAPublicKey | haz assert isinstance(asym_key, AsymmetricKey) return asym_key.get_public_key() + def get_log_entries(self, previous_entry: LogEntry | None = None) -> LogData: + return self.backend.get_log_entries(previous_entry) + + def free_log_entries(self, up_until_num: int) -> None: + self.backend.set_log_index(up_until_num) + + def get_audit_settings(self) -> tuple[HSMAuditSettings, dict[str, YubiHsm2AuditMode]]: + def tristate(val: AUDIT) -> YubiHsm2AuditMode: + return 'off' if val == AUDIT.OFF else 'on' if val == AUDIT.ON else 'fixed' + + uknown_res: dict[str, YubiHsm2AuditMode] = {} + known_res = HSMAuditSettings( + forced_audit=tristate(self.backend.get_force_audit()), + default_command_logging='off', + command_logging = {}) + + std_enum_vals = {int(x[1].value) for x in COMMAND._member_map_.items()} + conf_cmd_literals = typing.get_args(YubiHsm2Command) + + for cmd, a in self.backend.get_command_audit().items(): + cmd_name = f'{cmd.name.lower().replace("_","-")}' + if cmd.value in std_enum_vals and cmd_name in conf_cmd_literals: + known_res.command_logging[cast(YubiHsm2Command, cmd_name)] = tristate(a) + else: + cmd_name = f'0x{cmd.value:02x}-{cmd.name}' + uknown_res[cmd_name] = tristate(a) + + return known_res, uknown_res + + def set_audit_settings(self, settings: HSMAuditSettings) -> None: + tristate: dict[YubiHsm2AuditMode, AUDIT] = {'off': AUDIT.OFF, 'on': AUDIT.ON, 'fixed': AUDIT.FIXED} + self.backend.set_force_audit(tristate[settings.forced_audit]) + audit_mapping: dict[COMMAND, AUDIT] = {COMMAND._member_map_[k.upper().replace("-","_")].value: tristate[v] for k,v in settings.command_logging.items()} + self.backend.set_command_audit(audit_mapping) + + # --------- Mock YubiHSM2 --------- @@ -519,10 +601,22 @@ def save_mock_hsms(path: str): class MockHSMDevice: serial: int objects: dict[tuple[HSMKeyID, OBJECT], 'MockYhsmObject'] = {} + log_entries:list[LogEntry] = [] def __init__(self, serial: int, objects: dict): self.serial = serial self.objects = objects + now = int(datetime.datetime.now().timestamp()*1000) + self.start_ts = now + initial_digest = b"0123456789abcdef" + + # FORMAT: ClassVar[str] = "!HBHHHHBL16s" + # LENGTH: ClassVar[int] = struct.calcsize(FORMAT) + #reset_entry_bytes = b"\xff"*32 + #reset_entry = LogEntry.parse(reset_entry_bytes) + #reset_entry.tick = now - self.start_ts + self.log_entries.append(LogEntry(0, cast(COMMAND, 0xff), 0xff, 0xff, 0xff, 0xff, 0xff, self.start_ts-now, initial_digest)) + self.log_entries.append(LogEntry(1, cast(COMMAND, 0), 0, 0, 0, 0, 0, self.start_ts-now, initial_digest)) def get_mock_object(self, key: HSMKeyID, type: OBJECT) -> 'MockYhsmObject': if (key, type) not in self.objects: @@ -542,15 +636,6 @@ def del_mock_object(self, key: HSMKeyID, type: OBJECT) -> None: del self.objects[(key, type)] -_conf_class_to_yhs_object_type = { - HSMAuthKey: OBJECT.AUTHENTICATION_KEY, - HSMWrapKey: OBJECT.WRAP_KEY, - HSMSymmetricKey: OBJECT.SYMMETRIC_KEY, - HSMAsymmetricKey: OBJECT.ASYMMETRIC_KEY, - HSMHmacKey: OBJECT.HMAC_KEY, - HSMOpaqueObject: OBJECT.OPAQUE -} - class MockYhsmObject: """ Mock version of the YhsmObject class (returned by list_objects() among others). @@ -624,6 +709,7 @@ def __init__(self, dev_serial: int): global _g_mock_hsms self.backend = _g_mock_hsms[dev_serial] self.dev_serial = dev_serial + self.audit_settings = HSMAuditSettings(forced_audit='off', default_command_logging='off', command_logging={}) def get_serial(self) -> int: return self.dev_serial @@ -783,3 +869,15 @@ def get_public_key(self, keydef: HSMAsymmetricKey) -> haz_rsa.RSAPublicKey | haz asym_key = haz_ser.load_pem_private_key(asym_pem, password=None) assert isinstance(asym_key, (haz_rsa.RSAPrivateKey, haz_ec.EllipticCurvePrivateKey, haz_ed25519.Ed25519PrivateKey)) return asym_key.public_key() + + def get_log_entries(self, previous_entry: LogEntry | None = None) -> LogData: + return LogData(0, 0, self.backend.log_entries) + + def free_log_entries(self, up_until_num: int) -> None: + self.backend.log_entries = [entry for entry in self.backend.log_entries if entry.number > up_until_num] + + def get_audit_settings(self) -> tuple[HSMAuditSettings, dict[str, YubiHsm2AuditMode]]: + return self.audit_settings, {} + + def set_audit_settings(self, settings: HSMAuditSettings) -> None: + self.audit_settings = settings diff --git a/requirements.txt b/requirements.txt index cf988a3..d77f52c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ pycryptodome yubihsm-ssh-tool @ git+https://github.com/YubicoLabs/yubihsm-ssh-tool@9cec861 uni-curses +filelock