Skip to content

Commit

Permalink
Certificate V2 parser (#230)
Browse files Browse the repository at this point in the history
- Added version mapping and parsing logic to certificate version 1
- Added dict based initialisation functions to HSMCertificateV2Element and its subclasses
- Updated sgx attestation gathering certificate generation
- Added and updated unit tests
- Ignoring long line linting in certificate v2 resources file
  • Loading branch information
amendelzon authored Dec 18, 2024
1 parent 5c53967 commit 76b4683
Show file tree
Hide file tree
Showing 9 changed files with 480 additions and 123 deletions.
7 changes: 7 additions & 0 deletions middleware/admin/certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,10 @@
from .certificate_v2 import HSMCertificateV2, HSMCertificateV2ElementSGXQuote, \
HSMCertificateV2ElementSGXAttestationKey, \
HSMCertificateV2ElementX509


# Assign version mapping to the parent class
HSMCertificate.VERSION_MAPPING = {
1: HSMCertificate,
2: HSMCertificateV2,
}
30 changes: 18 additions & 12 deletions middleware/admin/certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,18 +125,25 @@ class HSMCertificate:
VERSION = 1 # Only supported version
ROOT_ELEMENT = "root"
ELEMENT_BASE_CLASS = HSMCertificateElement
ELEMENT_FACTORY = HSMCertificateElement

@staticmethod
def from_jsonfile(path):
@classmethod
def from_jsonfile(kls, path):
try:
with open(path, "r") as file:
certificate_map = json.loads(file.read())

if type(certificate_map) != dict:
raise ValueError(
"JSON file must contain an object as a top level element")
"Certificate file must contain an object as a top level element")

return HSMCertificate(certificate_map)
version = certificate_map.get("version")
if version not in kls.VERSION_MAPPING:
raise ValueError("Invalid or unsupported HSM certificate "
f"version {version} (supported versions are "
f"{", ".join(kls.VERSION_MAPPING.keys())})")

return kls.VERSION_MAPPING[version](certificate_map)
except (ValueError, json.JSONDecodeError) as e:
raise ValueError('Unable to read HSM certificate from "%s": %s' %
(path, str(e)))
Expand Down Expand Up @@ -190,8 +197,8 @@ def validate_and_get_values(self, raw_root_pubkey_hex):

def add_element(self, element):
if not isinstance(element, self.ELEMENT_BASE_CLASS):
raise ValueError(
f"Expected an HSMCertificateElement but got a {type(element)}")
raise ValueError(f"Expected an {self.ELEMENT_BASE_CLASS.__name__} "
"but got a {type(element)}")
self._elements[element.name] = element

def clear_targets(self):
Expand All @@ -214,11 +221,10 @@ def save_to_jsonfile(self, path):
file.write("%s\n" % json.dumps(self.to_dict(), indent=2))

def _parse(self, certificate_map):
if "version" not in certificate_map or certificate_map["version"] != self.VERSION:
raise ValueError(
"Invalid or unsupported HSM certificate version "
f"(current version is {self.VERSION})"
)
version = certificate_map.get("version")
if version != self.VERSION:
raise ValueError("Invalid or unexpected HSM certificate version "
f"{version} (expected {self.VERSION})")

if "targets" not in certificate_map or type(certificate_map["targets"]) != list:
raise ValueError("Missing or invalid targets")
Expand All @@ -229,7 +235,7 @@ def _parse(self, certificate_map):
raise ValueError("Missing elements")

for item in certificate_map["elements"]:
element = HSMCertificateElement(item)
element = self.ELEMENT_FACTORY(item)
self._elements[item["name"]] = element

# Sanity: check each target has a path to the root authority
Expand Down
160 changes: 130 additions & 30 deletions middleware/admin/certificate_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,76 +20,176 @@
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

import base64
from .certificate_v1 import HSMCertificate
from .utils import is_nonempty_hex_string


class HSMCertificateV2Element:
pass
def __init__(self):
raise RuntimeError("Cannot instantiate an "
"abstract HSMCertificateV2Element")

