Skip to content

Commit

Permalink
Implement log auditing
Browse files Browse the repository at this point in the history
  • Loading branch information
elonen committed Aug 1, 2024
1 parent 1b2e11a commit 9e5ada5
Show file tree
Hide file tree
Showing 7 changed files with 480 additions and 18 deletions.
36 changes: 36 additions & 0 deletions hsm-conf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,42 @@ admin:
capabilities: ["wrap-data", "unwrap-data", "export-wrapped", "import-wrapped", "exportable-under-wrap"]
delegated_capabilities: ['all']

audit:
# Specify logging/audit policies for the devices.
# 'fixed' is like 'on', but cannot be turned off again except by a factory reset
forced_audit: 'off' # If on/fixed, HSM refuses further commands until log is audited when it fills up
default_command_logging: 'on' # Default for commands not listed below
command_logging: # Overrides for specific commands
reset-device: 'fixed'
put-opaque: 'fixed'
put-authentication-key: 'fixed'
put-asymmetric-key: 'fixed'
generate-asymmetric-key: 'fixed'
export-wrapped: 'fixed'
import-wrapped: 'fixed'
put-wrap-key: 'fixed'
set-option: 'fixed'
put-hmac-key: 'fixed'
delete-object: 'fixed'
generate-hmac-key: 'fixed'
generate-wrap-key: 'fixed'
put-template: 'fixed'
change-authentication-key: 'fixed'
put-symmetric-key: 'fixed'
generate-symmetric-key: 'fixed'
echo: 'off'
device-info: 'off'
get-storage-info: 'off'
get-object-info: 'off'
get-option: 'off'
get-pseudo-random: 'off'
blink-device: 'off'
get-log-entries: 'off' # This seems to change after fetch (firmware bug?), so don't log it to avoid digest mismatch
create-session: 'off' # Not much point, as auth session key is included in every other log entry
close-session: 'off' # (--||--)
authentication-session: 'off' # (--||--)
session-message: 'off' # This is a low-level command, spams the log, not very useful to log


# User keys are for general use by human operators.
#
Expand Down
16 changes: 16 additions & 0 deletions hsm_secrets/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
import os
import re
import typing
import click.shell_completion
from pydantic import BaseModel, ConfigDict, HttpUrl, Field, StringConstraints
from typing_extensions import Annotated
Expand Down Expand Up @@ -155,6 +156,19 @@ class HSMObjBase(NoExtraBaseModel):
id: HSMKeyID
domains: set[HSMDomainName]

# -- Logging / audit --
YubiHsm2AuditMode = Literal['off', 'on', 'fixed']
YubiHsm2Command = Literal['echo', 'create-session', 'authenticate-session', 'session-message', 'device-info', 'reset-device', 'get-device-public-key', 'close-session', 'get-storage-info', 'put-opaque', 'get-opaque', 'put-authentication-key', 'put-asymmetric-key', 'generate-asymmetric-key', 'sign-pkcs1', 'list-objects', 'decrypt-pkcs1', 'export-wrapped', 'import-wrapped', 'put-wrap-key', 'get-log-entries', 'get-object-info', 'set-option', 'get-option', 'get-pseudo-random', 'put-hmac-key', 'sign-hmac', 'get-public-key', 'sign-pss', 'sign-ecdsa', 'derive-ecdh', 'delete-object', 'decrypt-oaep', 'generate-hmac-key', 'generate-wrap-key', 'verify-hmac', 'sign-ssh-certificate', 'put-template', 'get-template', 'decrypt-otp', 'create-otp-aead', 'randomize-otp-aead', 'rewrap-otp-aead', 'sign-attestation-certificate', 'put-otp-aead-key', 'generate-otp-aead-key', 'set-log-index', 'wrap-data', 'unwrap-data', 'sign-eddsa', 'blink-device', 'change-authentication-key', 'put-symmetric-key', 'generate-symmetric-key', 'decrypt-ecb', 'encrypt-ecb', 'decrypt-cbc', 'encrypt-cbc']
class HSMAuditSettings(NoExtraBaseModel):
forced_audit: YubiHsm2AuditMode
default_command_logging: YubiHsm2AuditMode
command_logging: dict[YubiHsm2Command, YubiHsm2AuditMode]

def apply_defaults(self):
# Fill command_logging with default value for any missing commands
for cmd in typing.get_args(YubiHsm2Command):
if cmd not in self.command_logging:
self.command_logging[cmd] = self.default_command_logging

# -- Asymmetric key models --
AsymmetricAlgorithm = Literal["rsa2048", "rsa3072", "rsa4096", "ecp256", "ecp384", "ecp521", "eck256", "ecbp256", "ecbp384", "ecbp512", "ed25519", "ecp224"]
Expand Down Expand Up @@ -281,6 +295,8 @@ class Admin(NoExtraBaseModel):
default_admin_key: HSMAuthKey
shared_admin_key: HSMAuthKey
wrap_key: HSMWrapKey
audit: HSMAuditSettings


