diff --git a/Makefile b/Makefile index e3c40e01ea..b504a854ef 100644 --- a/Makefile +++ b/Makefile @@ -155,7 +155,7 @@ help: @echo " PR=7 SUFFIX='my_additional_suffix' make " @echo " MR=6 COPR_CONFIG='path/to/the/config/copr/file' make " @echo " ACTOR= TEST_LIBS=y make test" - @echo " BUILD_CONTAINER=el7 make build_container" + @echo " BUILD_CONTAINER=rhel7 make build_container" @echo " TEST_CONTAINER=f34 make test_container" @echo " CONTAINER_TOOL=docker TEST_CONTAINER=rhel7 make test_container_no_lint" @echo "" diff --git a/repos/system_upgrade/common/actors/gpgpubkeycheck/actor.py b/repos/system_upgrade/common/actors/gpgpubkeycheck/actor.py new file mode 100644 index 0000000000..3d11de381b --- /dev/null +++ b/repos/system_upgrade/common/actors/gpgpubkeycheck/actor.py @@ -0,0 +1,23 @@ +from leapp.actors import Actor +from leapp.libraries.actor import gpgpubkeycheck +from leapp.models import TrustedGpgKeys +from leapp.reporting import Report +from leapp.tags import ApplicationsPhaseTag, IPUWorkflowTag + + +class GpgPubkeyCheck(Actor): + """ + Checks no unexpected GPG keys were installed during the upgrade. + + This should be mostly sanity check and this should not happen + unless something went very wrong, regardless the gpgcheck was + used (default) or not (with --no-gpgcheck option). + """ + + name = 'gpg_pubkey_check' + consumes = (TrustedGpgKeys,) + produces = (Report,) + tags = (IPUWorkflowTag, ApplicationsPhaseTag,) + + def process(self): + gpgpubkeycheck.process() diff --git a/repos/system_upgrade/common/actors/gpgpubkeycheck/libraries/gpgpubkeycheck.py b/repos/system_upgrade/common/actors/gpgpubkeycheck/libraries/gpgpubkeycheck.py new file mode 100644 index 0000000000..387c6cefb8 --- /dev/null +++ b/repos/system_upgrade/common/actors/gpgpubkeycheck/libraries/gpgpubkeycheck.py @@ -0,0 +1,124 @@ +from leapp import reporting +from leapp.libraries.common.gpg import is_nogpgcheck_set +from leapp.libraries.common.rpms import get_installed_rpms +from leapp.libraries.stdlib import api +from leapp.models import TrustedGpgKeys + +FMT_LIST_SEPARATOR = '\n - ' + + +def _get_installed_fps_tuple(): + """ + Return list of tuples (fingerprint, packager). + """ + installed_fps_tuple = [] + rpms = get_installed_rpms() + for rpm in rpms: + rpm = rpm.strip() + if not rpm: + continue + try: + # NOTE: pgpsig is (none) for 'gpg-pubkey' entries + name, version, dummy_release, dummy_epoch, packager, dummy_arch, dummy_pgpsig = rpm.split('|') + except ValueError as e: + # NOTE: it's seatbelt, but if it happens, seeing loong list of errors + # will let us know earlier that we missed something really + api.current_logger().error('Cannot perform the check of installed GPG keys after the upgrade.') + api.current_logger().error('Cannot parse rpm output: {}'.format(e)) + continue + if name != 'gpg-pubkey': + continue + installed_fps_tuple.append((version, packager)) + return installed_fps_tuple + + +def _report_cannot_check_keys(installed_fps): + # NOTE: in this case, it's expected there will be always some GPG keys present + summary = ( + 'Cannot perform the check of GPG keys installed in the RPM DB' + ' due to missing facts (TrustedGpgKeys) supposed to be generated' + ' in the start of the upgrade process on the original system.' + ' Unexpected unexpected installed GPG keys could be e.g. a mark of' + ' a malicious attempt to hijack the upgrade process.' + ' The list of all GPG keys in RPM DB:{sep}{key_list}' + .format( + sep=FMT_LIST_SEPARATOR, + key_list=FMT_LIST_SEPARATOR.join(installed_fps) + ) + ) + hint = ( + 'Verify the installed GPG keys are expected.' + ) + groups = [ + reporting.Groups.POST, + reporting.Groups.REPOSITORY, + reporting.Groups.SECURITY + ] + reporting.create_report([ + reporting.Title('Cannot perform the check of installed GPG keys after the upgrade.'), + reporting.Summary(summary), + reporting.Severity(reporting.Severity.HIGH), + reporting.Groups(groups), + reporting.Remediation(hint=hint), + ]) + + +def _report_unexpected_keys(unexpected_fps): + summary = ( + 'The system contains unexpected GPG keys after upgrade.' + ' This can be caused e.g. by a manual intervention' + ' or by malicious attempt to hijack the upgrade process.' + ' The unexpected keys are the following:' + ' {sep}{key_list}' + .format( + sep=FMT_LIST_SEPARATOR, + key_list=FMT_LIST_SEPARATOR.join(unexpected_fps) + ) + ) + hint = ( + 'Verify the installed GPG keys are expected.' + ) + groups = [ + reporting.Groups.POST, + reporting.Groups.REPOSITORY, + reporting.Groups.SECURITY + ] + reporting.create_report([ + reporting.Title('Detected unexpected GPG keys after the upgrade.'), + reporting.Summary(summary), + reporting.Severity(reporting.Severity.HIGH), + reporting.Groups(groups), + reporting.Remediation(hint=hint), + ]) + + +def process(): + """ + Verify the system does not have any unexpected gpg keys installed + + If the --no-gpgcheck option is used, this is skipped as we can not + guarantee that what was installed came from trusted source + """ + + if is_nogpgcheck_set(): + api.current_logger().warning('The --nogpgcheck option is used: Skipping the check of installed GPG keys.') + return + + installed_fps_tuple = _get_installed_fps_tuple() + + try: + trusted_gpg_keys = next(api.consume(TrustedGpgKeys)) + except StopIteration: + # unexpected (bug) situation; keeping as seatbelt for the security aspect + installed_fps = ['{fp}: {packager}'.format(fp=fp, packager=packager) for fp, packager in installed_fps_tuple] + _report_cannot_check_keys(installed_fps) + return + + trusted_fps = [key.fingerprint for key in trusted_gpg_keys.items] + unexpected_fps = [] + for fp, packager in installed_fps_tuple: + if fp not in trusted_fps: + unexpected_fps.append('{fp}: {packager}'.format(fp=fp, packager=packager)) + + if unexpected_fps: + _report_unexpected_keys(unexpected_fps) diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py index 6f836a5b66..faa96452da 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/actor.py @@ -2,9 +2,9 @@ from leapp.libraries.actor import missinggpgkey from leapp.models import ( DNFWorkaround, - InstalledRPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories ) from leapp.reporting import Report @@ -28,7 +28,7 @@ class MissingGpgKeysInhibitor(Actor): name = 'missing_gpg_keys_inhibitor' consumes = ( - InstalledRPM, + TrustedGpgKeys, TMPTargetRepositoriesFacts, TargetUserSpaceInfo, UsedTargetRepositories, diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py index 1880986dcf..9a806ca297 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/libraries/missinggpgkey.py @@ -8,113 +8,21 @@ from leapp import reporting from leapp.exceptions import StopActorExecutionError -from leapp.libraries.common import config -from leapp.libraries.common.config.version import get_source_major_version, get_target_major_version -from leapp.libraries.stdlib import api, run +from leapp.libraries.common.config.version import get_target_major_version +from leapp.libraries.common.gpg import get_gpg_fp_from_file, get_path_to_gpg_certs, is_nogpgcheck_set +from leapp.libraries.stdlib import api from leapp.models import ( DNFWorkaround, - InstalledRPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories ) from leapp.utils.deprecation import suppress_deprecation -GPG_CERTS_FOLDER = 'rpm-gpg' FMT_LIST_SEPARATOR = '\n - ' -def _gpg_show_keys(key_path): - """ - Show keys in given file in version-agnostic manner - - This runs gpg --show-keys (EL8) or gpg --with-fingerprints (EL7) - to verify the given file exists, is readable and contains valid - OpenPGP key data, which is printed in parsable format (--with-colons). - """ - try: - cmd = ['gpg2'] - # RHEL7 gnupg requires different switches to get the same output - if get_source_major_version() == '7': - cmd.append('--with-fingerprint') - else: - cmd.append('--show-keys') - cmd += ['--with-colons', key_path] - # TODO: discussed, most likely the checked=False will be dropped - # and error will be handled in other functions - return run(cmd, split=True, checked=False) - except OSError as err: - # NOTE: this is hypothetic; gnupg2 has to be installed on RHEL 7+ - error = 'Failed to read fingerprint from GPG key {}: {}'.format(key_path, str(err)) - api.current_logger().error(error) - return {} - - -def _parse_fp_from_gpg(output): - """ - Parse the output of gpg --show-keys --with-colons. - - Return list of 8 characters fingerprints per each gpgkey for the given - output from stdlib.run() or None if some error occurred. Either the - command return non-zero exit code, the file does not exists, its not - readable or does not contain any openpgp data. - """ - if not output or output['exit_code']: - return [] - - # we are interested in the lines of the output starting with "pub:" - # the colons are used for separating the fields in output like this - # pub:-:4096:1:999F7CBF38AB71F4:1612983048:::-:::escESC::::::23::0: - # ^--------------^ this is the fingerprint we need - # ^------^ but RPM version is just the last 8 chars lowercase - # Also multiple gpg keys can be stored in the file, so go through all "pub" - # lines - gpg_fps = [] - for line in output['stdout']: - if not line or not line.startswith('pub:'): - continue - parts = line.split(':') - if len(parts) >= 4 and len(parts[4]) == 16: - gpg_fps.append(parts[4][8:].lower()) - else: - api.current_logger().warning( - 'Cannot parse the gpg2 output. Line: "{}"' - .format(line) - ) - - return gpg_fps - - -def _read_gpg_fp_from_file(key_path): - """ - Returns the list of public key fingerprints from the given file - - Logs warning in case no OpenPGP data found in the given file or it is not - readable for some reason. - """ - res = _gpg_show_keys(key_path) - fp = _parse_fp_from_gpg(res) - if not fp: - error = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr']) - api.current_logger().error(error) - return fp - - -def _get_path_to_gpg_certs(): - """ - Get path to the directory with trusted target gpg keys in leapp tree - """ - # XXX This is copy&paste from TargetUserspaceCreator actor. - # Potential changes need to happen in both places to keep them in sync. - target_major_version = get_target_major_version() - target_product_type = config.get_product_type('target') - certs_dir = target_major_version - # only beta is special in regards to the GPG signing keys - if target_product_type == 'beta': - certs_dir = '{}beta'.format(target_major_version) - return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) - - def _expand_vars(path): """ Expand variables like $releasever and $basearch to the target system version @@ -152,38 +60,6 @@ def _get_abs_file_path(target_userspace, file_url): return os.path.join('/', file_path) -def _pubkeys_from_rpms(installed_rpms): - """ - Return the list of fingerprints of GPG keys in RPM DB - - This function returns short 8 characters fingerprints of trusted GPG keys - "installed" in the source OS RPM database. These look like normal packages - named "gpg-pubkey" and the fingerprint is present in the version field. - """ - return [pkg.version for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey'] - - -def _get_pubkeys(installed_rpms): - """ - Get pubkeys from installed rpms and the trusted directory - """ - pubkeys = _pubkeys_from_rpms(installed_rpms) - certs_path = _get_path_to_gpg_certs() - for certname in os.listdir(certs_path): - key_file = os.path.join(certs_path, certname) - fps = _read_gpg_fp_from_file(key_file) - if fps: - pubkeys += fps - # TODO: what about else: ? - # The warning is now logged in _read_gpg_fp_from_file. We can raise - # the priority of the message or convert it to report though. - return pubkeys - - -def _the_nogpgcheck_option_used(): - return config.get_env('LEAPP_NOGPGCHECK', False) == '1' - - def _consume_data(): try: used_target_repos = next(api.consume(UsedTargetRepositories)).repos @@ -199,10 +75,10 @@ def _consume_data(): 'Could not check for valid GPG keys', details={'details': 'No TMPTargetRepositoriesFacts facts'} ) try: - installed_rpms = next(api.consume(InstalledRPM)) + trusted_gpg_keys = next(api.consume(TrustedGpgKeys)) except StopIteration: raise StopActorExecutionError( - 'Could not check for valid GPG keys', details={'details': 'No InstalledRPM facts'} + 'Could not check for valid GPG keys', details={'details': 'No TrustedGpgKeys facts'} ) try: target_userspace = next(api.consume(TargetUserSpaceInfo)) @@ -211,7 +87,7 @@ def _consume_data(): 'Could not check for valid GPG keys', details={'details': 'No TargetUserSpaceInfo facts'} ) - return used_target_repos, target_repos, installed_rpms, target_userspace + return used_target_repos, target_repos, trusted_gpg_keys, target_userspace def _get_repo_gpgkey_urls(repo): @@ -274,7 +150,7 @@ def _report(title, summary, keys, inhibitor=False): ' prior the upgrade.' ' If you want to proceed the in-place upgrade without checking any RPM' ' signatures, execute leapp with the `--nogpgcheck` option.' - .format(_get_path_to_gpg_certs()) + .format(get_path_to_gpg_certs()) ) groups = [reporting.Groups.REPOSITORY] if inhibitor: @@ -306,7 +182,7 @@ def _report_missing_keys(keys): summary = ( 'Some of the target repositories require GPG keys that are not installed' ' in the current RPM DB or are not stored in the {trust_dir} directory.' - .format(trust_dir=_get_path_to_gpg_certs()) + .format(trust_dir=get_path_to_gpg_certs()) ) _report('Detected unknown GPG keys for target system repositories', summary, keys, True) @@ -383,7 +259,7 @@ def register_dnfworkaround(): api.produce(DNFWorkaround( display_name='import trusted gpg keys to RPM DB', script_path=api.current_actor().get_common_tool_path('importrpmgpgkeys'), - script_args=[_get_path_to_gpg_certs()], + script_args=[get_path_to_gpg_certs()], )) @@ -396,11 +272,11 @@ def process(): them from model TMPTargetRepositoriesFacts. """ # when the user decided to ignore gpg signatures on the packages, we can ignore these checks altogether - if _the_nogpgcheck_option_used(): + if is_nogpgcheck_set(): api.current_logger().warning('The --nogpgcheck option is used: skipping all related checks.') return - used_target_repos, target_repos, installed_rpms, target_userspace = _consume_data() + used_target_repos, target_repos, trusted_gpg_keys, target_userspace = _consume_data() target_repo_id_to_repositories_facts_map = { repo.repoid: repo @@ -415,8 +291,7 @@ def process(): invalid_keys = list() repos_missing_keys = list() - # These are used only for getting the installed gpg-pubkey "packages" - pubkeys = _get_pubkeys(installed_rpms) + pubkeys = [key.fingerprint for key in trusted_gpg_keys.items] processed_gpgkey_urls = set() tmpdir = None for repoid in used_target_repos: @@ -454,7 +329,7 @@ def process(): api.current_logger().error( 'Skipping unknown protocol for gpgkey {}'.format(gpgkey_url)) continue - fps = _read_gpg_fp_from_file(key_file) + fps = get_gpg_fp_from_file(key_file) if not fps: invalid_keys.append(gpgkey_url) api.current_logger().warning( diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py index 7da13cec9c..6d3fa0b24a 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/component_test_missinggpgkey.py @@ -3,12 +3,13 @@ from leapp import reporting from leapp.exceptions import StopActorExecutionError -from leapp.libraries.actor.missinggpgkey import _pubkeys_from_rpms, process +from leapp.libraries.actor.missinggpgkey import process +from leapp.libraries.common.gpg import get_pubkeys_from_rpms from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked, produce_mocked from leapp.libraries.stdlib import api from leapp.models import ( DNFWorkaround, - InstalledRPM, + GpgKey, Report, RepositoriesFacts, RepositoryData, @@ -16,6 +17,7 @@ RPM, TargetUserSpaceInfo, TMPTargetRepositoriesFacts, + TrustedGpgKeys, UsedTargetRepositories, UsedTargetRepository ) @@ -26,59 +28,21 @@ # whole process as I was initially advised not to use these component tests. -def _get_test_installedrpm_no_my_key(): +def _get_test_gpgkeys_missing(): """ - Valid RPM packages missing the key we are looking for (epel9) + Return list of Trusted GPG keys without the epel9 key we look for """ return [ - RPM( - name='rpm', - version='4.16.1.3', - release='17.el9', - epoch='0', - packager='Red Hat, Inc. ', - arch='x86_64', - pgpsig='RSA/SHA256, Mon 08 Aug 2022 09:10:15 AM UTC, Key ID 199e2f91fd431d51', - repository='BaseOS', - ), - RPM( - name='gpg-pubkey', - version='fd431d51', - release='4ae0493b', - epoch='0', - packager='Red Hat, Inc. (release key 2) ', - arch='noarch', - pgpsig='' - ), - RPM( - name='gpg-pubkey', - version='5a6340b3', - release='6229229e', - epoch='0', - packager='Red Hat, Inc. (auxiliary key 3) ', - arch='noarch', - pgpsig='' - ), + GpgKey(fingerprint='fd431d51', rpmdb=True), + GpgKey(fingerprint='5a6340b3', rpmdb=True), ] -def _get_test_installedrpm(): +def _get_test_gpgkeys(): """ - All test RPMS packages + Return all the Trusted GPG keys for a test """ - return InstalledRPM( - items=[ - RPM( - name='gpg-pubkey', - version='3228467c', - release='613798eb', - epoch='0', - packager='Fedora (epel9) ', - arch='noarch', - pgpsig='' - ), - ] + _get_test_installedrpm_no_my_key(), - ) + return TrustedGpgKeys(items=[GpgKey(fingerprint='3228467c', rpmdb=True)] + _get_test_gpgkeys_missing()) def _get_test_targuserspaceinfo(path='/'): @@ -189,7 +153,7 @@ def test_perform_nogpgcheck(monkeypatch): monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( envars={'LEAPP_NOGPGCHECK': '1'}, msgs=[ - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ], @@ -206,13 +170,13 @@ def test_perform_nogpgcheck(monkeypatch): @pytest.mark.parametrize('msgs', [ [], - [_get_test_installedrpm], + [_get_test_gpgkeys], [_get_test_usedtargetrepositories], [_get_test_tmptargetrepositoriesfacts], # These are just incomplete lists of required facts - [_get_test_installedrpm(), _get_test_usedtargetrepositories()], + [_get_test_gpgkeys(), _get_test_usedtargetrepositories()], [_get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts()], - [_get_test_installedrpm(), _get_test_tmptargetrepositoriesfacts()], + [_get_test_gpgkeys(), _get_test_tmptargetrepositoriesfacts()], ]) def test_perform_missing_facts(monkeypatch, msgs): """ @@ -238,7 +202,7 @@ def test_perform_missing_facts(monkeypatch, msgs): @suppress_deprecation(TMPTargetRepositoriesFacts) def _get_test_tmptargetrepositoriesfacts_partial(): return [ - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), TMPTargetRepositoriesFacts( repositories=[ @@ -298,7 +262,7 @@ def _get_pubkeys_mocked(installed_rpms): """ This skips getting fps from files in container for simplification """ - return _pubkeys_from_rpms(installed_rpms) + return get_pubkeys_from_rpms(installed_rpms) def test_perform_missing_some_repo_facts(monkeypatch): @@ -314,7 +278,7 @@ def test_perform_missing_some_repo_facts(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) with pytest.raises(StopActorExecutionError): process() @@ -326,7 +290,7 @@ def test_perform_missing_some_repo_facts(monkeypatch): def _get_test_tmptargetrepositoriesfacts_https_unused(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), TMPTargetRepositoriesFacts( repositories=[ @@ -362,8 +326,7 @@ def test_perform_https_gpgkey_unused(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert not api.current_logger.warnmsg @@ -376,7 +339,7 @@ def test_perform_https_gpgkey_unused(monkeypatch): def get_test_tmptargetrepositoriesfacts_https(): return ( _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -409,7 +372,7 @@ def get_test_tmptargetrepositoriesfacts_https(): def get_test_tmptargetrepositoriesfacts_ftp(): return ( _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -454,8 +417,7 @@ def test_perform_https_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) monkeypatch.setattr('six.moves.urllib.request.urlretrieve', _urlretrive_mocked) process() @@ -482,8 +444,7 @@ def test_perform_https_gpgkey_urlerror(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) monkeypatch.setattr('six.moves.urllib.request.urlretrieve', _urlretrive_mocked_urlerror) process() @@ -508,8 +469,7 @@ def test_perform_ftp_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert len(api.current_logger.errmsg) == 1 @@ -525,7 +485,7 @@ def test_perform_ftp_gpgkey(monkeypatch): def get_test_data_missing_key(): return [ _get_test_targuserspaceinfo(), - InstalledRPM(items=_get_test_installedrpm_no_my_key()), + TrustedGpgKeys(items=_get_test_gpgkeys_missing()), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ] @@ -543,8 +503,7 @@ def test_perform_report(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert not api.current_logger.warnmsg @@ -559,7 +518,7 @@ def test_perform_report(monkeypatch): def get_test_data_no_gpg_data(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), _get_test_usedtargetrepositories(), _get_test_tmptargetrepositoriesfacts(), ] @@ -593,12 +552,11 @@ def test_perform_invalid_key(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked_my_empty) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked_my_empty) process() - assert len(api.current_logger.warnmsg) == 1 - assert 'Cannot get any gpg key from the file' in api.current_logger.warnmsg[0] + assert len(api.current_logger.warnmsg) == 2, api.current_logger.warnmsg + assert 'Cannot get any gpg key from the file' in api.current_logger.warnmsg[1] assert api.produce.called == 1 assert isinstance(api.produce.model_instances[0], DNFWorkaround) assert reporting.create_report.called == 1 @@ -610,7 +568,7 @@ def test_perform_invalid_key(monkeypatch): def get_test_data_gpgcheck_without_gpgkey(): return [ _get_test_targuserspaceinfo(), - _get_test_installedrpm(), + _get_test_gpgkeys(), UsedTargetRepositories( repos=_get_test_usedtargetrepositories_list() + [ UsedTargetRepository( @@ -651,8 +609,7 @@ def test_perform_gpgcheck_without_gpgkey(monkeypatch): monkeypatch.setattr(api, 'produce', produce_mocked()) monkeypatch.setattr(api, 'current_logger', logger_mocked()) monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._gpg_show_keys', _gpg_show_keys_mocked) - monkeypatch.setattr('leapp.libraries.actor.missinggpgkey._get_pubkeys', _get_pubkeys_mocked) + monkeypatch.setattr('leapp.libraries.common.gpg._gpg_show_keys', _gpg_show_keys_mocked) process() assert len(api.current_logger.warnmsg) == 1 diff --git a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py index 68e4cdfe08..8cd0053174 100644 --- a/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py +++ b/repos/system_upgrade/common/actors/missinggpgkeysinhibitor/tests/unit_test_missinggpgkey.py @@ -6,134 +6,12 @@ import distro import pytest -from leapp.libraries.actor.missinggpgkey import ( - _expand_vars, - _get_abs_file_path, - _get_path_to_gpg_certs, - _get_pubkeys, - _get_repo_gpgkey_urls, - _gpg_show_keys, - _parse_fp_from_gpg, - _pubkeys_from_rpms -) +from leapp.libraries.actor.missinggpgkey import _expand_vars, _get_abs_file_path, _get_repo_gpgkey_urls from leapp.libraries.common.testutils import CurrentActorMocked from leapp.libraries.stdlib import api from leapp.models import InstalledRPM, RepositoryData, RPM, TargetUserSpaceInfo -def is_rhel7(): - return int(distro.major_version()) < 8 - - -def test_gpg_show_keys(current_actor_context, monkeypatch): - src = '7.9' if is_rhel7() else '8.6' - current_actor = CurrentActorMocked(src_ver=src) - monkeypatch.setattr(api, 'current_actor', current_actor) - - # python2 compatibility :/ - dirpath = tempfile.mkdtemp() - - # using GNUPGHOME env should avoid gnupg modifying the system - os.environ['GNUPGHOME'] = dirpath - - try: - # non-existing file - non_existent_path = os.path.join(dirpath, 'nonexistent') - res = _gpg_show_keys(non_existent_path) - if is_rhel7(): - err_msg = "gpg: can't open `{}'".format(non_existent_path) - else: - err_msg = "gpg: can't open '{}': No such file or directory\n".format(non_existent_path) - assert not res['stdout'] - assert err_msg in res['stderr'] - assert res['exit_code'] == 2 - - fp = _parse_fp_from_gpg(res) - assert fp == [] - - # no gpg data found - no_key_path = os.path.join(dirpath, "no_key") - with open(no_key_path, "w") as f: - f.write('test') - - res = _gpg_show_keys(no_key_path) - if is_rhel7(): - err_msg = ('gpg: no valid OpenPGP data found.\n' - 'gpg: processing message failed: Unknown system error\n') - else: - err_msg = 'gpg: no valid OpenPGP data found.\n' - assert not res['stdout'] - assert res['stderr'] == err_msg - assert res['exit_code'] == 2 - - fp = _parse_fp_from_gpg(res) - assert fp == [] - - # with some test data now -- rhel9 release key - # rhel9_key_path = os.path.join(api.get_common_folder_path('rpm-gpg'), '9') - cur_dir = os.path.dirname(os.path.abspath(__file__)) - rhel9_key_path = os.path.join(cur_dir, '..', '..', '..', 'files', 'rpm-gpg', '9', - 'RPM-GPG-KEY-redhat-release') - res = _gpg_show_keys(rhel9_key_path) - finally: - shutil.rmtree(dirpath) - - if is_rhel7(): - assert len(res['stdout']) == 4 - assert res['stdout'][0] == ('pub:-:4096:1:199E2F91FD431D51:1256212795:::-:' - 'Red Hat, Inc. (release key 2) :') - assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' - assert res['stdout'][2] == ('pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:' - 'Red Hat, Inc. (auxiliary key 3) :') - assert res['stdout'][3] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' - else: - assert len(res['stdout']) == 6 - assert res['stdout'][0] == 'pub:-:4096:1:199E2F91FD431D51:1256212795:::-:::scSC::::::23::0:' - assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' - assert res['stdout'][2] == ('uid:-::::1256212795::DC1CAEC7997B3575101BB0FCAAC6191792660D8F::' - 'Red Hat, Inc. (release key 2) ::::::::::0:') - assert res['stdout'][3] == 'pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:::scSC::::::23::0:' - assert res['stdout'][4] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' - assert res['stdout'][5] == ('uid:-::::1646863006::DA7F68E3872D6E7BDCE05225E7EB5F3ACDD9699F::' - 'Red Hat, Inc. (auxiliary key 3) ::::::::::0:') - - err = '{}/trustdb.gpg: trustdb created'.format(dirpath) - assert err in res['stderr'] - assert res['exit_code'] == 0 - - # now, parse the output too - fp = _parse_fp_from_gpg(res) - assert fp == ['fd431d51', '5a6340b3'] - - -@pytest.mark.parametrize('res, exp', [ - ({'exit_code': 2, 'stdout': '', 'stderr': ''}, []), - ({'exit_code': 2, 'stdout': '', 'stderr': 'bash: gpg2: command not found...'}, []), - ({'exit_code': 0, 'stdout': 'Some other output', 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['Some other output', 'other line'], 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['pub:-:4096:1:199E2F91FD431D:'], 'stderr': ''}, []), - ({'exit_code': 0, 'stdout': ['pub:-:4096:1:5054E4A45A6340B3:1..'], 'stderr': ''}, ['5a6340b3']), -]) -def test_parse_fp_from_gpg(res, exp): - fp = _parse_fp_from_gpg(res) - assert fp == exp - - -@pytest.mark.parametrize('target, product_type, exp', [ - ('8.6', 'beta', '../../files/rpm-gpg/8beta'), - ('8.8', 'htb', '../../files/rpm-gpg/8'), - ('9.0', 'beta', '../../files/rpm-gpg/9beta'), - ('9.2', 'ga', '../../files/rpm-gpg/9'), -]) -def test_get_path_to_gpg_certs(current_actor_context, monkeypatch, target, product_type, exp): - current_actor = CurrentActorMocked(dst_ver=target, - envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) - monkeypatch.setattr(api, 'current_actor', current_actor) - - p = _get_path_to_gpg_certs() - assert p == exp - - @pytest.mark.parametrize('data, exp', [ ('bare string', 'bare string'), ('with dollar$$$', 'with dollar$$$'), @@ -148,50 +26,6 @@ def test_expand_vars(monkeypatch, data, exp): assert res == exp -def _get_test_installed_rmps(): - return InstalledRPM( - items=[ - RPM(name='gpg-pubkey', - version='9570ff31', - release='5e3006fb', - epoch='0', - packager='Fedora (33) ', - arch='noarch', - pgpsig=''), - RPM(name='rpm', - version='4.17.1', - release='3.fc35', - epoch='0', - packager='Fedora Project', - arch='x86_64', - pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f'), - ], - ) - - -def test_pubkeys_from_rpms(): - installed_rpm = _get_test_installed_rmps() - assert _pubkeys_from_rpms(installed_rpm) == ['9570ff31'] - - -# @pytest.mark.parametrize('target, product_type, exp', [ -# ('8.6', 'beta', ['F21541EB']), -# ('8.8', 'htb', ['FD431D51', 'D4082792']), # ga -# ('9.0', 'beta', ['F21541EB']), -# ('9.2', 'ga', ['FD431D51', '5A6340B3']), -# ]) -# Def test_get_pubkeys(current_actor_context, monkeypatch, target, product_type, exp): -# current_actor = CurrentActorMocked(dst_ver=target, -# envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) -# monkeypatch.setattr(api, 'current_actor', current_actor) -# installed_rpm = _get_test_installed_rmps() -# -# p = _get_pubkeys(installed_rpm) -# assert '9570ff31' in p -# for x in exp: -# assert x in p - - @pytest.mark.parametrize('repo, exp', [ (RepositoryData(repoid='dummy', name='name'), None), (RepositoryData(repoid='dummy', name='name', additional_fields='{}'), None), diff --git a/repos/system_upgrade/common/actors/systemd/checksystemdservicetasks/actor.py b/repos/system_upgrade/common/actors/systemd/checksystemdservicetasks/actor.py index 547a13df45..272ebc1f2e 100644 --- a/repos/system_upgrade/common/actors/systemd/checksystemdservicetasks/actor.py +++ b/repos/system_upgrade/common/actors/systemd/checksystemdservicetasks/actor.py @@ -14,7 +14,7 @@ class CheckSystemdServicesTasks(Actor): - enabled and disabled. This actor inhibits upgrade in such cases. Note: We expect that SystemdServicesTasks could be produced even after the - TargetTransactionChecksPhase (e.g. during the ApplicationPhase). The + TargetTransactionChecksPhase (e.g. during the ApplicationsPhase). The purpose of this actor is to report collisions in case we can already detect them. In case of conflicts caused by messages produced later we just log the collisions and the services will end up disabled. diff --git a/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py b/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py index e015a741b3..d605ba0e02 100644 --- a/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py +++ b/repos/system_upgrade/common/actors/targetuserspacecreator/libraries/userspacegen.py @@ -9,6 +9,7 @@ from leapp.libraries.common import dnfplugin, mounting, overlaygen, repofileutils, rhsm, utils from leapp.libraries.common.config import get_env, get_product_type from leapp.libraries.common.config.version import get_target_major_version +from leapp.libraries.common.gpg import get_path_to_gpg_certs, is_nogpgcheck_set from leapp.libraries.stdlib import api, CalledProcessError, config, run from leapp.models import RequiredTargetUserspacePackages # deprecated from leapp.models import TMPTargetRepositoriesFacts # deprecated all the time @@ -54,7 +55,6 @@ # Issue: #486 PROD_CERTS_FOLDER = 'prod-certs' -GPG_CERTS_FOLDER = 'rpm-gpg' PERSISTENT_PACKAGE_CACHE_DIR = '/var/lib/leapp/persistent_package_cache' DEDICATED_LEAPP_PART_URL = 'https://access.redhat.com/solutions/7011704' @@ -143,21 +143,8 @@ def _backup_to_persistent_package_cache(userspace_dir): shutil.move(src_cache, PERSISTENT_PACKAGE_CACHE_DIR) -def _the_nogpgcheck_option_used(): - return get_env('LEAPP_NOGPGCHECK', False) == '1' - - -def _get_path_to_gpg_certs(target_major_version): - target_product_type = get_product_type('target') - certs_dir = target_major_version - # only beta is special in regards to the GPG signing keys - if target_product_type == 'beta': - certs_dir = '{}beta'.format(target_major_version) - return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) - - def _import_gpg_keys(context, install_root_dir, target_major_version): - certs_path = _get_path_to_gpg_certs(target_major_version) + certs_path = get_path_to_gpg_certs() # Import the RHEL X+1 GPG key to be able to verify the installation of initial packages try: # Import also any other keys provided by the customer in the same directory @@ -234,13 +221,13 @@ def prepare_target_userspace(context, userspace_dir, enabled_repos, packages): install_root_dir = '/el{}target'.format(target_major_version) with mounting.BindMount(source=userspace_dir, target=os.path.join(context.base_dir, install_root_dir.lstrip('/'))): _restore_persistent_package_cache(userspace_dir) - if not _the_nogpgcheck_option_used(): + if not is_nogpgcheck_set(): _import_gpg_keys(context, install_root_dir, target_major_version) repos_opt = [['--enablerepo', repo] for repo in enabled_repos] repos_opt = list(itertools.chain(*repos_opt)) cmd = ['dnf', 'install', '-y'] - if _the_nogpgcheck_option_used(): + if is_nogpgcheck_set(): cmd.append('--nogpgcheck') cmd += [ '--setopt=module_platform_id=platform:el{}'.format(target_major_version), diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py new file mode 100644 index 0000000000..46e8f9ec84 --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/actor.py @@ -0,0 +1,21 @@ +from leapp.actors import Actor +from leapp.libraries.actor import trustedgpgkeys +from leapp.models import InstalledRPM, TrustedGpgKeys +from leapp.tags import FactsPhaseTag, IPUWorkflowTag + + +class TrustedGpgKeysScanner(Actor): + """ + Scan for trusted GPG keys. + + These include keys readily available in the source RPM DB, keys for N+1 + Red Hat release and custom keys stored in the trusted directory. + """ + + name = 'trusted_gpg_keys_scanner' + consumes = (InstalledRPM,) + produces = (TrustedGpgKeys,) + tags = (IPUWorkflowTag, FactsPhaseTag) + + def process(self): + trustedgpgkeys.process() diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py new file mode 100644 index 0000000000..6377f767db --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/libraries/trustedgpgkeys.py @@ -0,0 +1,38 @@ +import os + +from leapp.exceptions import StopActorExecutionError +from leapp.libraries.common.gpg import get_gpg_fp_from_file, get_path_to_gpg_certs, get_pubkeys_from_rpms +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, TrustedGpgKeys + + +def _get_pubkeys(installed_rpms): + """ + Get pubkeys from installed rpms and the trusted directory + """ + pubkeys = get_pubkeys_from_rpms(installed_rpms) + db_pubkeys = [key.fingerprint for key in pubkeys] + certs_path = get_path_to_gpg_certs() + for certname in os.listdir(certs_path): + key_file = os.path.join(certs_path, certname) + fps = get_gpg_fp_from_file(key_file) + for fp in fps: + if fp not in db_pubkeys: + pubkeys.append(GpgKey(fingerprint=fp, rpmdb=False, filename=key_file)) + db_pubkeys += fp + return pubkeys + + +def process(): + """ + Process keys in RPM DB and the ones in trusted directory to produce a list of trusted keys + """ + + try: + installed_rpms = next(api.consume(InstalledRPM)) + except StopIteration: + raise StopActorExecutionError( + 'Could not check for valid GPG keys', details={'details': 'No InstalledRPM facts'} + ) + pubkeys = _get_pubkeys(installed_rpms) + api.produce(TrustedGpgKeys(items=pubkeys)) diff --git a/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py new file mode 100644 index 0000000000..0d98aad73b --- /dev/null +++ b/repos/system_upgrade/common/actors/trustedgpgkeysscanner/tests/test_trustedgpgkeys.py @@ -0,0 +1,87 @@ +import os + +from leapp import reporting +from leapp.libraries.actor import trustedgpgkeys +from leapp.libraries.common.gpg import get_pubkeys_from_rpms +from leapp.libraries.common.testutils import create_report_mocked, CurrentActorMocked, logger_mocked, produce_mocked +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, RPM, TrustedGpgKeys + + +def _get_test_installed_rmps(fps): + # adding at least one rpm that is not gpg-pubkey + rpms = [RPM( + name='rpm', + version='4.17.1', + release='3.fc35', + epoch='0', + packager='Fedora Project', + arch='x86_64', + pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f' + )] + for fp in fps: + rpms.append(RPM( + name='gpg-pubkey', + version=fp, + release='5e3006fb', + epoch='0', + packager='Fedora (33) ', + arch='noarch', + pgpsig='' + )) + return InstalledRPM(items=rpms) + + +class MockedGetGpgFromFile(object): + def __init__(self, file_fps_tuples): + # e.g. file_fps_tuple = [('/mydir/myfile', ['0000ff31', '0000ff32'])] + self._data = {} + for fname, fps in file_fps_tuples: + self._data[fname] = fps + + def get_files(self): + return self._data.keys() # noqa: W1655; pylint: disable=dict-keys-not-iterating + + def __call__(self, fname): + return self._data.get(fname, []) + + +def test_get_pubkeys(monkeypatch): + """ + Very basic test of _get_pubkeys function + """ + rpm_fps = ['9570ff31', '99900000'] + file_fps = ['0000ff31', '0000ff32'] + installed_rpms = _get_test_installed_rmps(rpm_fps) + mocked_gpg_files = MockedGetGpgFromFile([('/mydir/myfile', ['0000ff31', '0000ff32'])]) + + def _mocked_listdir(dummy): + return [os.path.basename(i) for i in mocked_gpg_files.get_files()] + + monkeypatch.setattr(trustedgpgkeys.os, 'listdir', _mocked_listdir) + monkeypatch.setattr(trustedgpgkeys, 'get_path_to_gpg_certs', lambda: '/mydir/') + monkeypatch.setattr(trustedgpgkeys, 'get_gpg_fp_from_file', mocked_gpg_files) + + pubkeys = trustedgpgkeys._get_pubkeys(installed_rpms) + assert len(pubkeys) == len(rpm_fps + file_fps) + assert set(rpm_fps) == {pkey.fingerprint for pkey in pubkeys if pkey.rpmdb} + assert set(file_fps) == {pkey.fingerprint for pkey in pubkeys if not pkey.rpmdb} + assert list({pkey.filename for pkey in pubkeys if not pkey.rpmdb})[0] == '/mydir/myfile' + + +def test_process(monkeypatch): + """ + Executes the "main" function + """ + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked( + msgs=[_get_test_installed_rmps(['9570ff31'])]) + ) + monkeypatch.setattr(api, 'produce', produce_mocked()) + monkeypatch.setattr(api, 'current_logger', logger_mocked()) + monkeypatch.setattr(reporting, 'create_report', create_report_mocked()) + monkeypatch.setattr(trustedgpgkeys, '_get_pubkeys', get_pubkeys_from_rpms) + + trustedgpgkeys.process() + assert api.produce.called == 1 + assert isinstance(api.produce.model_instances[0], TrustedGpgKeys) + assert reporting.create_report.called == 0 diff --git a/repos/system_upgrade/common/libraries/dnfplugin.py b/repos/system_upgrade/common/libraries/dnfplugin.py index d3ec59018c..fbd58246f8 100644 --- a/repos/system_upgrade/common/libraries/dnfplugin.py +++ b/repos/system_upgrade/common/libraries/dnfplugin.py @@ -9,6 +9,7 @@ from leapp.libraries.common import dnfconfig, guards, mounting, overlaygen, rhsm, utils from leapp.libraries.common.config import get_env from leapp.libraries.common.config.version import get_target_major_version, get_target_version +from leapp.libraries.common.gpg import is_nogpgcheck_set from leapp.libraries.stdlib import api, CalledProcessError, config from leapp.models import DNFWorkaround @@ -77,10 +78,6 @@ def _rebuild_rpm_db(context, root=None): context.call(cmd) -def _the_nogpgcheck_option_used(): - return get_env('LEAPP_NOGPGCHECK', '0') == '1' - - def build_plugin_data(target_repoids, debug, test, tasks, on_aws): """ Generates a dictionary with the DNF plugin data. @@ -100,7 +97,7 @@ def build_plugin_data(target_repoids, debug, test, tasks, on_aws): 'debugsolver': debug, 'disable_repos': True, 'enable_repos': target_repoids, - 'gpgcheck': not _the_nogpgcheck_option_used(), + 'gpgcheck': not is_nogpgcheck_set(), 'platform_id': 'platform:el{}'.format(get_target_major_version()), 'releasever': get_target_version(), 'installroot': '/installroot', @@ -367,7 +364,7 @@ def install_initramdisk_requirements(packages, target_userspace_info, used_repos 'dnf', 'install', '-y'] - if _the_nogpgcheck_option_used(): + if is_nogpgcheck_set(): cmd.append('--nogpgcheck') cmd += [ '--setopt=module_platform_id=platform:el{}'.format(get_target_major_version()), diff --git a/repos/system_upgrade/common/libraries/gpg.py b/repos/system_upgrade/common/libraries/gpg.py new file mode 100644 index 0000000000..a8071329f8 --- /dev/null +++ b/repos/system_upgrade/common/libraries/gpg.py @@ -0,0 +1,137 @@ +import os + +from leapp.libraries.common import config +from leapp.libraries.common.config.version import get_source_major_version, get_target_major_version +from leapp.libraries.stdlib import api, run +from leapp.models import GpgKey + +GPG_CERTS_FOLDER = 'rpm-gpg' + + +def get_pubkeys_from_rpms(installed_rpms): + """ + Return the list of fingerprints of GPG keys in RPM DB + + This function returns short 8 characters fingerprints of trusted GPG keys + "installed" in the source OS RPM database. These look like normal packages + named "gpg-pubkey" and the fingerprint is present in the version field. + + :param installed_rpms: List of installed RPMs + :type installed_rpms: list(leapp.models.RPM) + :return: list of GPG keys from RPM DB + :rtype: list(leapp.models.GpgKey) + """ + return [GpgKey(fingerprint=pkg.version, rpmdb=True) for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey'] + + +def _gpg_show_keys(key_path): + """ + Show keys in given file in version-agnostic manner + + This runs gpg --show-keys (EL8) or gpg --with-fingerprints (EL7) + to verify the given file exists, is readable and contains valid + OpenPGP key data, which is printed in parsable format (--with-colons). + """ + try: + cmd = ['gpg2'] + # RHEL7 gnupg requires different switches to get the same output + if get_source_major_version() == '7': + cmd.append('--with-fingerprint') + else: + cmd.append('--show-keys') + cmd += ['--with-colons', key_path] + # TODO: discussed, most likely the checked=False will be dropped + # and error will be handled in other functions + return run(cmd, split=True, checked=False) + except OSError as err: + # NOTE: this is hypothetic; gnupg2 has to be installed on RHEL 7+ + error = 'Failed to read fingerprint from GPG key {}: {}'.format(key_path, str(err)) + api.current_logger().error(error) + return {} + + +def _parse_fp_from_gpg(output): + """ + Parse the output of gpg --show-keys --with-colons. + + Return list of 8 characters fingerprints per each gpgkey for the given + output from stdlib.run() or None if some error occurred. Either the + command return non-zero exit code, the file does not exists, its not + readable or does not contain any openpgp data. + """ + if not output or output['exit_code']: + return [] + + # we are interested in the lines of the output starting with "pub:" + # the colons are used for separating the fields in output like this + # pub:-:4096:1:999F7CBF38AB71F4:1612983048:::-:::escESC::::::23::0: + # ^--------------^ this is the fingerprint we need + # ^------^ but RPM version is just the last 8 chars lowercase + # Also multiple gpg keys can be stored in the file, so go through all "pub" + # lines + gpg_fps = [] + for line in output['stdout']: + if not line or not line.startswith('pub:'): + continue + parts = line.split(':') + if len(parts) >= 4 and len(parts[4]) == 16: + gpg_fps.append(parts[4][8:].lower()) + else: + api.current_logger().warning( + 'Cannot parse the gpg2 output. Line: "{}"' + .format(line) + ) + + return gpg_fps + + +def get_gpg_fp_from_file(key_path): + """ + Return the list of public key fingerprints from the given file + + Log warning in case no OpenPGP data found in the given file or it is not + readable for some reason. + + :param key_path: Path to the file with GPG key(s) + :type key_path: str + :return: List of public key fingerprints from the given file + :rtype: list(str) + """ + res = _gpg_show_keys(key_path) + fp = _parse_fp_from_gpg(res) + if not fp: + error_msg = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr']) + api.current_logger().warning(error_msg) + return fp + + +def get_path_to_gpg_certs(): + """ + Get path to the directory with trusted target gpg keys in the common leapp repository. + + GPG keys stored under this directory are considered as trusted and are + installed during the upgrade process. + + :return: Path to the directory with GPG keys stored under the common leapp repository. + :rtype: str + """ + target_major_version = get_target_major_version() + target_product_type = config.get_product_type('target') + certs_dir = target_major_version + # only beta is special in regards to the GPG signing keys + if target_product_type == 'beta': + certs_dir = '{}beta'.format(target_major_version) + return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir) + + +def is_nogpgcheck_set(): + """ + Return True if the GPG check should be skipped. + + The GPG check is skipped if leapp is executed with LEAPP_NOGPGCHECK=1 + or with the --nogpgcheck CLI option. In both cases, actors will see + LEAPP_NOGPGCHECK is '1'. + + :rtype: bool + """ + return config.get_env('LEAPP_NOGPGCHECK', False) == '1' diff --git a/repos/system_upgrade/common/libraries/tests/test_gpg.py b/repos/system_upgrade/common/libraries/tests/test_gpg.py new file mode 100644 index 0000000000..7cf37fa2b7 --- /dev/null +++ b/repos/system_upgrade/common/libraries/tests/test_gpg.py @@ -0,0 +1,147 @@ +import os +import shutil +import tempfile + +import distro +import pytest + +from leapp.libraries.common import gpg +from leapp.libraries.common.testutils import CurrentActorMocked +from leapp.libraries.stdlib import api +from leapp.models import GpgKey, InstalledRPM, RPM + + +@pytest.mark.parametrize('target, product_type, exp', [ + ('8.6', 'beta', '../../files/rpm-gpg/8beta'), + ('8.8', 'htb', '../../files/rpm-gpg/8'), + ('9.0', 'beta', '../../files/rpm-gpg/9beta'), + ('9.2', 'ga', '../../files/rpm-gpg/9'), +]) +def test_get_path_to_gpg_certs(monkeypatch, target, product_type, exp): + current_actor = CurrentActorMocked(dst_ver=target, + envars={'LEAPP_DEVEL_TARGET_PRODUCT_TYPE': product_type}) + monkeypatch.setattr(api, 'current_actor', current_actor) + + p = gpg.get_path_to_gpg_certs() + assert p == exp + + +def is_rhel7(): + return int(distro.major_version()) < 8 + + +@pytest.mark.skipif(distro.id() not in ("rhel", "centos"), reason="Requires RHEL or CentOS for valid results.") +def test_gpg_show_keys(loaded_leapp_repository, monkeypatch): + src = '7.9' if is_rhel7() else '8.6' + current_actor = CurrentActorMocked(src_ver=src) + monkeypatch.setattr(api, 'current_actor', current_actor) + + # python2 compatibility :/ + dirpath = tempfile.mkdtemp() + + # using GNUPGHOME env should avoid gnupg modifying the system + os.environ['GNUPGHOME'] = dirpath + + try: + # non-existing file + non_existent_path = os.path.join(dirpath, 'nonexistent') + res = gpg._gpg_show_keys(non_existent_path) + if is_rhel7(): + err_msg = "gpg: can't open `{}'".format(non_existent_path) + else: + err_msg = "gpg: can't open '{}': No such file or directory\n".format(non_existent_path) + assert not res['stdout'] + assert err_msg in res['stderr'] + assert res['exit_code'] == 2 + + fp = gpg._parse_fp_from_gpg(res) + assert fp == [] + + # no gpg data found + no_key_path = os.path.join(dirpath, "no_key") + with open(no_key_path, "w") as f: + f.write('test') + + res = gpg._gpg_show_keys(no_key_path) + if is_rhel7(): + err_msg = ('gpg: no valid OpenPGP data found.\n' + 'gpg: processing message failed: Unknown system error\n') + else: + err_msg = 'gpg: no valid OpenPGP data found.\n' + assert not res['stdout'] + assert res['stderr'] == err_msg + assert res['exit_code'] == 2 + + fp = gpg._parse_fp_from_gpg(res) + assert fp == [] + + # with some test data now -- rhel9 release key + # rhel9_key_path = os.path.join(api.get_common_folder_path('rpm-gpg'), '9') + cur_dir = os.path.dirname(os.path.abspath(__file__)) + rhel9_key_path = os.path.join(cur_dir, '..', '..', 'files', 'rpm-gpg', '9', + 'RPM-GPG-KEY-redhat-release') + res = gpg._gpg_show_keys(rhel9_key_path) + finally: + shutil.rmtree(dirpath) + + if is_rhel7(): + assert len(res['stdout']) == 4 + assert res['stdout'][0] == ('pub:-:4096:1:199E2F91FD431D51:1256212795:::-:' + 'Red Hat, Inc. (release key 2) :') + assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' + assert res['stdout'][2] == ('pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:' + 'Red Hat, Inc. (auxiliary key 3) :') + assert res['stdout'][3] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' + else: + assert len(res['stdout']) == 6 + assert res['stdout'][0] == 'pub:-:4096:1:199E2F91FD431D51:1256212795:::-:::scSC::::::23::0:' + assert res['stdout'][1] == 'fpr:::::::::567E347AD0044ADE55BA8A5F199E2F91FD431D51:' + assert res['stdout'][2] == ('uid:-::::1256212795::DC1CAEC7997B3575101BB0FCAAC6191792660D8F::' + 'Red Hat, Inc. (release key 2) ::::::::::0:') + assert res['stdout'][3] == 'pub:-:4096:1:5054E4A45A6340B3:1646863006:::-:::scSC::::::23::0:' + assert res['stdout'][4] == 'fpr:::::::::7E4624258C406535D56D6F135054E4A45A6340B3:' + assert res['stdout'][5] == ('uid:-::::1646863006::DA7F68E3872D6E7BDCE05225E7EB5F3ACDD9699F::' + 'Red Hat, Inc. (auxiliary key 3) ::::::::::0:') + + err = '{}/trustdb.gpg: trustdb created'.format(dirpath) + assert err in res['stderr'] + assert res['exit_code'] == 0 + + # now, parse the output too + fp = gpg._parse_fp_from_gpg(res) + assert fp == ['fd431d51', '5a6340b3'] + + +@pytest.mark.parametrize('res, exp', [ + ({'exit_code': 2, 'stdout': '', 'stderr': ''}, []), + ({'exit_code': 2, 'stdout': '', 'stderr': 'bash: gpg2: command not found...'}, []), + ({'exit_code': 0, 'stdout': 'Some other output', 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['Some other output', 'other line'], 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['pub:-:4096:1:199E2F91FD431D:'], 'stderr': ''}, []), + ({'exit_code': 0, 'stdout': ['pub:-:4096:1:5054E4A45A6340B3:1..'], 'stderr': ''}, ['5a6340b3']), +]) +def test_parse_fp_from_gpg(res, exp): + fp = gpg._parse_fp_from_gpg(res) + assert fp == exp + + +def test_pubkeys_from_rpms(): + installed_rpms = InstalledRPM( + items=[ + RPM(name='gpg-pubkey', + version='9570ff31', + release='5e3006fb', + epoch='0', + packager='Fedora (33) ', + arch='noarch', + pgpsig=''), + RPM(name='rpm', + version='4.17.1', + release='3.fc35', + epoch='0', + packager='Fedora Project', + arch='x86_64', + pgpsig='RSA/SHA256, Tue 02 Aug 2022 03:12:43 PM CEST, Key ID db4639719867c58f'), + ], + ) + assert gpg.get_pubkeys_from_rpms(installed_rpms) == [GpgKey(fingerprint='9570ff31', rpmdb=True)] diff --git a/repos/system_upgrade/common/libraries/utils.py b/repos/system_upgrade/common/libraries/utils.py index cd3ad1a6aa..38b9bb1a53 100644 --- a/repos/system_upgrade/common/libraries/utils.py +++ b/repos/system_upgrade/common/libraries/utils.py @@ -14,7 +14,7 @@ def parse_config(cfg=None, strict=True): """ Applies a workaround to parse a config file using py3 AND py2 - ConfigParser has a new def to read strings/iles in Py3, making + ConfigParser has a new def to read strings/files in Py3, making the old ones (Py2) obsoletes, these function was created to use the ConfigParser on Py2 and Py3 diff --git a/repos/system_upgrade/common/models/trustedgpgkeys.py b/repos/system_upgrade/common/models/trustedgpgkeys.py new file mode 100644 index 0000000000..c397bea748 --- /dev/null +++ b/repos/system_upgrade/common/models/trustedgpgkeys.py @@ -0,0 +1,19 @@ +from leapp.models import fields, Model +from leapp.topics import SystemFactsTopic + + +class GpgKey(Model): + """ + GPG Public key + + It is represented by a record in the RPM DB or by a file in directory with trusted keys (or both). + """ + topic = SystemFactsTopic + fingerprint = fields.String() + rpmdb = fields.Boolean() + filename = fields.Nullable(fields.String()) + + +class TrustedGpgKeys(Model): + topic = SystemFactsTopic + items = fields.List(fields.Model(GpgKey), default=[])