Skip to content

Commit

Permalink
Refactor tests
Browse files Browse the repository at this point in the history
Test from lower level functions, then after we know
these functios work, we mock them
  • Loading branch information
tomasfratrik committed Mar 18, 2024
1 parent 59e0c6c commit cf72b95
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 85 deletions.
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from leapp.libraries.actor import scansourcefiles
from leapp.libraries.common import testutils
from leapp.libraries.stdlib import api
from leapp.libraries.stdlib import api, CalledProcessError
from leapp.models import FileInfo, TrackedFilesInfoSource

CUR_DIR = os.path.dirname(os.path.abspath(__file__))
Expand All @@ -14,117 +14,150 @@
RHEL_MAJOR_VERSIONS_LIST = list(range(8, 9+1))

TRACKED_FILES_BY_VERSIONS = {
version: ['{}/file_major_version_{}'.format(FILES_DIR, version)]
version: [os.path.join(FILES_DIR, 'file_major_version_{}'.format(version))]
for version in RHEL_MAJOR_VERSIONS_LIST
}

VERSION_FILES_MOCKED = {
'common': [
'{}/file_not_rpm_owned'.format(FILES_DIR),
os.path.join(FILES_DIR, 'file_not_rpm_owned')
]
}

VERSION_FILES_MOCKED.update(TRACKED_FILES_BY_VERSIONS)


@pytest.mark.parametrize('major_version', RHEL_MAJOR_VERSIONS_LIST)
def test_version_files_with_common(monkeypatch, major_version):
monkeypatch.setattr(api, 'produce', testutils.produce_mocked())
@pytest.mark.parametrize('run_output,expected_output', (
({'exit_code': 0}, False),
({'exit_code': 1, 'stdout': 'missing'}, True),
({'exit_code': 1, 'stdout': '5'}, True),
({'exit_code': 1, 'stdout': '5 missing'}, True),
({'exit_code': 1, 'stdout': 'fi5ve'}, True),
({'exit_code': 1, 'stdout': 'NoFiveNoMissing'}, False),
))
def test_is_modified(monkeypatch, run_output, expected_output):
def mocked_run(cmd, *args, **kwargs):
assert cmd == ['rpm', '-Vf', '--nomtime', input_file]
return run_output

input_file = os.path.join(FILES_DIR, 'mocked_file')
monkeypatch.setattr(scansourcefiles, 'run', mocked_run)

assert scansourcefiles.is_modified(input_file) == expected_output


@pytest.mark.parametrize('run_output,expected_output', (
({'stdout': ['rpm']}, 'rpm'),
({'stdout': ['rpm1', 'rpm2']}, 'rpm1'),
(CalledProcessError, ''),
))
def test_get_rpm_name(monkeypatch, run_output, expected_output):
def mocked_run(cmd, *args, **kwargs):
assert cmd == ['rpm', '-qf', '--queryformat', r'%{NAME}\n', input_file]
if raise_error:
raise CalledProcessError("mocked error", cmd, "result")
return run_output

raise_error = run_output and not isinstance(run_output, dict)
input_file = os.path.join(FILES_DIR, 'mocked_file')

monkeypatch.setattr(scansourcefiles, 'run', mocked_run)
monkeypatch.setattr(api, 'current_logger', testutils.logger_mocked())
monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: major_version)
monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', VERSION_FILES_MOCKED)
scansourcefiles.process()

assert api.produce.called == 1
assert isinstance(api.produce.model_instances[0], TrackedFilesInfoSource)
# assert only files from correct major version were scanned, so common files + 1 (version file)
assert len(api.produce.model_instances[0].files) == len(VERSION_FILES_MOCKED['common']) + 1

assert isinstance(api.produce.model_instances[0].files[0], FileInfo)
assert api.produce.model_instances[0].files[0].path == '{}/file_not_rpm_owned'.format(FILES_DIR)
assert api.produce.model_instances[0].files[0].exists is True
assert api.produce.model_instances[0].files[0].rpm_name == ''
assert api.produce.model_instances[0].files[0].is_modified is False

