From a23c63217709f02c19e5e3d7e00baeb8ffea28e9 Mon Sep 17 00:00:00 2001 From: Jarno Elonen Date: Mon, 16 Sep 2024 20:13:23 +0300 Subject: [PATCH] Implement Windows codesigning (osslsigncode) --- hsm-conf.yml | 3 +- hsm_secrets/codesign/__init__.py | 290 +++++++++++++++++++++++++++++++ hsm_secrets/config.py | 1 + hsm_secrets/main.py | 3 +- hsm_secrets/x509/__init__.py | 12 +- hsm_secrets/x509/cert_checker.py | 18 +- run-tests.sh | 53 ++++++ 7 files changed, 367 insertions(+), 13 deletions(-) create mode 100644 hsm_secrets/codesign/__init__.py diff --git a/hsm-conf.yml b/hsm-conf.yml index e91c38a..bb6dc4a 100644 --- a/hsm-conf.yml +++ b/hsm-conf.yml @@ -25,7 +25,7 @@ macros: country: US state: Calisota locality: Duckburg - organization: "Example Inc." + organization: "ExampleCorp" # Name constraints for TLS intermediate CAs. # Edit to match your domains and IP ranges (or set to null for no constraints). @@ -691,6 +691,7 @@ gpg: # Code signing keys for signing software, firmware, etc. # NO COMMANDS FOR THIS SECTION IMPLEMENTED YET -- KEYS ARE GENERATED FOR FUTURE USE codesign: + default_cert_id: 0x0711 certs: - key: diff --git a/hsm_secrets/codesign/__init__.py b/hsm_secrets/codesign/__init__.py new file mode 100644 index 0000000..36573e0 --- /dev/null +++ b/hsm_secrets/codesign/__init__.py @@ -0,0 +1,290 @@ +from pathlib import Path +import click +from asn1crypto import cms, algos, x509, core, pem # type: ignore [import] +from datetime import datetime, timezone +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import padding, rsa + +from hsm_secrets.config import HSMOpaqueObject +from hsm_secrets.utils import ( + HsmSecretsCtx, + cli_code_info, + cli_info, + open_hsm_session, + pass_common_args, +) +from hsm_secrets.x509.cert_builder import get_issuer_cert_and_key + +from typing import Any, Optional, List, Tuple, Union, cast + +SZ_OID_CTL = '1.3.6.1.4.1.311.10.1' # PKCS #7 ContentType OID for Certificate Trust List (CTL) szOID_CTL +SPC_INDIRECT_DATA_OBJID = '1.3.6.1.4.1.311.2.1.4' # SpcIndirectDataContent OID +SPC_STATEMENT_TYPE_OBJID = '1.3.6.1.4.1.311.2.1.11' # SpcStatementType OID +SPC_INDIVIDUAL_SP_KEY_PURPOSE_OBJID = '1.3.6.1.4.1.311.2.1.21' # Individual Code Signing OID + + +@click.group() +@click.pass_context +def cmd_codesign(ctx: click.Context) -> None: + """Code signing operations""" + ctx.ensure_object(dict) + + +@cmd_codesign.command('sign-osslsigncode-hash') +@pass_common_args +@click.argument('hashfile', type=click.Path(exists=False, dir_okay=False, resolve_path=True, allow_dash=True), default='-', required=True, metavar='') +@click.option('--out', '-o', required=False, type=click.Path(dir_okay=False, resolve_path=True), help="Output filename (default: deduce from input)", default=None) +@click.option('--ca', '-c', type=str, required=False, help="CA ID (hex) or label to sign with. Default: use config", default=None) +def sign_osslsigncode_hash( + ctx: HsmSecretsCtx, + hashfile: str, + out: Optional[str], + ca: Optional[str] +) -> None: + """Sign a Microsoft Authenticode hash from `osslsigncode` + + Usage: + + 1) Generate the hashfile with osslsigncode: `osslsigncode extract-data -h sha256 -in -out ` + + 2) Sign the request with this command + + 3) Embed the signature: `osslsigncode attach-signature -sigin -CAfile -in -out ` + + The `cert.chain` file should contain the full certificate chain, from issuer up to the root CA. + Both input and output are ASN.1 structures. Input can be DER or PEM encoded, output is DER. + """ + # Read and parse the input data + content_info: cms.ContentInfo = _read_input_data(hashfile) + + # Extract the spcIndirectDataContent + spc_indirect_data: core.Sequence = _extract_spc_indirect_data(content_info) + + # Extract the digest algorithm and compute the digest + digest_info_der: bytes = spc_indirect_data[1].dump() + digest_info: algos.DigestInfo = algos.DigestInfo.load(digest_info_der) + digest_algorithm_oid: core.ObjectIdentifier = digest_info['digest_algorithm']['algorithm'] # type: ignore + + digest_value, hash_algorithm = _compute_digest(spc_indirect_data, digest_algorithm_oid) + signing_time: datetime = datetime.now(timezone.utc) + + # Build the signed attributes + signed_attrs: cms.CMSAttributes = _build_signed_attributes(digest_value, signing_time) + + # Connect HSM and sign + ca_cert_id = ctx.conf.find_def(ca or ctx.conf.codesign.default_cert_id, HSMOpaqueObject).id + with open_hsm_session(ctx) as ses: + signer_cert_orig, signer_private_key = get_issuer_cert_and_key(ctx, ses, ca_cert_id) + + ca_der = signer_cert_orig.public_bytes(serialization.Encoding.DER) + signer_cert: x509.Certificate = x509.Certificate.load(ca_der) + + if not isinstance(signer_private_key, rsa.RSAPrivateKey): + raise click.ClickException("Only RSA private keys are supported for code signing for now") + + # Build the SignerInfo structure + signer_info: cms.SignerInfo = _build_signer_info( + signer_cert, + signer_private_key, + signed_attrs, + digest_algorithm_oid, + hash_algorithm, + ) + + # Assemble the SignedData structure + certificates: List[x509.Certificate] = [signer_cert] + signed_data: cms.SignedData = _assemble_signed_data( + spc_indirect_data, + digest_algorithm_oid, + certificates, + signer_info, + ) + + # Wrap in ContentInfo + content_info = cms.ContentInfo({'content_type': 'signed_data', 'content': signed_data}) + + _write_output(content_info, hashfile, out) + + +# ----- Helper functions ----- + + +def _get_hash_algorithm( + digest_algorithm_oid: core.ObjectIdentifier, +) -> hashes.HashAlgorithm: + """Map OID to hash algorithm.""" + oid: str = digest_algorithm_oid.dotted + if oid == '1.3.14.3.2.26': + return hashes.SHA1() + elif oid == '2.16.840.1.101.3.4.2.1': + return hashes.SHA256() + elif oid == '2.16.840.1.101.3.4.2.2': + return hashes.SHA384() + elif oid == '2.16.840.1.101.3.4.2.3': + return hashes.SHA512() + else: + raise ValueError(f"Unsupported digest algorithm OID: {oid}") + + +def _read_input_data(hashfile: str) -> cms.ContentInfo: + """Read and parse the input data.""" + if hashfile == '-': + data: bytes = click.get_binary_stream('stdin').read() + else: + data = Path(hashfile).read_bytes() + + # Detect and handle PEM encoding, or assume DER + if pem.detect(data): + _type_name, _headers, der_bytes = pem.unarmor(data) + else: + der_bytes = data + + return cms.ContentInfo.load(der_bytes) + + +def _extract_spc_indirect_data(content_info: cms.ContentInfo) -> core.Sequence: + """Extract the spcIndirectDataContent from ContentInfo.""" + if content_info['content_type'].native != 'signed_data': + raise click.ClickException("Input data is not a PKCS#7 SignedData structure") + + signed_data: cms.SignedData = cast(cms.SignedData, content_info['content']) + encap_content_info = cast(cms.EncapsulatedContentInfo, signed_data['encap_content_info']) + + if cast(core.ObjectIdentifier, encap_content_info['content_type']).dotted != SPC_INDIRECT_DATA_OBJID: + raise click.ClickException("Encapsulated content is not SPC_INDIRECT_DATA_OBJID") + + spc_indirect_data: core.Sequence = encap_content_info['content'].parsed # type: ignore + if spc_indirect_data is None: + raise click.ClickException("Encapsulated content is missing or cannot be parsed") + + return spc_indirect_data + + +def _compute_digest( + spc_indirect_data: core.Asn1Value, digest_algorithm_oid: core.ObjectIdentifier +) -> Tuple[bytes, hashes.HashAlgorithm]: + """Compute the digest over the DER-encoded data field.""" + assert spc_indirect_data.contents is not None + spc_data_der: bytes = spc_indirect_data.contents + hash_algorithm: hashes.HashAlgorithm = _get_hash_algorithm(digest_algorithm_oid) + digest = hashes.Hash(hash_algorithm) + digest.update(spc_data_der) + digest_value: bytes = digest.finalize() + return digest_value, hash_algorithm + + +def _build_signed_attributes(digest_value: bytes, signing_time: datetime) -> cms.CMSAttributes: + """Build the signed attributes for the SignerInfo.""" + class OIDSequence(core.Sequence): + _fields = [('oid', core.ObjectIdentifier)] + + content_type_attr = cms.CMSAttribute( + { + 'type': 'content_type', + 'values': cms.SetOfContentType([cms.ContentType(SZ_OID_CTL)]), + } + ) + signing_time_attr = cms.CMSAttribute( + { + 'type': 'signing_time', + 'values': cms.SetOfTime([cms.Time({'utc_time': signing_time})]), + } + ) + ms_crypto_attr = cms.CMSAttribute( + { + 'type': SPC_STATEMENT_TYPE_OBJID, + 'values': cms.SetOfAny( + [OIDSequence({'oid': SPC_INDIVIDUAL_SP_KEY_PURPOSE_OBJID})] + ), + } + ) + message_digest_attr = cms.CMSAttribute( + { + 'type': 'message_digest', + 'values': cms.SetOfOctetString([core.OctetString(digest_value)]), + } + ) + signed_attrs = cms.CMSAttributes( + [content_type_attr, signing_time_attr, ms_crypto_attr, message_digest_attr] + ) + return signed_attrs + + +def _build_signer_info( + signer_cert: x509.Certificate, + signer_private_key: rsa.RSAPrivateKey, + signed_attrs: cms.CMSAttributes, + digest_algorithm_oid: core.ObjectIdentifier, + hash_algorithm: hashes.HashAlgorithm, +) -> cms.SignerInfo: + """Build the SignerInfo structure.""" + issuer_and_serial = cms.IssuerAndSerialNumber( + {'issuer': signer_cert.issuer, 'serial_number': signer_cert.serial_number} + ) + + signature: bytes = signer_private_key.sign( + signed_attrs.dump(), + padding.PKCS1v15(), + hash_algorithm + ) + + signer_info = cms.SignerInfo( + { + 'version': 'v1', + 'sid': cms.SignerIdentifier( + {'issuer_and_serial_number': issuer_and_serial} + ), + 'digest_algorithm': { + 'algorithm': digest_algorithm_oid, + 'parameters': core.Null(), + }, + 'signed_attrs': signed_attrs, + 'signature_algorithm': { + 'algorithm': 'rsassa_pkcs1v15', + 'parameters': core.Null(), + }, + 'signature': cms.OctetString(signature), + 'unsigned_attrs': None, + } + ) + return signer_info + + +def _assemble_signed_data( + spc_indirect_data: core.Asn1Value, + digest_algorithm_oid: core.ObjectIdentifier, + certificates: List[x509.Certificate], + signer_info: cms.SignerInfo, +) -> cms.SignedData: + """Assemble the SignedData structure.""" + signed_data = cms.SignedData( + { + 'version': 'v1', + 'digest_algorithms': [ + {'algorithm': digest_algorithm_oid, 'parameters': core.Null()} + ], + 'encap_content_info': { + 'content_type': SPC_INDIRECT_DATA_OBJID, + 'content': spc_indirect_data, + }, + 'certificates': certificates, + 'signer_infos': [signer_info], + } + ) + return signed_data + + +def _write_output(content_info: cms.ContentInfo, hashfile: str, out: Optional[str]) -> None: + """Write the signed data to the output file.""" + if out: + out_path = Path(out) + else: + out_path = Path(hashfile).with_suffix(".signed" + Path(hashfile).suffix) + + out_path.write_bytes(content_info.dump()) + + cli_code_info(f"Signed code signature (ASN1 DER) saved to: `{out_path}`") + cli_info("Attach it to your PE executable with:") + cli_code_info( + f" `osslsigncode attach-signature -sigin '{out_path.name}' -CAfile MYCERT.chain.pem -in MYBIN.exe -out MYBIN.signed.exe`" + ) diff --git a/hsm_secrets/config.py b/hsm_secrets/config.py index 41dca3c..7089e39 100644 --- a/hsm_secrets/config.py +++ b/hsm_secrets/config.py @@ -103,6 +103,7 @@ class GPG(NoExtraBaseModel): keys: List['HSMAsymmetricKey'] class CodeSign(NoExtraBaseModel): + default_cert_id: HSMKeyID certs: List['X509CA'] class SSHTemplateSlots(NoExtraBaseModel): diff --git a/hsm_secrets/main.py b/hsm_secrets/main.py index cae6c68..63fd758 100755 --- a/hsm_secrets/main.py +++ b/hsm_secrets/main.py @@ -2,6 +2,7 @@ import os import click +from hsm_secrets.codesign import cmd_codesign from hsm_secrets.hsm import cmd_hsm from hsm_secrets.log import cmd_log from hsm_secrets.piv import cmd_piv @@ -101,7 +102,6 @@ def cli(ctx: click.Context, config: str|None, quiet: bool, yklabel: str|None, hs def cmd_nop(ctx: HsmSecretsCtx): cli_info("No errors. Exiting.") - cli.add_command(cmd_ssh, "ssh") cli.add_command(cmd_tls, "tls") cli.add_command(cmd_pass, "pass") @@ -111,6 +111,7 @@ def cmd_nop(ctx: HsmSecretsCtx): cli.add_command(cmd_piv, "piv") cli.add_command(cmd_x509, "x509") cli.add_command(cmd_user, "user") +cli.add_command(cmd_codesign, "codesign") register_repl(cli) if __name__ == '__main__': diff --git a/hsm_secrets/x509/__init__.py b/hsm_secrets/x509/__init__.py index fa3ea55..dacc1f4 100644 --- a/hsm_secrets/x509/__init__.py +++ b/hsm_secrets/x509/__init__.py @@ -95,8 +95,8 @@ def get_cert_cmd(ctx: HsmSecretsCtx, all_certs: bool, outdir: str|None, bundle: raise click.ClickException("Error: No certificates selected.") for cd in selected_certs: - cli_info(f"- Fetching PEM for 0x{cd.id:04x}: '{cd.label}'") - cli_info("") + cli_info(f"- Fetching PEM for 0x{cd.id:04x}: '{cd.label}'", err=True) + cli_info("", err=True) with open_hsm_session(ctx) as ses: pem_file = None @@ -207,7 +207,7 @@ def _do_it(ses: HSMSession|None): for cd, issues in cert_issues: if issues: cli_warn(f"\n-- Check results for certificate 0x{cd.id:04x} ({cd.label}) --") - X509RootCACertificateChecker.show_issues(issues) + X509RootCACertificateChecker.show_issues(issues, cd.label) if dry_run: cli_warn("DRY RUN. Would create the following certificates:") @@ -234,8 +234,6 @@ def init_crl(ctx: HsmSecretsCtx, ca: str, out: str, validity: int, this_update: ca_x509_def = find_ca_def(ctx.conf, ca_cert_def.id) assert ca_x509_def, f"CA cert ID not found: 0x{ca_cert_def.id:04x}" - print(f"CA cert: {ca_cert_def.label} (0x{ca_cert_def.id:04x})") - with open_hsm_session(ctx) as ses: ca_cert = ses.get_certificate(ca_cert_def) ca_key = ses.get_private_key(ca_x509_def.key) @@ -251,8 +249,8 @@ def init_crl(ctx: HsmSecretsCtx, ca: str, out: str, validity: int, this_update: crl_pem = crl.public_bytes(encoding=serialization.Encoding.PEM) Path(out).write_bytes(crl_pem) - cli_info(f"Initialized CRL signed by CA 0x{ca_cert_def.id:04x}") - cli_info(f"CRL written to: {out}") + cli_code_info(f"Initialized CRL signed by CA `{ca_cert_def.label}` (0x{ca_cert_def.id:04x})") + cli_code_info(f"CRL written to: `{out}`") # --------------- diff --git a/hsm_secrets/x509/cert_checker.py b/hsm_secrets/x509/cert_checker.py index 747f94c..f540f10 100644 --- a/hsm_secrets/x509/cert_checker.py +++ b/hsm_secrets/x509/cert_checker.py @@ -201,7 +201,7 @@ def _check_specific_subject_common_name_consistency(self, cn_value: str, san: x5 pass @staticmethod - def show_issues(issues: list[tuple[IssueSeverity, str]]): + def show_issues(issues: list[tuple[IssueSeverity, str]], subject: str|None = None): notices = [message for severity, message in issues if severity == IssueSeverity.NOTICE] warnings = [message for severity, message in issues if severity == IssueSeverity.WARNING] errors = [message for severity, message in issues if severity == IssueSeverity.ERROR] @@ -210,7 +210,10 @@ def show_issues(issues: list[tuple[IssueSeverity, str]]): return prn: Any = cli_error if errors else (cli_warn if warnings else cli_info) - prn("Detected issues:") + if subject: + prn(f"Detected issues for {subject}:") + else: + prn("Detected issues:") if notices: cli_info(" - ℹ️ Cert notices:") @@ -230,7 +233,7 @@ def show_issues(issues: list[tuple[IssueSeverity, str]]): def check_and_show_issues(self) -> list[tuple[IssueSeverity, str]]: self.check() - self.show_issues(self.issues) + self.show_issues(self.issues, str(self.certificate.subject)) return self.issues @@ -257,8 +260,15 @@ def _check_path_length_constraint(self, path_length): pass def _check_specific_key_usage(self, key_usage: x509.KeyUsage): - if not key_usage.key_cert_sign: + is_ca = False + try: + bc = self.certificate.extensions.get_extension_for_class(x509.BasicConstraints).value + is_ca = bc.ca + except x509.ExtensionNotFound: + pass + if is_ca and not key_usage.key_cert_sign: self._add_issue("KeyUsage does not include keyCertSign", IssueSeverity.ERROR) + if not key_usage.digital_signature: self._add_issue("KeyUsage does not include digitalSignature", IssueSeverity.WARNING) if key_usage.key_encipherment: diff --git a/run-tests.sh b/run-tests.sh index 7210f89..92dae21 100755 --- a/run-tests.sh +++ b/run-tests.sh @@ -424,6 +424,58 @@ test_ssh_host_certificates() { } +test_codesign_sign_osslsigncode_hash() { + + if ! which osslsigncode > /dev/null; then + echo "osslsigncode not found, skipping test" + return 0 + fi + + setup + + # Create a temporary directory for this test + local test_dir=$(mktemp -d "$TEMPDIR/codesign_test.XXXXXX") + + # Write a m inimal 'tiny.exe' for testing + echo "H4sIAH/x+VYAA/ONmsDAzMDAwALE//8zMOxggAAHBsJgAxDzye/iY9jCeVZxB6PPWcWQjMxihYKi/PSixFyF5MS8vPwShaRUhaLSPIXMPAUX/2CF3PyUVD1eXi4VqBk/dYtu7vWR6YLhWV2FXXvAdAqYDspMzgCJw+wMcGVg8GFkZMjf6+oKE3vAwMzIzcjBwMCE5DgBKFaA+gbEZoL4k4EBQYPlofog0gIQtXAaTg0o0CtJrShhgLob6hcU/zKAvZJAqrlZWhGHKXbcKBiyAAD3yoGLAAQAAA==" | base64 -d | gzip -d > "$test_dir/tiny.exe" + + # Extract hash to be signed from tiny.exe + osslsigncode extract-data -h sha256 -in "$test_dir/tiny.exe" -out "$test_dir/tiny.req" + assert_success + + # Sign the request using the HSM + run_cmd codesign sign-osslsigncode-hash "$test_dir/tiny.req" + assert_success + + # Check if the signed file exists + [ -f "$test_dir/tiny.signed.req" ] || { echo "ERROR: Signed file not created"; return 1; } + + # Get the full certificate chain from HSM + run_cmd x509 cert get --bundle "$test_dir/bundle.pem" cert_codesign-cs1-rsa4096 cert_ca-root-a1-rsa4096 + assert_success + + # Create a CRL + run_cmd x509 crl init --ca cert_ca-root-a1-rsa4096 -o "$test_dir/crl.pem" + assert_success + + # Attach the signature to the executable + local attach_output=$(osslsigncode attach-signature -sigin "$test_dir/tiny.signed.req" -CAfile "$test_dir/bundle.pem" -CRLfile "$test_dir/crl.pem" -in "$test_dir/tiny.exe" -out "$test_dir/tiny.signed.exe") + assert_success + + # Check the output of the attach-signature command + echo "$attach_output" + assert_grep "Signature successfully attached" "$attach_output" + assert_grep "Succeeded" "$attach_output" + + # Verify the signed executable + local verify_output=$(osslsigncode verify -in "$test_dir/tiny.signed.exe" -CAfile "$test_dir/bundle.pem" -CRLfile "$test_dir/crl.pem") + assert_success + echo "$verify_output" + assert_grep "Signature verification: ok" "$verify_output" + + echo "Codesign sign-osslsigncode-hash test passed successfully" +} + test_logging_commands() { local DB_PATH="$TEMPDIR/test_log.db" export HSM_PASSWORD="password123-not-really-set" @@ -531,6 +583,7 @@ run_test test_password_derivation run_test test_wrapped_backup run_test test_ssh_user_certificates run_test test_ssh_host_certificates +run_test test_codesign_sign_osslsigncode_hash run_test test_piv_user_certificate_key_type run_test test_piv_user_certificate_csr run_test test_piv_dc_certificate