Skip to content

Commit

Permalink
Merge pull request #16 from yftacherzog/RHTAPWATCH-680-gen-output
Browse files Browse the repository at this point in the history
chore(RHTAPWATCH-680): generate rpm_verifier output
  • Loading branch information
yftacherzog authored Dec 17, 2023
2 parents 1fc72ff + cc44c7e commit 98d05e4
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 8 deletions.
80 changes: 79 additions & 1 deletion tests/test_rpm_verifier.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Test rpm_verifier.py end-to-end"""

from pathlib import Path
from textwrap import dedent
from unittest.mock import MagicMock, call, create_autospec, sentinel

import pytest
Expand All @@ -10,6 +11,83 @@
from verify_rpms.rpm_verifier import ImageProcessor, ProcessedImage, generate_output


@pytest.mark.parametrize(
("processed_images", "fail", "expected_print", "expected_fail"),
[
pytest.param(
[ProcessedImage(image="i1", unsigned_rpms=[])],
False,
"No unsigned RPMs found.",
False,
id="No unsigned RPMs + do not fail if unsigned",
),
pytest.param(
[
ProcessedImage(image="i1", unsigned_rpms=[]),
ProcessedImage(image="i2", unsigned_rpms=[]),
],
True,
"No unsigned RPMs found.",
False,
id="No unsigned RPMs + fail if unsigned",
),
pytest.param(
[ProcessedImage(image="i1", unsigned_rpms=["my-rpm"])],
False,
dedent(
"""
Found unsigned RPMs:
image: i1 | unsigned RPMs: my-rpm
"""
).strip(),
False,
id="1 unsigned + do not fail if unsigned",
),
pytest.param(
[
ProcessedImage(image="i1", unsigned_rpms=["my-rpm", "another-rpm"]),
ProcessedImage(image="i2", unsigned_rpms=[]),
],
True,
dedent(
"""
Found unsigned RPMs:
image: i1 | unsigned RPMs: my-rpm, another-rpm
"""
).strip(),
True,
id="an image with multiple unsigned and another without + fail if unsigned",
),
pytest.param(
[
ProcessedImage(image="i1", unsigned_rpms=["my-rpm", "another-rpm"]),
ProcessedImage(image="i2", unsigned_rpms=["their-rpm"]),
],
True,
dedent(
"""
Found unsigned RPMs:
image: i1 | unsigned RPMs: my-rpm, another-rpm
image: i2 | unsigned RPMs: their-rpm
"""
).strip(),
True,
id="multiple images with unsigned rpms + fail if unsigned",
),
],
)
def test_generate_output(
processed_images: list[ProcessedImage],
fail: bool,
expected_print: str,
expected_fail: bool,
) -> None:
"""Test generate output"""
fail_out, print_out = generate_output(processed_images, fail)
assert fail_out == expected_fail
assert print_out == expected_print


class TestImageProcessor:
"""Test ImageProcessor's callable"""

Expand Down Expand Up @@ -68,7 +146,7 @@ def mock_image_processor(self, monkeypatch: MonkeyPatch) -> MagicMock:
@pytest.fixture()
def mock_generate_output(self, monkeypatch: MonkeyPatch) -> MagicMock:
"""Monkey-patched generate_output"""
mock = create_autospec(generate_output)
mock = create_autospec(generate_output, return_value=(False, ""))
monkeypatch.setattr(rpm_verifier, generate_output.__name__, mock)
return mock

Expand Down
31 changes: 24 additions & 7 deletions verify_rpms/rpm_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

"""Verify RPMs are signed"""

import sys
import tempfile
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
Expand All @@ -19,6 +20,9 @@ class ProcessedImage:
image: str
unsigned_rpms: list[str]

def __str__(self) -> str:
return f"image: {self. image} | unsigned RPMs: {', '.join(self.unsigned_rpms)}"


def get_rpmdb(container_image: str, target_dir: Path, runner: Callable = run) -> Path:
"""Extract RPM DB from a given container image reference"""
Expand All @@ -31,14 +35,24 @@ def get_unsigned_rpms(rpmdb: Path, runner: Callable = run) -> list[str]:


def generate_output(
unsigned_rpms: Iterable[ProcessedImage], fail_unsigned: bool
) -> None:
processed_images: Iterable[ProcessedImage], fail_unsigned: bool
) -> tuple[bool, str]:
"""
Given a mapping between images and unsigned RPMs, print out the images
having unsigned RPMs, together with their unsigned RPMs.
Exit with error in case unsigned RPMs found and the relevant flag provided.
Generate the output that should be printed to the user based on the processed images
and decide whether the script should fail or not
:param processed_images: each element contains a name and a list of unsigned images
:param fail_unsigned: whether the script should fail in case unsigned rpms found
"""
raise NotImplementedError()
with_unsigned_rpms = [img for img in processed_images if img.unsigned_rpms]
if not with_unsigned_rpms:
output = "No unsigned RPMs found."
fail = False
else:
output = "\n".join([str(img) for img in with_unsigned_rpms])
output = f"Found unsigned RPMs:\n{output}"
fail = fail_unsigned
return fail, output


def parse_image_input(image_input: str) -> list[str]:
Expand Down Expand Up @@ -103,7 +117,10 @@ def main(img_input: str, fail_unsigned: bool, workdir: Path) -> None:
processor, container_images
)

generate_output(list(processed_images), fail_unsigned)
fail, output = generate_output(list(processed_images), fail_unsigned)
if fail:
sys.exit(output)
print(output)


if __name__ == "__main__":
Expand Down

0 comments on commit 98d05e4

Please sign in to comment.