From c7c619d6bff011d27a1338ad1186422c7a9c487f Mon Sep 17 00:00:00 2001 From: Jakub Jelen Date: Tue, 4 Jul 2023 16:14:04 +0200 Subject: [PATCH] Move code handling GPG keys to separate library This decouples gpg keys handling and some code duplication from the MissingGpgKeysInhibitor actor to separate library that will be usable from more actors. The new actor TrustedGpgKeysScanner actor is crated, which handles reading the source RPM DB and trusted keys directory and produces a new model describing what keys are supposed to be trusted on the target system. This also removes the code duplication for detecting the --no-gpgcheck and for defining the directory where to look for the gpg keys. Signed-off-by: Jakub Jelen --- .../actors/missinggpgkeysinhibitor/actor.py | 4 +- .../libraries/missinggpgkey.py | 153 ++-------------- .../tests/component_test_missinggpgkey.py | 105 ++++------- .../tests/unit_test_missinggpgkey.py | 168 +----------------- .../libraries/userspacegen.py | 21 +-- .../actors/trustedgpgkeysscanner/actor.py | 21 +++ .../libraries/trustedgpgkeys.py | 41 +++++ .../tests/test_trustedgpgkeys.py | 74 ++++++++ repos/system_upgrade/common/libraries/gpg.py | 112 ++++++++++++ .../common/libraries/tests/test_gpg.py | 123 +++++++++++++ .../common/models/trustedgpgkeys.py | 19 ++ 11 files changed, 442 insertions(+), 399 deletions(-) create mode 100644 repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py create mode 100644 repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py create mode 100644 repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py create mode 100644 repos/system_upgrade/common/libraries/gpg.py create mode 100644 repos/system_upgrade/common/libraries/tests/test_gpg.py create mode 100644 repos/system_upgrade/common/models/trustedgpgkeys.py diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py index 6f836a5b66..faa96452da 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py @@ -2,9 +2,9 @@ from leapp.libraries.actor import missinggpgkey from leapp.models import ( DNFWorkaround, - InstalledRPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories ) from leapp.reporting import Report @@ -28,7 +28,7 @@ class MissingGpgKeysInhibitor(Actor): name = 'missing_gpg_keys_inhibitor' consumes = ( - InstalledRPM, + TrustedGpgKeys, TMPTargetRepositoriesFacts, TargetUserSpaceInfo, UsedTargetRepositories, diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py index 1880986dcf..8f6f5a30d3 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py @@ -8,113 +8,21 @@ from leapp import reporting from leapp.exceptions import StopActorExecutionError -from leapp.libraries.common import config -from leapp.libraries.common.config.version import get_source_major_version, get_target_major_version -from leapp.libraries.stdlib import api, run +from leapp.libraries.common.config.version import get_target_major_version +from leapp.libraries.common.gpg import get_path_to_gpg_certs, read_gpg_fp_from_file, the_nogpgcheck_option_used +from leapp.libraries.stdlib import api from leapp.models import ( DNFWorkaround, - InstalledRPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories ) from leapp.utils.deprecation import suppress_deprecation -GPG_CERTS_FOLDER = 'rpm-gpg' FMT_LIST_SEPARATOR = '\n - ' -def _gpg_show_keys(key_path): - """ - Show keys in given file in version-agnostic manner - - This runs gpg --show-keys (EL8) or gpg --with-fingerprints (EL7) - to verify the given file exists, is readable and contains valid - OpenPGP key data, which is printed in parsable format (--with-colons). - """ - try: - cmd = ['gpg2'] - # RHEL7 gnupg requires different switches to get the same output - if get_source_major_version() == '7': - cmd.append('--with-fingerprint') - else: - cmd.append('--show-keys') - cmd += ['--with-colons', key_path] - # TODO: discussed, most likely the checked=False will be dropped - # and error will be handled in other functions - return run(cmd, split=True, checked=False) - except OSError as err: - # NOTE: this is hypothetic; gnupg2 has to be installed on RHEL 7+ - error = 'Failed to read fingerprint from GPG key {}: {}'.format(key_path, str(err)) - api.current_logger().error(error) - return {} - - -def _parse_fp_from_gpg(output): - """ - Parse the output of gpg --show-keys --with-colons. - - Return list of 8 characters fingerprints per each gpgkey for the given - output from stdlib.run() or None if some error occurred. Either the - command return non-zero exit code, the file does not exists, its not - readable or does not contain any openpgp data. - """ - if not output or output['exit_code']: - return [] - - # we are interested in the lines of the output starting with "pub:" - # the colons are used for separating the fields in output like this - # pub:-:4096:1:999F7CBF38AB71F4:1612983048:::-:::escESC::::::23::0: - # ^--------------^ this is the fingerprint we need - # ^------^ but RPM version is just the last 8 chars lowercase - # Also multiple gpg keys can be stored in the file, so go through all "pub" - # lines - gpg_fps = [] - for line in output['stdout']: - if not line or not line.startswith('pub:'): - continue - parts = line.split(':') - if len(parts) >= 4 and len(parts[4]) == 16: - gpg_fps.append(parts[4][8:].lower()) - else: - api.current_logger().warning( - 'Cannot parse the gpg2 output. Line: "{}"' - .format(line) - ) - - return gpg_fps - - -def _read_gpg_fp_from_file(key_path): - """ - Returns the list of public key fingerprints from the given file - - Logs warning in case no OpenPGP data found in the given file or it is not - readable for some reason. - """ - res = _gpg_show_keys(key_path) - fp = _parse_fp_from_gpg(res) - if not fp: - error = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr']) - api.current_logger().error(error) - return fp - - -def _get_path_to_gpg_certs(): - """ - Get path to the directory with trusted target gpg keys in leapp tree - """ - # XXX This is copy&paste from TargetUserspaceCreator actor. - # Potential changes need to happen in both places to keep them in sync. - target_major_version = get_target_major_version() - target_product_type = config.get_product_type('target') - certs_dir = target_major_version - # only beta is special in regards to the GPG signing keys - if target_product_type == 'beta': - certs_dir = '{}beta'.format(target_major_version) - return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) - - def _expand_vars(path): """ Expand variables like $releasever and $basearch to the target system version @@ -152,38 +60,6 @@ def _get_abs_file_path(target_userspace, file_url): return os.path.join('/', file_path) -def _pubkeys_from_rpms(installed_rpms): - """ - Return the list of fingerprints of GPG keys in RPM DB - - This function returns short 8 characters fingerprints of trusted GPG keys - "installed" in the source OS RPM database. These look like normal packages - named "gpg-pubkey" and the fingerprint is present in the version field. - """ - return [pkg.version for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey'] - - -def _get_pubkeys(installed_rpms): - """ - Get pubkeys from installed rpms and the trusted directory - """ - pubkeys = _pubkeys_from_rpms(installed_rpms) - certs_path = _get_path_to_gpg_certs() - for certname in os.listdir(certs_path): - key_file = os.path.join(certs_path, certname) - fps = _read_gpg_fp_from_file(key_file) - if fps: - pubkeys += fps - # TODO: what about else: ? - # The warning is now logged in _read_gpg_fp_from_file. We can raise - # the priority of the message or convert it to report though. - return pubkeys - - -def _the_nogpgcheck_option_used(): - return config.get_env('LEAPP_NOGPGCHECK', False) == '1' - - def _consume_data(): try: used_target_repos = next(api.consume(UsedTargetRepositories)).repos @@ -199,10 +75,10 @@ def _consume_data(): 'Could not check for valid GPG keys', details={'details': 'No TMPTargetRepositoriesFacts facts'} ) try: - installed_rpms = next(api.consume(InstalledRPM)) + trusted_gpg_keys = next(api.consume(TrustedGpgKeys)) except StopIteration: raise StopActorExecutionError( - 'Could not check for valid GPG keys', details={'details': 'No InstalledRPM facts'} + 'Could not check for valid GPG keys', details={'details': 'No TrustedGpgKeys facts'} ) try: target_userspace = next(api.consume(TargetUserSpaceInfo)) @@ -211,7 +87,7 @@ def _consume_data(): 'Could not check for valid GPG keys', details={'details': 'No TargetUserSpaceInfo facts'} ) - return used_target_repos, target_repos, installed_rpms, target_userspace + return used_target_repos, target_repos, trusted_gpg_keys, target_userspace def _get_repo_gpgkey_urls(repo): @@ -274,7 +150,7 @@ def _report(title, summary, keys, inhibitor=False): ' prior the upgrade.' ' If you want to proceed the in-place upgrade without checking any RPM' ' signatures, execute leapp with the `--nogpgcheck` option.' - .format(_get_path_to_gpg_certs()) + .format(get_path_to_gpg_certs()) ) groups = [reporting.Groups.REPOSITORY] if inhibitor: @@ -306,7 +182,7 @@ def _report_missing_keys(keys): summary = ( 'Some of the target repositories require GPG keys that are not installed' ' in the current RPM DB or are not stored in the {trust_dir} directory.' - .format(trust_dir=_get_path_to_gpg_certs()) + .format(trust_dir=get_path_to_gpg_certs()) ) _report('Detected unknown GPG keys for target system repositories', summary, keys, True) @@ -383,7 +259,7 @@ def register_dnfworkaround(): api.produce(DNFWorkaround( display_name='import trusted gpg keys to RPM DB', script_path=api.current_actor().get_common_tool_path('importrpmgpgkeys'), - script_args=[_get_path_to_gpg_certs()], + script_args=[get_path_to_gpg_certs()], )) @@ -396,11 +272,11 @@ def process(): them from model TMPTargetRepositoriesFacts. """ # when the user decided to ignore gpg signatures on the packages, we can ignore these checks altogether - if _the_nogpgcheck_option_used(): + if the_nogpgcheck_option_used(): api.current_logger().warning('The --nogpgcheck option is used: skipping all related checks.') return - used_target_repos, target_repos, installed_rpms, target_userspace = _consume_data() + used_target_repos, target_repos, trusted_gpg_keys, target_userspace = _consume_data() target_repo_id_to_repositories_facts_map = { repo.repoid: repo @@ -415,8 +291,7 @@ def process(): invalid_keys = list() repos_missing_keys = list() - # These are used only for getting the installed gpg-pubkey "packages" - pubkeys = _get_pubkeys(installed_rpms) + pubkeys = [key.fingerprint for key in trusted_gpg_keys.items] processed_gpgkey_urls = set() tmpdir = None for repoid in used_target_repos: @@ -454,7 +329,7 @@ def process(): api.current_logger().error( 'Skipping unknown protocol for gpgkey {}'.format(gpgkey_url)) continue - fps = _read_gpg_fp_from_file(key_file) + fps = read_gpg_fp_from_file(key_file) if not fps: invalid_keys.append(gpgkey_url) api.current_logger().warning( diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py index 7da13cec9c..2754cb5f27 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py @@ -3,12 +3,13 @@ from leapp import reporting from leapp.exceptions import StopActorExecutionError -from leapp.libraries.actor.missinggpgkey import _pubkeys_from_rpms, process +from leapp.libraries.actor.missinggpgkey import process +from leapp.libraries.common.gpg import get_pubkeys_from_rpms from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked, produce_mocked from leapp.libraries.stdlib import api from leapp.models import ( DNFWorkaround, - InstalledRPM, + GpgKey, Report, RepositoriesFacts, RepositoryData, @@ -16,6 +17,7 @@ RPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories, UsedTargetRepository ) @@ -26,59 +28,21 @@ # whole process as I was initially advised not to use these component tests. -def _get_test_installedrpm_no_my_key(): +def _get_test_gpgkeys_missing(): """ - Valid RPM packages missing the key we are looking for (epel9) + Return list of Trusted GPG keys without the epel9 key we look for """ return [ - RPM( - name='rpm', - version='4.16.1.3', - release='17.el9', - epoch='0', - packager='Red Hat, Inc. ', - arch='x86_64', - pgpsig='RSA/SHA256, Mon 08 Aug 2022 09:10:15 AM UTC, Key ID 199e2f91fd431d51', - repository='BaseOS', - ), - RPM( - name='gpg-pubkey', - version='fd431d51', - release='4ae0493b', - epoch='0', - packager='Red Hat, Inc. (release key 2) ', - arch='noarch', - pgpsig='' - ), - RPM( - name='gpg-pubkey', - version='5a6340b3', - release='6229229e', - epoch='0', - packager='Red Hat, Inc. (auxiliary key 3) ', - arch='noarch', - pgpsig='' - ), + GpgKey(fingerprint='fd431d51', rpmdb=True), + GpgKey(fingerprint='5a6340b3', rpmdb=True), ] -def _get_test_installedrpm(): +def _get_test_gpgkeys(): """ - All test RPMS packages + Return all the Trusted GPG keys for a test """ - return InstalledRPM( - items=[ - RPM( - name='gpg-pubkey', - version='3228467c', - release='613798eb', - epoch='0', - packager='Fedora (epel9) ', - arch='noarch', - pgpsig='' - ), - ] + _get_test_installedrpm_no_my_key(), - ) + return TrustedGpgKeys(items=[GpgKey(fingerprint='3228467c', rpmdb=True)] + _get_test_gpgkeys_missing()) def _get_test_targuserspaceinfo(path='/'): @@ -189,7 +153,7 @@ def test_perform_nogpgcheck(monkeypatch): monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( envars={'LEAPP_NOGPGCHECK': '1'}, msgs=[ - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ], @@ -206,13 +170,13 @@ def test_perform_nogpgcheck(monkeypatch): @pytest.mark.parametrize('msgs', [ [], - [_get_test_installedrpm], + [_get_test_gpgkeys], [_get_test_usedtargetrepositories], [_get_test_tmptargetrepositoriesfacts], # These are just incomplete lists of required facts - [_get_test_installedrpm(), _get_test_usedtargetrepositories()], + [_get_test_gpgkeys(), _get_test_usedtargetrepositories()], [_get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts()], - [_get_test_installedrpm(), _get_test_tmptargetrepositoriesfacts()], + [_get_test_gpgkeys(), _get_test_tmptargetrepositoriesfacts()], ]) def test_perform_missing_facts(monkeypatch, msgs): """ @@ -238,7 +202,7 @@ def test_perform_missing_facts(monkeypatch, msgs): @suppress_deprecation(TMPTargetRepositoriesFacts) def _get_test_tmptargetrepositoriesfacts_partial(): return [ - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), TMPTargetRepositoriesFacts( repositories=[ @@ -298,7 +262,7 @@ def _get_pubkeys_mocked(installed_rpms): """ This skips getting fps from files in container for simplification """ - return _pubkeys_from_rpms(installed_rpms) + return get_pubkeys_from_rpms(installed_rpms) def test_perform_missing_some_repo_facts(monkeypatch): @@ -314,7 +278,7 @@ def test_perform_missing_some_repo_facts(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) with pytest.raises(StopActorExecutionError): process() @@ -326,7 +290,7 @@ def test_perform_missing_some_repo_facts(monkeypatch): def _get_test_tmptargetrepositoriesfacts_https_unused(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), TMPTargetRepositoriesFacts( repositories=[ @@ -362,8 +326,7 @@ def test_perform_https_gpgkey_unused(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert not api.current_logger.warnmsg @@ -376,7 +339,7 @@ def test_perform_https_gpgkey_unused(monkeypatch): def get_test_tmptargetrepositoriesfacts_https(): return ( _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -409,7 +372,7 @@ def get_test_tmptargetrepositoriesfacts_https(): def get_test_tmptargetrepositoriesfacts_ftp(): return ( _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -454,8 +417,7 @@ def test_perform_https_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) monkeypatch.setattr('six.moves.urllib.request.urlretrieve', _urlretrive_mocked) process() @@ -482,8 +444,7 @@ def test_perform_https_gpgkey_urlerror(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) monkeypatch.setattr('six.moves.urllib.request.urlretrieve', _urlretrive_mocked_urlerror) process() @@ -508,8 +469,7 @@ def test_perform_ftp_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert len(api.current_logger.errmsg) == 1 @@ -525,7 +485,7 @@ def test_perform_ftp_gpgkey(monkeypatch): def get_test_data_missing_key(): return [ _get_test_targuserspaceinfo(), - InstalledRPM(items=_get_test_installedrpm_no_my_key()), + TrustedGpgKeys(items=_get_test_gpgkeys_missing()), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ] @@ -543,8 +503,7 @@ def test_perform_report(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert not api.current_logger.warnmsg @@ -559,7 +518,7 @@ def test_perform_report(monkeypatch): def get_test_data_no_gpg_data(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ] @@ -593,8 +552,7 @@ def test_perform_invalid_key(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked_my_empty) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked_my_empty) process() assert len(api.current_logger.warnmsg) == 1 @@ -610,7 +568,7 @@ def test_perform_invalid_key(monkeypatch): def get_test_data_gpgcheck_without_gpgkey(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -651,8 +609,7 @@ def test_perform_gpgcheck_without_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert len(api.current_logger.warnmsg) == 1 diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py index 68e4cdfe08..8cd0053174 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py @@ -6,134 +6,12 @@ import distro import pytest -from leapp.libraries.actor.missinggpgkey import ( - _expand_vars, - _get_abs_file_path, - _get_path_to_gpg_certs, - _get_pubkeys, - _get_repo_gpgkey_urls, - _gpg_show_keys, - _parse_fp_from_gpg, - _pubkeys_from_rpms -) +from leapp.libraries.actor.missinggpgkey import _expand_vars, _get_abs_file_path, _get_repo_gpgkey_urls from leapp.libraries.common.testutils import CurrentActorMocked from leapp.libraries.stdlib import api from leapp.models import InstalledRPM, RepositoryData, RPM, TargetUserSpaceInfo -def is_rhel7(): - return int(distro.major_version()) < 8 - - -def test_gpg_show_keys(current_actor_context, monkeypatch): - src = '7.9' if is_rhel7() else '8.6' - current_actor = CurrentActorMocked(src_ver=src) - monkeypatch.setattr(api, 'current_actor', current_actor) - - # python2 compatibility :/ - dirpath = tempfile.mkdtemp() - - # using GNUPGHOME env should avoid gnupg modifying the system - os.environ['GNUPGHOME'] = dirpath - - try: - # non-existing file - non_existent_path = os.path.join(dirpath, 'nonexistent') - res = _gpg_show_keys(non_existent_path) - if is_rhel7(): - err_msg = "gpg: can't open `{}'".format(non_existent_path) - else: - err_msg = "gpg: can't open '{}': No such file or directory\n".format(non_existent_path) - assert not res['stdout'] - assert err_msg in res['stderr'] - assert res['exit_code'] == 2 - - fp = _parse_fp_from_gpg(res) - assert fp == [] - - # no gpg data found - no_key_path = os.path.join(dirpath, "no_key") - with open(no_key_path, "w") as f: - f.write('test') - - res = _gpg_show_keys(no_key_path) - if is_rhel7(): - err_msg = ('gpg: no valid OpenPGP data found.\n' - 'gpg: processing message failed: Unknown system error\n') - else: - err_msg = 'gpg: no valid OpenPGP data found.\n' - assert not res['stdout'] - assert res['stderr'] == err_msg - assert res['exit_code'] == 2 - - fp = _parse_fp_from_gpg(res) - assert fp == [] - - # with some test data now -- rhel9 release key - # rhel9_key_path = os.path.join(api.get_common_folder_path('rpm-gpg'), '9') - cur_dir = os.path.dirname(os.path.abspath(__file__)) - rhel9_key_path = os.path.join(cur_dir, '..', '..', '..', 'files', 'rpm-gpg', '9', - 'RPM-GPG-KEY-redhat-release') - res = _gpg_show_keys(rhel9_key_path) - finally: - shutil.rmtree(dirpath) - - if is_rhel7(): - assert len(res['stdout']) == 4 - assert res['stdout'][0] == ('pub:-:4096:1:199E2F91FD431D51:1256212795:::-:' - 'Red Hat, Inc. (release key 2) :') - assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' - assert res['stdout'][2] == ('pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:' - 'Red Hat, Inc. (auxiliary key 3) :') - assert res['stdout'][3] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' - else: - assert len(res['stdout']) == 6 - assert res['stdout'][0] == 'pub:-:4096:1:199E2F91FD431D51:1256212795:::-:::scSC::::::23::0:' - assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' - assert res['stdout'][2] == ('uid:-::::1256212795::DC1CAEC7997B3575101BB0FCAAC6191792660D8F::' - 'Red Hat, Inc. (release key 2) ::::::::::0:') - assert res['stdout'][3] == 'pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:::scSC::::::23::0:' - assert res['stdout'][4] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' - assert res['stdout'][5] == ('uid:-::::1646863006::DA7F68E3872D6E7BDCE05225E7EB5F3ACDD9699F::' - 'Red Hat, Inc. (auxiliary key 3) ::::::::::0:') - - err = '{}/trustdb.gpg: trustdb created'.format(dirpath) - assert err in res['stderr'] - assert res['exit_code'] == 0 - - # now, parse the output too - fp = _parse_fp_from_gpg(res) - assert fp == ['fd431d51', '5a6340b3'] - - -@pytest.mark.parametrize('res, exp', [ - ({'exit_code': 2, 'stdout': '', 'stderr': ''}, []), - ({'exit_code': 2, 'stdout': '', 'stderr': 'bash: gpg2: command not found...'}, []), - ({'exit_code': 0, 'stdout': 'Some other output', 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['Some other output', 'other line'], 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['pub:-:4096:1:199E2F91FD431D:'], 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['pub:-:4096:1:5054E4A45A6340B3:1..'], 'stderr': ''}, ['5a6340b3']), -]) -def test_parse_fp_from_gpg(res, exp): - fp = _parse_fp_from_gpg(res) - assert fp == exp - - -@pytest.mark.parametrize('target, product_type, exp', [ - ('8.6', 'beta', '../../files/rpm-gpg/8beta'), - ('8.8', 'htb', '../../files/rpm-gpg/8'), - ('9.0', 'beta', '../../files/rpm-gpg/9beta'), - ('9.2', 'ga', '../../files/rpm-gpg/9'), -]) -def test_get_path_to_gpg_certs(current_actor_context, monkeypatch, target, product_type, exp): - current_actor = CurrentActorMocked(dst_ver=target, - envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) - monkeypatch.setattr(api, 'current_actor', current_actor) - - p = _get_path_to_gpg_certs() - assert p == exp - - @pytest.mark.parametrize('data, exp', [ ('bare string', 'bare string'), ('with dollar$$$', 'with dollar$$$'), @@ -148,50 +26,6 @@ def test_expand_vars(monkeypatch, data, exp): assert res == exp -def _get_test_installed_rmps(): - return InstalledRPM( - items=[ - RPM(name='gpg-pubkey', - version='9570ff31', - release='5e3006fb', - epoch='0', - packager='Fedora (33) ', - arch='noarch', - pgpsig=''), - RPM(name='rpm', - version='4.17.1', - release='3.fc35', - epoch='0', - packager='Fedora Project', - arch='x86_64', - pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f'), - ], - ) - - -def test_pubkeys_from_rpms(): - installed_rpm = _get_test_installed_rmps() - assert _pubkeys_from_rpms(installed_rpm) == ['9570ff31'] - - -# @pytest.mark.parametrize('target, product_type, exp', [ -# ('8.6', 'beta', ['F21541EB']), -# ('8.8', 'htb', ['FD431D51', 'D4082792']), # ga -# ('9.0', 'beta', ['F21541EB']), -# ('9.2', 'ga', ['FD431D51', '5A6340B3']), -# ]) -# Def test_get_pubkeys(current_actor_context, monkeypatch, target, product_type, exp): -# current_actor = CurrentActorMocked(dst_ver=target, -# envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) -# monkeypatch.setattr(api, 'current_actor', current_actor) -# installed_rpm = _get_test_installed_rmps() -# -# p = _get_pubkeys(installed_rpm) -# assert '9570ff31' in p -# for x in exp: -# assert x in p - - @pytest.mark.parametrize('repo, exp', [ (RepositoryData(repoid='dummy', name='name'), None), (RepositoryData(repoid='dummy', name='name', additional_fields='{}'), None), diff --git a/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py b/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py index 9dfa0f14e0..725da3c52c 100644 --- a/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py +++ b/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py @@ -9,6 +9,7 @@ from leapp.libraries.common import dnfplugin, mounting, overlaygen, repofileutils, rhsm, rhui, utils from leapp.libraries.common.config import get_env, get_product_type from leapp.libraries.common.config.version import get_target_major_version +from leapp.libraries.common.gpg import get_path_to_gpg_certs, the_nogpgcheck_option_used from leapp.libraries.stdlib import api, CalledProcessError, config, run from leapp.models import RequiredTargetUserspacePackages # deprecated from leapp.models import TMPTargetRepositoriesFacts # deprecated all the time @@ -54,7 +55,6 @@ # Issue: #486 PROD_CERTS_FOLDER = 'prod-certs' -GPG_CERTS_FOLDER = 'rpm-gpg' PERSISTENT_PACKAGE_CACHE_DIR = '/var/lib/leapp/persistent_package_cache' DEDICATED_LEAPP_PART_URL = 'https://access.redhat.com/solutions/7011704' @@ -143,21 +143,8 @@ def _backup_to_persistent_package_cache(userspace_dir): shutil.move(src_cache, PERSISTENT_PACKAGE_CACHE_DIR) -def _the_nogpgcheck_option_used(): - return get_env('LEAPP_NOGPGCHECK', False) == '1' - - -def _get_path_to_gpg_certs(target_major_version): - target_product_type = get_product_type('target') - certs_dir = target_major_version - # only beta is special in regards to the GPG signing keys - if target_product_type == 'beta': - certs_dir = '{}beta'.format(target_major_version) - return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) - - def _import_gpg_keys(context, install_root_dir, target_major_version): - certs_path = _get_path_to_gpg_certs(target_major_version) + certs_path = get_path_to_gpg_certs(target_major_version) # Import the RHEL X+1 GPG key to be able to verify the installation of initial packages try: # Import also any other keys provided by the customer in the same directory @@ -234,13 +221,13 @@ def prepare_target_userspace(context, userspace_dir, enabled_repos, packages): install_root_dir = '/el{}target'.format(target_major_version) with mounting.BindMount(source=userspace_dir, target=os.path.join(context.base_dir, install_root_dir.lstrip('/'))): _restore_persistent_package_cache(userspace_dir) - if not _the_nogpgcheck_option_used(): + if not the_nogpgcheck_option_used(): _import_gpg_keys(context, install_root_dir, target_major_version) repos_opt = [['--enablerepo', repo] for repo in enabled_repos] repos_opt = list(itertools.chain(*repos_opt)) cmd = ['dnf', 'install', '-y'] - if _the_nogpgcheck_option_used(): + if the_nogpgcheck_option_used(): cmd.append('--nogpgcheck') cmd += [ '--setopt=module_platform_id=platform:el{}'.format(target_major_version), diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py new file mode 100644 index 0000000000..46e8f9ec84 --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py @@ -0,0 +1,21 @@ +from leapp.actors import Actor +from leapp.libraries.actor import trustedgpgkeys +from leapp.models import InstalledRPM, TrustedGpgKeys +from leapp.tags import FactsPhaseTag, IPUWorkflowTag + + +class TrustedGpgKeysScanner(Actor): + """ + Scan for trusted GPG keys. + + These include keys readily available in the source RPM DB, keys for N+1 + Red Hat release and custom keys stored in the trusted directory. + """ + + name = 'trusted_gpg_keys_scanner' + consumes = (InstalledRPM,) + produces = (TrustedGpgKeys,) + tags = (IPUWorkflowTag, FactsPhaseTag) + + def process(self): + trustedgpgkeys.process() diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py new file mode 100644 index 0000000000..ee1e248371 --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py @@ -0,0 +1,41 @@ +import os + +from leapp.exceptions import StopActorExecutionError +from leapp.libraries.common.gpg import get_path_to_gpg_certs, get_pubkeys_from_rpms, read_gpg_fp_from_file +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, TrustedGpgKeys + + +def _get_pubkeys(installed_rpms): + """ + Get pubkeys from installed rpms and the trusted directory + """ + pubkeys = get_pubkeys_from_rpms(installed_rpms) + db_pubkeys = [key.fingerprint for key in pubkeys] + certs_path = get_path_to_gpg_certs() + for certname in os.listdir(certs_path): + key_file = os.path.join(certs_path, certname) + fps = read_gpg_fp_from_file(key_file) + for fp in fps: + if fp not in db_pubkeys: + pubkeys += GpgKey(fingerprint=fp, rpmdb=False, filename=key_file) + db_pubkeys += fp + # TODO: what about else: ? + # The warning is now logged in read_gpg_fp_from_file. We can raise + # the priority of the message or convert it to report though. + return pubkeys + + +def process(): + """ + Process keys in RPM DB and the ones in trusted directory to produce a list of trusted keys + """ + + try: + installed_rpms = next(api.consume(InstalledRPM)) + except StopIteration: + raise StopActorExecutionError( + 'Could not check for valid GPG keys', details={'details': 'No InstalledRPM facts'} + ) + pubkeys = _get_pubkeys(installed_rpms) + api.produce(TrustedGpgKeys(items=pubkeys)) diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py new file mode 100644 index 0000000000..32cba52222 --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py @@ -0,0 +1,74 @@ +from leapp import reporting +from leapp.libraries.actor.trustedgpgkeys import process +from leapp.libraries.common.gpg import get_pubkeys_from_rpms +from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked, produce_mocked +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, RPM, TrustedGpgKeys + +# @pytest.mark.parametrize('target, product_type, exp', [ +# ('8.6', 'beta', ['F21541EB']), +# ('8.8', 'htb', ['FD431D51', 'D4082792']), # ga +# ('9.0', 'beta', ['F21541EB']), +# ('9.2', 'ga', ['FD431D51', '5A6340B3']), +# ]) +# Def test_get_pubkeys(current_actor_context, monkeypatch, target, product_type, exp): +# current_actor = CurrentActorMocked(dst_ver=target, +# envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) +# monkeypatch.setattr(api, 'current_actor', current_actor) +# installed_rpm = _get_test_installed_rmps() +# +# p = _get_pubkeys(installed_rpm) +# assert '9570ff31' in p +# for x in exp: +# assert x in p + + +def _get_test_installed_rmps(): + return InstalledRPM( + items=[ + RPM(name='gpg-pubkey', + version='9570ff31', + release='5e3006fb', + epoch='0', + packager='Fedora (33) ', + arch='noarch', + pgpsig=''), + RPM(name='rpm', + version='4.17.1', + release='3.fc35', + epoch='0', + packager='Fedora Project', + arch='x86_64', + pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f'), + ], + ) + + +def test_pubkeys_from_rpms(): + installed_rpm = _get_test_installed_rmps() + assert get_pubkeys_from_rpms(installed_rpm) == [GpgKey(fingerprint='9570ff31', rpmdb=True)] + + +def _get_pubkeys_mocked(installed_rpms): + """ + This skips getting fps from files in container for simplification + """ + return get_pubkeys_from_rpms(installed_rpms) + + +def test_process(monkeypatch): + """ + Executes the "main" function + """ + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( + msgs=[_get_test_installed_rmps()]) + ) + monkeypatch.setattr(api, 'produce', produce_mocked()) + monkeypatch.setattr(api, 'current_logger', logger_mocked()) + monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) + monkeypatch.setattr('leapp.libraries.actor.trustedgpgkeys._get_pubkeys', _get_pubkeys_mocked) + + process() + assert api.produce.called == 1 + assert isinstance(api.produce.model_instances[0], TrustedGpgKeys) + assert reporting.create_report.called == 0 diff --git a/repos/system_upgrade/common/libraries/gpg.py b/repos/system_upgrade/common/libraries/gpg.py new file mode 100644 index 0000000000..cdf43bbc21 --- /dev/null +++ b/repos/system_upgrade/common/libraries/gpg.py @@ -0,0 +1,112 @@ +import os + +from leapp.libraries.common import config +from leapp.libraries.common.config.version import get_source_major_version, get_target_major_version +from leapp.libraries.stdlib import api, run +from leapp.models import GpgKey + +GPG_CERTS_FOLDER = 'rpm-gpg' + + +def get_pubkeys_from_rpms(installed_rpms): + """ + Return the list of fingerprints of GPG keys in RPM DB + + This function returns short 8 characters fingerprints of trusted GPG keys + "installed" in the source OS RPM database. These look like normal packages + named "gpg-pubkey" and the fingerprint is present in the version field. + """ + return [GpgKey(fingerprint=pkg.version, rpmdb=True) for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey'] + + +def _gpg_show_keys(key_path): + """ + Show keys in given file in version-agnostic manner + + This runs gpg --show-keys (EL8) or gpg --with-fingerprints (EL7) + to verify the given file exists, is readable and contains valid + OpenPGP key data, which is printed in parsable format (--with-colons). + """ + try: + cmd = ['gpg2'] + # RHEL7 gnupg requires different switches to get the same output + if get_source_major_version() == '7': + cmd.append('--with-fingerprint') + else: + cmd.append('--show-keys') + cmd += ['--with-colons', key_path] + # TODO: discussed, most likely the checked=False will be dropped + # and error will be handled in other functions + return run(cmd, split=True, checked=False) + except OSError as err: + # NOTE: this is hypothetic; gnupg2 has to be installed on RHEL 7+ + error = 'Failed to read fingerprint from GPG key {}: {}'.format(key_path, str(err)) + api.current_logger().error(error) + return {} + + +def _parse_fp_from_gpg(output): + """ + Parse the output of gpg --show-keys --with-colons. + + Return list of 8 characters fingerprints per each gpgkey for the given + output from stdlib.run() or None if some error occurred. Either the + command return non-zero exit code, the file does not exists, its not + readable or does not contain any openpgp data. + """ + if not output or output['exit_code']: + return [] + + # we are interested in the lines of the output starting with "pub:" + # the colons are used for separating the fields in output like this + # pub:-:4096:1:999F7CBF38AB71F4:1612983048:::-:::escESC::::::23::0: + # ^--------------^ this is the fingerprint we need + # ^------^ but RPM version is just the last 8 chars lowercase + # Also multiple gpg keys can be stored in the file, so go through all "pub" + # lines + gpg_fps = [] + for line in output['stdout']: + if not line or not line.startswith('pub:'): + continue + parts = line.split(':') + if len(parts) >= 4 and len(parts[4]) == 16: + gpg_fps.append(parts[4][8:].lower()) + else: + api.current_logger().warning( + 'Cannot parse the gpg2 output. Line: "{}"' + .format(line) + ) + + return gpg_fps + + +def read_gpg_fp_from_file(key_path): + """ + Returns the list of public key fingerprints from the given file + + Logs warning in case no OpenPGP data found in the given file or it is not + readable for some reason. + """ + res = _gpg_show_keys(key_path) + fp = _parse_fp_from_gpg(res) + if not fp: + error = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr']) + api.current_logger().error(error) + return fp + + +def get_path_to_gpg_certs(): + """ + Get path to the directory with trusted target gpg keys in leapp tree + """ + target_major_version = get_target_major_version() + target_product_type = config.get_product_type('target') + certs_dir = target_major_version + # only beta is special in regards to the GPG signing keys + if target_product_type == 'beta': + certs_dir = '{}beta'.format(target_major_version) + return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) + + +def the_nogpgcheck_option_used(): + return config.get_env('LEAPP_NOGPGCHECK', False) == '1' diff --git a/repos/system_upgrade/common/libraries/tests/test_gpg.py b/repos/system_upgrade/common/libraries/tests/test_gpg.py new file mode 100644 index 0000000000..c84d3fd3db --- /dev/null +++ b/repos/system_upgrade/common/libraries/tests/test_gpg.py @@ -0,0 +1,123 @@ +import os +import shutil +import tempfile + +import distro +import pytest + +from leapp.libraries.common.gpg import _gpg_show_keys, _parse_fp_from_gpg, get_path_to_gpg_certs +from leapp.libraries.common.testutils import CurrentActorMocked +from leapp.libraries.stdlib import api + + +@pytest.mark.parametrize('target, product_type, exp', [ + ('8.6', 'beta', '../../files/rpm-gpg/8beta'), + ('8.8', 'htb', '../../files/rpm-gpg/8'), + ('9.0', 'beta', '../../files/rpm-gpg/9beta'), + ('9.2', 'ga', '../../files/rpm-gpg/9'), +]) +def test_get_path_to_gpg_certs(monkeypatch, target, product_type, exp): + current_actor = CurrentActorMocked(dst_ver=target, + envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) + monkeypatch.setattr(api, 'current_actor', current_actor) + + p = get_path_to_gpg_certs() + assert p == exp + + +def is_rhel7(): + return int(distro.major_version()) < 8 + + +def test_gpg_show_keys(loaded_leapp_repository, monkeypatch): + src = '7.9' if is_rhel7() else '8.6' + current_actor = CurrentActorMocked(src_ver=src) + monkeypatch.setattr(api, 'current_actor', current_actor) + + # python2 compatibility :/ + dirpath = tempfile.mkdtemp() + + # using GNUPGHOME env should avoid gnupg modifying the system + os.environ['GNUPGHOME'] = dirpath + + try: + # non-existing file + non_existent_path = os.path.join(dirpath, 'nonexistent') + res = _gpg_show_keys(non_existent_path) + if is_rhel7(): + err_msg = "gpg: can't open `{}'".format(non_existent_path) + else: + err_msg = "gpg: can't open '{}': No such file or directory\n".format(non_existent_path) + assert not res['stdout'] + assert err_msg in res['stderr'] + assert res['exit_code'] == 2 + + fp = _parse_fp_from_gpg(res) + assert fp == [] + + # no gpg data found + no_key_path = os.path.join(dirpath, "no_key") + with open(no_key_path, "w") as f: + f.write('test') + + res = _gpg_show_keys(no_key_path) + if is_rhel7(): + err_msg = ('gpg: no valid OpenPGP data found.\n' + 'gpg: processing message failed: Unknown system error\n') + else: + err_msg = 'gpg: no valid OpenPGP data found.\n' + assert not res['stdout'] + assert res['stderr'] == err_msg + assert res['exit_code'] == 2 + + fp = _parse_fp_from_gpg(res) + assert fp == [] + + # with some test data now -- rhel9 release key + # rhel9_key_path = os.path.join(api.get_common_folder_path('rpm-gpg'), '9') + cur_dir = os.path.dirname(os.path.abspath(__file__)) + rhel9_key_path = os.path.join(cur_dir, '..', '..', 'files', 'rpm-gpg', '9', + 'RPM-GPG-KEY-redhat-release') + res = _gpg_show_keys(rhel9_key_path) + finally: + shutil.rmtree(dirpath) + + if is_rhel7(): + assert len(res['stdout']) == 4 + assert res['stdout'][0] == ('pub:-:4096:1:199E2F91FD431D51:1256212795:::-:' + 'Red Hat, Inc. (release key 2) :') + assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' + assert res['stdout'][2] == ('pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:' + 'Red Hat, Inc. (auxiliary key 3) :') + assert res['stdout'][3] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' + else: + assert len(res['stdout']) == 6 + assert res['stdout'][0] == 'pub:-:4096:1:199E2F91FD431D51:1256212795:::-:::scSC::::::23::0:' + assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' + assert res['stdout'][2] == ('uid:-::::1256212795::DC1CAEC7997B3575101BB0FCAAC6191792660D8F::' + 'Red Hat, Inc. (release key 2) ::::::::::0:') + assert res['stdout'][3] == 'pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:::scSC::::::23::0:' + assert res['stdout'][4] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' + assert res['stdout'][5] == ('uid:-::::1646863006::DA7F68E3872D6E7BDCE05225E7EB5F3ACDD9699F::' + 'Red Hat, Inc. (auxiliary key 3) ::::::::::0:') + + err = '{}/trustdb.gpg: trustdb created'.format(dirpath) + assert err in res['stderr'] + assert res['exit_code'] == 0 + + # now, parse the output too + fp = _parse_fp_from_gpg(res) + assert fp == ['fd431d51', '5a6340b3'] + + +@pytest.mark.parametrize('res, exp', [ + ({'exit_code': 2, 'stdout': '', 'stderr': ''}, []), + ({'exit_code': 2, 'stdout': '', 'stderr': 'bash: gpg2: command not found...'}, []), + ({'exit_code': 0, 'stdout': 'Some other output', 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['Some other output', 'other line'], 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['pub:-:4096:1:199E2F91FD431D:'], 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['pub:-:4096:1:5054E4A45A6340B3:1..'], 'stderr': ''}, ['5a6340b3']), +]) +def test_parse_fp_from_gpg(res, exp): + fp = _parse_fp_from_gpg(res) + assert fp == exp diff --git a/repos/system_upgrade/common/models/trustedgpgkeys.py b/repos/system_upgrade/common/models/trustedgpgkeys.py new file mode 100644 index 0000000000..c397bea748 --- /dev/null +++ b/repos/system_upgrade/common/models/trustedgpgkeys.py @@ -0,0 +1,19 @@ +from leapp.models import fields, Model +from leapp.topics import SystemFactsTopic + + +class GpgKey(Model): + """ + GPG Public key + + It is represented by a record in the RPM DB or by a file in directory with trusted keys (or both). + """ + topic = SystemFactsTopic + fingerprint = fields.String() + rpmdb = fields.Boolean() + filename = fields.Nullable(fields.String()) + + +class TrustedGpgKeys(Model): + topic = SystemFactsTopic + items = fields.List(fields.Model(GpgKey), default=[])