@classmethod
def from_dict(kls, element_map):
if element_map.get("type") not in kls.TYPE_MAPPING:
raise ValueError("Invalid or missing element type for "
f"element {element_map.get("name")}")

return kls.TYPE_MAPPING[element_map["type"]](element_map)

def _init_with_map(self, element_map):
if "name" not in element_map:
raise ValueError("Missing name for HSM certificate element")

self._name = element_map["name"]

if "signed_by" not in element_map:
raise ValueError("Missing certifier for HSM certificate element")
self._signed_by = element_map["signed_by"]

@property
def name(self):
return self._name

@property
def signed_by(self):
return self._signed_by


class HSMCertificateV2ElementSGXQuote(HSMCertificateV2Element):
def __init__(self, name, message, custom_data, signature, signed_by):
self.name = name
self.message = message
self.custom_data = custom_data
self.signature = signature
self.signed_by = signed_by
def __init__(self, element_map):
self._init_with_map(element_map)

def _init_with_map(self, element_map):
super()._init_with_map(element_map)

if not is_nonempty_hex_string(element_map.get("message")):
raise ValueError(f"Invalid message for HSM certificate element {self.name}")
self._message = bytes.fromhex(element_map["message"])

if not is_nonempty_hex_string(element_map.get("custom_data")):
raise ValueError("Invalid custom data for HSM certificate "
f"element {self.name}")
self._custom_data = bytes.fromhex(element_map["custom_data"])

if not is_nonempty_hex_string(element_map.get("signature")):
raise ValueError("Invalid signature for HSM certificate element {self.name}")
self._signature = bytes.fromhex(element_map["signature"])

@property
def message(self):
return self._message.hex()

@property
def custom_data(self):
return self._custom_data.hex()

@property
def signature(self):
return self._signature.hex()

def to_dict(self):
return {
"name": self.name,
"type": "sgx_quote",
"message": self.message.hex(),
"custom_data": self.custom_data.hex(),
"signature": self.signature.hex(),
"message": self.message,
"custom_data": self.custom_data,
"signature": self.signature,
"signed_by": self.signed_by,
}


class HSMCertificateV2ElementSGXAttestationKey(HSMCertificateV2Element):
def __init__(self, name, message, key, auth_data, signature, signed_by):
self.name = name
self.message = message
self.key = key
self.auth_data = auth_data
self.signature = signature
self.signed_by = signed_by
def __init__(self, element_map):
self._init_with_map(element_map)

def _init_with_map(self, element_map):
super()._init_with_map(element_map)

if not is_nonempty_hex_string(element_map.get("message")):
raise ValueError(f"Invalid message for HSM certificate element {self.name}")
self._message = bytes.fromhex(element_map["message"])

if not is_nonempty_hex_string(element_map.get("key")):
raise ValueError(f"Invalid key for HSM certificate element {self.name}")
self._key = bytes.fromhex(element_map["key"])

if not is_nonempty_hex_string(element_map.get("auth_data")):
raise ValueError(f"Invalid auth data for HSM certificate element {self.name}")
self._auth_data = bytes.fromhex(element_map["auth_data"])

if not is_nonempty_hex_string(element_map.get("signature")):
raise ValueError(f"Invalid signature for HSM certificate element {self.name}")
self._signature = bytes.fromhex(element_map["signature"])

@property
def message(self):
return self._message.hex()

@property
def key(self):
return self._key.hex()

@property
def auth_data(self):
return self._auth_data.hex()

@property
def signature(self):
return self._signature.hex()

def to_dict(self):
return {
"name": self.name,
"type": "sgx_attestation_key",
"message": self.message.hex(),
"key": self.key.hex(),
"auth_data": self.auth_data.hex(),
"signature": self.signature.hex(),
"message": self.message,
"key": self.key,
"auth_data": self.auth_data,
"signature": self.signature,
"signed_by": self.signed_by,
}


class HSMCertificateV2ElementX509(HSMCertificateV2Element):
def __init__(self, name, message, signed_by):
self.name = name
self.message = message
self.signed_by = signed_by
def __init__(self, element_map):
self._init_with_map(element_map)

def _init_with_map(self, element_map):
super()._init_with_map(element_map)

