Skip to content

Commit

Permalink
squash! Move code handling GPG keys to separate library
Browse files Browse the repository at this point in the history
* updated docstrings for public functions in the shared library
  We want them documented better in comparison to functions in private
  (actor's) libraries as they could be used by everyone.

* some functions are renamed:
  * read_gpg_fp_from_file -> get_gpg_fp_from_file
  * the_nogpgcheck_option_used -> is_nogpgcheck_set
  The related code has been updated.

* use the gpg library in the shared dnfplugin library

* make some unit-tests conditional so we know the results are always
  valid (skip if distro ID is not rhel or centos)
  • Loading branch information
pirat89 committed Nov 10, 2023
1 parent 3f3b719 commit 7c9fce9
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from leapp import reporting
from leapp.exceptions import StopActorExecutionError
from leapp.libraries.common.config.version import get_target_major_version
from leapp.libraries.common.gpg import get_path_to_gpg_certs, read_gpg_fp_from_file, the_nogpgcheck_option_used
from leapp.libraries.common.gpg import get_path_to_gpg_certs, get_gpg_fp_from_file, is_nogpgcheck_set
from leapp.libraries.stdlib import api
from leapp.models import (
DNFWorkaround,
Expand Down Expand Up @@ -272,7 +272,7 @@ def process():
them from model TMPTargetRepositoriesFacts.
"""
# when the user decided to ignore gpg signatures on the packages, we can ignore these checks altogether
if the_nogpgcheck_option_used():
if is_nogpgcheck_set():
api.current_logger().warning('The --nogpgcheck option is used: skipping all related checks.')
return

Expand Down Expand Up @@ -329,7 +329,7 @@ def process():
api.current_logger().error(
'Skipping unknown protocol for gpgkey {}'.format(gpgkey_url))
continue
fps = read_gpg_fp_from_file(key_file)
fps = get_gpg_fp_from_file(key_file)
if not fps:
invalid_keys.append(gpgkey_url)
api.current_logger().warning(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from leapp.libraries.common import dnfplugin, mounting, overlaygen, repofileutils, rhsm, rhui, utils
from leapp.libraries.common.config import get_env, get_product_type
from leapp.libraries.common.config.version import get_target_major_version
from leapp.libraries.common.gpg import get_path_to_gpg_certs, the_nogpgcheck_option_used
from leapp.libraries.common.gpg import get_path_to_gpg_certs, is_nogpgcheck_set
from leapp.libraries.stdlib import api, CalledProcessError, config, run
from leapp.models import RequiredTargetUserspacePackages # deprecated
from leapp.models import TMPTargetRepositoriesFacts # deprecated all the time
Expand Down Expand Up @@ -221,13 +221,13 @@ def prepare_target_userspace(context, userspace_dir, enabled_repos, packages):
install_root_dir = '/el{}target'.format(target_major_version)
with mounting.BindMount(source=userspace_dir, target=os.path.join(context.base_dir, install_root_dir.lstrip('/'))):
_restore_persistent_package_cache(userspace_dir)
if not the_nogpgcheck_option_used():
if not is_nogpgcheck_set():
_import_gpg_keys(context, install_root_dir, target_major_version)

repos_opt = [['--enablerepo', repo] for repo in enabled_repos]
repos_opt = list(itertools.chain(*repos_opt))
cmd = ['dnf', 'install', '-y']
if the_nogpgcheck_option_used():
if is_nogpgcheck_set():
cmd.append('--nogpgcheck')
cmd += [
'--setopt=module_platform_id=platform:el{}'.format(target_major_version),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os

from leapp.exceptions import StopActorExecutionError
from leapp.libraries.common.gpg import get_path_to_gpg_certs, get_pubkeys_from_rpms, read_gpg_fp_from_file
from leapp.libraries.common.gpg import get_path_to_gpg_certs, get_pubkeys_from_rpms, get_gpg_fp_from_file
from leapp.libraries.stdlib import api
from leapp.models import GpgKey, InstalledRPM, TrustedGpgKeys

Expand All @@ -15,13 +15,13 @@ def _get_pubkeys(installed_rpms):
certs_path = get_path_to_gpg_certs()
for certname in os.listdir(certs_path):
key_file = os.path.join(certs_path, certname)
fps = read_gpg_fp_from_file(key_file)
fps = get_gpg_fp_from_file(key_file)
for fp in fps:
if fp not in db_pubkeys:
pubkeys += GpgKey(fingerprint=fp, rpmdb=False, filename=key_file)
db_pubkeys += fp
# TODO: what about else: ?
# The warning is now logged in read_gpg_fp_from_file. We can raise
# The warning is now logged in get_gpg_fp_from_file. We can raise
# the priority of the message or convert it to report though.
return pubkeys

Expand Down
9 changes: 3 additions & 6 deletions repos/system_upgrade/common/libraries/dnfplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from leapp.libraries.common import dnfconfig, guards, mounting, overlaygen, rhsm, utils
from leapp.libraries.common.config import get_env
from leapp.libraries.common.config.version import get_target_major_version, get_target_version
from leapp.libraries.common.gpg import is_nogpgcheck_set
from leapp.libraries.stdlib import api, CalledProcessError, config
from leapp.models import DNFWorkaround

Expand Down Expand Up @@ -77,10 +78,6 @@ def _rebuild_rpm_db(context, root=None):
context.call(cmd)


def _the_nogpgcheck_option_used():
return get_env('LEAPP_NOGPGCHECK', '0') == '1'


def build_plugin_data(target_repoids, debug, test, tasks, on_aws):
"""
Generates a dictionary with the DNF plugin data.
Expand All @@ -100,7 +97,7 @@ def build_plugin_data(target_repoids, debug, test, tasks, on_aws):
'debugsolver': debug,
'disable_repos': True,
'enable_repos': target_repoids,
'gpgcheck': not _the_nogpgcheck_option_used(),
'gpgcheck': not is_nogpgcheck_set(),
'platform_id': 'platform:el{}'.format(get_target_major_version()),
'releasever': get_target_version(),
'installroot': '/installroot',
Expand Down Expand Up @@ -344,7 +341,7 @@ def install_initramdisk_requirements(packages, target_userspace_info, used_repos
'dnf',
'install',
'-y']
if _the_nogpgcheck_option_used():
if is_nogpgcheck_set():
cmd.append('--nogpgcheck')
cmd += [
'--setopt=module_platform_id=platform:el{}'.format(get_target_major_version()),
Expand Down
39 changes: 32 additions & 7 deletions repos/system_upgrade/common/libraries/gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def get_pubkeys_from_rpms(installed_rpms):
This function returns short 8 characters fingerprints of trusted GPG keys
"installed" in the source OS RPM database. These look like normal packages
named "gpg-pubkey" and the fingerprint is present in the version field.
:param installed_rpms: List of installed RPMs
:type installed_rpms: list(leapp.models.RPM)
:return: list of GPG keys from RPM DB
:rtype: list(leapp.models.GpgKey)
"""
return [GpgKey(fingerprint=pkg.version, rpmdb=True) for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey']

Expand Down Expand Up @@ -80,24 +85,35 @@ def _parse_fp_from_gpg(output):
return gpg_fps


def read_gpg_fp_from_file(key_path):
def get_gpg_fp_from_file(key_path):
"""
Returns the list of public key fingerprints from the given file
Return the list of public key fingerprints from the given file
Logs warning in case no OpenPGP data found in the given file or it is not
Log warning in case no OpenPGP data found in the given file or it is not
readable for some reason.
:param key_path: Path to the file with GPG key(s)
:type key_path: str
:return: List of public key fingerprints from the given file
:rtype: list(str)
"""
res = _gpg_show_keys(key_path)
fp = _parse_fp_from_gpg(res)
if not fp:
error = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr'])
api.current_logger().error(error)
error_msg = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr'])
api.current_logger().warning(error_msg)
return fp


def get_path_to_gpg_certs():
"""
Get path to the directory with trusted target gpg keys in leapp tree
Get path to the directory with trusted target gpg keys in the common leapp repository.
GPG keys stored under this directory are considered as trusted and are
installed during the upgrade process.
:return: Path to the directory with GPG keys stored under the common leapp repository.
:rtype: str
"""
target_major_version = get_target_major_version()
target_product_type = config.get_product_type('target')
Expand All @@ -108,5 +124,14 @@ def get_path_to_gpg_certs():
return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir)


def the_nogpgcheck_option_used():
def is_nogpgcheck_set():
"""
Return True if the GPG check should be skipped.
The GPG check is skipped if leapp is executed with LEAPP_NOGPGCHECK=1
or with the --nogpgcheck CLI option. In both cases, actors will see
LEAPP_NOGPGCHECK is '1'.
:rtype: bool
"""
return config.get_env('LEAPP_NOGPGCHECK', False) == '1'
1 change: 1 addition & 0 deletions repos/system_upgrade/common/libraries/tests/test_gpg.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def is_rhel7():
return int(distro.major_version()) < 8


@pytest.mark.skipif(distro.id() not in ("rhel", "centos"), reason="Requires RHEL or CentOS for valid results.")
def test_gpg_show_keys(loaded_leapp_repository, monkeypatch):
src = '7.9' if is_rhel7() else '8.6'
current_actor = CurrentActorMocked(src_ver=src)
Expand Down

0 comments on commit 7c9fce9

Please sign in to comment.