diff --git a/middleware/admin/verify_attestation.py b/middleware/admin/verify_attestation.py index e22caf10..3d398267 100644 --- a/middleware/admin/verify_attestation.py +++ b/middleware/admin/verify_attestation.py @@ -23,13 +23,14 @@ import json import hashlib import secp256k1 as ec +import re from .misc import info, head, AdminError from .utils import is_nonempty_hex_string from .certificate import HSMCertificate -UI_MESSAGE_HEADER = b"HSM:UI:5.1" -SIGNER_MESSAGE_HEADER = b"HSM:SIGNER:5.1" +UI_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:UI:(5.[0-9])") +SIGNER_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:SIGNER:(5.[0-9])") UI_DERIVATION_PATH = "m/44'/0'/0'/0/0" UD_VALUE_LENGTH = 32 PUBKEY_COMPRESSED_LENGTH = 33 @@ -45,6 +46,14 @@ "dad609" +def match_ui_message_header(ui_message): + return UI_MESSAGE_HEADER_REGEX.match(ui_message) + + +def match_signer_message_header(signer_message): + return SIGNER_MESSAGE_HEADER_REGEX.match(signer_message) + + def do_verify_attestation(options): head("### -> Verify UI and Signer attestations", fill="#") @@ -121,12 +130,14 @@ def do_verify_attestation(options): ui_message = bytes.fromhex(ui_result[1]) ui_hash = bytes.fromhex(ui_result[2]) - mh_len = len(UI_MESSAGE_HEADER) - if ui_message[:mh_len] != UI_MESSAGE_HEADER: + mh_match = match_ui_message_header(ui_message) + if mh_match is None: raise AdminError( - f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}") + f"Invalid UI attestation message header: {ui_message.hex()}") + mh_len = len(mh_match.group(0)) - # Extract UD value, UI public key and signer version from message + # Extract UI version, UD value, UI public key and signer version from message + ui_version = mh_match.group(1) ud_value = ui_message[mh_len:mh_len + UD_VALUE_LENGTH].hex() ui_public_key = ui_message[mh_len + UD_VALUE_LENGTH:mh_len + UD_VALUE_LENGTH + PUBKEY_COMPRESSED_LENGTH].hex() @@ -147,6 +158,7 @@ def do_verify_attestation(options): f"Authorized signer hash: {signer_hash}", f"Authorized signer iteration: {signer_iteration}", f"Installed UI hash: {ui_hash.hex()}", + f"Installed UI version: {ui_version.decode()}", ], fill="-", ) @@ -162,11 +174,12 @@ def do_verify_attestation(options): signer_message = bytes.fromhex(signer_result[1]) signer_hash = bytes.fromhex(signer_result[2]) - mh_len = len(SIGNER_MESSAGE_HEADER) - if signer_message[:mh_len] != SIGNER_MESSAGE_HEADER: + mh_match = match_signer_message_header(signer_message) + if mh_match is None: raise AdminError( - f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}") + f"Invalid Signer attestation message header: {signer_message.hex()}") + mh_len = len(mh_match.group(0)) if signer_message[mh_len:] != pubkeys_hash: reported = signer_message[mh_len:].hex() raise AdminError( @@ -174,11 +187,13 @@ def do_verify_attestation(options): f" but attestation reports {reported}" ) + signer_version = mh_match.group(1) head( ["Signer verified with public keys:"] + pubkeys_output + [ "", f"Hash: {signer_message[mh_len:].hex()}", f"Installed Signer hash: {signer_hash.hex()}", + f"Installed Signer version: {signer_version.decode()}", ], fill="-", ) diff --git a/middleware/tests/admin/test_verify_attestation.py b/middleware/tests/admin/test_verify_attestation.py index 4da01987..b3b0286f 100644 --- a/middleware/tests/admin/test_verify_attestation.py +++ b/middleware/tests/admin/test_verify_attestation.py @@ -25,7 +25,11 @@ from unittest.mock import Mock, call, patch, mock_open from admin.misc import AdminError from admin.pubkeys import PATHS -from admin.verify_attestation import do_verify_attestation +from admin.verify_attestation import ( + do_verify_attestation, + match_ui_message_header, + match_signer_message_header +) import ecdsa import hashlib import logging @@ -33,6 +37,8 @@ logging.disable(logging.CRITICAL) EXPECTED_UI_DERIVATION_PATH = "m/44'/0'/0'/0/0" +SIGNER_HEADER = b"HSM:SIGNER:5.1" +UI_HEADER = b"HSM:UI:5.1" @patch("sys.stdout.write") @@ -65,14 +71,14 @@ def setUp(self): ) self.pubkeys_hash = pubkeys_hash.digest() - self.ui_msg = b"HSM:UI:5.1" + \ + self.ui_msg = UI_HEADER + \ bytes.fromhex("aa"*32) + \ bytes.fromhex("bb"*33) + \ bytes.fromhex("cc"*32) + \ bytes.fromhex("0123") self.ui_hash = bytes.fromhex("ee" * 32) - self.signer_msg = b"HSM:SIGNER:5.1" + \ + self.signer_msg = SIGNER_HEADER + \ bytes.fromhex(self.pubkeys_hash.hex()) self.signer_hash = bytes.fromhex("ff" * 32) @@ -108,6 +114,7 @@ def test_verify_attestation(self, f"Authorized signer hash: {'cc'*32}", "Authorized signer iteration: 291", f"Installed UI hash: {'ee'*32}", + "Installed UI version: 5.1", ], fill="-", ) @@ -118,6 +125,7 @@ def test_verify_attestation(self, "", f"Hash: {self.pubkeys_hash.hex()}", f"Installed Signer hash: {'ff'*32}", + "Installed Signer version: 5.1", ], fill="-", ) @@ -276,3 +284,45 @@ def test_verify_attestation_invalid_signer_att(self, self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list) self.assertEqual(("Invalid Signer attestation: error validating 'signer'"), str(e.exception)) + + def test_match_ui_message_header_valid_header(self, _): + valid_headers = [ + UI_HEADER, + b"HSM:UI:5.0", + b"HSM:UI:5.5", + b"HSM:UI:5.9", + ] + for header in valid_headers: + ui_message = header + self.ui_msg[len(UI_HEADER):] + self.assertTrue(match_ui_message_header(ui_message)) + + def test_match_ui_message_header_invalid_header(self, _): + invalid_headers = [ + SIGNER_HEADER, + b"HSM:UI:4.0", + b"HSM:UI:5.X", + ] + for header in invalid_headers: + ui_message = header + self.ui_msg[len(UI_HEADER):] + self.assertFalse(match_ui_message_header(ui_message)) + + def test_match_signer_message_header_valid_header(self, _): + valid_headers = [ + SIGNER_HEADER, + b"HSM:SIGNER:5.0", + b"HSM:SIGNER:5.5", + b"HSM:SIGNER:5.9", + ] + for header in valid_headers: + signer_message = header + self.signer_msg[len(SIGNER_HEADER):] + self.assertTrue(match_signer_message_header(signer_message)) + + def test_match_signer_message_header_invalid_header(self, _): + invalid_headers = [ + UI_HEADER, + b"HSM:SIGNER:4.0", + b"HSM:SIGNER:5.X", + ] + for header in invalid_headers: + signer_message = header + self.signer_msg[len(SIGNER_HEADER):] + self.assertFalse(match_signer_message_header(signer_message))