try:
self._message = base64.b64decode(element_map.get("message"))
except Exception:
raise ValueError(f"Invalid message for HSM certificate element {self.name}")

@property
def message(self):
return base64.b64encode(self._message).decode("ASCII")

def to_dict(self):
return {
"name": self.name,
"type": "x509_pem",
"message": self.message.decode('ASCII'),
"message": self.message,
"signed_by": self.signed_by,
}


# Element type mappings
HSMCertificateV2Element.TYPE_MAPPING = {
"sgx_quote": HSMCertificateV2ElementSGXQuote,
"sgx_attestation_key": HSMCertificateV2ElementSGXAttestationKey,
"x509_pem": HSMCertificateV2ElementX509,
}


class HSMCertificateV2(HSMCertificate):
VERSION = 2
ROOT_ELEMENT = "sgx_root"
ELEMENT_BASE_CLASS = HSMCertificateV2Element
ELEMENT_FACTORY = HSMCertificateV2Element.from_dict

def validate_and_get_values(self, raw_root_pubkey_hex):
# TODO
pass

def _parse(self, certificate_map):
# TODO
pass
50 changes: 25 additions & 25 deletions middleware/admin/sgx_attestation.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,34 +98,34 @@ def do_attestation(options):
att_cert = HSMCertificateV2()

att_cert.add_element(
HSMCertificateV2ElementSGXQuote(
name="quote",
message=envelope.quote.get_raw_data(),
custom_data=envelope.custom_message,
signature=quote_signature,
signed_by="attestation",
))
HSMCertificateV2ElementSGXQuote({
"name": "quote",
"message": envelope.quote.get_raw_data().hex(),
"custom_data": envelope.custom_message.hex(),
"signature": quote_signature.hex(),
"signed_by": "attestation",
}))
att_cert.add_element(
HSMCertificateV2ElementSGXAttestationKey(
name="attestation",
message=envelope.quote_auth_data.qe_report_body.get_raw_data(),
key=att_key.to_string("uncompressed"),
auth_data=envelope.qe_auth_data.data,
signature=qe_rb_signature,
signed_by="quoting_enclave",
))
HSMCertificateV2ElementSGXAttestationKey({
"name": "attestation",
"message": envelope.quote_auth_data.qe_report_body.get_raw_data().hex(),
"key": att_key.to_string("uncompressed").hex(),
"auth_data": envelope.qe_auth_data.data.hex(),
"signature": qe_rb_signature.hex(),
"signed_by": "quoting_enclave",
}))
att_cert.add_element(
HSMCertificateV2ElementX509(
name="quoting_enclave",
message=envelope.qe_cert_data.certs[0],
signed_by="platform_ca",
))
HSMCertificateV2ElementX509({
"name": "quoting_enclave",
"message": envelope.qe_cert_data.certs[0],
"signed_by": "platform_ca",
}))
att_cert.add_element(
HSMCertificateV2ElementX509(
name="platform_ca",
message=envelope.qe_cert_data.certs[1],
signed_by="sgx_root",
))
HSMCertificateV2ElementX509({
"name": "platform_ca",
"message": envelope.qe_cert_data.certs[1],
"signed_by": "sgx_root",
}))

att_cert.add_target("quote")
att_cert.save_to_jsonfile(options.output_file_path)
Expand Down
4 changes: 2 additions & 2 deletions middleware/tests/admin/test_certificate_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from unittest import TestCase
from unittest.mock import call, patch, mock_open
from admin.certificate_v1 import HSMCertificate, HSMCertificateElement
from admin.certificate import HSMCertificate, HSMCertificateElement


class TestHSMCertificate(TestCase):
Expand Down Expand Up @@ -155,7 +155,7 @@ def test_create_certificate_missing_elements(self):
"targets": ["attestation", "device"]
})

@patch('admin.certificate_v1.HSMCertificateElement')
@patch('admin.certificate_v1.HSMCertificate.ELEMENT_FACTORY')
def test_create_certificate_invalid_element(self, certElementMock):
certElementMock.side_effect = ValueError()
with self.assertRaises(ValueError):
Expand Down
Loading

0 comments on commit 76b4683

Please sign in to comment.