class X509(NoExtraBaseModel):
root_certs: List[X509Cert]
Expand Down
182 changes: 180 additions & 2 deletions hsm_secrets/hsm/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
from dataclasses import astuple
import datetime
from hashlib import sha256
from io import BytesIO
import json
from pathlib import Path
import struct
import sys
import tarfile
from typing import Sequence
from typing import Sequence, cast
import click
from click.shell_completion import CompletionItem

from hsm_secrets.config import HSMAsymmetricKey, HSMConfig, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid
from hsm_secrets.config import HSMAsymmetricKey, HSMAuditSettings, HSMConfig, YubiHsm2AuditMode, YubiHsm2Command, click_hsm_obj_auto_complete, find_all_config_items_per_type, find_config_items_of_class, parse_keyid
from hsm_secrets.hsm import yhsm_log
from hsm_secrets.hsm.secret_sharing_ceremony import cli_reconstruction_ceremony, cli_splitting_ceremony
from hsm_secrets.utils import HSMAuthMethod, HsmSecretsCtx, cli_error, cli_info, cli_result, cli_ui_msg, cli_warn, confirm_and_delete_old_yubihsm_object_if_exists, open_hsm_session, open_hsm_session_with_password, pass_common_args, pretty_fmt_yubihsm_object, prompt_for_secret, pw_check_fromhex

Expand Down Expand Up @@ -408,6 +413,25 @@ def compare_config(ctx: HsmSecretsCtx, alldevs: bool, create: bool):
cli_info("")
cli_info(f"KEY CREATION REPORT: Created {n_created} objects, skipped {n_skipped} objects. Run the command again without --create to verify status.")


# Check logging / audit settings

cur_audit_settings, unknown_audit_commands = ses.get_audit_settings()
if unknown_audit_commands:
cli_result("UNKNOWN AUDIT LOG COMMANDS ON DEVICE (not necessarily a problem)")
for k, v in unknown_audit_commands.items():
cli_result(f" ??? {k} = {v}")

new_audit_setting = ctx.conf.admin.audit
ctx.conf.admin.audit.apply_defaults()

audit_settings_diff = _check_and_format_audit_conf_differences(cur_audit_settings, new_audit_setting, raise_if_fixed_change=False)
if audit_settings_diff:
cli_result("MISMATCHING AUDIT LOG SETTINGS (device -> config)")
cli_result(audit_settings_diff)
if create:
cli_warn("Use 'hsm apply-audit-settings' to change audit settings to match the configuration.")

cli_info("")


Expand Down Expand Up @@ -548,3 +572,157 @@ def restore_hsm(ctx: HsmSecretsCtx, backup_file: str, force: bool):

cli_info("")
cli_info("Restore complete.")


# ---------------

@cmd_hsm.command('apply-audit-settings')
@pass_common_args
@click.option('--alldevs', is_flag=True, help="Set on all devices")
@click.option('--force', is_flag=True, help="Don't ask for confirmation before setting")
def apply_audit_settings(ctx: HsmSecretsCtx, alldevs: bool, force: bool):
"""Apply audit settings from config
Apply the audit/logging settings from configuration file to the YubiHSM(s).
"""
conf_settings = ctx.conf.admin.audit
conf_settings.apply_defaults()

hsm_serials = ctx.conf.general.all_devices.keys() if alldevs else [ctx.hsm_serial]
for serial in hsm_serials:
with open_hsm_session(ctx, HSMAuthMethod.DEFAULT_ADMIN, serial) as ses:
cli_info(f"Checking audit settings on device {serial}...")
cur_settings, _unknown_audits = ses.get_audit_settings()
mismatches_str = _check_and_format_audit_conf_differences(cur_settings, conf_settings)
if not mismatches_str:
cli_info(" └– Already ok. Audit settings match the config file.")
continue
else:
cli_info(" └– Mismatching audit commands (current -> new):")
cli_info(mismatches_str)
if not force and not click.confirm("Do you want to set the audit settings to match the configuration?", default=False, err=True):
cli_info(f" └– Skipping device {serial}.")
else:
# Remove any 'fixed' commands from the configuration before applying.
# The YubiHSM2 command will fail otherwise.
without_fixed: dict[YubiHsm2Command, YubiHsm2AuditMode] = {k:v for k,v in conf_settings.command_logging.items() if v != 'fixed'}
to_apply = HSMAuditSettings(
forced_audit = conf_settings.forced_audit,
default_command_logging = conf_settings.default_command_logging,
command_logging = without_fixed)
ses.set_audit_settings(to_apply)
cli_info(" └– Audit settings applied.")


