diff --git a/repos/system_upgrade/common/actors/scansourcefiles/tests/files/file_owned_by_multiple_rpms b/repos/system_upgrade/common/actors/scansourcefiles/tests/files/file_owned_by_multiple_rpms deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/repos/system_upgrade/common/actors/scansourcefiles/tests/unit_test_scansourcefiles.py b/repos/system_upgrade/common/actors/scansourcefiles/tests/unit_test_scansourcefiles.py index 5b7dceae80..582d086cb9 100644 --- a/repos/system_upgrade/common/actors/scansourcefiles/tests/unit_test_scansourcefiles.py +++ b/repos/system_upgrade/common/actors/scansourcefiles/tests/unit_test_scansourcefiles.py @@ -4,7 +4,7 @@ from leapp.libraries.actor import scansourcefiles from leapp.libraries.common import testutils -from leapp.libraries.stdlib import api +from leapp.libraries.stdlib import api, CalledProcessError from leapp.models import FileInfo, TrackedFilesInfoSource CUR_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -14,117 +14,150 @@ RHEL_MAJOR_VERSIONS_LIST = list(range(8, 9+1)) TRACKED_FILES_BY_VERSIONS = { - version: ['{}/file_major_version_{}'.format(FILES_DIR, version)] + version: [os.path.join(FILES_DIR, 'file_major_version_{}'.format(version))] for version in RHEL_MAJOR_VERSIONS_LIST } VERSION_FILES_MOCKED = { 'common': [ - '{}/file_not_rpm_owned'.format(FILES_DIR), + os.path.join(FILES_DIR, 'file_not_rpm_owned') ] } VERSION_FILES_MOCKED.update(TRACKED_FILES_BY_VERSIONS) -@pytest.mark.parametrize('major_version', RHEL_MAJOR_VERSIONS_LIST) -def test_version_files_with_common(monkeypatch, major_version): - monkeypatch.setattr(api, 'produce', testutils.produce_mocked()) +@pytest.mark.parametrize('run_output,expected_output', ( + ({'exit_code': 0}, False), + ({'exit_code': 1, 'stdout': 'missing'}, True), + ({'exit_code': 1, 'stdout': '5'}, True), + ({'exit_code': 1, 'stdout': '5 missing'}, True), + ({'exit_code': 1, 'stdout': 'fi5ve'}, True), + ({'exit_code': 1, 'stdout': 'NoFiveNoMissing'}, False), +)) +def test_is_modified(monkeypatch, run_output, expected_output): + def mocked_run(cmd, *args, **kwargs): + assert cmd == ['rpm', '-Vf', '--nomtime', input_file] + return run_output + + input_file = os.path.join(FILES_DIR, 'mocked_file') + monkeypatch.setattr(scansourcefiles, 'run', mocked_run) + + assert scansourcefiles.is_modified(input_file) == expected_output + + +@pytest.mark.parametrize('run_output,expected_output', ( + ({'stdout': ['rpm']}, 'rpm'), + ({'stdout': ['rpm1', 'rpm2']}, 'rpm1'), + (CalledProcessError, ''), +)) +def test_get_rpm_name(monkeypatch, run_output, expected_output): + def mocked_run(cmd, *args, **kwargs): + assert cmd == ['rpm', '-qf', '--queryformat', r'%{NAME}\n', input_file] + if raise_error: + raise CalledProcessError("mocked error", cmd, "result") + return run_output + + raise_error = run_output and not isinstance(run_output, dict) + input_file = os.path.join(FILES_DIR, 'mocked_file') + + monkeypatch.setattr(scansourcefiles, 'run', mocked_run) monkeypatch.setattr(api, 'current_logger', testutils.logger_mocked()) - monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: major_version) - monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', VERSION_FILES_MOCKED) - scansourcefiles.process() - assert api.produce.called == 1 - assert isinstance(api.produce.model_instances[0], TrackedFilesInfoSource) - # assert only files from correct major version were scanned, so common files + 1 (version file) - assert len(api.produce.model_instances[0].files) == len(VERSION_FILES_MOCKED['common']) + 1 - - assert isinstance(api.produce.model_instances[0].files[0], FileInfo) - assert api.produce.model_instances[0].files[0].path == '{}/file_not_rpm_owned'.format(FILES_DIR) - assert api.produce.model_instances[0].files[0].exists is True - assert api.produce.model_instances[0].files[0].rpm_name == '' - assert api.produce.model_instances[0].files[0].is_modified is False - - INDEX_OF_VERSION_FILE = len(VERSION_FILES_MOCKED['common']) - assert isinstance(api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE], FileInfo) - version_file_path = '{}/file_major_version_{}'.format(FILES_DIR, major_version) - assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].path == version_file_path - assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].exists is False - assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].rpm_name == '' - assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].is_modified is False - - assert not api.current_logger.errmsg - - -def test_modified_rpm_owned_files(monkeypatch): - - class run_mocked(object): - - def __init__(self): - self.called = 0 - - def __call__(self, args, **kwargs): - self.called += 1 - # run to check if file is owned by rpm - if '-qf' in args: - if '{}/file_rpm_owned'.format(FILES_DIR) in args: - return {'stdout': ['rpm']} - if '{}/file_owned_by_multiple_rpms'.format(FILES_DIR) in args: - return {'stdout': ['rpm1', 'rpm2']} - return {'stdout': []} - # run to check if file was modified - if '-Vf' in args: - return {'exit_code': 0} - return {'exit_code': 1, 'stdout': []} - - FILES_MOCKED = { - 'common': [ - '{}/file_rpm_owned'.format(FILES_DIR), - '{}/file_owned_by_multiple_rpms'.format(FILES_DIR), - ], - } + assert scansourcefiles._get_rpm_name(input_file) == expected_output + + if raise_error: + return + + if len(run_output['stdout']) > 1: + assert len(api.current_logger.warnmsg) == 1 + expected_warnmsg = 'The {} file is owned by multiple rpms: rpm1, rpm2.'.format(input_file) + assert api.current_logger.warnmsg[0] == expected_warnmsg + else: + assert not api.current_logger.warnmsg + + +@pytest.mark.parametrize('input_file,rpm_name,modified', ( + (os.path.join(FILES_DIR, 'file_not_existant'), 'rpm', True), + (os.path.join(FILES_DIR, 'file_not_rpm_owned'), '', False) +)) +def test_scan_file(monkeypatch, input_file, rpm_name, modified): + monkeypatch.setattr(scansourcefiles, 'is_modified', lambda _: modified) + monkeypatch.setattr(scansourcefiles, '_get_rpm_name', lambda _: rpm_name) + + file_info = scansourcefiles.scan_file(input_file) + + assert isinstance(file_info, FileInfo) + assert file_info.path == input_file + assert file_info.exists == os.path.exists(input_file) + assert file_info.rpm_name == rpm_name + assert file_info.is_modified == modified + +@pytest.mark.parametrize('input_files,expected_output', ( + ([], []), + (['file1'], [FileInfo(path='file1', exists=False, rpm_name='', is_modified=False)]), + (['file1', 'file2'], [FileInfo(path='file1', exists=False, rpm_name='', is_modified=False), + FileInfo(path='file2', exists=False, rpm_name='', is_modified=False)]), +)) +def test_scan_files(monkeypatch, input_files, expected_output): + def mocked_scan_file(input_file): + return FileInfo(path=input_file, exists=False, rpm_name='', is_modified=False) + + monkeypatch.setattr(scansourcefiles, 'scan_file', mocked_scan_file) + + assert scansourcefiles.scan_files(input_files) == expected_output + + +@pytest.mark.parametrize('input_file,rpm_name,expected_output', ( + (os.path.join(FILES_DIR, 'file_rpm_owned'), 'rpm', + FileInfo(path=os.path.join(FILES_DIR, 'file_rpm_owned'), exists=True, rpm_name='rpm', is_modified=False)), +)) +def test_rpm_owned_files(monkeypatch, input_file, rpm_name, expected_output): monkeypatch.setattr(api, 'produce', testutils.produce_mocked()) - monkeypatch.setattr(api, 'current_logger', testutils.logger_mocked()) monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: 9) - monkeypatch.setattr(scansourcefiles, 'run', run_mocked()) - monkeypatch.setattr(scansourcefiles, 'is_modified', lambda x: True) - monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', FILES_MOCKED) + monkeypatch.setattr(scansourcefiles, '_get_rpm_name', lambda _: rpm_name) + monkeypatch.setattr(scansourcefiles, 'is_modified', lambda _: False) + monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', {'common': [input_file]}) scansourcefiles.process() - assert isinstance(api.produce.model_instances[0].files[0], FileInfo) - assert api.produce.model_instances[0].files[0].path == '{}/file_rpm_owned'.format(FILES_DIR) - assert api.produce.model_instances[0].files[0].exists is True - assert api.produce.model_instances[0].files[0].rpm_name == 'rpm' - assert api.produce.model_instances[0].files[0].is_modified is True + instance = api.produce.model_instances[0] + file = instance.files[0] - assert isinstance(api.produce.model_instances[0].files[1], FileInfo) - assert api.produce.model_instances[0].files[1].path == '{}/file_owned_by_multiple_rpms'.format(FILES_DIR) - assert api.produce.model_instances[0].files[1].exists is True - assert api.produce.model_instances[0].files[1].rpm_name == 'rpm1' - assert api.produce.model_instances[0].files[1].is_modified is True + assert api.produce.called == 1 + assert len(instance.files) == 1 + assert file == expected_output - assert len(api.current_logger.warnmsg) == 1 - log_msg = 'The {}/file_owned_by_multiple_rpms file is owned by multiple rpms: rpm1, rpm2.'.format(FILES_DIR) - assert api.current_logger.warnmsg[0] == log_msg +param_list = [(major_version, + FileInfo(path=os.path.join(FILES_DIR, 'file_major_version_{}'.format(major_version)), + exists=False, rpm_name='', is_modified=False), + FileInfo(path=os.path.join(FILES_DIR, 'file_not_rpm_owned'), + exists=True, rpm_name='', is_modified=False)) + for major_version in RHEL_MAJOR_VERSIONS_LIST] -def test_non_existent_file(monkeypatch): - FILES_MOCKED = {'common': ['{}/file_nonexistent'.format(FILES_DIR)]} +@pytest.mark.parametrize('major_version,expected_output,common_expected_output', param_list) +def test_version_file_with_existant_common_file(monkeypatch, major_version, expected_output, common_expected_output): monkeypatch.setattr(api, 'produce', testutils.produce_mocked()) monkeypatch.setattr(api, 'current_logger', testutils.logger_mocked()) - monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: 9) - monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', FILES_MOCKED) + monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: major_version) + monkeypatch.setattr(scansourcefiles, '_get_rpm_name', lambda _: '') + monkeypatch.setattr(scansourcefiles, 'is_modified', lambda _: False) + monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', VERSION_FILES_MOCKED) scansourcefiles.process() - assert isinstance(api.produce.model_instances[0].files[0], FileInfo) - assert api.produce.model_instances[0].files[0].path == '{}/file_nonexistent'.format(FILES_DIR) - assert api.produce.model_instances[0].files[0].exists is False - assert api.produce.model_instances[0].files[0].rpm_name == '' - assert api.produce.model_instances[0].files[0].is_modified is False + instance = api.produce.model_instances[0] + + assert api.produce.called == 1 + assert isinstance(instance, TrackedFilesInfoSource) + # assert only 1 common and 1 version file were scanned + assert len(instance.files) == 2 + + file1 = instance.files[0] + assert file1 == common_expected_output - assert not api.current_logger.warnmsg + file2 = instance.files[1] + assert file2 == expected_output