From 42827acf91abb3ef09eb413637bbc384ddd5bf12 Mon Sep 17 00:00:00 2001 From: Timo Pollmeier Date: Fri, 13 Dec 2024 09:04:20 +0100 Subject: [PATCH] Add: Tests for CPE match string API --- tests/nvd/__init__.py | 40 +++ tests/nvd/cpe_match/__init__.py | 4 + tests/nvd/cpe_match/test_api.py | 490 +++++++++++++++++++++++++++++ tests/nvd/models/test_cpe_match.py | 112 +++++++ 4 files changed, 646 insertions(+) create mode 100644 tests/nvd/cpe_match/__init__.py create mode 100644 tests/nvd/cpe_match/test_api.py create mode 100644 tests/nvd/models/test_cpe_match.py diff --git a/tests/nvd/__init__.py b/tests/nvd/__init__.py index 8f3fc2090..010ac4297 100644 --- a/tests/nvd/__init__.py +++ b/tests/nvd/__init__.py @@ -55,6 +55,46 @@ def get_cpe_data(update: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: return data +def get_cpe_match_data( + update: Optional[Dict[str, Any]] = None +) -> Dict[str, Any]: + data = { + "cpe_last_modified": "2019-07-22T16:37:38.133", + "created": "2019-06-17T09:16:33.960", + "criteria": "cpe:2.3:a:sun:jre:*:update3:*:*:*:*:*:*", + "last_modified": "2019-06-17T09:16:44.000", + "match_criteria_id": "EAB2C9C2-F685-450B-9980-553966FC3B63", + "matches": [ + { + "cpe_name": "cpe:2.3:a:sun:jre:1.3.0:update3:*:*:*:*:*:*", + "cpe_name_id": "2D284534-DA21-43D5-9D89-07F19AE400EA", + }, + { + "cpe_name": "cpe:2.3:a:sun:jre:1.4.1:update3:*:*:*:*:*:*", + "cpe_name_id": "CE55E1DF-8EA2-41EA-9C51-1BAE728CA094", + }, + { + "cpe_name": "cpe:2.3:a:sun:jre:1.4.2:update3:*:*:*:*:*:*", + "cpe_name_id": "A09C4E47-6548-40C5-8458-5C07C3292C86", + }, + { + "cpe_name": "cpe:2.3:a:sun:jre:1.5.0:update3:*:*:*:*:*:*", + "cpe_name_id": "C484A93A-2677-4501-A6E0-E4ADFFFB549E", + }, + { + "cpe_name": "cpe:2.3:a:sun:jre:1.6.0:update3:*:*:*:*:*:*", + "cpe_name_id": "C518A954-369E-453E-8E17-2AF639150115", + }, + ], + "status": "Active", + "version_end_including": "1.6.0", + } + + if update: + data.update(update) + return data + + def get_cve_change_data( data: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: diff --git a/tests/nvd/cpe_match/__init__.py b/tests/nvd/cpe_match/__init__.py new file mode 100644 index 000000000..fce2b5df1 --- /dev/null +++ b/tests/nvd/cpe_match/__init__.py @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# diff --git a/tests/nvd/cpe_match/test_api.py b/tests/nvd/cpe_match/test_api.py new file mode 100644 index 000000000..1f4353196 --- /dev/null +++ b/tests/nvd/cpe_match/test_api.py @@ -0,0 +1,490 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later +# + +# pylint: disable=line-too-long, arguments-differ, redefined-builtin +# ruff: noqa: E501 + +from datetime import datetime +from typing import Any, Optional +from unittest.mock import MagicMock, patch +from uuid import UUID, uuid4 + +from httpx import AsyncClient, Response + +from pontos.errors import PontosError +from pontos.nvd.api import now +from pontos.nvd.cpe_match.api import MAX_CPE_MATCHES_PER_PAGE, CPEMatchApi +from tests import AsyncMock, IsolatedAsyncioTestCase, aiter, anext +from tests.nvd import get_cpe_match_data + + +def uuid_replace_str(uuid: UUID, iteration: int, number: int) -> str: + id_str = str(uuid).rsplit("-", 2) + return f"{id_str[0]}-{iteration:04}-{number:012}" + + +def uuid_replace(uuid: UUID, iteration: int, number: int) -> UUID: + return UUID(uuid_replace_str(uuid, iteration, number)) + + +def generate_cpe_name(iteration: int, number: int) -> str: + return f"cpe:2.3:a:acme:test-app:1.{iteration-1}.{number-1}:*:*:*:*:*:*:*" + + +def create_cpe_match_response( + match_criteria_id: UUID, + cpe_name_id: UUID, + *, + update: Optional[dict[str, Any]] = None, + results: int = 1, + iteration: int = 1, +) -> MagicMock: + match_strings = [ + { + "match_string": get_cpe_match_data( + { + "match_criteria_id": f"{uuid_replace_str(match_criteria_id, iteration, i)}", + "criteria": generate_cpe_name(iteration, i), + "matches": [ + { + "cpe_name": generate_cpe_name(iteration, i), + "cpe_name_id": f"{uuid_replace_str(cpe_name_id, iteration, i)}", + } + ] + } + ) + } + for i in range(1, results + 1) + ] + + data = { + "match_strings": match_strings, + "results_per_page": results, + } + if update: + data.update(update) + + response = MagicMock(spec=Response) + response.json.return_value = data + return response + +def create_cpe_match_responses( + match_criteria_id: UUID, cpe_name_id: UUID, responses: int = 2, results_per_response: int = 1 +) -> list[MagicMock]: + return [ + create_cpe_match_response( + match_criteria_id=match_criteria_id, + cpe_name_id=cpe_name_id, + update={"total_results": responses * results_per_response}, + results=results_per_response, + iteration=i, + ) + for i in range(1, responses + 1) + ] + + +class CPEMatchApiTestCase(IsolatedAsyncioTestCase): + @patch("pontos.nvd.api.time.monotonic", autospec=True) + @patch("pontos.nvd.api.AsyncClient", spec=AsyncClient) + def setUp(self, async_client: MagicMock, monotonic_mock: MagicMock) -> None: + self.http_client = AsyncMock() + async_client.return_value = self.http_client + monotonic_mock.return_value = 0 + self.api = CPEMatchApi() + + async def test_no_match_criteria_id(self): + with self.assertRaises(PontosError): + await self.api.cpe_match(None) + + async def test_no_match_strings(self): + data = { + "match_strings": [], + "results_per_page": 1, + } + response = MagicMock(spec=Response) + response.json.return_value = data + self.http_client.get.return_value = response + + with self.assertRaises(PontosError): + await self.api.cpe_match("DOES-NOT-EXIST") + + async def test_cpe_match(self): + match_criteria_id = uuid_replace(uuid4(), 1, 1) + cpe_name_id = uuid_replace(uuid4(), 1, 1) + + self.http_client.get.return_value = create_cpe_match_response(match_criteria_id, cpe_name_id) + + cpe_match_string = await self.api.cpe_match(match_criteria_id) + + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={"matchCriteriaId": str(match_criteria_id)}, + ) + + self.assertEqual( + match_criteria_id, + cpe_match_string.match_criteria_id, + ) + self.assertEqual( + generate_cpe_name(1,1), + cpe_match_string.criteria, + ) + self.assertEqual( + cpe_name_id, + cpe_match_string.matches[0].cpe_name_id, + ) + self.assertEqual( + generate_cpe_name(1,1), + cpe_match_string.matches[0].cpe_name, + ) + + @patch("pontos.nvd.cpe_match.api.now", spec=now) + async def test_cpe_matches_last_modified_start_date(self, now_mock: MagicMock): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + + now_mock.return_value = datetime(2019, 8, 30) + self.http_client.get.side_effect = create_cpe_match_responses(match_criteria_id, cpe_name_id) + + it = aiter( + self.api.cpe_matches(last_modified_start_date=datetime(2019, 6, 1)) + ) + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 1, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 0, + "lastModStartDate": "2019-06-01T00:00:00", + "lastModEndDate": "2019-08-30T00:00:00", + "resultsPerPage": MAX_CPE_MATCHES_PER_PAGE, + }, + ) + + self.http_client.get.reset_mock() + + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 2, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 1, + "lastModStartDate": "2019-06-01T00:00:00", + "lastModEndDate": "2019-08-30T00:00:00", + "resultsPerPage": 1, + }, + ) + + with self.assertRaises(StopAsyncIteration): + cpe_match = await anext(it) + + @patch("pontos.nvd.cpe_match.api.now", spec=now) + async def test_cpe_matches_last_modified_end_date(self, now_mock: MagicMock): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + + now_mock.return_value = datetime(2019, 8, 30) + self.http_client.get.side_effect = create_cpe_match_responses(match_criteria_id, cpe_name_id) + + it = aiter( + self.api.cpe_matches(last_modified_end_date=datetime(2019, 8, 1)) + ) + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 1, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 0, + "lastModEndDate": "2019-08-01T00:00:00", + "resultsPerPage": MAX_CPE_MATCHES_PER_PAGE, + }, + ) + + self.http_client.get.reset_mock() + + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 2, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 1, + "lastModEndDate": "2019-08-01T00:00:00", + "resultsPerPage": 1, + }, + ) + + with self.assertRaises(StopAsyncIteration): + cpe_match = await anext(it) + + async def test_cpe_matches_cve_id(self): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + + self.http_client.get.side_effect = create_cpe_match_responses(match_criteria_id, cpe_name_id) + + it = aiter( + self.api.cpe_matches(cve_id="CVE-2010-3574") + ) + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 1, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 0, + "cveId": "CVE-2010-3574", + "resultsPerPage": MAX_CPE_MATCHES_PER_PAGE, + }, + ) + + self.http_client.get.reset_mock() + + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 2, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 1, + "cveId": "CVE-2010-3574", + "resultsPerPage": 1, + }, + ) + + with self.assertRaises(StopAsyncIteration): + cpe_match = await anext(it) + + async def test_cpe_matches_match_string_search(self): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + + self.http_client.get.side_effect = create_cpe_match_responses(match_criteria_id, cpe_name_id) + + it = aiter( + self.api.cpe_matches(match_string_search="cpe:2.3:a:sun:jre:*:*:*:*:*:*:*:*") + ) + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 1, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 0, + "matchStringSearch": "cpe:2.3:a:sun:jre:*:*:*:*:*:*:*:*", + "resultsPerPage": MAX_CPE_MATCHES_PER_PAGE, + }, + ) + + self.http_client.get.reset_mock() + + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 2, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 1, + "matchStringSearch": "cpe:2.3:a:sun:jre:*:*:*:*:*:*:*:*", + "resultsPerPage": 1, + }, + ) + + with self.assertRaises(StopAsyncIteration): + cpe_match = await anext(it) + + async def test_cpe_matches_match_string_search(self): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + + self.http_client.get.side_effect = create_cpe_match_responses(match_criteria_id, cpe_name_id) + + it = aiter( + self.api.cpe_matches(match_string_search="cpe:2.3:a:sun:jre:*:*:*:*:*:*:*:*") + ) + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 1, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 0, + "matchStringSearch": "cpe:2.3:a:sun:jre:*:*:*:*:*:*:*:*", + "resultsPerPage": MAX_CPE_MATCHES_PER_PAGE, + }, + ) + + self.http_client.get.reset_mock() + + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 2, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 1, + "matchStringSearch": "cpe:2.3:a:sun:jre:*:*:*:*:*:*:*:*", + "resultsPerPage": 1, + }, + ) + + with self.assertRaises(StopAsyncIteration): + cpe_match = await anext(it) + + async def test_cpe_matches_request_results(self): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + + self.http_client.get.side_effect = create_cpe_match_responses( + match_criteria_id=match_criteria_id, + cpe_name_id=cpe_name_id, + results_per_response=2, + ) + + it = aiter(self.api.cpe_matches(request_results=10)) + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 1, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 0, + "resultsPerPage": 10, + }, + ) + + self.http_client.get.reset_mock() + cpe_match = await anext(it) + self.assertEqual(uuid_replace(match_criteria_id, 1, 2), cpe_match.match_criteria_id) + self.http_client.get.assert_not_called() + + self.http_client.get.reset_mock() + + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 2, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={}, + params={ + "startIndex": 2, + "resultsPerPage": 2, + }, + ) + + self.http_client.get.reset_mock() + cpe_match = await anext(it) + self.assertEqual(uuid_replace(match_criteria_id, 2, 2), cpe_match.match_criteria_id) + self.http_client.get.assert_not_called() + + with self.assertRaises(Exception): + cpe_match = await anext(it) + + async def test_context_manager(self): + async with self.api: + pass + + self.http_client.__aenter__.assert_awaited_once() + self.http_client.__aexit__.assert_awaited_once() + + +class CPEMatchApiWithTokenTestCase(IsolatedAsyncioTestCase): + @patch("pontos.nvd.api.time.monotonic", autospec=True) + @patch("pontos.nvd.api.AsyncClient", spec=AsyncClient) + def setUp(self, async_client: MagicMock, monotonic_mock: MagicMock) -> None: + self.http_client = AsyncMock() + async_client.return_value = self.http_client + monotonic_mock.return_value = 0 + self.api = CPEMatchApi(token="token123") + + async def test_cpe_matches_request_results_with_token(self): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + + self.http_client.get.side_effect = create_cpe_match_responses( + match_criteria_id=match_criteria_id, + cpe_name_id=cpe_name_id, + results_per_response=2, + ) + + it = aiter(self.api.cpe_matches(request_results=10)) + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 1, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={"apiKey": "token123"}, + params={ + "startIndex": 0, + "resultsPerPage": 10, + }, + ) + + self.http_client.get.reset_mock() + cpe_match = await anext(it) + self.assertEqual(uuid_replace(match_criteria_id, 1, 2), cpe_match.match_criteria_id) + self.http_client.get.assert_not_called() + + self.http_client.get.reset_mock() + + cpe_match = await anext(it) + + self.assertEqual(uuid_replace(match_criteria_id, 2, 1), cpe_match.match_criteria_id) + self.http_client.get.assert_awaited_once_with( + "https://services.nvd.nist.gov/rest/json/cpematch/2.0", + headers={"apiKey": "token123"}, + params={ + "startIndex": 2, + "resultsPerPage": 2, + }, + ) + + self.http_client.get.reset_mock() + cpe_match = await anext(it) + self.assertEqual(uuid_replace(match_criteria_id, 2, 2), cpe_match.match_criteria_id) + self.http_client.get.assert_not_called() + + with self.assertRaises(Exception): + cpe_match = await anext(it) + + @patch("pontos.nvd.api.time.monotonic", autospec=True) + @patch("pontos.nvd.api.asyncio.sleep", autospec=True) + async def test_rate_limit_with_token( + self, + sleep_mock: MagicMock, + monotonic_mock: MagicMock, + ): + match_criteria_id = uuid4() + cpe_name_id = uuid4() + self.http_client.get.side_effect = create_cpe_match_responses(match_criteria_id, cpe_name_id,8) + monotonic_mock.side_effect = [10, 11] + + it = aiter(self.api.cpe_matches()) + await anext(it) + await anext(it) + await anext(it) + await anext(it) + await anext(it) + + sleep_mock.assert_not_called() + + await anext(it) + + sleep_mock.assert_not_called() diff --git a/tests/nvd/models/test_cpe_match.py b/tests/nvd/models/test_cpe_match.py new file mode 100644 index 000000000..6d5b64f6b --- /dev/null +++ b/tests/nvd/models/test_cpe_match.py @@ -0,0 +1,112 @@ +# SPDX-FileCopyrightText: 2024 Greenbone AG +# +# SPDX-License-Identifier: GPL-3.0-or-later + +# pylint: disable=line-too-long +# ruff: noqa: E501 + +import unittest +from datetime import datetime, timezone +from uuid import UUID + +from pontos.nvd.models.cpe_match_string import CPEMatchString, CPEMatch +from tests.nvd import get_cpe_match_data + + +class CPEMatchTestCase(unittest.TestCase): + + def test_required_only(self): + """ + Test the required attributes of a CPEMatchString + """ + data = get_cpe_match_data() + data.__delitem__("matches") + data.__delitem__("version_end_including") + + cpe_match_string = CPEMatchString.from_dict(data) + + self.assertEqual( + UUID("EAB2C9C2-F685-450B-9980-553966FC3B63"), + cpe_match_string.match_criteria_id, + ) + self.assertEqual( + "cpe:2.3:a:sun:jre:*:update3:*:*:*:*:*:*", + cpe_match_string.criteria, + ) + self.assertEqual( + "Active", + cpe_match_string.status, + ) + self.assertEqual( + datetime(2019, 7, 22, 16, 37, 38, 133000, tzinfo=timezone.utc), + cpe_match_string.cpe_last_modified, + ) + self.assertEqual( + datetime(2019, 6, 17, 9, 16, 33, 960000, tzinfo=timezone.utc), + cpe_match_string.created, + ) + self.assertEqual( + datetime(2019, 6, 17, 9, 16, 44, 0, tzinfo=timezone.utc), + cpe_match_string.last_modified, + ) + + self.assertEqual([], cpe_match_string.matches) + + self.assertIsNone(cpe_match_string.version_start_excluding) + self.assertIsNone(cpe_match_string.version_end_excluding) + self.assertIsNone(cpe_match_string.version_start_including) + self.assertIsNone(cpe_match_string.version_end_including) + + def test_matches(self): + """ + Test the matches list of a CPEMatchString + """ + cpe_match_string = CPEMatchString.from_dict(get_cpe_match_data()) + + self.assertEqual(5, len(cpe_match_string.matches)) + + self.assertEqual( + CPEMatch( + "cpe:2.3:a:sun:jre:1.3.0:update3:*:*:*:*:*:*", + UUID("2d284534-da21-43d5-9d89-07f19ae400ea") + ), + cpe_match_string.matches[0] + ) + + self.assertEqual( + CPEMatch( + "cpe:2.3:a:sun:jre:1.6.0:update3:*:*:*:*:*:*", + UUID("c518a954-369e-453e-8e17-2af639150115") + ), + cpe_match_string.matches[-1] + ) + + def test_including_version_limits(self): + """ + Test the including version limits of a CPEMatchString + """ + data = get_cpe_match_data({"version_start_including": "1.3.0"}) + cpe_match_string = CPEMatchString.from_dict(data) + + self.assertEqual("1.3.0", cpe_match_string.version_start_including) + self.assertEqual("1.6.0", cpe_match_string.version_end_including) + self.assertIsNone(cpe_match_string.version_start_excluding) + self.assertIsNone(cpe_match_string.version_end_excluding) + + def test_excluding_version_limits(self): + """ + Test the excluding version limits of a CPEMatchString + """ + data = get_cpe_match_data( + { + "version_start_excluding": "1.2.0", + "version_end_excluding": "1.7.0", + } + ) + data.__delitem__("version_end_including") + cpe_match_string = CPEMatchString.from_dict(data) + + self.assertEqual("1.2.0", cpe_match_string.version_start_excluding) + self.assertEqual("1.7.0", cpe_match_string.version_end_excluding) + self.assertIsNone(cpe_match_string.version_start_including) + self.assertIsNone(cpe_match_string.version_end_including)