From 790781df4da22ebc33dd2f460d3c1d05d023dd50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Ricks?= Date: Fri, 17 Nov 2023 09:30:40 +0100 Subject: [PATCH] Change: Use a UUID for cpe_name_id at NIST CPE API and models cpe_name_id values are actually UUID therefore they should be parsed as a UUID instance. This allows for better serialization (as bytes) if necessary. --- pontos/nvd/cpe/api.py | 5 +- pontos/nvd/models/cpe.py | 7 ++- tests/nvd/cpe/test_api.py | 108 +++++++++++++++++++++++------------ tests/nvd/models/test_cpe.py | 6 +- 4 files changed, 81 insertions(+), 45 deletions(-) diff --git a/pontos/nvd/cpe/api.py b/pontos/nvd/cpe/api.py index de982eb7b..072b3b07d 100644 --- a/pontos/nvd/cpe/api.py +++ b/pontos/nvd/cpe/api.py @@ -26,6 +26,7 @@ Type, Union, ) +from uuid import UUID from httpx import Timeout @@ -88,7 +89,7 @@ def __init__( rate_limit=rate_limit, ) - async def cpe(self, cpe_name_id: str) -> CPE: + async def cpe(self, cpe_name_id: Union[str, UUID]) -> CPE: """ Query for a CPE matching the CPE UUID. @@ -114,7 +115,7 @@ async def cpe(self, cpe_name_id: str) -> CPE: if not cpe_name_id: raise PontosError("Missing CPE Name ID.") - response = await self._get(params={"cpeNameId": cpe_name_id}) + response = await self._get(params={"cpeNameId": str(cpe_name_id)}) response.raise_for_status() data = response.json(object_hook=convert_camel_case) products = data["products"] diff --git a/pontos/nvd/models/cpe.py b/pontos/nvd/models/cpe.py index 4cad14c1d..33027adc3 100644 --- a/pontos/nvd/models/cpe.py +++ b/pontos/nvd/models/cpe.py @@ -18,6 +18,7 @@ from dataclasses import dataclass, field from datetime import datetime from typing import List, Optional +from uuid import UUID from pontos.models import Model, StrEnum @@ -90,7 +91,7 @@ class DeprecatedBy(Model): """ cpe_name: Optional[str] = None - cpe_name_id: Optional[str] = None + cpe_name_id: Optional[UUID] = None @dataclass @@ -100,7 +101,7 @@ class CPE(Model): Attributes: cpe_name: The name of the CPE - cpe_name_id: ID of the CPE + cpe_name_id: UUID of the CPE deprecated: True if the CPE is deprecated last_modified: Last modification date of the CPE created: Creation date of the CPE @@ -111,7 +112,7 @@ class CPE(Model): """ cpe_name: str - cpe_name_id: str + cpe_name_id: UUID deprecated: bool last_modified: datetime created: datetime diff --git a/tests/nvd/cpe/test_api.py b/tests/nvd/cpe/test_api.py index 82dfcaf93..07a7ad51b 100644 --- a/tests/nvd/cpe/test_api.py +++ b/tests/nvd/cpe/test_api.py @@ -19,8 +19,10 @@ # ruff: noqa: E501 from datetime import datetime -from typing import Any, Dict, List, Optional +from itertools import repeat +from typing import Any, Optional from unittest.mock import MagicMock, patch +from uuid import UUID, uuid4 from httpx import AsyncClient, Response @@ -31,17 +33,37 @@ from tests.nvd import get_cpe_data +def uuid_replace_str(uuid: UUID, iteration: int, number: int) -> str: + id_str = str(uuid).rsplit("-", 2) + nn = "".join([str(j) for j in repeat(number, 4)]) + ii = "".join([str(j) for j in repeat(iteration, 12)]) + return f"{id_str[0]}-{nn}-{ii}" + + +def uuid_replace(uuid: UUID, iteration: int, number: int) -> UUID: + return UUID(uuid_replace_str(uuid, iteration, number)) + + def create_cpe_response( - cpe_name_id: str, + cpe_name_id: UUID, *, - update: Optional[Dict[str, Any]] = None, + update: Optional[dict[str, Any]] = None, results: int = 1, + iteration: int = 1, ) -> MagicMock: + products = [ + { + "cpe": get_cpe_data( + { + "cpe_name_id": f"{uuid_replace_str(cpe_name_id, iteration, i)}" + } + ) + } + for i in range(1, results + 1) + ] + data = { - "products": [ - {"cpe": get_cpe_data({"cpe_name_id": f"{cpe_name_id}-{i}"})} - for i in range(1, results + 1) - ], + "products": products, "results_per_page": results, } if update: @@ -53,13 +75,14 @@ def create_cpe_response( def create_cpes_responses( - responses: int = 2, results_per_response: int = 1 -) -> List[MagicMock]: + cpe_name_id: UUID, responses: int = 2, results_per_response: int = 1 +) -> list[MagicMock]: return [ create_cpe_response( - cpe_name_id=f"CPE-{i}", + cpe_name_id=cpe_name_id, update={"total_results": responses * results_per_response}, results=results_per_response, + iteration=i, ) for i in range(1, responses + 1) ] @@ -91,21 +114,22 @@ async def test_no_cpe(self): await self.api.cpe("CPE-1") async def test_cpe(self): - self.http_client.get.return_value = create_cpe_response("CPE-1") + uuid = uuid4() + self.http_client.get.return_value = create_cpe_response(uuid) - cpe = await self.api.cpe("CPE-1-1") + cpe = await self.api.cpe(uuid) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, - params={"cpeNameId": "CPE-1-1"}, + params={"cpeNameId": str(uuid)}, ) self.assertEqual( cpe.cpe_name, "cpe:2.3:o:microsoft:windows_10_22h2:-:*:*:*:*:*:arm64:*", ) - self.assertEqual(cpe.cpe_name_id, "CPE-1-1") + self.assertEqual(cpe.cpe_name_id, uuid_replace(uuid, 1, 1)) self.assertFalse(cpe.deprecated) self.assertEqual( cpe.last_modified, datetime(2022, 12, 9, 18, 15, 16, 973000) @@ -123,7 +147,8 @@ async def test_rate_limit( sleep_mock: MagicMock, monotonic_mock: MagicMock, ): - self.http_client.get.side_effect = create_cpes_responses(8) + uuid = uuid4() + self.http_client.get.side_effect = create_cpes_responses(uuid, 8) monotonic_mock.side_effect = [10, 11] it = aiter(self.api.cpes()) @@ -141,15 +166,16 @@ async def test_rate_limit( @patch("pontos.nvd.cpe.api.now", spec=now) async def test_cves_last_modified_start_date(self, now_mock: MagicMock): + uuid = uuid4() now_mock.return_value = datetime(2022, 12, 31) - self.http_client.get.side_effect = create_cpes_responses() + self.http_client.get.side_effect = create_cpes_responses(uuid) it = aiter( self.api.cpes(last_modified_start_date=datetime(2022, 12, 1)) ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -165,7 +191,7 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -181,7 +207,8 @@ async def test_cves_last_modified_start_date(self, now_mock: MagicMock): cve = await anext(it) async def test_cves_last_modified_end_date(self): - self.http_client.get.side_effect = create_cpes_responses() + uuid = uuid4() + self.http_client.get.side_effect = create_cpes_responses(uuid) it = aiter( self.api.cpes( @@ -191,7 +218,7 @@ async def test_cves_last_modified_end_date(self): ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -207,7 +234,7 @@ async def test_cves_last_modified_end_date(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -223,12 +250,13 @@ async def test_cves_last_modified_end_date(self): cve = await anext(it) async def test_cpes_keywords(self): - self.http_client.get.side_effect = create_cpes_responses() + uuid = uuid4() + self.http_client.get.side_effect = create_cpes_responses(uuid) it = aiter(self.api.cpes(keywords=["Mac OS X", "kernel"])) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -244,7 +272,7 @@ async def test_cpes_keywords(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -260,12 +288,13 @@ async def test_cpes_keywords(self): cve = await anext(it) async def test_cpes_keyword(self): - self.http_client.get.side_effect = create_cpes_responses() + uuid = uuid4() + self.http_client.get.side_effect = create_cpes_responses(uuid) it = aiter(self.api.cpes(keywords="macOS")) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -280,7 +309,7 @@ async def test_cpes_keyword(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -295,7 +324,8 @@ async def test_cpes_keyword(self): cve = await anext(it) async def test_cpes_cpe_match_string(self): - self.http_client.get.side_effect = create_cpes_responses() + uuid = uuid4() + self.http_client.get.side_effect = create_cpes_responses(uuid) it = aiter( self.api.cpes( @@ -304,7 +334,7 @@ async def test_cpes_cpe_match_string(self): ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -319,7 +349,7 @@ async def test_cpes_cpe_match_string(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -334,7 +364,8 @@ async def test_cpes_cpe_match_string(self): cve = await anext(it) async def test_cpes_match_criteria_id(self): - self.http_client.get.side_effect = create_cpes_responses() + uuid = uuid4() + self.http_client.get.side_effect = create_cpes_responses(uuid) it = aiter( self.api.cpes( @@ -343,7 +374,7 @@ async def test_cpes_match_criteria_id(self): ) cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -358,7 +389,7 @@ async def test_cpes_match_criteria_id(self): cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1)) self.http_client.get.assert_awaited_once_with( "https://services.nvd.nist.gov/rest/json/cpes/2.0", headers={}, @@ -373,8 +404,9 @@ async def test_cpes_match_criteria_id(self): cve = await anext(it) async def test_cpes_request_results(self): + uuid = uuid4() self.http_client.get.side_effect = create_cpes_responses( - results_per_response=2 + uuid, results_per_response=2 ) it = aiter(self.api.cpes(request_results=10)) @@ -388,11 +420,11 @@ async def test_cpes_request_results(self): "resultsPerPage": 10, }, ) - self.assertEqual(cve.cpe_name_id, "CPE-1-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 1)) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-1-2") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 1, 2)) self.http_client.get.assert_not_called() self.http_client.get.reset_mock() @@ -405,11 +437,11 @@ async def test_cpes_request_results(self): "resultsPerPage": 2, }, ) - self.assertEqual(cve.cpe_name_id, "CPE-2-1") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 1)) self.http_client.get.reset_mock() cve = await anext(it) - self.assertEqual(cve.cpe_name_id, "CPE-2-2") + self.assertEqual(cve.cpe_name_id, uuid_replace(uuid, 2, 2)) self.http_client.get.assert_not_called() self.http_client.get.reset_mock() diff --git a/tests/nvd/models/test_cpe.py b/tests/nvd/models/test_cpe.py index 050c8728e..1bb7cb758 100644 --- a/tests/nvd/models/test_cpe.py +++ b/tests/nvd/models/test_cpe.py @@ -18,6 +18,7 @@ import unittest from datetime import datetime +from uuid import UUID from pontos.nvd.models.cpe import CPE, ReferenceType from tests.nvd import get_cpe_data @@ -32,7 +33,7 @@ def test_required_only(self): "cpe:2.3:o:microsoft:windows_10_22h2:-:*:*:*:*:*:arm64:*", ) self.assertEqual( - cpe.cpe_name_id, "9BAECDB2-614D-4E9C-9936-190C30246F03" + cpe.cpe_name_id, UUID("9BAECDB2-614D-4E9C-9936-190C30246F03") ) self.assertFalse(cpe.deprecated) self.assertEqual( @@ -111,5 +112,6 @@ def test_deprecated(self): "cpe:2.3:o:microsoft:windows_10_22h2:-:*:*:*:*:*:x64:*", ) self.assertEqual( - deprecated_by.cpe_name_id, "A09335E2-B42F-4820-B487-57A4BF0CEE98" + deprecated_by.cpe_name_id, + UUID("A09335E2-B42F-4820-B487-57A4BF0CEE98"), )