From 72492472e49763b02bd856e4c6bf441173cef10c Mon Sep 17 00:00:00 2001 From: Martin Malina Date: Fri, 8 Nov 2024 13:12:06 +0530 Subject: [PATCH] feat(RELEASE-1214): add support for spdx sbom format Build team is moving from cyclonedx to spdx sbom format. To support both formats until the transition is over: * The original upload_rpm_data was copied as upload_rpm_data_cyclonedx to preserve the original behavior for now * upload_rpm_data was modified to work with spdx format Signed-off-by: Martin Malina --- pyxis/test_upload_rpm_data.py | 152 +++++---- pyxis/test_upload_rpm_data_cyclonedx.py | 434 ++++++++++++++++++++++++ pyxis/upload_rpm_data.py | 113 +++--- pyxis/upload_rpm_data_cyclonedx | 1 + pyxis/upload_rpm_data_cyclonedx.py | 321 ++++++++++++++++++ 5 files changed, 881 insertions(+), 140 deletions(-) create mode 100644 pyxis/test_upload_rpm_data_cyclonedx.py create mode 120000 pyxis/upload_rpm_data_cyclonedx create mode 100755 pyxis/upload_rpm_data_cyclonedx.py diff --git a/pyxis/test_upload_rpm_data.py b/pyxis/test_upload_rpm_data.py index d799d0f..36779c9 100644 --- a/pyxis/test_upload_rpm_data.py +++ b/pyxis/test_upload_rpm_data.py @@ -8,8 +8,7 @@ get_image_rpm_data, create_image_rpm_manifest, update_container_content_sets, - load_sbom_components, - check_bom_ref_duplicates, + load_sbom_packages, construct_rpm_items_and_content_sets, ) @@ -18,38 +17,78 @@ SBOM_PATH = "mypath" RPM_MANIFEST_ID = "abcd1234" CONTENT_SETS = ["myrepo1", "myrepo2"] -COMPONENTS = [ +PACKAGES = [ { # all fields - "purl": "pkg:rpm/rhel/pkg1@1-2.el8?arch=x86_64&" - + "upstream=pkg1-1-2.el8.src.rpm&distro=rhel-8.0&repository_id=myrepo1", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:rpm/rhel/pkg1@1-2.el8?arch=x86_64&" + + "upstream=pkg1-1-2.el8.src.rpm&distro=rhel-8.0&repository_id=myrepo1", + } + ] }, { # no version, same repository_id - "purl": "pkg:rpm/rhel/pkg2?arch=noarch&upstream=pkg2-1-2.el8.src.rpm&distro=rhel-8.0" - + "&repository_id=myrepo1", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:rpm/rhel/pkg2?arch=noarch" + + "&upstream=pkg2-1-2.el8.src.rpm&distro=rhel-8.0&repository_id=myrepo1", + } + ] }, { # no architecture, different repository_id - "purl": "pkg:rpm/rhel/pkg3@9-8.el8?upstream=pkg3-9-8.el8.src.rpm&distro=rhel-8.0" - + "&repository_id=myrepo2", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:rpm/rhel/pkg3@9-8.el8?upstream=pkg3-9-8.el8.src.rpm" + + "&distro=rhel-8.0&repository_id=myrepo2", + } + ] }, { # no upstream - "purl": "pkg:rpm/rhel/pkg4@1-2.el8?arch=x86_64&distro=rhel-8.0", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:rpm/rhel/pkg4@1-2.el8?arch=x86_64&distro=rhel-8.0", + } + ] }, { # with RH publisher - "purl": "pkg:rpm/rhel/pkg5?arch=noarch&upstream=pkg5-1-2.el8.src.rpm&distro=rhel-8.0", - "publisher": "Red Hat, Inc.", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:rpm/rhel/pkg5?arch=noarch" + + "&upstream=pkg5-1-2.el8.src.rpm&distro=rhel-8.0", + } + ], + "supplier": "Organization: Red Hat, Inc.", }, { # with other publisher - "purl": "pkg:rpm/rhel/pkg6?arch=noarch&upstream=pkg6-1-2.el8.src.rpm&distro=rhel-8.0", - "publisher": "Blue Shoe, inc.", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:rpm/rhel/pkg6?arch=noarch" + + "&upstream=pkg6-1-2.el8.src.rpm&distro=rhel-8.0", + } + ], + "supplier": "Organization: Blue Shoe, inc.", }, { # not an rpm - "purl": "pkg:golang/./staging/src@(devel)#k8s.io/api", - }, - { # no purl - "bom_ref": "ref", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:golang/./staging/src@(devel)#k8s.io/api", + } + ] }, + {}, # no purl { # with redhat namespace, but no publisher - "purl": "pkg:rpm/redhat/pkg7@1.2.3-4.el9000?arch=noarch", + "externalRefs": [ + { + "referenceType": "purl", + "referenceLocator": "pkg:rpm/redhat/pkg7@1.2.3-4.el9000?arch=noarch", + } + ] }, ] @@ -135,11 +174,11 @@ def test_upload_container_rpm_data_with_retry__fails_http_other( @patch("upload_rpm_data.update_container_content_sets") @patch("upload_rpm_data.create_image_rpm_manifest") @patch("upload_rpm_data.construct_rpm_items_and_content_sets") -@patch("upload_rpm_data.load_sbom_components") +@patch("upload_rpm_data.load_sbom_packages") @patch("upload_rpm_data.get_image_rpm_data") def test_upload_container_rpm_data__success( mock_get_image_rpm_data, - mock_load_sbom_components, + mock_load_sbom_packages, mock_construct_rpm_items_and_content_sets, mock_create_image_rpm_manifest, mock_update_container_content_sets, @@ -154,12 +193,12 @@ def test_upload_container_rpm_data__success( "content_sets": None, "rpm_manifest": None, } - mock_load_sbom_components.return_value = COMPONENTS + mock_load_sbom_packages.return_value = PACKAGES mock_construct_rpm_items_and_content_sets.return_value = ([{"name": "pkg"}], ["myrepo1"]) upload_container_rpm_data(GRAPHQL_API, IMAGE_ID, SBOM_PATH) - mock_construct_rpm_items_and_content_sets.assert_called_once_with(COMPONENTS) + mock_construct_rpm_items_and_content_sets.assert_called_once_with(PACKAGES) mock_create_image_rpm_manifest.assert_called_once_with( GRAPHQL_API, IMAGE_ID, @@ -175,11 +214,11 @@ def test_upload_container_rpm_data__success( @patch("upload_rpm_data.update_container_content_sets") @patch("upload_rpm_data.create_image_rpm_manifest") @patch("upload_rpm_data.construct_rpm_items_and_content_sets") -@patch("upload_rpm_data.load_sbom_components") +@patch("upload_rpm_data.load_sbom_packages") @patch("upload_rpm_data.get_image_rpm_data") def test_upload_container_rpm_data__data_already_exists( mock_get_image_rpm_data, - mock_load_sbom_components, + mock_load_sbom_packages, mock_construct_rpm_items_and_content_sets, mock_create_image_rpm_manifest, mock_update_container_content_sets, @@ -193,13 +232,13 @@ def test_upload_container_rpm_data__data_already_exists( "content_sets": CONTENT_SETS, "rpm_manifest": {"_id": RPM_MANIFEST_ID}, } - mock_load_sbom_components.return_value = COMPONENTS + mock_load_sbom_packages.return_value = PACKAGES mock_construct_rpm_items_and_content_sets.return_value = ([{"name": "pkg"}], CONTENT_SETS) upload_container_rpm_data(GRAPHQL_API, IMAGE_ID, SBOM_PATH) - mock_load_sbom_components.assert_called_once() - mock_construct_rpm_items_and_content_sets.assert_called_once_with(COMPONENTS) + mock_load_sbom_packages.assert_called_once() + mock_construct_rpm_items_and_content_sets.assert_called_once_with(PACKAGES) mock_create_image_rpm_manifest.assert_not_called() mock_update_container_content_sets.assert_not_called() @@ -293,70 +332,37 @@ def test_update_container_content_sets__error(mock_post): @patch("json.load") -@patch("upload_rpm_data.check_bom_ref_duplicates") @patch("builtins.open") -def test_load_sbom_components__success(mock_open, mock_check_bom_ref_duplicates, mock_load): - fake_components = [1, 2, 3, 4] - mock_load.return_value = {"components": fake_components} +def test_load_sbom_packages__success(mock_open, mock_load): + fake_packages = [1, 2, 3, 4] + mock_load.return_value = {"packages": fake_packages} - loaded_components = load_sbom_components(SBOM_PATH) + loaded_packages = load_sbom_packages(SBOM_PATH) mock_load.assert_called_once_with(mock_open.return_value.__enter__.return_value) - mock_check_bom_ref_duplicates.assert_called_once_with(loaded_components) - assert fake_components == loaded_components + assert fake_packages == loaded_packages @patch("json.load") -@patch("upload_rpm_data.check_bom_ref_duplicates") @patch("builtins.open") -def test_load_sbom_components__no_components_key( - mock_open, mock_check_bom_ref_duplicates, mock_load -): +def test_load_sbom_packages__no_components_key(mock_open, mock_load): mock_load.return_value = {} - loaded_components = load_sbom_components(SBOM_PATH) + loaded_components = load_sbom_packages(SBOM_PATH) mock_load.assert_called_once_with(mock_open.return_value.__enter__.return_value) - mock_check_bom_ref_duplicates.assert_called_once_with(loaded_components) assert loaded_components == [] @patch("json.load") -@patch("upload_rpm_data.check_bom_ref_duplicates") @patch("builtins.open") -def test_load_sbom_components__json_load_fails( - mock_open, mock_check_bom_ref_duplicates, mock_load -): +def test_load_sbom_packages__json_load_fails(mock_open, mock_load): mock_load.side_effect = ValueError with pytest.raises(ValueError): - load_sbom_components(SBOM_PATH) + load_sbom_packages(SBOM_PATH) mock_load.assert_called_once_with(mock_open.return_value.__enter__.return_value) - mock_check_bom_ref_duplicates.assert_not_called() - - -def test_check_bom_ref_duplicates__no_duplicates(): - components = [ - {"bom-ref": "a"}, - {"bom-ref": "b"}, - {}, - {"bom-ref": "c"}, - ] - - check_bom_ref_duplicates(components) - - -def test_check_bom_ref_duplicates__duplicates_found(): - components = [ - {"bom-ref": "a"}, - {"bom-ref": "b"}, - {}, - {"bom-ref": "a"}, - ] - - with pytest.raises(ValueError): - check_bom_ref_duplicates(components) def test_construct_rpm_items_and_content_sets__success(): @@ -364,7 +370,7 @@ def test_construct_rpm_items_and_content_sets__success(): and architecture fields are added if present. All unique repository_id values are returned.""" - rpms, content_sets = construct_rpm_items_and_content_sets(COMPONENTS) + rpms, content_sets = construct_rpm_items_and_content_sets(PACKAGES) assert rpms == [ { @@ -426,8 +432,8 @@ def test_construct_rpm_items_and_content_sets__success(): assert content_sets == ["myrepo1", "myrepo2"] -def test_construct_rpm_items_and_content_sets__no_components_result_in_empty_list(): - """An empty list of components results in an empty list of rpms and content_sets""" +def test_construct_rpm_items_and_content_sets__no_packages_result_in_empty_list(): + """An empty list of packages results in an empty list of rpms and content_sets""" rpms, content_sets = construct_rpm_items_and_content_sets([]) assert rpms == [] diff --git a/pyxis/test_upload_rpm_data_cyclonedx.py b/pyxis/test_upload_rpm_data_cyclonedx.py new file mode 100644 index 0000000..9d47315 --- /dev/null +++ b/pyxis/test_upload_rpm_data_cyclonedx.py @@ -0,0 +1,434 @@ +from unittest.mock import patch, Mock +from urllib.error import HTTPError +import pytest + +from upload_rpm_data_cyclonedx import ( + upload_container_rpm_data_with_retry, + upload_container_rpm_data, + get_image_rpm_data, + create_image_rpm_manifest, + update_container_content_sets, + load_sbom_components, + check_bom_ref_duplicates, + construct_rpm_items_and_content_sets, +) + +GRAPHQL_API = "myapiurl" +IMAGE_ID = "123456abcd" +SBOM_PATH = "mypath" +RPM_MANIFEST_ID = "abcd1234" +CONTENT_SETS = ["myrepo1", "myrepo2"] +COMPONENTS = [ + { # all fields + "purl": "pkg:rpm/rhel/pkg1@1-2.el8?arch=x86_64&" + + "upstream=pkg1-1-2.el8.src.rpm&distro=rhel-8.0&repository_id=myrepo1", + }, + { # no version, same repository_id + "purl": "pkg:rpm/rhel/pkg2?arch=noarch&upstream=pkg2-1-2.el8.src.rpm&distro=rhel-8.0" + + "&repository_id=myrepo1", + }, + { # no architecture, different repository_id + "purl": "pkg:rpm/rhel/pkg3@9-8.el8?upstream=pkg3-9-8.el8.src.rpm&distro=rhel-8.0" + + "&repository_id=myrepo2", + }, + { # no upstream + "purl": "pkg:rpm/rhel/pkg4@1-2.el8?arch=x86_64&distro=rhel-8.0", + }, + { # with RH publisher + "purl": "pkg:rpm/rhel/pkg5?arch=noarch&upstream=pkg5-1-2.el8.src.rpm&distro=rhel-8.0", + "publisher": "Red Hat, Inc.", + }, + { # with other publisher + "purl": "pkg:rpm/rhel/pkg6?arch=noarch&upstream=pkg6-1-2.el8.src.rpm&distro=rhel-8.0", + "publisher": "Blue Shoe, inc.", + }, + { # not an rpm + "purl": "pkg:golang/./staging/src@(devel)#k8s.io/api", + }, + { # no purl + "bom_ref": "ref", + }, + { # with redhat namespace, but no publisher + "purl": "pkg:rpm/redhat/pkg7@1.2.3-4.el9000?arch=noarch", + }, +] + + +@patch("upload_rpm_data_cyclonedx.upload_container_rpm_data") +def test_upload_container_rpm_data_with_retry__success(mock_upload_container_rpm_data): + """upload_container_rpm_data succeeds on first attempt""" + upload_container_rpm_data_with_retry(GRAPHQL_API, IMAGE_ID, SBOM_PATH) + + mock_upload_container_rpm_data.assert_called_once_with(GRAPHQL_API, IMAGE_ID, SBOM_PATH) + + +@patch("upload_rpm_data_cyclonedx.upload_container_rpm_data") +def test_upload_container_rpm_data_with_retry__success_after_one_attempt( + mock_upload_container_rpm_data, +): + """upload_container_rpm_data succeeds after one retry""" + mock_upload_container_rpm_data.side_effect = [RuntimeError("error"), None] + + upload_container_rpm_data_with_retry(GRAPHQL_API, IMAGE_ID, SBOM_PATH, backoff_factor=0) + + assert mock_upload_container_rpm_data.call_count == 2 + + +@patch("upload_rpm_data_cyclonedx.upload_container_rpm_data") +def test_upload_container_rpm_data_with_retry__fails_runtime( + mock_upload_container_rpm_data, +): + """ + upload_container_rpm_data fails constantly with RuntimeError, + so the retry eventually fails + """ + mock_upload_container_rpm_data.side_effect = RuntimeError("error") + + with pytest.raises(RuntimeError): + upload_container_rpm_data_with_retry( + GRAPHQL_API, IMAGE_ID, SBOM_PATH, retries=2, backoff_factor=0 + ) + + assert mock_upload_container_rpm_data.call_count == 2 + + +@patch("upload_rpm_data_cyclonedx.upload_container_rpm_data") +def test_upload_container_rpm_data_with_retry__fails_http_504( + mock_upload_container_rpm_data, +): + """ + upload_container_rpm_data fails constantly with HTTPError with code 504, + so the retry eventually fails + """ + mock_upload_container_rpm_data.side_effect = HTTPError( + "http://example.com", 504, "Internal Error", {}, None + ) + + with pytest.raises(HTTPError): + upload_container_rpm_data_with_retry( + GRAPHQL_API, IMAGE_ID, SBOM_PATH, retries=2, backoff_factor=0 + ) + + assert mock_upload_container_rpm_data.call_count == 2 + + +@patch("upload_rpm_data_cyclonedx.upload_container_rpm_data") +def test_upload_container_rpm_data_with_retry__fails_http_other( + mock_upload_container_rpm_data, +): + """ + upload_container_rpm_data fails with HTTPError code other than 504, + so it fails without retry + """ + mock_upload_container_rpm_data.side_effect = HTTPError( + "http://example.com", 404, "Internal Error", {}, None + ) + + with pytest.raises(HTTPError): + upload_container_rpm_data_with_retry( + GRAPHQL_API, IMAGE_ID, SBOM_PATH, retries=2, backoff_factor=0 + ) + + assert mock_upload_container_rpm_data.call_count == 1 + + +@patch("upload_rpm_data_cyclonedx.update_container_content_sets") +@patch("upload_rpm_data_cyclonedx.create_image_rpm_manifest") +@patch("upload_rpm_data_cyclonedx.construct_rpm_items_and_content_sets") +@patch("upload_rpm_data_cyclonedx.load_sbom_components") +@patch("upload_rpm_data_cyclonedx.get_image_rpm_data") +def test_upload_container_rpm_data__success( + mock_get_image_rpm_data, + mock_load_sbom_components, + mock_construct_rpm_items_and_content_sets, + mock_create_image_rpm_manifest, + mock_update_container_content_sets, +): + """ + Basic use case: + RPM Manifest does not exist and is successfully created; + content_sets are updated as well. + """ + mock_get_image_rpm_data.return_value = { + "_id": IMAGE_ID, + "content_sets": None, + "rpm_manifest": None, + } + mock_load_sbom_components.return_value = COMPONENTS + mock_construct_rpm_items_and_content_sets.return_value = ([{"name": "pkg"}], ["myrepo1"]) + + upload_container_rpm_data(GRAPHQL_API, IMAGE_ID, SBOM_PATH) + + mock_construct_rpm_items_and_content_sets.assert_called_once_with(COMPONENTS) + mock_create_image_rpm_manifest.assert_called_once_with( + GRAPHQL_API, + IMAGE_ID, + [{"name": "pkg"}], + ) + mock_update_container_content_sets.assert_called_once_with( + GRAPHQL_API, + IMAGE_ID, + ["myrepo1"], + ) + + +@patch("upload_rpm_data_cyclonedx.update_container_content_sets") +@patch("upload_rpm_data_cyclonedx.create_image_rpm_manifest") +@patch("upload_rpm_data_cyclonedx.construct_rpm_items_and_content_sets") +@patch("upload_rpm_data_cyclonedx.load_sbom_components") +@patch("upload_rpm_data_cyclonedx.get_image_rpm_data") +def test_upload_container_rpm_data__data_already_exists( + mock_get_image_rpm_data, + mock_load_sbom_components, + mock_construct_rpm_items_and_content_sets, + mock_create_image_rpm_manifest, + mock_update_container_content_sets, +): + """ + RPM Manifest and content sets already exists so the function + returns without creating or updating anything + """ + mock_get_image_rpm_data.return_value = { + "_id": IMAGE_ID, + "content_sets": CONTENT_SETS, + "rpm_manifest": {"_id": RPM_MANIFEST_ID}, + } + mock_load_sbom_components.return_value = COMPONENTS + mock_construct_rpm_items_and_content_sets.return_value = ([{"name": "pkg"}], CONTENT_SETS) + + upload_container_rpm_data(GRAPHQL_API, IMAGE_ID, SBOM_PATH) + + mock_load_sbom_components.assert_called_once() + mock_construct_rpm_items_and_content_sets.assert_called_once_with(COMPONENTS) + mock_create_image_rpm_manifest.assert_not_called() + mock_update_container_content_sets.assert_not_called() + + +def generate_pyxis_response(query_name, data=None, error=False): + response_json = { + "data": { + query_name: { + "data": data, + "error": None, + } + } + } + if error: + response_json["data"][query_name]["error"] = {"detail": "Major failure!"} + response = Mock() + response.json.return_value = response_json + + return response + + +@patch("pyxis.post") +def test_get_image_rpm_data__success(mock_post): + """The Pyxis query is called and the image data is returned""" + image = { + "_id": IMAGE_ID, + "content_sets": CONTENT_SETS, + "rpm_manifest": { + "_id": RPM_MANIFEST_ID, + }, + } + mock_post.side_effect = [generate_pyxis_response("get_image", image)] + + image = get_image_rpm_data(GRAPHQL_API, IMAGE_ID) + + assert image["rpm_manifest"]["_id"] == RPM_MANIFEST_ID + assert image["content_sets"] == CONTENT_SETS + assert mock_post.call_count == 1 + + +@patch("pyxis.post") +def test_get_image_rpm_data__error(mock_post): + mock_post.return_value = generate_pyxis_response("get_image", error=True) + + with pytest.raises(RuntimeError): + get_image_rpm_data(GRAPHQL_API, IMAGE_ID) + + mock_post.assert_called_once() + + +@patch("pyxis.post") +def test_create_image_rpm_manifest__success(mock_post): + mock_post.return_value = generate_pyxis_response( + "create_image_rpm_manifest", {"_id": RPM_MANIFEST_ID} + ) + + id = create_image_rpm_manifest(GRAPHQL_API, IMAGE_ID, []) + + assert id == RPM_MANIFEST_ID + mock_post.assert_called_once() + + +@patch("pyxis.post") +def test_create_image_rpm_manifest__error(mock_post): + mock_post.return_value = generate_pyxis_response("create_image_rpm_manifest", error=True) + + with pytest.raises(RuntimeError): + create_image_rpm_manifest(GRAPHQL_API, IMAGE_ID, []) + + mock_post.assert_called_once() + + +@patch("pyxis.post") +def test_update_container_content_sets__success(mock_post): + mock_post.return_value = generate_pyxis_response("update_image", {"_id": IMAGE_ID}) + + id = update_container_content_sets(GRAPHQL_API, IMAGE_ID, CONTENT_SETS) + + assert id == IMAGE_ID + mock_post.assert_called_once() + + +@patch("pyxis.post") +def test_update_container_content_sets__error(mock_post): + mock_post.return_value = generate_pyxis_response("update_image", error=True) + + with pytest.raises(RuntimeError): + update_container_content_sets(GRAPHQL_API, IMAGE_ID, CONTENT_SETS) + + mock_post.assert_called_once() + + +@patch("json.load") +@patch("upload_rpm_data_cyclonedx.check_bom_ref_duplicates") +@patch("builtins.open") +def test_load_sbom_components__success(mock_open, mock_check_bom_ref_duplicates, mock_load): + fake_components = [1, 2, 3, 4] + mock_load.return_value = {"components": fake_components} + + loaded_components = load_sbom_components(SBOM_PATH) + + mock_load.assert_called_once_with(mock_open.return_value.__enter__.return_value) + mock_check_bom_ref_duplicates.assert_called_once_with(loaded_components) + assert fake_components == loaded_components + + +@patch("json.load") +@patch("upload_rpm_data_cyclonedx.check_bom_ref_duplicates") +@patch("builtins.open") +def test_load_sbom_components__no_components_key( + mock_open, mock_check_bom_ref_duplicates, mock_load +): + mock_load.return_value = {} + + loaded_components = load_sbom_components(SBOM_PATH) + + mock_load.assert_called_once_with(mock_open.return_value.__enter__.return_value) + mock_check_bom_ref_duplicates.assert_called_once_with(loaded_components) + assert loaded_components == [] + + +@patch("json.load") +@patch("upload_rpm_data_cyclonedx.check_bom_ref_duplicates") +@patch("builtins.open") +def test_load_sbom_components__json_load_fails( + mock_open, mock_check_bom_ref_duplicates, mock_load +): + mock_load.side_effect = ValueError + + with pytest.raises(ValueError): + load_sbom_components(SBOM_PATH) + + mock_load.assert_called_once_with(mock_open.return_value.__enter__.return_value) + mock_check_bom_ref_duplicates.assert_not_called() + + +def test_check_bom_ref_duplicates__no_duplicates(): + components = [ + {"bom-ref": "a"}, + {"bom-ref": "b"}, + {}, + {"bom-ref": "c"}, + ] + + check_bom_ref_duplicates(components) + + +def test_check_bom_ref_duplicates__duplicates_found(): + components = [ + {"bom-ref": "a"}, + {"bom-ref": "b"}, + {}, + {"bom-ref": "a"}, + ] + + with pytest.raises(ValueError): + check_bom_ref_duplicates(components) + + +def test_construct_rpm_items_and_content_sets__success(): + """Only rpm purls are added, the version, release, + and architecture fields are added if present. + All unique repository_id values are returned.""" + + rpms, content_sets = construct_rpm_items_and_content_sets(COMPONENTS) + + assert rpms == [ + { + "name": "pkg1", + "summary": "pkg1-1-2.el8.x86_64", + "nvra": "pkg1-1-2.el8.x86_64", + "version": "1", + "release": "2.el8", + "architecture": "x86_64", + "srpm_name": "pkg1-1-2.el8.src.rpm", + }, + { + "name": "pkg2", + "summary": "pkg2", + "architecture": "noarch", + "srpm_name": "pkg2-1-2.el8.src.rpm", + }, + { + "name": "pkg3", + "summary": "pkg3-9-8.el8.noarch", + "nvra": "pkg3-9-8.el8.noarch", + "version": "9", + "release": "8.el8", + "architecture": "noarch", + "srpm_name": "pkg3-9-8.el8.src.rpm", + }, + { + "name": "pkg4", + "summary": "pkg4-1-2.el8.x86_64", + "nvra": "pkg4-1-2.el8.x86_64", + "version": "1", + "release": "2.el8", + "architecture": "x86_64", + }, + { + "name": "pkg5", + "gpg": "199e2f91fd431d51", + "summary": "pkg5", + "architecture": "noarch", + "srpm_name": "pkg5-1-2.el8.src.rpm", + }, + { + "name": "pkg6", + "summary": "pkg6", + "architecture": "noarch", + "srpm_name": "pkg6-1-2.el8.src.rpm", + }, + { + "name": "pkg7", + "gpg": "199e2f91fd431d51", + "summary": "pkg7-1.2.3-4.el9000.noarch", + "release": "4.el9000", + "version": "1.2.3", + "architecture": "noarch", + "nvra": "pkg7-1.2.3-4.el9000.noarch", + }, + ] + + assert content_sets == ["myrepo1", "myrepo2"] + + +def test_construct_rpm_items_and_content_sets__no_components_result_in_empty_list(): + """An empty list of components results in an empty list of rpms and content_sets""" + rpms, content_sets = construct_rpm_items_and_content_sets([]) + + assert rpms == [] + assert content_sets == [] diff --git a/pyxis/upload_rpm_data.py b/pyxis/upload_rpm_data.py index 6837e1b..1f1e725 100755 --- a/pyxis/upload_rpm_data.py +++ b/pyxis/upload_rpm_data.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """Upload rpm manifest to Pyxis -This script will take Pyxis image ID and an sbom cyclonedx file +This script will take Pyxis image ID and an sbom spdx file on the input. It will inspect the sbom for the rpms and then push data into Pyxis. There are two separate items that will be pushed: @@ -64,10 +64,10 @@ def upload_container_rpm_data_with_retry( def upload_container_rpm_data(graphql_api: str, image_id: str, sbom_path: str): """Upload a Container Image RPM Manifest and content sets to Pyxis""" - sbom_components = load_sbom_components(sbom_path) - LOGGER.info(f"Loaded {len(sbom_components)} components from sbom file.") + sbom_packages = load_sbom_packages(sbom_path) + LOGGER.info(f"Loaded {len(sbom_packages)} packages from sbom file.") - rpms, content_sets = construct_rpm_items_and_content_sets(sbom_components) + rpms, content_sets = construct_rpm_items_and_content_sets(sbom_packages) image = get_image_rpm_data(graphql_api, image_id) @@ -198,55 +198,31 @@ def update_container_content_sets(graphql_api: str, image_id: str, content_sets: return data["update_image"]["data"]["_id"] -def load_sbom_components(sbom_path: str) -> list[dict]: - """Open sbom file, load components and return them +def load_sbom_packages(sbom_path: str) -> list[dict]: + """Open sbom file, load packages and return them If unable to open and load the json, raise an exception. - If there are duplicate bom-ref strings in the components, - raise an exception. """ try: with open(sbom_path) as f: sbom = json.load(f) - components = sbom["components"] if "components" in sbom else [] + packages = sbom["packages"] if "packages" in sbom else [] except Exception: - LOGGER.error("Unable to load components from sbom file") + LOGGER.error("Unable to load packages from sbom file") raise - check_bom_ref_duplicates(components) - - return components - - -def check_bom_ref_duplicates(components: list[dict]): - """Check if any two components use the same bom-ref string - - bom-ref is not required, but has to be unique for - a given sbom. In most cases it is defined. - Pyxis team suggested we at least check this, - since Pyxis has no checks for component uniqueness. - """ - bom_refs = [c["bom-ref"] for c in components if c.get("bom-ref") is not None] - seen = set() - for x in bom_refs: - if x in seen: - LOGGER.error(f"Duplicate bom-ref detected: {x}") - msg = "Invalid sbom file. bom-ref must to be unique." - LOGGER.error(msg) - raise ValueError(msg) - else: - seen.add(x) + return packages def construct_rpm_items_and_content_sets( - components: list[dict], + packages: list[dict], ) -> tuple[list[dict], list[str]]: - """Create RpmsItems object and content_sets from components for Pyxis. + """Create RpmsItems object and content_sets from packages for Pyxis. This function creates two items: 1. A list of RpmsItems dicts. There will be - one RpmsItems per rpm component. A list is then formed of them + one RpmsItems per rpm package. A list is then formed of them and returned to be used in a containerImageRPMManifest. 2. A list of unique content set strings to be saved in the ContainerImage.content_sets @@ -254,37 +230,40 @@ def construct_rpm_items_and_content_sets( """ rpms_items = [] content_sets = set() - for component in components: - if "purl" in component: - purl_dict = PackageURL.from_string(component["purl"]).to_dict() - if purl_dict["type"] == "rpm": - rpm_item = { - "name": purl_dict["name"], - "summary": purl_dict["name"], - "architecture": purl_dict["qualifiers"].get("arch", "noarch"), - } - if purl_dict["version"] is not None: - rpm_item["version"] = purl_dict["version"].split("-")[0] - rpm_item["release"] = purl_dict["version"].split("-")[1] - rpm_item["nvra"] = ( - f"{rpm_item['name']}-{purl_dict['version']}.{rpm_item['architecture']}" - ) - rpm_item["summary"] = rpm_item["nvra"] - if "upstream" in purl_dict["qualifiers"]: - rpm_item["srpm_name"] = purl_dict["qualifiers"]["upstream"] - - # XXX - temporary https://issues.redhat.com/browse/KONFLUX-4292 - # Undo this in https://issues.redhat.com/browse/KONFLUX-4175 - if ( - component.get("publisher") == "Red Hat, Inc." - or purl_dict["namespace"] == "redhat" - ): - rpm_item["gpg"] = "199e2f91fd431d51" - - rpms_items.append(rpm_item) - - if "repository_id" in purl_dict["qualifiers"]: - content_sets.add(purl_dict["qualifiers"]["repository_id"]) + for package in packages: + for externalRef in package.get("externalRefs", []): + if externalRef.get("referenceType") != "purl": + continue + purl_dict = PackageURL.from_string(externalRef["referenceLocator"]).to_dict() + if purl_dict["type"] != "rpm": + continue + rpm_item = { + "name": purl_dict["name"], + "summary": purl_dict["name"], + "architecture": purl_dict["qualifiers"].get("arch", "noarch"), + } + if purl_dict["version"] is not None: + rpm_item["version"] = purl_dict["version"].split("-")[0] + rpm_item["release"] = purl_dict["version"].split("-")[1] + rpm_item["nvra"] = ( + f"{rpm_item['name']}-{purl_dict['version']}.{rpm_item['architecture']}" + ) + rpm_item["summary"] = rpm_item["nvra"] + if "upstream" in purl_dict["qualifiers"]: + rpm_item["srpm_name"] = purl_dict["qualifiers"]["upstream"] + + # XXX - temporary https://issues.redhat.com/browse/KONFLUX-4292 + # Undo this in https://issues.redhat.com/browse/KONFLUX-4175 + if ( + package.get("supplier") == "Organization: Red Hat, Inc." + or purl_dict["namespace"] == "redhat" + ): + rpm_item["gpg"] = "199e2f91fd431d51" + + rpms_items.append(rpm_item) + + if "repository_id" in purl_dict["qualifiers"]: + content_sets.add(purl_dict["qualifiers"]["repository_id"]) return rpms_items, sorted(content_sets) diff --git a/pyxis/upload_rpm_data_cyclonedx b/pyxis/upload_rpm_data_cyclonedx new file mode 120000 index 0000000..69026c3 --- /dev/null +++ b/pyxis/upload_rpm_data_cyclonedx @@ -0,0 +1 @@ +upload_rpm_data_cyclonedx.py \ No newline at end of file diff --git a/pyxis/upload_rpm_data_cyclonedx.py b/pyxis/upload_rpm_data_cyclonedx.py new file mode 100755 index 0000000..6837e1b --- /dev/null +++ b/pyxis/upload_rpm_data_cyclonedx.py @@ -0,0 +1,321 @@ +#!/usr/bin/env python3 +"""Upload rpm manifest to Pyxis + +This script will take Pyxis image ID and an sbom cyclonedx file +on the input. It will inspect the sbom for the rpms and then push +data into Pyxis. There are two separate items that will be pushed: + +1. RPM Manifest object +If an RPM Manifest already exists for the container +image, nothing is done as we assume it was already pushed by this +script. + +2. content_sets field of ContainerImage object + +Required env vars: +PYXIS_KEY_PATH +PYXIS_CERT_PATH + +Optional env vars: +PYXIS_GRAPHQL_API +""" +import argparse +import json +import logging +import string +import os +from pathlib import Path +import time +from urllib.error import HTTPError +from packageurl import PackageURL + +import pyxis + +LOGGER = logging.getLogger("upload_rpm_data") + + +def upload_container_rpm_data_with_retry( + graphql_api: str, + image_id: str, + sbom_path: str, + retries: int = 3, + backoff_factor: float = 5.0, +): + """Call the upload_container_rpm_data function with retries""" + last_err = RuntimeError() + for attempt in range(retries): + try: + time.sleep(backoff_factor * attempt) + upload_container_rpm_data(graphql_api, image_id, sbom_path) + return + except RuntimeError as e: + LOGGER.warning(f"Attempt {attempt+1} failed.") + last_err = e + except HTTPError as e: + if e.code == 504: + LOGGER.warning(f"Attempt {attempt+1} failed with HTTPError code 504.") + last_err = e + else: + raise e + LOGGER.error("Out of attempts. Raising the error.") + raise last_err + + +def upload_container_rpm_data(graphql_api: str, image_id: str, sbom_path: str): + """Upload a Container Image RPM Manifest and content sets to Pyxis""" + + sbom_components = load_sbom_components(sbom_path) + LOGGER.info(f"Loaded {len(sbom_components)} components from sbom file.") + + rpms, content_sets = construct_rpm_items_and_content_sets(sbom_components) + + image = get_image_rpm_data(graphql_api, image_id) + + if image["rpm_manifest"] is not None and "_id" in image["rpm_manifest"]: + # We assume that if the RPM Manifest already exists, it is accurate as the + # entire object is added in one request. + LOGGER.info("RPM manifest already exists for ContainerImage. Skipping...") + rpm_manifest_id = image["rpm_manifest"]["_id"] + else: + rpm_manifest_id = create_image_rpm_manifest(graphql_api, image_id, rpms) + LOGGER.info(f"RPM manifest ID: {rpm_manifest_id}") + + if image["content_sets"] is not None: + LOGGER.info( + f"Content sets for the image are already set, skipping: {image['content_sets']}" + ) + elif not content_sets: + LOGGER.info( + "No content sets found in the sbom, skipping update of " + "ContainerImage.content_sets field in Pyxis" + ) + else: + LOGGER.info(f"Updating ContainerImage.content_sets field in Pyxis to: {content_sets}") + update_container_content_sets(graphql_api, image_id, content_sets) + + +def parse_arguments() -> argparse.Namespace: # pragma: no cover + """Parse CLI arguments + + :return: Dictionary of parsed arguments + """ + + parser = argparse.ArgumentParser(description="Upload RPM data to Pyxis via graphql") + + parser.add_argument( + "--pyxis-graphql-api", + default=os.environ.get("PYXIS_GRAPHQL_API", "https://graphql-pyxis.api.redhat.com/"), + help="Pyxis Graphql endpoint.", + ) + parser.add_argument( + "--image-id", + help="Pyxis container image ID. If omitted, sbom filename is used", + ) + parser.add_argument("--sbom-path", help="Path to the sbom file", required=True) + parser.add_argument("--verbose", "-v", action="store_true", help="Verbose output") + parser.add_argument( + "--retry", + "-r", + action="store_true", + help="If set, retry the upload in case it fails", + ) + return parser.parse_args() + + +def get_image_rpm_data(graphql_api: str, image_id: str) -> dict: + """Get the Image's RPM Manifest id and content sets from Pyxis using GraphQL API + + This function uses the get_image graphql query to get the rpm_manifest + id and content sets. + """ + query = """ +query ($id: ObjectIDFilterScalar!) { + get_image(id: $id) { + data { + _id + rpm_manifest { + _id + } + content_sets + } + error { + status + detail + } + } +} + """ + variables = {"id": image_id} + body = {"query": query, "variables": variables} + + data = pyxis.graphql_query(graphql_api, body) + image = data["get_image"]["data"] + + return image + + +def create_image_rpm_manifest(graphql_api: str, image_id: str, rpms: list) -> str: + """Create ContainerImageRPMManifest object in Pyxis using GraphQL API""" + mutation = """ +mutation ($id: String!, $input: ContainerImageRPMManifestInput!) { + create_image_rpm_manifest(id: $id, input: $input) { + data { + _id + } + error { + detail + } + } +} +""" + variables = {"id": "konflux-" + image_id, "input": {"image_id": image_id, "rpms": rpms}} + body = {"query": mutation, "variables": variables} + + data = pyxis.graphql_query(graphql_api, body) + + return data["create_image_rpm_manifest"]["data"]["_id"] + + +def update_container_content_sets(graphql_api: str, image_id: str, content_sets: list): + """Update ContainerImage.content_sets field in Pyxis using GraphQL API""" + mutation = """ +mutation ($id: ObjectIDFilterScalar!, $input: ContainerImageInput!) { + update_image(id: $id, input: $input) { + data { + _id + } + error { + detail + } + } +} +""" + variables = {"id": image_id, "input": {"content_sets": content_sets}} + body = {"query": mutation, "variables": variables} + + data = pyxis.graphql_query(graphql_api, body) + + return data["update_image"]["data"]["_id"] + + +def load_sbom_components(sbom_path: str) -> list[dict]: + """Open sbom file, load components and return them + + If unable to open and load the json, raise an exception. + If there are duplicate bom-ref strings in the components, + raise an exception. + """ + try: + with open(sbom_path) as f: + sbom = json.load(f) + components = sbom["components"] if "components" in sbom else [] + except Exception: + LOGGER.error("Unable to load components from sbom file") + raise + + check_bom_ref_duplicates(components) + + return components + + +def check_bom_ref_duplicates(components: list[dict]): + """Check if any two components use the same bom-ref string + + bom-ref is not required, but has to be unique for + a given sbom. In most cases it is defined. + Pyxis team suggested we at least check this, + since Pyxis has no checks for component uniqueness. + """ + bom_refs = [c["bom-ref"] for c in components if c.get("bom-ref") is not None] + seen = set() + for x in bom_refs: + if x in seen: + LOGGER.error(f"Duplicate bom-ref detected: {x}") + msg = "Invalid sbom file. bom-ref must to be unique." + LOGGER.error(msg) + raise ValueError(msg) + else: + seen.add(x) + + +def construct_rpm_items_and_content_sets( + components: list[dict], +) -> tuple[list[dict], list[str]]: + """Create RpmsItems object and content_sets from components for Pyxis. + + This function creates two items: + + 1. A list of RpmsItems dicts. There will be + one RpmsItems per rpm component. A list is then formed of them + and returned to be used in a containerImageRPMManifest. + + 2. A list of unique content set strings to be saved in the ContainerImage.content_sets + field in Pyxis + """ + rpms_items = [] + content_sets = set() + for component in components: + if "purl" in component: + purl_dict = PackageURL.from_string(component["purl"]).to_dict() + if purl_dict["type"] == "rpm": + rpm_item = { + "name": purl_dict["name"], + "summary": purl_dict["name"], + "architecture": purl_dict["qualifiers"].get("arch", "noarch"), + } + if purl_dict["version"] is not None: + rpm_item["version"] = purl_dict["version"].split("-")[0] + rpm_item["release"] = purl_dict["version"].split("-")[1] + rpm_item["nvra"] = ( + f"{rpm_item['name']}-{purl_dict['version']}.{rpm_item['architecture']}" + ) + rpm_item["summary"] = rpm_item["nvra"] + if "upstream" in purl_dict["qualifiers"]: + rpm_item["srpm_name"] = purl_dict["qualifiers"]["upstream"] + + # XXX - temporary https://issues.redhat.com/browse/KONFLUX-4292 + # Undo this in https://issues.redhat.com/browse/KONFLUX-4175 + if ( + component.get("publisher") == "Red Hat, Inc." + or purl_dict["namespace"] == "redhat" + ): + rpm_item["gpg"] = "199e2f91fd431d51" + + rpms_items.append(rpm_item) + + if "repository_id" in purl_dict["qualifiers"]: + content_sets.add(purl_dict["qualifiers"]["repository_id"]) + + return rpms_items, sorted(content_sets) + + +def main(): # pragma: no cover + """Main func""" + args = parse_arguments() + log_level = logging.DEBUG if args.verbose else logging.INFO + pyxis.setup_logger(level=log_level) + + if not os.path.isfile(args.sbom_path): + msg = f"sbom file does not exist: {args.sbom_path}" + LOGGER.error(msg) + raise RuntimeError(msg) + + # Use sbom filename (minus extension) for image_id if not provided + if args.image_id is None: + image_id = Path(args.sbom_path).stem + else: + image_id = args.image_id + if not all(c in string.hexdigits for c in image_id): + raise ValueError(f"image-id is invalid, hexadecimal value is expected: {image_id}") + LOGGER.debug(f"Image ID: {image_id}") + + LOGGER.debug(f"Pyxis GraphQL API: {args.pyxis_graphql_api}") + + if args.retry: + upload_container_rpm_data_with_retry(args.pyxis_graphql_api, image_id, args.sbom_path) + else: + upload_container_rpm_data(args.pyxis_graphql_api, image_id, args.sbom_path) + + +if __name__ == "__main__": # pragma: no cover + main()