def _check_and_format_audit_conf_differences(cur_settings: HSMAuditSettings, conf_settings: HSMAuditSettings, raise_if_fixed_change = True) -> str|None:
"""
Check the audit settings in the YubiHSM against the configuration file.
Returns a formatted string with the differences, or None if there are none.
Raises ValueError if a fixed command is set in the device and the configuration wants to change it,
and `raise_if_fixed_change` is True.
"""
mismatches: dict[str, tuple[YubiHsm2AuditMode|None, YubiHsm2AuditMode|None]] = {}

if cur_settings.forced_audit != conf_settings.forced_audit:
mismatches['<FORCED AUDIT>'] = (cur_settings.forced_audit, conf_settings.forced_audit)

for k, new_v in conf_settings.command_logging.items():
cur_v = cur_settings.command_logging.get(cast(YubiHsm2Command, k), None)
if cur_v != new_v:
mismatches[k] = (cur_v, new_v)
if cur_v == 'fixed' and raise_if_fixed_change:
raise ValueError(f"Command '{k}' is set to 'fixed' in the device. Cannot change it without resetting the HSM.")

if not mismatches:
return None
return '\n'.join([f" - {mk.ljust(30)} {cv} -> {nv}" for mk, (cv, nv) in sorted(mismatches.items())])


# ---------------

@cmd_hsm.command('log-fetch')
@pass_common_args
@click.argument('jsonfile', type=click.Path(exists=False, allow_dash=False, dir_okay=False), metavar='<jsonfile>', required=False)
@click.option('--no-verify', is_flag=True, help="Don't verify against previous entry")
@click.option('--clear', '-c', is_flag=True, help="Clear the log entries after fetching")
def log_fetch(ctx: HsmSecretsCtx, jsonfile: str|None, no_verify: bool, clear: bool):
"""Fetch and store log entries from HSM
Retrieve new log entries from device and optionally store them in a JSON file.
If no JSON file is specified, show logs without storing (implies --no-verify).
YubiHSM2 log forms an audit chain where each entry is verified against
the previous one for integrity. The script first checks `jsonfile`,
then <stem>.1.json, if the file is empty or missing, when looking for the last entry.
Always maintain secure backups of JSON files; any missing or tampered files will
break the chain's integrity for future entries.
By default, fetched log entries remain in the HSM unless --clear is used.
This option frees up log space in the HSM after successfully saving the entries
to the JSON file.
For scripting, suppress all non-errors with the global --quiet option.
"""
if not jsonfile:
cli_warn("No JSON file specified. This implies --no-verify.")
no_verify = True

if clear and not jsonfile:
raise click.ClickException("The --clear option requires a file to store the log entries.")

# Read JSON file if it exists
prev_entries = yhsm_log.read_json_files(jsonfile, False) if jsonfile else {}

# Get the last entry for verification
prev = None
if prev_entries:
n = max(prev_entries.keys())
ld_bytes = bytes.fromhex(prev_entries[n]['data'])
prev = yubihsm.core.LogEntry.parse(ld_bytes)

# Get new entries from the HSM
with open_hsm_session(ctx, HSMAuthMethod.PASSWORD) as ses:
new_log_data = ses.get_log_entries()
dev_serial = ses.get_serial()
for e in new_log_data.entries:
if e.number in prev_entries:
cli_info(f"- Skipping already stored entry {e.number}")
assert e.data.hex() == prev_entries[e.number]['data'][:32], f"Data mismatch between already stored entry {e.number} and the one from HSM. At least GET_LOG_ENTRIES is known to cause this, so don't log it."
elif prev and not no_verify:
if not e.validate(prev):
raise click.ClickException(f"Log entry {e.number} FAILED validation against previous entry. Audit chain broken.")
prev = e

new_json_entries = {e.number: yhsm_log.decode_log_entry_to_dict(e, ctx.conf, dev_serial) for e in new_log_data.entries}

# Print the new entries to console
cli_info(json.dumps(new_json_entries, indent=2, sort_keys=True))

# Update (/create) the JSON file
if jsonfile:
yhsm_log.update_json_file(Path(jsonfile), new_json_entries, dev_serial)
if clear:
last_to_free = max(new_json_entries.keys())
ses.free_log_entries(last_to_free)
cli_info(f"HSM updated: log space up to event {last_to_free} freed for reuse.")


@cmd_hsm.command('log-verify-all')
@pass_common_args
@click.argument('jsonfile', type=click.Path(exists=True, allow_dash=False), metavar='<jsonfile>', required=True)
@click.option('--initial-num', '-i', type=int, help="Entry number to treat as first in the chain", default=1)
def log_verify_all(ctx: HsmSecretsCtx, jsonfile: str, initial_num: int):
"""Verify the entire log chain
Read all JSON files and verify the integrity of the entire log chain
starting from entry #1 (or the specified initial number).
Starts reading from the given JSON file and continues with <stem>.1.json
and so on.
This command does not connect to the HSM or fetch new entries.
"""
all_entries = yhsm_log.read_json_files(jsonfile, True)
yhsm_log.verify_log_chain(all_entries, initial_num)
Loading

0 comments on commit 9e5ada5

Please sign in to comment.