From 1ea18a470642a50f43fe2817ad954a19cf7d980a Mon Sep 17 00:00:00 2001 From: Inessa Vasilevskaya Date: Tue, 21 Nov 2023 15:19:50 +0100 Subject: [PATCH] Add actor name discovery and some unit tests With this the scanning part should be complete. Checker that actually reports errors will be delivered by the next commit. --- .../libraries/trackcustommodifications.py | 101 ++++++++++++------ .../tests/test_trackcustommodifications.py | 61 +++++++++++ .../common/models/custommodifications.py | 1 + 3 files changed, 128 insertions(+), 35 deletions(-) create mode 100644 repos/system_upgrade/common/actors/trackcustommodifications/tests/test_trackcustommodifications.py diff --git a/repos/system_upgrade/common/actors/trackcustommodifications/libraries/trackcustommodifications.py b/repos/system_upgrade/common/actors/trackcustommodifications/libraries/trackcustommodifications.py index f2cbea7d71..88882e5943 100644 --- a/repos/system_upgrade/common/actors/trackcustommodifications/libraries/trackcustommodifications.py +++ b/repos/system_upgrade/common/actors/trackcustommodifications/libraries/trackcustommodifications.py @@ -1,3 +1,4 @@ +import ast import os from leapp.exceptions import StopActorExecution @@ -5,7 +6,6 @@ from leapp.libraries.stdlib import api, CalledProcessError, run from leapp.models import CustomModifications - RPMS_TO_CHECK = ['leapp-upgrade-el{source}toel{target}'] DIRS_TO_SCAN = ['/usr/share/leapp-repository'] @@ -14,49 +14,80 @@ def _get_rpms_to_check(): return [rpm.format(source=get_source_major_version(), target=get_target_major_version()) for rpm in RPMS_TO_CHECK] -def check_for_modifications(dirs=DIRS_TO_SCAN): +def deduce_actor_name(a_file): + """A helper to map an actor/library to the actor name""" + if not os.path.exists(a_file): + return '' + data = None + with open(a_file) as f: + try: + data = ast.parse(f.read()) + except UnicodeDecodeError: + # Most probably dealing with *.pyc by mistake or any other binary file we can't read. Those should be + # probably filtered out, but things happen so let's keep that check here. + return '' + # NOTE(ivasilev) Making proper syntax analysis is not the goal here, so let's get away with the bare minimum. + # An actor file will have an Actor ClassDef with a name attribute and at least one function defined + actor = next((obj for obj in data.body if isinstance(obj, ast.ClassDef) and obj.name and + any(isinstance(o, ast.FunctionDef) for o in obj.body)), None) + if not actor: + # Assuming here we are dealing with a library, so let's discover actor file and deduce actor name from it + # actor is expected to be found under ../../actor.py + assumed_actor_file = os.path.join(a_file.split('libraries')[0], 'actor.py') + if not os.path.exists(assumed_actor_file): + # Nothing more we can do - no actor name mapping, return '' + return '' + return deduce_actor_name(assumed_actor_file) + return actor.name + + +def _run_command(cmd, warning_to_log, checked=True): + """ + A helper that executes a command and returns a result or raises StopActorExecution. + Upon success results will contain a list with line-by-line output returned by the command. + """ + try: + res = run(cmd, checked=checked) + output = res['stdout'].strip() + if not output: + return [] + return output.split('\n') + except CalledProcessError: + api.current_logger().warning(warning_to_log) + raise StopActorExecution() + + +def check_for_modifications(dirs=None): """ This will return a tuple (changes, custom) is case any untypical files or changes to shipped leapp files are discovered. (None, None) means that no modifications have been found. """ + dirs = dirs if dirs else DIRS_TO_SCAN rpms = _get_rpms_to_check() source_of_truth = [] - for rpm in rpms: - get_rpm_files_command = ['rpm', '-ql', rpm] - try: - rpm_files = run(get_rpm_files_command) - source_of_truth.extend(rpm_files['stdout'].strip().split()) - except CalledProcessError: - api.current_logger().warning( - 'Could not get list of installed files from rpm %s'.format(rpm) - ) - raise StopActorExecution() leapp_files = [] + # Let's collect data about what should have been installed from rpm + for rpm in rpms: + res = _run_command(['rpm', '-ql', rpm], 'Could not get a list of installed files from rpm {}'.format(rpm)) + source_of_truth.extend(res) + # Let's collect data about what's really on the system for directory in dirs: - get_files_command = ['find', directory, '-type', 'f'] - try: - files = run(get_files_command) - leapp_files.extend(files['stdout'].strip().split()) - except CalledProcessError: - api.current_logger().warning( - 'Could not get list of leapp files in %s'.format(directory) - ) - raise StopActorExecution() - custom_files = set(leapp_files) - set(source_of_truth) + res = _run_command(['find', directory, '-type', 'f'], + 'Could not get a list of leapp files from {}'.format(directory)) + leapp_files.extend(res) + # Let's check for unexpected additions + custom_files = sorted(set(leapp_files) - set(source_of_truth)) # Now let's check for modifications modified_files = [] for rpm in rpms: - try: - modified = run(['rpm', '-V', rpm]) - modified_files.extend(modified['stdout'].strip().split()) - except CalledProcessError: - api.current_logger().warning( - 'Could not check authenticity of the files from %s'.format(rpm) - ) - raise StopActorExecution() - # NOTE(ivasilev) Now the fun part TBD - the mapping between actor file / library and actor's - # name - # XXX Leaving blank atm as it's tbd - return ([CustomModifications(actor_name='', filename=f, type='modified') for f in modified_files], - [CustomModifications(actor_name='', filename=f, type='custom') for f in custom_files]) + res = _run_command(['rpm', '-V', rpm], 'Could not check authenticity of the files from {}'.format(rpm), + # NOTE(ivasilev) check is False here as in case of any changes found exit code will be 1 + checked=False) + if res: + api.current_logger().warning('Modifications to leapp files detected!\n%s', res) + modified_files.extend([tuple(x.split()) for x in res]) + return ([CustomModifications(actor_name=deduce_actor_name(f[1]), filename=f[1], type='modified', + rpm_checks_str=f[0]) + for f in modified_files], + [CustomModifications(actor_name=deduce_actor_name(f), filename=f, type='custom') for f in custom_files]) diff --git a/repos/system_upgrade/common/actors/trackcustommodifications/tests/test_trackcustommodifications.py b/repos/system_upgrade/common/actors/trackcustommodifications/tests/test_trackcustommodifications.py new file mode 100644 index 0000000000..d209452db7 --- /dev/null +++ b/repos/system_upgrade/common/actors/trackcustommodifications/tests/test_trackcustommodifications.py @@ -0,0 +1,61 @@ +import pytest + +from leapp.libraries.actor import trackcustommodifications +from leapp.libraries.common.testutils import CurrentActorMocked, produce_mocked +from leapp.libraries.stdlib import api + +FILES_FROM_RPM = """ +repos/system_upgrade/el8toel9/actors/xorgdrvfact/libraries/xorgdriverlib.py +repos/system_upgrade/el8toel9/actors/anotheractor/actor.py +repos/system_upgrade/el8toel9/files +""" + +FILES_ON_SYSTEM = """ +repos/system_upgrade/el8toel9/actors/xorgdrvfact/libraries/xorgdriverlib.py +repos/system_upgrade/el8toel9/actors/anotheractor/actor.py +repos/system_upgrade/el8toel9/files +/some/unrelated/to/leapp/file +repos/system_upgrade/el8toel9/files/file/that/should/not/be/there +repos/system_upgrade/el8toel9/actors/actor/that/should/not/be/there +""" + +VERIFIED_FILES = """ +.......T. repos/system_upgrade/el8toel9/actors/xorgdrvfact/libraries/xorgdriverlib.py +S.5....T. repos/system_upgrade/el8toel9/actors/anotheractor/actor.py +""" + + +@pytest.mark.parametrize('a_file,name', [ + ('repos/system_upgrade/el8toel9/actors/checkblacklistca/actor.py', 'CheckBlackListCA'), + ('repos/system_upgrade/el7toel8/actors/checkmemcached/actor.py', 'CheckMemcached'), + ('repos/system_upgrade/el7toel8/actors/checkmemcached/libraries/checkmemcached.py', 'CheckMemcached'), + # not a library and not an actor file + ('repos/system_upgrade/el7toel8/models/authselect.py', ''), +]) +def test_deduce_actor_name_from_file(a_file, name): + assert trackcustommodifications.deduce_actor_name(a_file) == name + + +def mocked__run_command(list_of_args, log_message, checked=True): + if list_of_args == ['rpm', '-ql', 'leapp-upgrade-el7toel8']: + # get source of truth + return FILES_FROM_RPM.strip().split('\n') + if list_of_args and list_of_args[0] == 'find': + # listing files in directory + return FILES_ON_SYSTEM.strip().split('\n') + if list_of_args == ['rpm', '-V', 'leapp-upgrade-el7toel8']: + # checking authenticity + return VERIFIED_FILES.strip().split('\n') + return [] + + +def test_check_for_modifications(monkeypatch): + monkeypatch.setattr(api, 'current_actor', CurrentActorMocked(arch='x86_64', src_ver='7.9', dst_ver='8.4')) + monkeypatch.setattr(trackcustommodifications, '_run_command', mocked__run_command) + modified, custom = trackcustommodifications.check_for_modifications() + assert len(modified) == 2 + assert modified[0].filename == 'repos/system_upgrade/el8toel9/actors/xorgdrvfact/libraries/xorgdriverlib.py' + assert modified[0].rpm_checks_str == '.......T.' + assert len(custom) == 3 + assert custom[0].filename == '/some/unrelated/to/leapp/file' + assert custom[0].rpm_checks_str == '' diff --git a/repos/system_upgrade/common/models/custommodifications.py b/repos/system_upgrade/common/models/custommodifications.py index 0b708dce62..01e65055af 100644 --- a/repos/system_upgrade/common/models/custommodifications.py +++ b/repos/system_upgrade/common/models/custommodifications.py @@ -9,3 +9,4 @@ class CustomModifications(Model): filename = fields.String() actor_name = fields.String() type = fields.StringEnum(choices=['custom', 'modified']) + rpm_checks_str = fields.String(default='')