Skip to content

Commit

Permalink
Fix verify_attestation.py to accept distinct versions for UI and Sign…
Browse files Browse the repository at this point in the history
…er (#197)

- Version MINOR is now allowed to be distinct between UI and Signer
- verify_attestation.py now outputs the installed version of both apps
- Enhanced message header validation to use a regex to improve maintainability 
- Added unit tests for header validation functions
  • Loading branch information
italo-sampaio authored Sep 6, 2024
1 parent 68f02ca commit d6dc8b6
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 12 deletions.
33 changes: 24 additions & 9 deletions middleware/admin/verify_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@
import json
import hashlib
import secp256k1 as ec
import re
from .misc import info, head, AdminError
from .utils import is_nonempty_hex_string
from .certificate import HSMCertificate


UI_MESSAGE_HEADER = b"HSM:UI:5.1"
SIGNER_MESSAGE_HEADER = b"HSM:SIGNER:5.1"
UI_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:UI:(5.[0-9])")
SIGNER_MESSAGE_HEADER_REGEX = re.compile(b"^HSM:SIGNER:(5.[0-9])")
UI_DERIVATION_PATH = "m/44'/0'/0'/0/0"
UD_VALUE_LENGTH = 32
PUBKEY_COMPRESSED_LENGTH = 33
Expand All @@ -45,6 +46,14 @@
"dad609"


def match_ui_message_header(ui_message):
return UI_MESSAGE_HEADER_REGEX.match(ui_message)


def match_signer_message_header(signer_message):
return SIGNER_MESSAGE_HEADER_REGEX.match(signer_message)


def do_verify_attestation(options):
head("### -> Verify UI and Signer attestations", fill="#")

Expand Down Expand Up @@ -121,12 +130,14 @@ def do_verify_attestation(options):

ui_message = bytes.fromhex(ui_result[1])
ui_hash = bytes.fromhex(ui_result[2])
mh_len = len(UI_MESSAGE_HEADER)
if ui_message[:mh_len] != UI_MESSAGE_HEADER:
mh_match = match_ui_message_header(ui_message)
if mh_match is None:
raise AdminError(
f"Invalid UI attestation message header: {ui_message[:mh_len].hex()}")
f"Invalid UI attestation message header: {ui_message.hex()}")
mh_len = len(mh_match.group(0))

# Extract UD value, UI public key and signer version from message
# Extract UI version, UD value, UI public key and signer version from message
ui_version = mh_match.group(1)
ud_value = ui_message[mh_len:mh_len + UD_VALUE_LENGTH].hex()
ui_public_key = ui_message[mh_len + UD_VALUE_LENGTH:mh_len + UD_VALUE_LENGTH +
PUBKEY_COMPRESSED_LENGTH].hex()
Expand All @@ -147,6 +158,7 @@ def do_verify_attestation(options):
f"Authorized signer hash: {signer_hash}",
f"Authorized signer iteration: {signer_iteration}",
f"Installed UI hash: {ui_hash.hex()}",
f"Installed UI version: {ui_version.decode()}",
],
fill="-",
)
Expand All @@ -162,23 +174,26 @@ def do_verify_attestation(options):

signer_message = bytes.fromhex(signer_result[1])
signer_hash = bytes.fromhex(signer_result[2])
mh_len = len(SIGNER_MESSAGE_HEADER)
if signer_message[:mh_len] != SIGNER_MESSAGE_HEADER:
mh_match = match_signer_message_header(signer_message)
if mh_match is None:
raise AdminError(
f"Invalid Signer attestation message header: {signer_message[:mh_len].hex()}")
f"Invalid Signer attestation message header: {signer_message.hex()}")

mh_len = len(mh_match.group(0))
if signer_message[mh_len:] != pubkeys_hash:
reported = signer_message[mh_len:].hex()
raise AdminError(
f"Signer attestation public keys hash mismatch: expected {pubkeys_hash.hex()}"
f" but attestation reports {reported}"
)

