From 23502f1afe16b03a4bfe4d5f1134ef47fbc5a2f6 Mon Sep 17 00:00:00 2001 From: Avi Biton Date: Mon, 18 Dec 2023 12:36:01 +0200 Subject: [PATCH] chore(RHTAPWATCH-686): handle runner exceptions Except exceptions from runner Add errors to the output Add unit tests Signed-off-by: Avi Biton --- tests/test_rpm_verifier.py | 115 ++++++++++++++++++++++++++++++++++-- verify_rpms/rpm_verifier.py | 25 +++++--- 2 files changed, 128 insertions(+), 12 deletions(-) diff --git a/tests/test_rpm_verifier.py b/tests/test_rpm_verifier.py index 5f91bea..dc6f3c0 100644 --- a/tests/test_rpm_verifier.py +++ b/tests/test_rpm_verifier.py @@ -1,5 +1,6 @@ """Test rpm_verifier.py end-to-end""" from pathlib import Path +from subprocess import CalledProcessError from textwrap import dedent from unittest.mock import MagicMock, call, create_autospec, sentinel @@ -43,7 +44,7 @@ dedent( """ Found unsigned RPMs: - image: i1 | unsigned RPMs: my-rpm + i1: my-rpm """ ).strip(), False, @@ -58,7 +59,7 @@ dedent( """ Found unsigned RPMs: - image: i1 | unsigned RPMs: my-rpm, another-rpm + i1: my-rpm, another-rpm """ ).strip(), True, @@ -73,13 +74,73 @@ dedent( """ Found unsigned RPMs: - image: i1 | unsigned RPMs: my-rpm, another-rpm - image: i2 | unsigned RPMs: their-rpm + i1: my-rpm, another-rpm + i2: their-rpm """ ).strip(), True, id="multiple images with unsigned rpms + fail if unsigned", ), + pytest.param( + [ + ProcessedImage( + image="i1", unsigned_rpms=[], error="Failed to run command" + ), + ProcessedImage(image="i2", unsigned_rpms=["their-rpm"]), + ], + True, + dedent( + """ + Found unsigned RPMs: + i2: their-rpm + Encountered errors: + i1: Failed to run command + """ + ).strip(), + True, + id="Error in running command + image with unsigned rpms + fail if unsigned", + ), + pytest.param( + [ + ProcessedImage( + image="i1", unsigned_rpms=[], error="Failed to run first command" + ), + ProcessedImage( + image="i2", unsigned_rpms=[], error="Failed to run second command" + ), + ], + True, + dedent( + """ + Encountered errors: + i1: Failed to run first command + i2: Failed to run second command + """ + ).strip(), + True, + id="2 Errors in running command + fail if unsigned", + ), + pytest.param( + [ + ProcessedImage( + image="i1", unsigned_rpms=[], error="Failed to run first command" + ), + ProcessedImage( + image="i2", unsigned_rpms=["my-rpm", "another-rpm"], error="" + ), + ], + False, + dedent( + """ + Found unsigned RPMs: + i2: my-rpm, another-rpm + Encountered errors: + i1: Failed to run first command + """ + ).strip(), + False, + id="Errors in running command + image with unsigned rpms + do not fail if unsigned", + ), ], ) def test_generate_output( @@ -220,7 +281,51 @@ def test_call( ) img = "my-img" out = instance(img) - assert out == ProcessedImage(image=img, unsigned_rpms=unsigned_rpms) + assert out == ProcessedImage(image=img, unsigned_rpms=unsigned_rpms, error="") + + def test_call_db_getter_exception( + self, + mock_db_getter: MagicMock, + mock_rpms_getter: MagicMock, + tmp_path: Path, + ) -> None: + """Test ImageProcessor's callable""" + stderr = "Failed to run command" + mock_db_getter.side_effect = CalledProcessError( + stderr=stderr, returncode=1, cmd="" + ) + instance = ImageProcessor( + workdir=tmp_path, + db_getter=mock_db_getter, + rpms_getter=mock_rpms_getter, + ) + img = "my-img" + out = instance(img) + mock_db_getter.assert_called_once() + mock_rpms_getter.assert_not_called() + assert out == ProcessedImage(image=img, unsigned_rpms=[], error=stderr) + + def test_call_rpm_getter_exception( + self, + mock_db_getter: MagicMock, + mock_rpms_getter: MagicMock, + tmp_path: Path, + ) -> None: + """Test ImageProcessor's callable""" + stderr = "Failed to run command" + mock_rpms_getter.side_effect = CalledProcessError( + stderr=stderr, returncode=1, cmd="" + ) + instance = ImageProcessor( + workdir=tmp_path, + db_getter=mock_db_getter, + rpms_getter=mock_rpms_getter, + ) + img = "my-img" + out = instance(img) + mock_db_getter.assert_called_once() + mock_rpms_getter.assert_called_once() + assert out == ProcessedImage(image=img, unsigned_rpms=[], error=stderr) def test_get_rpmdb(tmp_path: Path) -> None: diff --git a/verify_rpms/rpm_verifier.py b/verify_rpms/rpm_verifier.py index 6ff4908..2d4cbf1 100644 --- a/verify_rpms/rpm_verifier.py +++ b/verify_rpms/rpm_verifier.py @@ -8,7 +8,7 @@ from dataclasses import dataclass from json import JSONDecodeError, loads from pathlib import Path -from subprocess import run +from subprocess import CalledProcessError, run from typing import Callable, Iterable import click @@ -20,9 +20,10 @@ class ProcessedImage: image: str unsigned_rpms: list[str] + error: str = "" def __str__(self) -> str: - return f"image: {self. image} | unsigned RPMs: {', '.join(self.unsigned_rpms)}" + return f"{self. image}: {', '.join(self.unsigned_rpms)}" def get_rpmdb(container_image: str, target_dir: Path, runner: Callable = run) -> Path: @@ -82,14 +83,17 @@ def generate_output( :param fail_unsigned: whether the script should fail in case unsigned rpms found """ with_unsigned_rpms = [img for img in processed_images if img.unsigned_rpms] - if not with_unsigned_rpms: + with_error = [img for img in processed_images if img.error] + if not with_unsigned_rpms and not with_error: output = "No unsigned RPMs found." fail = False else: output = "\n".join([str(img) for img in with_unsigned_rpms]) - output = f"Found unsigned RPMs:\n{output}" + output = f"Found unsigned RPMs:\n{output}" if with_unsigned_rpms else "" + error_str = "\n".join([f"{img.image}: {img.error}" for img in with_error]) + output = f"{output}\nEncountered errors:\n{error_str}" if error_str else output fail = fail_unsigned - return fail, output + return fail, output.lstrip() def parse_image_input(image_input: str) -> list[str]: @@ -119,8 +123,15 @@ def __call__(self, img: str) -> ProcessedImage: with tempfile.TemporaryDirectory( dir=str(self.workdir), prefix="rpmdb" ) as tmpdir: - rpm_db = self.db_getter(img, Path(tmpdir)) - unsigned_rpms = self.rpms_getter(rpm_db) + try: + rpm_db = self.db_getter(img, Path(tmpdir)) + unsigned_rpms = self.rpms_getter(rpm_db) + except CalledProcessError as err: + return ProcessedImage( + image=img, + unsigned_rpms=[], + error=err.stderr, + ) return ProcessedImage( image=img, unsigned_rpms=unsigned_rpms,