-
Notifications
You must be signed in to change notification settings - Fork 9
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add a skeleton for the verifier module to allow multiple people to work on it later. Signed-off-by: Yftach Herzog <[email protected]>
- Loading branch information
1 parent
5e9a7c7
commit 4973a70
Showing
6 changed files
with
147 additions
and
2 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
"""Test rpm_verifier.py end-to-end""" | ||
|
||
from unittest.mock import MagicMock, call, create_autospec, sentinel | ||
|
||
import pytest | ||
from pytest import MonkeyPatch | ||
|
||
from verify_rpms import rpm_verifier | ||
from verify_rpms.rpm_verifier import ImageProcessor, generate_output | ||
|
||
|
||
class TestMain: | ||
"""Test call to rpm_verifier.py main function""" | ||
|
||
@pytest.fixture() | ||
def mock_image_processor(self, monkeypatch: MonkeyPatch) -> MagicMock: | ||
"""Monkey-patched ImageProcessor""" | ||
mock: MagicMock = create_autospec( | ||
ImageProcessor, | ||
return_value=MagicMock(return_value=sentinel.output), | ||
) | ||
monkeypatch.setattr(rpm_verifier, ImageProcessor.__name__, mock) | ||
return mock | ||
|
||
@pytest.fixture() | ||
def mock_generate_output(self, monkeypatch: MonkeyPatch) -> MagicMock: | ||
"""Monkey-patched db_getter""" | ||
mock = create_autospec(generate_output) | ||
monkeypatch.setattr(rpm_verifier, generate_output.__name__, mock) | ||
return mock | ||
|
||
def test_main( | ||
self, | ||
mock_generate_output: MagicMock, | ||
mock_image_processor: MagicMock, | ||
) -> None: | ||
"""Test call to rpm_verifier.py main function""" | ||
rpm_verifier.main( # pylint: disable=no-value-for-parameter | ||
args=[ | ||
"--container-image", | ||
"img1", | ||
"--container-image", | ||
"img2", | ||
"--fail-unsigned", | ||
], | ||
obj={}, | ||
standalone_mode=False, | ||
) | ||
|
||
assert mock_image_processor.return_value.call_count == 2 | ||
mock_image_processor.return_value.assert_has_calls([call("img1"), call("img2")]) | ||
mock_generate_output.assert_called_once_with( | ||
[sentinel.output, sentinel.output], True | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
#!/usr/bin/env python3 | ||
|
||
"""Verify RPMs are signed""" | ||
|
||
import tempfile | ||
from concurrent.futures import ThreadPoolExecutor | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
from typing import Callable, Iterable | ||
|
||
import click | ||
|
||
|
||
@dataclass(frozen=True) | ||
class ProcessedImage: | ||
"""Unsigned RPMs for a single image""" | ||
|
||
image: str | ||
unsigned_rpms: list[str] | ||
|
||
|
||
def get_rpmdb(container_image: str, target_dir: Path) -> Path: | ||
"""Extract RPM DB from a given container image reference""" | ||
raise NotImplementedError() | ||
|
||
|
||
def get_rpms(rpmdb: Path) -> list[str]: | ||
"""Get RPMs from RPM DB path""" | ||
raise NotImplementedError() | ||
|
||
|
||
def is_rpm_signed(rpm: str) -> bool: | ||
"""Check if a given RPM is singed or not""" | ||
raise NotImplementedError() | ||
|
||
|
||
def generate_output( | ||
unsigned_rpms: Iterable[ProcessedImage], fail_unsigned: bool | ||
) -> None: | ||
""" | ||
Given a mapping between images and unsigned RPMs, print out the images | ||
having unsigned RPMs, together with their unsigned RPMs. | ||
Exit with error in case unsigned RPMs found and the relevant flag provided. | ||
""" | ||
raise NotImplementedError() | ||
|
||
|
||
@dataclass(frozen=True) | ||
class ImageProcessor: | ||
"""Find unsigned RPMs provided an image""" | ||
|
||
db_getter: Callable[[str, Path], Path] = get_rpmdb | ||
rpms_getter: Callable[[Path], list[str]] = get_rpms | ||
signed_verifier: Callable[[str], bool] = is_rpm_signed | ||
|
||
def __call__(self, img: str) -> ProcessedImage: | ||
with tempfile.TemporaryDirectory(prefix="tmprpmdb") as tmpdir: | ||
rpm_db = self.db_getter(img, Path(tmpdir)) | ||
rpms = self.rpms_getter(rpm_db) | ||
return ProcessedImage( | ||
image=img, | ||
unsigned_rpms=[rpm for rpm in rpms if not self.signed_verifier(rpm)], | ||
) | ||
|
||
|
||
@click.command() | ||
@click.option( | ||
"--container-image", | ||
help="Reference to container image", | ||
type=str, | ||
multiple=True, | ||
) | ||
@click.option( | ||
"--fail-unsigned", | ||
help="exit with failure if unsigned RPMs found", | ||
type=bool, | ||
is_flag=True, | ||
) | ||
def main(container_image: list[str], fail_unsigned: bool) -> None: | ||
"""Verify RPMs are signed""" | ||
processor = ImageProcessor() | ||
with ThreadPoolExecutor() as executor: | ||
processed_images: Iterable[ProcessedImage] = executor.map( | ||
processor, container_image | ||
) | ||
|
||
generate_output(list(processed_images), fail_unsigned) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() # pylint: disable=no-value-for-parameter |