Skip to content

Commit

Permalink
Move code handling GPG keys to separate library
Browse files Browse the repository at this point in the history
This decouples gpg keys handling and some code duplication from the
MissingGpgKeysInhibitor actor to separate library that will be usable
from more actors.

The new actor TrustedGpgKeysScanner actor is crated, which handles
reading the source RPM DB and trusted keys directory and produces a new
model describing what keys are supposed to be trusted on the target
system.

This also removes the code duplication for detecting the --no-gpgcheck
and for defining the directory where to look for the gpg keys.

Petr Stodulka updates:

* updated docstrings for public functions in the shared library
  We want them documented better in comparison to functions in private
  (actor's) libraries as they could be used by everyone.

* some functions are renamed:
  * read_gpg_fp_from_file -> get_gpg_fp_from_file
  * the_nogpgcheck_option_used -> is_nogpgcheck_set
  The related code has been updated.

* use the gpg library in the shared dnfplugin library

* make some unit-tests conditional so we know the results are always
  valid (skip if distro ID is not rhel or centos)

* update tests and improve the test coverage

Signed-off-by: Jakub Jelen <[email protected]>
  • Loading branch information
Jakuje authored and pirat89 committed Nov 16, 2023
1 parent f54f3d7 commit 7082592
Show file tree
Hide file tree
Showing 12 changed files with 506 additions and 407 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
from leapp.libraries.actor import missinggpgkey
from leapp.models import (
DNFWorkaround,
InstalledRPM,
TargetUserSpaceInfo,
TMPTargetRepositoriesFacts,
TrustedGpgKeys,
UsedTargetRepositories
)
from leapp.reporting import Report
Expand All @@ -28,7 +28,7 @@ class MissingGpgKeysInhibitor(Actor):

name = 'missing_gpg_keys_inhibitor'
consumes = (
InstalledRPM,
TrustedGpgKeys,
TMPTargetRepositoriesFacts,
TargetUserSpaceInfo,
UsedTargetRepositories,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,113 +8,21 @@

from leapp import reporting
from leapp.exceptions import StopActorExecutionError
from leapp.libraries.common import config
from leapp.libraries.common.config.version import get_source_major_version, get_target_major_version
from leapp.libraries.stdlib import api, run
from leapp.libraries.common.config.version import get_target_major_version
from leapp.libraries.common.gpg import get_gpg_fp_from_file, get_path_to_gpg_certs, is_nogpgcheck_set
from leapp.libraries.stdlib import api
from leapp.models import (
DNFWorkaround,
InstalledRPM,
TargetUserSpaceInfo,
TMPTargetRepositoriesFacts,
TrustedGpgKeys,
UsedTargetRepositories
)
from leapp.utils.deprecation import suppress_deprecation

GPG_CERTS_FOLDER = 'rpm-gpg'
FMT_LIST_SEPARATOR = '\n - '


def _gpg_show_keys(key_path):
"""
Show keys in given file in version-agnostic manner
This runs gpg --show-keys (EL8) or gpg --with-fingerprints (EL7)
to verify the given file exists, is readable and contains valid
OpenPGP key data, which is printed in parsable format (--with-colons).
"""
try:
cmd = ['gpg2']
# RHEL7 gnupg requires different switches to get the same output
if get_source_major_version() == '7':
cmd.append('--with-fingerprint')
else:
cmd.append('--show-keys')
cmd += ['--with-colons', key_path]
# TODO: discussed, most likely the checked=False will be dropped
# and error will be handled in other functions
return run(cmd, split=True, checked=False)
except OSError as err:
# NOTE: this is hypothetic; gnupg2 has to be installed on RHEL 7+
error = 'Failed to read fingerprint from GPG key {}: {}'.format(key_path, str(err))
api.current_logger().error(error)
return {}


def _parse_fp_from_gpg(output):
"""
Parse the output of gpg --show-keys --with-colons.
Return list of 8 characters fingerprints per each gpgkey for the given
output from stdlib.run() or None if some error occurred. Either the
command return non-zero exit code, the file does not exists, its not
readable or does not contain any openpgp data.
"""
if not output or output['exit_code']:
return []

# we are interested in the lines of the output starting with "pub:"
# the colons are used for separating the fields in output like this
# pub:-:4096:1:999F7CBF38AB71F4:1612983048:::-:::escESC::::::23::0:
# ^--------------^ this is the fingerprint we need
# ^------^ but RPM version is just the last 8 chars lowercase
# Also multiple gpg keys can be stored in the file, so go through all "pub"
# lines
gpg_fps = []
for line in output['stdout']:
if not line or not line.startswith('pub:'):
continue
parts = line.split(':')
if len(parts) >= 4 and len(parts[4]) == 16:
gpg_fps.append(parts[4][8:].lower())
else:
api.current_logger().warning(
'Cannot parse the gpg2 output. Line: "{}"'
.format(line)
)

return gpg_fps


def _read_gpg_fp_from_file(key_path):
"""
Returns the list of public key fingerprints from the given file
Logs warning in case no OpenPGP data found in the given file or it is not
readable for some reason.
"""
res = _gpg_show_keys(key_path)
fp = _parse_fp_from_gpg(res)
if not fp:
error = 'Unable to read OpenPGP keys from {}: {}'.format(key_path, res['stderr'])
api.current_logger().error(error)
return fp


def _get_path_to_gpg_certs():
"""
Get path to the directory with trusted target gpg keys in leapp tree
"""
# XXX This is copy&paste from TargetUserspaceCreator actor.
# Potential changes need to happen in both places to keep them in sync.
target_major_version = get_target_major_version()
target_product_type = config.get_product_type('target')
certs_dir = target_major_version
# only beta is special in regards to the GPG signing keys
if target_product_type == 'beta':
certs_dir = '{}beta'.format(target_major_version)
return os.path.join(api.get_common_folder_path(GPG_CERTS_FOLDER), certs_dir)


def _expand_vars(path):
"""
Expand variables like $releasever and $basearch to the target system version
Expand Down Expand Up @@ -152,38 +60,6 @@ def _get_abs_file_path(target_userspace, file_url):
return os.path.join('/', file_path)


def _pubkeys_from_rpms(installed_rpms):
"""
Return the list of fingerprints of GPG keys in RPM DB
This function returns short 8 characters fingerprints of trusted GPG keys
"installed" in the source OS RPM database. These look like normal packages
named "gpg-pubkey" and the fingerprint is present in the version field.
"""
return [pkg.version for pkg in installed_rpms.items if pkg.name == 'gpg-pubkey']


def _get_pubkeys(installed_rpms):
"""
Get pubkeys from installed rpms and the trusted directory
"""
pubkeys = _pubkeys_from_rpms(installed_rpms)
certs_path = _get_path_to_gpg_certs()
for certname in os.listdir(certs_path):
key_file = os.path.join(certs_path, certname)
fps = _read_gpg_fp_from_file(key_file)
if fps:
pubkeys += fps
# TODO: what about else: ?
# The warning is now logged in _read_gpg_fp_from_file. We can raise
# the priority of the message or convert it to report though.
return pubkeys


def _the_nogpgcheck_option_used():
return config.get_env('LEAPP_NOGPGCHECK', False) == '1'


def _consume_data():
try:
used_target_repos = next(api.consume(UsedTargetRepositories)).repos
Expand All @@ -199,10 +75,10 @@ def _consume_data():
'Could not check for valid GPG keys', details={'details': 'No TMPTargetRepositoriesFacts facts'}
)
try:
installed_rpms = next(api.consume(InstalledRPM))
trusted_gpg_keys = next(api.consume(TrustedGpgKeys))
except StopIteration:
raise StopActorExecutionError(
'Could not check for valid GPG keys', details={'details': 'No InstalledRPM facts'}
'Could not check for valid GPG keys', details={'details': 'No TrustedGpgKeys facts'}
)
try:
target_userspace = next(api.consume(TargetUserSpaceInfo))
Expand All @@ -211,7 +87,7 @@ def _consume_data():
'Could not check for valid GPG keys', details={'details': 'No TargetUserSpaceInfo facts'}
)

return used_target_repos, target_repos, installed_rpms, target_userspace
return used_target_repos, target_repos, trusted_gpg_keys, target_userspace


def _get_repo_gpgkey_urls(repo):
Expand Down Expand Up @@ -274,7 +150,7 @@ def _report(title, summary, keys, inhibitor=False):
' prior the upgrade.'
' If you want to proceed the in-place upgrade without checking any RPM'
' signatures, execute leapp with the `--nogpgcheck` option.'
.format(_get_path_to_gpg_certs())
.format(get_path_to_gpg_certs())
)
groups = [reporting.Groups.REPOSITORY]
if inhibitor:
Expand Down Expand Up @@ -306,7 +182,7 @@ def _report_missing_keys(keys):
summary = (
'Some of the target repositories require GPG keys that are not installed'
' in the current RPM DB or are not stored in the {trust_dir} directory.'
.format(trust_dir=_get_path_to_gpg_certs())
.format(trust_dir=get_path_to_gpg_certs())
)
_report('Detected unknown GPG keys for target system repositories', summary, keys, True)

Expand Down Expand Up @@ -383,7 +259,7 @@ def register_dnfworkaround():
api.produce(DNFWorkaround(
display_name='import trusted gpg keys to RPM DB',
script_path=api.current_actor().get_common_tool_path('importrpmgpgkeys'),
script_args=[_get_path_to_gpg_certs()],
script_args=[get_path_to_gpg_certs()],
))


Expand All @@ -396,11 +272,11 @@ def process():
them from model TMPTargetRepositoriesFacts.
"""
# when the user decided to ignore gpg signatures on the packages, we can ignore these checks altogether
if _the_nogpgcheck_option_used():
if is_nogpgcheck_set():
api.current_logger().warning('The --nogpgcheck option is used: skipping all related checks.')
return

used_target_repos, target_repos, installed_rpms, target_userspace = _consume_data()
used_target_repos, target_repos, trusted_gpg_keys, target_userspace = _consume_data()

target_repo_id_to_repositories_facts_map = {
repo.repoid: repo
Expand All @@ -415,8 +291,7 @@ def process():
invalid_keys = list()
repos_missing_keys = list()

# These are used only for getting the installed gpg-pubkey "packages"
pubkeys = _get_pubkeys(installed_rpms)
pubkeys = [key.fingerprint for key in trusted_gpg_keys.items]
processed_gpgkey_urls = set()
tmpdir = None
for repoid in used_target_repos:
Expand Down Expand Up @@ -454,7 +329,7 @@ def process():
api.current_logger().error(
'Skipping unknown protocol for gpgkey {}'.format(gpgkey_url))
continue
fps = _read_gpg_fp_from_file(key_file)
fps = get_gpg_fp_from_file(key_file)
if not fps:
invalid_keys.append(gpgkey_url)
api.current_logger().warning(
Expand Down
Loading

0 comments on commit 7082592

Please sign in to comment.