signer_version = mh_match.group(1)
head(
["Signer verified with public keys:"] + pubkeys_output + [
"",
f"Hash: {signer_message[mh_len:].hex()}",
f"Installed Signer hash: {signer_hash.hex()}",
f"Installed Signer version: {signer_version.decode()}",
],
fill="-",
)
56 changes: 53 additions & 3 deletions middleware/tests/admin/test_verify_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@
from unittest.mock import Mock, call, patch, mock_open
from admin.misc import AdminError
from admin.pubkeys import PATHS
from admin.verify_attestation import do_verify_attestation
from admin.verify_attestation import (
do_verify_attestation,
match_ui_message_header,
match_signer_message_header
)
import ecdsa
import hashlib
import logging

logging.disable(logging.CRITICAL)

EXPECTED_UI_DERIVATION_PATH = "m/44'/0'/0'/0/0"
SIGNER_HEADER = b"HSM:SIGNER:5.1"
UI_HEADER = b"HSM:UI:5.1"


@patch("sys.stdout.write")
Expand Down Expand Up @@ -65,14 +71,14 @@ def setUp(self):
)
self.pubkeys_hash = pubkeys_hash.digest()

self.ui_msg = b"HSM:UI:5.1" + \
self.ui_msg = UI_HEADER + \
bytes.fromhex("aa"*32) + \
bytes.fromhex("bb"*33) + \
bytes.fromhex("cc"*32) + \
bytes.fromhex("0123")
self.ui_hash = bytes.fromhex("ee" * 32)

self.signer_msg = b"HSM:SIGNER:5.1" + \
self.signer_msg = SIGNER_HEADER + \
bytes.fromhex(self.pubkeys_hash.hex())
self.signer_hash = bytes.fromhex("ff" * 32)

Expand Down Expand Up @@ -108,6 +114,7 @@ def test_verify_attestation(self,
f"Authorized signer hash: {'cc'*32}",
"Authorized signer iteration: 291",
f"Installed UI hash: {'ee'*32}",
"Installed UI version: 5.1",
],
fill="-",
)
Expand All @@ -118,6 +125,7 @@ def test_verify_attestation(self,
"",
f"Hash: {self.pubkeys_hash.hex()}",
f"Installed Signer hash: {'ff'*32}",
"Installed Signer version: 5.1",
],
fill="-",
)
Expand Down Expand Up @@ -276,3 +284,45 @@ def test_verify_attestation_invalid_signer_att(self,
self.assertEqual([call(self.pubkeys_path, 'r')], file_mock.call_args_list)
self.assertEqual(("Invalid Signer attestation: error validating 'signer'"),
str(e.exception))

def test_match_ui_message_header_valid_header(self, _):
valid_headers = [
UI_HEADER,
b"HSM:UI:5.0",
b"HSM:UI:5.5",
b"HSM:UI:5.9",
]
for header in valid_headers:
ui_message = header + self.ui_msg[len(UI_HEADER):]
self.assertTrue(match_ui_message_header(ui_message))

def test_match_ui_message_header_invalid_header(self, _):
invalid_headers = [
SIGNER_HEADER,
b"HSM:UI:4.0",
b"HSM:UI:5.X",
]
for header in invalid_headers:
ui_message = header + self.ui_msg[len(UI_HEADER):]
self.assertFalse(match_ui_message_header(ui_message))

def test_match_signer_message_header_valid_header(self, _):
valid_headers = [
SIGNER_HEADER,
b"HSM:SIGNER:5.0",
b"HSM:SIGNER:5.5",
b"HSM:SIGNER:5.9",
]
for header in valid_headers:
signer_message = header + self.signer_msg[len(SIGNER_HEADER):]
self.assertTrue(match_signer_message_header(signer_message))

def test_match_signer_message_header_invalid_header(self, _):
invalid_headers = [
UI_HEADER,
b"HSM:SIGNER:4.0",
b"HSM:SIGNER:5.X",
]
for header in invalid_headers:
signer_message = header + self.signer_msg[len(SIGNER_HEADER):]
self.assertFalse(match_signer_message_header(signer_message))

0 comments on commit d6dc8b6

Please sign in to comment.