INDEX_OF_VERSION_FILE = len(VERSION_FILES_MOCKED['common'])
assert isinstance(api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE], FileInfo)
version_file_path = '{}/file_major_version_{}'.format(FILES_DIR, major_version)
assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].path == version_file_path
assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].exists is False
assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].rpm_name == ''
assert api.produce.model_instances[0].files[INDEX_OF_VERSION_FILE].is_modified is False

assert not api.current_logger.errmsg


def test_modified_rpm_owned_files(monkeypatch):

class run_mocked(object):

def __init__(self):
self.called = 0

def __call__(self, args, **kwargs):
self.called += 1
# run to check if file is owned by rpm
if '-qf' in args:
if '{}/file_rpm_owned'.format(FILES_DIR) in args:
return {'stdout': ['rpm']}
if '{}/file_owned_by_multiple_rpms'.format(FILES_DIR) in args:
return {'stdout': ['rpm1', 'rpm2']}
return {'stdout': []}
# run to check if file was modified
if '-Vf' in args:
return {'exit_code': 0}
return {'exit_code': 1, 'stdout': []}

FILES_MOCKED = {
'common': [
'{}/file_rpm_owned'.format(FILES_DIR),
'{}/file_owned_by_multiple_rpms'.format(FILES_DIR),
],
}
assert scansourcefiles._get_rpm_name(input_file) == expected_output

if raise_error:
return

if len(run_output['stdout']) > 1:
assert len(api.current_logger.warnmsg) == 1
expected_warnmsg = 'The {} file is owned by multiple rpms: rpm1, rpm2.'.format(input_file)
assert api.current_logger.warnmsg[0] == expected_warnmsg
else:
assert not api.current_logger.warnmsg


@pytest.mark.parametrize('input_file,rpm_name,modified', (
(os.path.join(FILES_DIR, 'file_not_existant'), 'rpm', True),
(os.path.join(FILES_DIR, 'file_not_rpm_owned'), '', False)
))
def test_scan_file(monkeypatch, input_file, rpm_name, modified):
monkeypatch.setattr(scansourcefiles, 'is_modified', lambda _: modified)
monkeypatch.setattr(scansourcefiles, '_get_rpm_name', lambda _: rpm_name)

file_info = scansourcefiles.scan_file(input_file)

assert isinstance(file_info, FileInfo)
assert file_info.path == input_file
assert file_info.exists == os.path.exists(input_file)
assert file_info.rpm_name == rpm_name
assert file_info.is_modified == modified


@pytest.mark.parametrize('input_files,expected_output', (
([], []),
(['file1'], [FileInfo(path='file1', exists=False, rpm_name='', is_modified=False)]),
(['file1', 'file2'], [FileInfo(path='file1', exists=False, rpm_name='', is_modified=False),
FileInfo(path='file2', exists=False, rpm_name='', is_modified=False)]),
))
def test_scan_files(monkeypatch, input_files, expected_output):
def mocked_scan_file(input_file):
return FileInfo(path=input_file, exists=False, rpm_name='', is_modified=False)

monkeypatch.setattr(scansourcefiles, 'scan_file', mocked_scan_file)

assert scansourcefiles.scan_files(input_files) == expected_output


@pytest.mark.parametrize('input_file,rpm_name,expected_output', (
(os.path.join(FILES_DIR, 'file_rpm_owned'), 'rpm',
FileInfo(path=os.path.join(FILES_DIR, 'file_rpm_owned'), exists=True, rpm_name='rpm', is_modified=False)),
))
def test_rpm_owned_files(monkeypatch, input_file, rpm_name, expected_output):
monkeypatch.setattr(api, 'produce', testutils.produce_mocked())
monkeypatch.setattr(api, 'current_logger', testutils.logger_mocked())
monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: 9)
monkeypatch.setattr(scansourcefiles, 'run', run_mocked())
monkeypatch.setattr(scansourcefiles, 'is_modified', lambda x: True)
monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', FILES_MOCKED)
monkeypatch.setattr(scansourcefiles, '_get_rpm_name', lambda _: rpm_name)
monkeypatch.setattr(scansourcefiles, 'is_modified', lambda _: False)
monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', {'common': [input_file]})

