Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(RHTAPWATCH-686): handle runner exceptions #25

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 110 additions & 5 deletions tests/test_rpm_verifier.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Test rpm_verifier.py end-to-end"""
from pathlib import Path
from subprocess import CalledProcessError
from textwrap import dedent
from unittest.mock import MagicMock, call, create_autospec, sentinel

Expand Down Expand Up @@ -43,7 +44,7 @@
dedent(
"""
Found unsigned RPMs:
image: i1 | unsigned RPMs: my-rpm
i1: my-rpm
"""
).strip(),
False,
Expand All @@ -58,7 +59,7 @@
dedent(
"""
Found unsigned RPMs:
image: i1 | unsigned RPMs: my-rpm, another-rpm
i1: my-rpm, another-rpm
"""
).strip(),
True,
Expand All @@ -73,13 +74,73 @@
dedent(
"""
Found unsigned RPMs:
image: i1 | unsigned RPMs: my-rpm, another-rpm
image: i2 | unsigned RPMs: their-rpm
i1: my-rpm, another-rpm
i2: their-rpm
"""
).strip(),
True,
id="multiple images with unsigned rpms + fail if unsigned",
),
pytest.param(
[
ProcessedImage(
image="i1", unsigned_rpms=[], error="Failed to run command"
),
ProcessedImage(image="i2", unsigned_rpms=["their-rpm"]),
],
True,
dedent(
"""
Found unsigned RPMs:
i2: their-rpm
Encountered errors:
i1: Failed to run command
"""
).strip(),
True,
id="Error in running command + image with unsigned rpms + fail if unsigned",
),
pytest.param(
[
ProcessedImage(
image="i1", unsigned_rpms=[], error="Failed to run first command"
),
ProcessedImage(
image="i2", unsigned_rpms=[], error="Failed to run second command"
),
],
True,
dedent(
"""
Encountered errors:
i1: Failed to run first command
i2: Failed to run second command
"""
).strip(),
True,
id="2 Errors in running command + fail if unsigned",
),
pytest.param(
[
ProcessedImage(
image="i1", unsigned_rpms=[], error="Failed to run first command"
),
ProcessedImage(
image="i2", unsigned_rpms=["my-rpm", "another-rpm"], error=""
),
],
False,
dedent(
"""
Found unsigned RPMs:
i2: my-rpm, another-rpm
Encountered errors:
i1: Failed to run first command
"""
).strip(),
False,
id="Errors in running command + image with unsigned rpms + do not fail if unsigned",
),
],
)
def test_generate_output(
Expand Down Expand Up @@ -220,7 +281,51 @@ def test_call(
)
img = "my-img"
out = instance(img)
assert out == ProcessedImage(image=img, unsigned_rpms=unsigned_rpms)
assert out == ProcessedImage(image=img, unsigned_rpms=unsigned_rpms, error="")

def test_call_db_getter_exception(
self,
mock_db_getter: MagicMock,
mock_rpms_getter: MagicMock,
tmp_path: Path,
) -> None:
"""Test ImageProcessor's callable"""
stderr = "Failed to run command"
mock_db_getter.side_effect = CalledProcessError(
stderr=stderr, returncode=1, cmd=""
)
instance = ImageProcessor(
workdir=tmp_path,
db_getter=mock_db_getter,
rpms_getter=mock_rpms_getter,
)
img = "my-img"
out = instance(img)
mock_db_getter.assert_called_once()
mock_rpms_getter.assert_not_called()
assert out == ProcessedImage(image=img, unsigned_rpms=[], error=stderr)

def test_call_rpm_getter_exception(
self,
mock_db_getter: MagicMock,
mock_rpms_getter: MagicMock,
tmp_path: Path,
) -> None:
"""Test ImageProcessor's callable"""
stderr = "Failed to run command"
mock_rpms_getter.side_effect = CalledProcessError(
stderr=stderr, returncode=1, cmd=""
)
instance = ImageProcessor(
workdir=tmp_path,
db_getter=mock_db_getter,
rpms_getter=mock_rpms_getter,
)
img = "my-img"
out = instance(img)
mock_db_getter.assert_called_once()
mock_rpms_getter.assert_called_once()
assert out == ProcessedImage(image=img, unsigned_rpms=[], error=stderr)


def test_get_rpmdb(tmp_path: Path) -> None:
Expand Down
25 changes: 18 additions & 7 deletions verify_rpms/rpm_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from dataclasses import dataclass
from json import JSONDecodeError, loads
from pathlib import Path
from subprocess import run
from subprocess import CalledProcessError, run
from typing import Callable, Iterable

import click
Expand All @@ -20,9 +20,10 @@ class ProcessedImage:

image: str
unsigned_rpms: list[str]
error: str = ""

def __str__(self) -> str:
return f"image: {self. image} | unsigned RPMs: {', '.join(self.unsigned_rpms)}"
return f"{self. image}: {', '.join(self.unsigned_rpms)}"


def get_rpmdb(container_image: str, target_dir: Path, runner: Callable = run) -> Path:
Expand Down Expand Up @@ -82,14 +83,17 @@ def generate_output(
:param fail_unsigned: whether the script should fail in case unsigned rpms found
"""
with_unsigned_rpms = [img for img in processed_images if img.unsigned_rpms]
if not with_unsigned_rpms:
with_error = [img for img in processed_images if img.error]
if not with_unsigned_rpms and not with_error:
output = "No unsigned RPMs found."
fail = False
else:
output = "\n".join([str(img) for img in with_unsigned_rpms])
output = f"Found unsigned RPMs:\n{output}"
output = f"Found unsigned RPMs:\n{output}" if with_unsigned_rpms else ""
error_str = "\n".join([f"{img.image}: {img.error}" for img in with_error])
output = f"{output}\nEncountered errors:\n{error_str}" if error_str else output
fail = fail_unsigned
return fail, output
return fail, output.lstrip()


def parse_image_input(image_input: str) -> list[str]:
Expand Down Expand Up @@ -119,8 +123,15 @@ def __call__(self, img: str) -> ProcessedImage:
with tempfile.TemporaryDirectory(
dir=str(self.workdir), prefix="rpmdb"
) as tmpdir:
rpm_db = self.db_getter(img, Path(tmpdir))
unsigned_rpms = self.rpms_getter(rpm_db)
try:
rpm_db = self.db_getter(img, Path(tmpdir))
unsigned_rpms = self.rpms_getter(rpm_db)
except CalledProcessError as err:
return ProcessedImage(
image=img,
unsigned_rpms=[],
error=err.stderr,
)
return ProcessedImage(
image=img,
unsigned_rpms=unsigned_rpms,
Expand Down