Skip to content

Commit

Permalink
Merge pull request #13 from yftacherzog/adjust-verifier-inputs
Browse files Browse the repository at this point in the history
chore: adjust rpm_verifier inputs
  • Loading branch information
yftacherzog authored Dec 14, 2023
2 parents c95f89c + 035aa59 commit 445b226
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 15 deletions.
15 changes: 7 additions & 8 deletions tests/test_rpm_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,17 @@ def test_main(
"""Test call to rpm_verifier.py main function"""
rpm_verifier.main( # pylint: disable=no-value-for-parameter
args=[
"--container-image",
"--input",
"img1",
"--container-image",
"img2",
"--fail-unsigned",
"true",
"--workdir",
"some/path",
],
obj={},
standalone_mode=False,
)

assert mock_image_processor.return_value.call_count == 2
mock_image_processor.return_value.assert_has_calls([call("img1"), call("img2")])
mock_generate_output.assert_called_once_with(
[sentinel.output, sentinel.output], True
)
assert mock_image_processor.return_value.call_count == 1
mock_image_processor.return_value.assert_has_calls([call("img1")])
mock_generate_output.assert_called_once_with([sentinel.output], True)
38 changes: 31 additions & 7 deletions verify_rpms/rpm_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,29 @@ def generate_output(
raise NotImplementedError()


def parse_image_input(image_input: str) -> list[str]:
"""
Input is either an image reference or snapshot data in json format.
Try parsing as json and extract the images.
If failing to parse as json, assume it's an image reference as one-element list.
"""
# Currently assuming a single image. To be replaced by deciding whether this is an
# image or a snapshot json, and parsing accordingly
return [image_input]


@dataclass(frozen=True)
class ImageProcessor:
"""Find unsigned RPMs provided an image"""

workdir: Path
db_getter: Callable[[str, Path], Path] = get_rpmdb
rpms_getter: Callable[[Path], list[str]] = get_unsigned_rpms

def __call__(self, img: str) -> ProcessedImage:
with tempfile.TemporaryDirectory(prefix="tmprpmdb") as tmpdir:
with tempfile.TemporaryDirectory(
dir=str(self.workdir), prefix="rpmdb"
) as tmpdir:
rpm_db = self.db_getter(img, Path(tmpdir))
unsigned_rpms = self.rpms_getter(rpm_db)
return ProcessedImage(
Expand All @@ -60,23 +74,33 @@ def __call__(self, img: str) -> ProcessedImage:

@click.command()
@click.option(
"--container-image",
"--input",
"img_input",
help="Reference to container image",
type=str,
multiple=True,
required=True,
)
@click.option(
"--fail-unsigned",
help="Exit with failure if unsigned RPMs found",
type=bool,
is_flag=True,
required=True,
)
def main(container_image: list[str], fail_unsigned: bool) -> None:
@click.option(
"--workdir",
help="Path in which temporary directories will be created",
type=click.Path(path_type=Path),
required=True,
)
def main(img_input: str, fail_unsigned: bool, workdir: Path) -> None:
"""Verify RPMs are signed"""
processor = ImageProcessor()

container_images = parse_image_input(img_input)

processor = ImageProcessor(workdir=workdir)
with ThreadPoolExecutor() as executor:
processed_images: Iterable[ProcessedImage] = executor.map(
processor, container_image
processor, container_images
)

generate_output(list(processed_images), fail_unsigned)
Expand Down

0 comments on commit 445b226

Please sign in to comment.