scansourcefiles.process()

assert isinstance(api.produce.model_instances[0].files[0], FileInfo)
assert api.produce.model_instances[0].files[0].path == '{}/file_rpm_owned'.format(FILES_DIR)
assert api.produce.model_instances[0].files[0].exists is True
assert api.produce.model_instances[0].files[0].rpm_name == 'rpm'
assert api.produce.model_instances[0].files[0].is_modified is True
instance = api.produce.model_instances[0]
file = instance.files[0]

assert isinstance(api.produce.model_instances[0].files[1], FileInfo)
assert api.produce.model_instances[0].files[1].path == '{}/file_owned_by_multiple_rpms'.format(FILES_DIR)
assert api.produce.model_instances[0].files[1].exists is True
assert api.produce.model_instances[0].files[1].rpm_name == 'rpm1'
assert api.produce.model_instances[0].files[1].is_modified is True
assert api.produce.called == 1
assert len(instance.files) == 1
assert file == expected_output

assert len(api.current_logger.warnmsg) == 1
log_msg = 'The {}/file_owned_by_multiple_rpms file is owned by multiple rpms: rpm1, rpm2.'.format(FILES_DIR)
assert api.current_logger.warnmsg[0] == log_msg

param_list = [(major_version,
FileInfo(path=os.path.join(FILES_DIR, 'file_major_version_{}'.format(major_version)),
exists=False, rpm_name='', is_modified=False),
FileInfo(path=os.path.join(FILES_DIR, 'file_not_rpm_owned'),
exists=True, rpm_name='', is_modified=False))
for major_version in RHEL_MAJOR_VERSIONS_LIST]

def test_non_existent_file(monkeypatch):
FILES_MOCKED = {'common': ['{}/file_nonexistent'.format(FILES_DIR)]}

@pytest.mark.parametrize('major_version,expected_output,common_expected_output', param_list)
def test_version_file_with_existant_common_file(monkeypatch, major_version, expected_output, common_expected_output):
monkeypatch.setattr(api, 'produce', testutils.produce_mocked())
monkeypatch.setattr(api, 'current_logger', testutils.logger_mocked())
monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: 9)
monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', FILES_MOCKED)
monkeypatch.setattr(scansourcefiles, 'get_source_major_version', lambda: major_version)
monkeypatch.setattr(scansourcefiles, '_get_rpm_name', lambda _: '')
monkeypatch.setattr(scansourcefiles, 'is_modified', lambda _: False)
monkeypatch.setattr(scansourcefiles, 'TRACKED_FILES', VERSION_FILES_MOCKED)

scansourcefiles.process()

assert isinstance(api.produce.model_instances[0].files[0], FileInfo)
assert api.produce.model_instances[0].files[0].path == '{}/file_nonexistent'.format(FILES_DIR)
assert api.produce.model_instances[0].files[0].exists is False
assert api.produce.model_instances[0].files[0].rpm_name == ''
assert api.produce.model_instances[0].files[0].is_modified is False
instance = api.produce.model_instances[0]

assert api.produce.called == 1
assert isinstance(instance, TrackedFilesInfoSource)
# assert only 1 common and 1 version file were scanned
assert len(instance.files) == 2

file1 = instance.files[0]
assert file1 == common_expected_output

assert not api.current_logger.warnmsg
file2 = instance.files[1]
assert file2 == expected_output

0 comments on commit cf72b95

Please sign in to comment.