Skip to content

Commit

Permalink
chore: add verifier skeleton
Browse files Browse the repository at this point in the history
Add a skeleton for the verifier module to allow multiple people to
work on it later.

Signed-off-by: Yftach Herzog <[email protected]>
  • Loading branch information
yftacherzog committed Dec 13, 2023
1 parent 5e9a7c7 commit 1d306ec
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 2 deletions.
2 changes: 1 addition & 1 deletion format.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash -ex

main() {
local pkgs=("tests" "generate_compose")
local pkgs=("tests" "generate_compose" "verify_rpms")
pipenv run isort --profile black "${pkgs[@]}"
pipenv run black "${pkgs[@]}"
}
Expand Down
54 changes: 54 additions & 0 deletions tests/test_rpm_verifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Test rpm_verifier.py end-to-end"""

from unittest.mock import MagicMock, call, create_autospec, sentinel

import pytest
from pytest import MonkeyPatch

from verify_rpms import rpm_verifier
from verify_rpms.rpm_verifier import ImageProcessor, generate_output


class TestMain:
"""Test call to rpm_verifier.py main function"""

@pytest.fixture()
def mock_image_processor(self, monkeypatch: MonkeyPatch) -> MagicMock:
"""Monkey-patched ImageProcessor"""
mock: MagicMock = create_autospec(
ImageProcessor,
return_value=MagicMock(return_value=sentinel.output),
)
monkeypatch.setattr(rpm_verifier, ImageProcessor.__name__, mock)
return mock

@pytest.fixture()
def mock_generate_output(self, monkeypatch: MonkeyPatch) -> MagicMock:
"""Monkey-patched generate_output"""
mock = create_autospec(generate_output)
monkeypatch.setattr(rpm_verifier, generate_output.__name__, mock)
return mock

def test_main(
self,
mock_generate_output: MagicMock,
mock_image_processor: MagicMock,
) -> None:
"""Test call to rpm_verifier.py main function"""
rpm_verifier.main( # pylint: disable=no-value-for-parameter
args=[
"--container-image",
"img1",
"--container-image",
"img2",
"--fail-unsigned",
],
obj={},
standalone_mode=False,
)

assert mock_image_processor.return_value.call_count == 2
mock_image_processor.return_value.assert_has_calls([call("img1"), call("img2")])
mock_generate_output.assert_called_once_with(
[sentinel.output, sentinel.output], True
)
2 changes: 1 addition & 1 deletion tests/test_static_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from subprocess import run
from typing import Final

PKGS: Final[list[str]] = ["tests", "generate_compose"]
PKGS: Final[list[str]] = ["tests", "generate_compose", "verify_rpms"]


def test_mypy() -> None:
Expand Down
File renamed without changes.
Empty file added verify_rpms/__init__.py
Empty file.
85 changes: 85 additions & 0 deletions verify_rpms/rpm_verifier.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
#!/usr/bin/env python3

"""Verify RPMs are signed"""

import tempfile
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, Iterable

import click


@dataclass(frozen=True)
class ProcessedImage:
"""Unsigned RPMs for a single image"""

image: str
unsigned_rpms: list[str]


def get_rpmdb(container_image: str, target_dir: Path) -> Path:
"""Extract RPM DB from a given container image reference"""
raise NotImplementedError()


def get_unsigned_rpms(rpmdb: Path) -> list[str]:
"""Get unsigned RPMs from RPM DB path"""
raise NotImplementedError()


def generate_output(
unsigned_rpms: Iterable[ProcessedImage], fail_unsigned: bool
) -> None:
"""
Given a mapping between images and unsigned RPMs, print out the images
having unsigned RPMs, together with their unsigned RPMs.
Exit with error in case unsigned RPMs found and the relevant flag provided.
"""
raise NotImplementedError()


@dataclass(frozen=True)
class ImageProcessor:
"""Find unsigned RPMs provided an image"""

db_getter: Callable[[str, Path], Path] = get_rpmdb
rpms_getter: Callable[[Path], list[str]] = get_unsigned_rpms

def __call__(self, img: str) -> ProcessedImage:
with tempfile.TemporaryDirectory(prefix="tmprpmdb") as tmpdir:
rpm_db = self.db_getter(img, Path(tmpdir))
unsigned_rpms = self.rpms_getter(rpm_db)
return ProcessedImage(
image=img,
unsigned_rpms=unsigned_rpms,
)


@click.command()
@click.option(
"--container-image",
help="Reference to container image",
type=str,
multiple=True,
)
@click.option(
"--fail-unsigned",
help="Exit with failure if unsigned RPMs found",
type=bool,
is_flag=True,
)
def main(container_image: list[str], fail_unsigned: bool) -> None:
"""Verify RPMs are signed"""
processor = ImageProcessor()
with ThreadPoolExecutor() as executor:
processed_images: Iterable[ProcessedImage] = executor.map(
processor, container_image
)

generate_output(list(processed_images), fail_unsigned)


if __name__ == "__main__":
main() # pylint: disable=no-value-for-parameter

0 comments on commit 1d306ec

Please sign in to comment.