From e380f33f58c91e2e0a1046a540d031596069b3ee Mon Sep 17 00:00:00 2001 From: Avi Biton Date: Mon, 9 Sep 2024 12:00:07 +0300 Subject: [PATCH] chore(RHTAPWATCH-1275): Remove support for snapshots Reove the support for snapshotsget an input of image. Adjust the output to be of a single image Signed-off-by: Avi Biton --- tests/test_rpm_verifier.py | 347 ++++++++++++++++++++---------------- verify_rpms/rpm_verifier.py | 111 ++++++------ 2 files changed, 250 insertions(+), 208 deletions(-) diff --git a/tests/test_rpm_verifier.py b/tests/test_rpm_verifier.py index deef146..79561a3 100644 --- a/tests/test_rpm_verifier.py +++ b/tests/test_rpm_verifier.py @@ -1,10 +1,10 @@ """Test rpm_verifier.py end-to-end""" from pathlib import Path -from subprocess import CalledProcessError +from subprocess import CalledProcessError, run from textwrap import dedent from typing import Callable -from unittest.mock import MagicMock, call, create_autospec +from unittest.mock import MagicMock, create_autospec import pytest from pytest import MonkeyPatch @@ -15,178 +15,182 @@ ProcessedImage, generate_output, get_rpmdb, + get_rpms_data, get_unsigned_rpms, - parse_image_input, ) +def test_get_rpmdb(tmp_path: Path) -> None: + """Test get_rpmdb""" + image = "my-image" + mock_runner = create_autospec(run) + out = get_rpmdb( + container_image=image, + target_dir=tmp_path, + runner=mock_runner, + ) + mock_runner.assert_called_once() + assert mock_runner.call_args.args[0] == [ + "oc", + "image", + "extract", + "my-image", + "--path", + f"/var/lib/rpm/:{tmp_path}", + ] + assert mock_runner.call_count == 1 + assert out == tmp_path + + @pytest.mark.parametrize( - ("processed_images", "expected_print", "expected_failures_exist"), + ("test_input", "expected"), [ pytest.param( - [ProcessedImage(image="i1", unsigned_rpms=[])], - "No unsigned RPMs found.", - False, - id="No unsigned RPMs + no errors. Do not report failures", - ), - pytest.param( - [ProcessedImage(image="i1", unsigned_rpms=["my-rpm"])], dedent( """ - Found unsigned RPMs: - i1: my-rpm + libssh-config-0.9.6-10.el8_8 RSA/SHA256, Tue 6 May , Key ID 1234567890 + python39-twisted-23.10.0-1.el8ap (none) + libmodulemd-2.13.0-1.el8 RSA/SHA256, Wed 18 Aug , Key ID 1234567890 + gpg-pubkey-d4082792-5b32db75 (none) """ ).strip(), - True, - id="Unsigned RPM + no errors. Report failures", + [ + "libssh-config-0.9.6-10.el8_8 RSA/SHA256, Tue 6 May , Key ID 1234567890", + "python39-twisted-23.10.0-1.el8ap (none)", + "libmodulemd-2.13.0-1.el8 RSA/SHA256, Wed 18 Aug , Key ID 1234567890", + "gpg-pubkey-d4082792-5b32db75 (none)", + ], + id="Mix of signed and unsigned", ), + pytest.param("", [], id="Empty list"), + ], +) +def test_get_rpms_data(test_input: list[str], expected: list[str]) -> None: + """Test get_rpms_data""" + mock_runner = create_autospec(run) + mock_runner.return_value.stdout = test_input + result = get_rpms_data(rpmdb=Path("rpmdb_folder"), runner=mock_runner) + mock_runner.assert_called_once() + assert mock_runner.call_args.args[0] == [ + "rpm", + "-qa", + "--qf", + "%{NAME}-%{VERSION}-%{RELEASE} %{SIGPGP:pgpsig}\n", + "--dbpath", + "rpmdb_folder", + ] + assert result == expected + + +@pytest.mark.parametrize( + ("test_input", "expected"), + [ pytest.param( [ - ProcessedImage(image="i1", unsigned_rpms=["my-rpm", "another-rpm"]), - ProcessedImage(image="i2", unsigned_rpms=[]), + "libssh-config-0.9.6-10.el8_8 RSA/SHA256, Tue 6 May , Key ID 1234567890", + "python39-twisted-23.10.0-1.el8ap (none)", + "libmodulemd-2.13.0-1.el8 RSA/SHA256, Wed 18 Aug , Key ID 1234567890", + "gpg-pubkey-d4082792-5b32db75 (none)", ], - dedent( - """ - Found unsigned RPMs: - i1: my-rpm, another-rpm - """ - ).strip(), - True, - id="An image with multiple unsigned and another without. Report failure", + ["python39-twisted-23.10.0-1.el8ap"], + id="Mix of signed and unsigned", ), pytest.param( [ - ProcessedImage( - image="i1", unsigned_rpms=[], error="Failed to run command" - ), - ProcessedImage(image="i2", unsigned_rpms=["their-rpm"]), + "libssh-config-0.9.6-10.el8_8 RSA/SHA256, Tue 6 May , Key ID 1234567890", + "libmodulemd-2.13.0-1.el8 RSA/SHA256, Wed 18 Aug , Key ID 1234567890", ], - dedent( - """ - Found unsigned RPMs: - i2: their-rpm - Encountered errors: - i1: Failed to run command - """ - ).strip(), - True, - id="Error in running command + image with unsigned RPMs. Report failure", + [], + id="All signed", ), pytest.param( [ - ProcessedImage( - image="i1", unsigned_rpms=[], error="Failed to run first command" - ), - ProcessedImage( - image="i2", unsigned_rpms=[], error="Failed to run second command" - ), + "libssh-config-0.9.6-10.el8_8 (none)", + "python39-twisted-23.10.0-1.el8ap (none)", + "libmodulemd-2.13.0-1.el8 (none)", ], - dedent( - """ - Encountered errors: - i1: Failed to run first command - i2: Failed to run second command - """ - ).strip(), - True, - id="Multiple errors in running command + no unsigned RPMs. Report failure", + [ + "libssh-config-0.9.6-10.el8_8", + "python39-twisted-23.10.0-1.el8ap", + "libmodulemd-2.13.0-1.el8", + ], + id="All unsigned", ), + pytest.param([], [], id="Empty list"), ], ) -def test_generate_output( - processed_images: list[ProcessedImage], - expected_print: str, - expected_failures_exist: bool, -) -> None: - """Test generate output""" - fail_out, print_out = generate_output(processed_images) - assert fail_out == expected_failures_exist - assert print_out == expected_print +def test_get_unsigned_rpms(test_input: list[str], expected: list[str]) -> None: + """Test get_unsigned_rpms""" + result = get_unsigned_rpms(rpms=test_input) + assert result == expected @pytest.mark.parametrize( - "test_input,expected", + ("processed_image", "expected_print", "expected_failures_exist"), [ pytest.param( + ProcessedImage( + image="i1", + unsigned_rpms=[], + ), + "No unsigned RPMs in i1", + False, + id="No unsigned RPMs + no errors. Do not report failures", + ), + pytest.param( + ProcessedImage( + image="i1", + unsigned_rpms=["my-rpm"], + ), dedent( """ - libssh-config-0.9.6-10.el8_8 (none) RSA/SHA256, Tue 6 May , Key ID 1234567890 - python39-twisted-23.10.0-1.el8ap (none) (none) - libmodulemd-2.13.0-1.el8 (none) RSA/SHA256, Wed 18 Aug , Key ID 1234567890 - gpg-pubkey-d4082792-5b32db75 (none) (none) + Found unsigned RPMs in i1: + ['my-rpm'] """ ).strip(), - ["python39-twisted-23.10.0-1.el8ap"], - id="Mix of signed and unsigned", + True, + id="Unsigned RPM + no errors. Report failures", ), pytest.param( + ProcessedImage( + image="i1", + unsigned_rpms=["my-rpm", "another-rpm"], + ), dedent( """ - libssh-config-0.9.6-10.el8_8 (none) RSA/SHA256, Tue 6 May , Key ID 1234567890 - libmodulemd-2.13.0-1.el8 (none) RSA/SHA256, Wed 18 Aug , Key ID 1234567890 - """ + Found unsigned RPMs in i1: + ['my-rpm', 'another-rpm'] + """ ).strip(), - [], - id="All signed", + True, + id="An image with multiple unsigned RPMs Report failure", ), - pytest.param("", [], id="Empty list"), - ], -) -def test_get_unsigned_rpms(test_input: list[str], expected: list[str]) -> None: - """Test get_unsigned_rpms""" - mock_runner = MagicMock() - mock_runner.return_value.stdout = test_input - result = get_unsigned_rpms(rpmdb=Path("rpmdb_folder"), runner=mock_runner) - assert result == expected - - -@pytest.mark.parametrize( - "test_input,expected", - [ pytest.param( + ProcessedImage( + image="i1", + unsigned_rpms=[], + error="Failed to run command", + ), dedent( """ - { - "application": "test", - "components": [ - { - "containerImage": "quay.io/container-image@sha256:123" - }, - { - "containerImage": "quay.io/container-image@sha256:456" - }, - { - "containerImage": "quay.io/container-image@sha256:789" - } - ] - } + Encountered errors in i1: + Failed to run command """ ).strip(), - [ - "quay.io/container-image@sha256:123", - "quay.io/container-image@sha256:456", - "quay.io/container-image@sha256:789", - ], - id="snapshot test", - ), - pytest.param( - "quay.io/container-image@sha256:123", - ["quay.io/container-image@sha256:123"], - id="single container", + True, + id="Error when running command, Report failure", ), ], ) -def test_parse_image_input(test_input: str, expected: list[str]) -> None: - """Test parse_image_input""" - result = parse_image_input(image_input=test_input) - assert result == expected - - -def test_parse_image_input_exception() -> None: - """Test parse_image_input throws exception""" - test_input = '{"apiVersion": "appstudio.redhat.com/v1alpha1"}' - with pytest.raises(KeyError): - parse_image_input(image_input=test_input) +def test_generate_output( + processed_image: ProcessedImage, + expected_print: str, + expected_failures_exist: bool, +) -> None: + """Test generate_output""" + fail_out, print_out = generate_output(processed_image) + assert fail_out == expected_failures_exist + assert print_out == expected_print class TestImageProcessor: @@ -202,25 +206,37 @@ def mock_rpms_getter(self) -> MagicMock: """mocked rpms_getter function""" return MagicMock() + @pytest.fixture() + def mock_unsigned_rpms_getter(self) -> MagicMock: + """mocked unsigned_rpms_getter function""" + return MagicMock() + @pytest.mark.parametrize( - ("unsigned_rpms",), + ("rpms_data", "expected_unsigned"), [ - pytest.param([], id="all signed"), - pytest.param(["my-unsigned-rpm"], id="one unsigned"), + pytest.param([], [], id="No RPMs"), pytest.param( - ["my-unsigned-rpm", "their-unsigned-rpm"], id="multiple unsigned" + [ + "libssh-config-0.9.6-10.el8_8 RSA/SHA256, Tue 6 May , Key ID 1234567890", + "python39-twisted-23.10.0-1.el8ap (none)", + "libmodulemd-2.13.0-1.el8 RSA/SHA256, Wed 18 Aug , Key ID 1234567890", + "gpg-pubkey-d4082792-5b32db75 (none)", + ], + ["python39-twisted-23.10.0-1.el8ap"], + id="one unsigned, two signed", ), ], ) - def test_call( + def test_call( # pylint: disable=too-many-arguments self, mock_db_getter: MagicMock, + expected_unsigned: list[str], mock_rpms_getter: MagicMock, tmp_path: Path, - unsigned_rpms: list[str], + rpms_data: list[str], ) -> None: """Test ImageProcessor's callable""" - mock_rpms_getter.return_value = unsigned_rpms + mock_rpms_getter.return_value = rpms_data instance = ImageProcessor( workdir=tmp_path, db_getter=mock_db_getter, @@ -228,15 +244,20 @@ def test_call( ) img = "my-img" out = instance(img) - assert out == ProcessedImage(image=img, unsigned_rpms=unsigned_rpms, error="") + assert out == ProcessedImage( + image=img, + unsigned_rpms=expected_unsigned, + error="", + ) - def test_call_db_getter_exception( + def test_call_db_getter_exception( # pylint: disable=too-many-arguments self, mock_db_getter: MagicMock, mock_rpms_getter: MagicMock, + mock_unsigned_rpms_getter: MagicMock, tmp_path: Path, ) -> None: - """Test ImageProcessor's callable""" + """Test ImageProcessor exception in db_getter""" stderr = "Failed to run command" mock_db_getter.side_effect = CalledProcessError( stderr=stderr, returncode=1, cmd="" @@ -245,20 +266,23 @@ def test_call_db_getter_exception( workdir=tmp_path, db_getter=mock_db_getter, rpms_getter=mock_rpms_getter, + unsigned_rpms_getter=mock_unsigned_rpms_getter, ) img = "my-img" out = instance(img) mock_db_getter.assert_called_once() mock_rpms_getter.assert_not_called() + mock_unsigned_rpms_getter.assert_not_called() assert out == ProcessedImage(image=img, unsigned_rpms=[], error=stderr) - def test_call_rpm_getter_exception( + def test_call_rpm_getter_exception( # pylint: disable=too-many-arguments self, mock_db_getter: MagicMock, mock_rpms_getter: MagicMock, + mock_unsigned_rpms_getter: MagicMock, tmp_path: Path, ) -> None: - """Test ImageProcessor's callable""" + """Test ImageProcessor exception in rpms_getter""" stderr = "Failed to run command" mock_rpms_getter.side_effect = CalledProcessError( stderr=stderr, returncode=1, cmd="" @@ -267,25 +291,39 @@ def test_call_rpm_getter_exception( workdir=tmp_path, db_getter=mock_db_getter, rpms_getter=mock_rpms_getter, + unsigned_rpms_getter=mock_unsigned_rpms_getter, ) img = "my-img" out = instance(img) mock_db_getter.assert_called_once() mock_rpms_getter.assert_called_once() + mock_unsigned_rpms_getter.assert_not_called() assert out == ProcessedImage(image=img, unsigned_rpms=[], error=stderr) - -def test_get_rpmdb(tmp_path: Path) -> None: - """Test get_rpmdb""" - image = "my-image" - mock_runner = MagicMock() - out = get_rpmdb( - container_image=image, - target_dir=tmp_path, - runner=mock_runner, - ) - assert mock_runner.call_count == 1 - assert out == tmp_path + def test_call_unsigned_rpms_getter_exception( # pylint: disable=too-many-arguments + self, + mock_db_getter: MagicMock, + mock_rpms_getter: MagicMock, + mock_unsigned_rpms_getter: MagicMock, + tmp_path: Path, + ) -> None: + """Test ImageProcessor exception in unsigned_rpms_getter""" + stderr = "Failed to run command" + mock_unsigned_rpms_getter.side_effect = CalledProcessError( + stderr=stderr, returncode=1, cmd="" + ) + instance = ImageProcessor( + workdir=tmp_path, + db_getter=mock_db_getter, + rpms_getter=mock_rpms_getter, + unsigned_rpms_getter=mock_unsigned_rpms_getter, + ) + img = "my-img" + out = instance(img) + mock_db_getter.assert_called_once() + mock_rpms_getter.assert_called_once() + mock_unsigned_rpms_getter.assert_called_once() + assert out == ProcessedImage(image=img, unsigned_rpms=[], error=stderr) class TestMain: @@ -324,7 +362,7 @@ def status_path(self, tmp_path: Path) -> Path: return tmp_path / "status" @pytest.mark.parametrize( - "fail_unsigned, has_errors", + ("fail_unsigned", "has_errors"), [ pytest.param( False, @@ -369,9 +407,9 @@ def test_main( # pylint: disable=too-many-arguments assert status_path.read_text() == "SUCCESS" assert mock_image_processor.return_value.call_count == 1 generate_output_mock.assert_called_once_with( - [mock_image_processor.return_value.return_value] + processed_image=mock_image_processor.return_value.return_value ) - mock_image_processor.return_value.assert_has_calls([call("img1")]) + mock_image_processor.return_value.assert_called_once_with(img="img1") def test_main_fail_on_unsigned_rpm_or_errors( self, @@ -383,7 +421,7 @@ def test_main_fail_on_unsigned_rpm_or_errors( when whe 'fail-unsigned' flag is used and there are unsigned RPMs """ fail_unsigned: bool = True - mock = create_generate_output_mock(with_failures=True) + generate_output_mock = create_generate_output_mock(with_failures=True) with pytest.raises(SystemExit) as err: rpm_verifier.main( # pylint: disable=no-value-for-parameter args=[ @@ -400,4 +438,5 @@ def test_main_fail_on_unsigned_rpm_or_errors( standalone_mode=False, ) assert status_path.read_text() == "ERROR" - assert err.value.code == mock.return_value[1] + generate_output_mock.assert_called_once() + assert err.value.code == generate_output_mock.return_value[1] diff --git a/verify_rpms/rpm_verifier.py b/verify_rpms/rpm_verifier.py index 09ae9c9..122c94c 100644 --- a/verify_rpms/rpm_verifier.py +++ b/verify_rpms/rpm_verifier.py @@ -1,33 +1,41 @@ #!/usr/bin/env python3 """Verify RPMs are signed""" - import sys import tempfile -from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass -from json import JSONDecodeError, loads from pathlib import Path from subprocess import CalledProcessError, run -from typing import Callable, Iterable +from typing import Callable import click @dataclass(frozen=True) class ProcessedImage: - """Unsigned RPMs for a single image""" + """ + A class to hold data about rpms for single image: + Image name, + Unsigned RPMs, + Errors + """ image: str unsigned_rpms: list[str] error: str = "" def __str__(self) -> str: - return f"{self. image}: {', '.join(self.unsigned_rpms)}" + return f"{self.image}: {', '.join(self.unsigned_rpms)}" def get_rpmdb(container_image: str, target_dir: Path, runner: Callable = run) -> Path: - """Extract RPM DB from a given container image reference""" + """ + Extract RPM DB from a given container image reference + :param container_image: the image to extract + :param target_dir: the directory to extract the DB to + :param runner: subprocess.run to run CLI commands + :return: Path of the directory the DB extracted to + """ runner( [ "oc", @@ -44,20 +52,19 @@ def get_rpmdb(container_image: str, target_dir: Path, runner: Callable = run) -> return target_dir -def get_unsigned_rpms(rpmdb: Path, runner: Callable = run) -> list[str]: +def get_rpms_data(rpmdb: Path, runner: Callable = run) -> list[str]: """ - Get all unsigned RPMs from RPM DB path - Filter and return the unsigned RPMs + Get RPMs data from RPM DB :param rpmdb: path to RPM DB folder :param runner: subprocess.run to run CLI commands - :return: list of unsigned RPMs within the folder + :return: list of RPMs with their signature information """ rpm_strs = runner( - [ + [ # pylint: disable=duplicate-code "rpm", "-qa", "--qf", - "%{NAME}-%{VERSION}-%{RELEASE} %{SIGGPG:pgpsig} %{SIGPGP:pgpsig}\n", + "%{NAME}-%{VERSION}-%{RELEASE} %{SIGPGP:pgpsig}\n", "--dbpath", str(rpmdb), ], @@ -65,64 +72,65 @@ def get_unsigned_rpms(rpmdb: Path, runner: Callable = run) -> list[str]: text=True, check=True, ).stdout.splitlines() - return [ + return rpm_strs + + +def get_unsigned_rpms(rpms: list[str]) -> list[str]: + """ + Get all unsigned RPMs from the list of RPMs + Filters out the `gpg-pubkey` and return the unsigned RPMs + :param rpms: list of RPMs + :return: list of unsigned RPMs + """ + unsigned_rpms: list[str] = [ rpm.split()[0] - for rpm in rpm_strs + for rpm in rpms if "Key ID" not in rpm and not rpm.startswith("gpg-pubkey") ] + return unsigned_rpms + -def generate_output(processed_images: Iterable[ProcessedImage]) -> tuple[bool, str]: +def generate_output(processed_image: ProcessedImage) -> tuple[bool, str]: """ Generate the output that should be printed to the user based on the processed images results. - - :param processed_images: each element contains a name and a list of unsigned images - + :param processed_image: ProcessedImage that contains all the rpms information :returns: The status and summary of the image processing. """ - failures = True - with_unsigned_rpms = [img for img in processed_images if img.unsigned_rpms] - with_error = [img for img in processed_images if img.error] - if not with_unsigned_rpms and not with_error: - output = "No unsigned RPMs found." - failures = False - else: - output = "\n".join([str(img) for img in with_unsigned_rpms]) - output = f"Found unsigned RPMs:\n{output}" if with_unsigned_rpms else "" - error_str = "\n".join([f"{img.image}: {img.error}" for img in with_error]) - output = f"{output}\nEncountered errors:\n{error_str}" if error_str else output - return failures, output.lstrip() + failure = True - -def parse_image_input(image_input: str) -> list[str]: - """ - Input is either an image reference or snapshot data in json format. - Try parsing as json and extract the images. - If failing to parse as json, assume it's an image reference as one-element list. - """ - try: - snapshot = loads(s=image_input) - except JSONDecodeError: - return [image_input] - return [component["containerImage"] for component in snapshot["components"]] + if not processed_image.unsigned_rpms and not processed_image.error: + output = f"No unsigned RPMs in {processed_image.image}" + failure = False + else: + output = ( + f"Found unsigned RPMs in {processed_image.image}:\n{processed_image.unsigned_rpms}" + if processed_image.unsigned_rpms + else f"Encountered errors in {processed_image.image}:\n{processed_image.error}" + ) + return failure, output.lstrip() @dataclass(frozen=True) class ImageProcessor: - """Find unsigned RPMs provided an image""" + """ + Populate RPMs data for all images + """ workdir: Path db_getter: Callable[[str, Path], Path] = get_rpmdb - rpms_getter: Callable[[Path], list[str]] = get_unsigned_rpms + rpms_getter: Callable[[Path], list[str]] = get_rpms_data + unsigned_rpms_getter: Callable[[list[str]], list[str]] = get_unsigned_rpms def __call__(self, img: str) -> ProcessedImage: with tempfile.TemporaryDirectory( dir=str(self.workdir), prefix="rpmdb" ) as tmpdir: try: - rpm_db = self.db_getter(img, Path(tmpdir)) - unsigned_rpms = self.rpms_getter(rpm_db) + rpms_db = self.db_getter(img, Path(tmpdir)) + rpms_data = self.rpms_getter(rpms_db) + unsigned_rpms = self.unsigned_rpms_getter(rpms_data) except CalledProcessError as err: return ProcessedImage( image=img, @@ -169,15 +177,10 @@ def main( ) -> None: """Verify RPMs are signed""" - container_images = parse_image_input(img_input) - processor = ImageProcessor(workdir=workdir) - with ThreadPoolExecutor() as executor: - processed_images: Iterable[ProcessedImage] = executor.map( - processor, container_images - ) + processed_image = processor(img=img_input) - failures_occurred, output = generate_output(list(processed_images)) + failures_occurred, output = generate_output(processed_image=processed_image) if failures_occurred: status_path.write_text("ERROR") else: