diff --git a/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py b/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py index 40b845b01d..efffaa8719 100644 --- a/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py +++ b/repos/system_upgrade/common/actors/inhibitwhenluks/actor.py @@ -1,7 +1,7 @@ -from leapp import reporting from leapp.actors import Actor -from leapp.models import CephInfo, StorageInfo -from leapp.reporting import create_report, Report +from leapp.libraries.actor.inhibitwhenluks import check_invalid_luks_devices +from leapp.models import CephInfo, LuksDumps, StorageInfo, TargetUserSpaceUpgradeTasks +from leapp.reporting import Report from leapp.tags import ChecksPhaseTag, IPUWorkflowTag @@ -13,28 +13,9 @@ class InhibitWhenLuks(Actor): """ name = 'check_luks_and_inhibit' - consumes = (StorageInfo, CephInfo) - produces = (Report,) + consumes = (CephInfo, LuksDumps, StorageInfo) + produces = (Report, TargetUserSpaceUpgradeTasks) tags = (ChecksPhaseTag, IPUWorkflowTag) def process(self): - # If encrypted Ceph volumes present, check if there are more encrypted disk in lsblk than Ceph vol - ceph_vol = [] - try: - ceph_info = next(self.consume(CephInfo)) - if ceph_info: - ceph_vol = ceph_info.encrypted_volumes[:] - except StopIteration: - pass - - for storage_info in self.consume(StorageInfo): - for blk in storage_info.lsblk: - if blk.tp == 'crypt' and blk.name not in ceph_vol: - create_report([ - reporting.Title('LUKS encrypted partition detected'), - reporting.Summary('Upgrading system with encrypted partitions is not supported'), - reporting.Severity(reporting.Severity.HIGH), - reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]), - reporting.Groups([reporting.Groups.INHIBITOR]), - ]) - break + check_invalid_luks_devices() diff --git a/repos/system_upgrade/common/actors/inhibitwhenluks/libraries/inhibitwhenluks.py b/repos/system_upgrade/common/actors/inhibitwhenluks/libraries/inhibitwhenluks.py new file mode 100644 index 0000000000..9c79ee4dc5 --- /dev/null +++ b/repos/system_upgrade/common/actors/inhibitwhenluks/libraries/inhibitwhenluks.py @@ -0,0 +1,166 @@ +from leapp import reporting +from leapp.libraries.common.config.version import get_source_major_version +from leapp.libraries.stdlib import api +from leapp.models import ( + CephInfo, + DracutModule, + LuksDumps, + StorageInfo, + TargetUserSpaceUpgradeTasks, + UpgradeInitramfsTasks +) +from leapp.reporting import create_report + + +# https://red.ht/clevis-tpm2-luks-auto-unlock-rhel8 +# https://red.ht/clevis-tpm2-luks-auto-unlock-rhel9 +# https://red.ht/convert-to-luks2-rhel8 +# https://red.ht/convert-to-luks2-rhel9 +CLEVIS_DOC_URL_FMT = 'https://red.ht/clevis-tpm2-luks-auto-unlock-rhel{}' +LUKS2_CONVERT_DOC_URL_FMT = 'https://red.ht/convert-to-luks2-rhel{}' + +FMT_LIST_SEPARATOR = '\n - ' + + +def _formatted_list_output(input_list, sep=FMT_LIST_SEPARATOR): + return ['{}{}'.format(sep, item) for item in input_list] + + +def _at_least_one_tpm_token(luks_dump): + return any([token.token_type == "clevis-tpm2" for token in luks_dump.tokens]) + + +def _get_ceph_volumes(): + ceph_info = next(api.consume(CephInfo), None) + return ceph_info.encrypted_volumes[:] if ceph_info else [] + + +def apply_obsoleted_check_ipu_7_8(): + ceph_vol = _get_ceph_volumes() + for storage_info in api.consume(StorageInfo): + for blk in storage_info.lsblk: + if blk.tp == 'crypt' and blk.name not in ceph_vol: + create_report([ + reporting.Title('LUKS encrypted partition detected'), + reporting.Summary('Upgrading system with encrypted partitions is not supported'), + reporting.Severity(reporting.Severity.HIGH), + reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]), + reporting.Groups([reporting.Groups.INHIBITOR]), + ]) + break + + +def report_inhibitor(luks1_partitions, no_tpm2_partitions): + source_major_version = get_source_major_version() + clevis_doc_url = CLEVIS_DOC_URL_FMT.format(source_major_version) + luks2_convert_doc_url = LUKS2_CONVERT_DOC_URL_FMT.format(source_major_version) + summary = ( + 'We have detected LUKS encrypted volumes that do not meet current' + ' criteria to be able to proceed the in-place upgrade process.' + ' Right now the upgrade process requires for encrypted storage to be' + ' in LUKS2 format configured with Clevis TPM 2.0.' + ) + + report_hints = [] + + if luks1_partitions: + + summary += ( + '\n\nSince RHEL 8 the default format for LUKS encryption is LUKS2.' + ' Despite the old LUKS1 format is still supported on RHEL systems' + ' it has some limitations in comparison to LUKS2 and during initial' + ' investigation we have decided to support for upgrades only LUKS2' + ' format.' + ' The following LUKS1 partitions have been discovered on your system:{}' + .format(_formatted_list_output(luks1_partitions)) + ) + report_hints.append(reporting.Remediation( + hint=( + 'Convert your LUKS1 encrypted devices to LUKS2 and bind it to TPM2 using clevis.' + ' If this is not possible in your case consider clean installation' + ' of the target RHEL system instead.' + ) + )) + report_hints.append(reporting.ExternalLink( + url=luks2_convert_doc_url, + title='LUKS versions in RHEL: Conversion' + )) + + if no_tpm2_partitions: + summary += ( + '\n\nCurrently we require the process to be non-interactive and' + ' offline. For this reason we require automatic unlock of' + ' encrypted devices during the upgrade process.' + ' Currently we support automatic unlocking during the upgrade only' + ' for volumes bound to Clevis TPM2 token.' + ' The following LUKS2 devices without Clevis TPM2 token ' + ' have been discovered on your system: {}' + .format(_formatted_list_output(no_tpm2_partitions)) + ) + + report_hints.append(reporting.Remediation( + hint=( + 'Add Clevis TPM2 binding to LUKS devices.' + ' If some LUKS devices use still the old LUKS1 format, convert' + ' them to LUKS2 prior to binding.' + ) + )) + report_hints.append(reporting.ExternalLink( + url=clevis_doc_url, + title='Configuring manual enrollment of LUKS-encrypted volumes by using a TPM 2.0 policy' + ) + ) + create_report([ + reporting.Title('Detected LUKS devices unsuitable for in-place upgrade.'), + reporting.Summary(summary), + reporting.Severity(reporting.Severity.HIGH), + reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]), + reporting.Groups([reporting.Groups.INHIBITOR]), + ] + report_hints) + + +def check_invalid_luks_devices(): + if get_source_major_version() == '7': + # NOTE: keeping unchanged behaviour for IPU 7 -> 8 + apply_obsoleted_check_ipu_7_8() + return + + luks_dumps = next(api.consume(LuksDumps), None) + if not luks_dumps: + api.current_logger().debug('No LUKS volumes detected. Skipping.') + return + + luks1_partitions = [] + no_tpm2_partitions = [] + ceph_vol = _get_ceph_volumes() + for luks_dump in luks_dumps.dumps: + # if the device is managed by ceph, don't inhibit + if luks_dump.device_name in ceph_vol: + api.current_logger().debug('Skipping LUKS CEPH volume: {}'.format(luks_dump.device_name)) + continue + + if luks_dump.version == 1: + luks1_partitions.append(luks_dump.device_name) + elif luks_dump.version == 2 and not _at_least_one_tpm_token(luks_dump): + no_tpm2_partitions.append(luks_dump.device_name) + + if luks1_partitions or no_tpm2_partitions: + report_inhibitor(luks1_partitions, no_tpm2_partitions) + else: + required_crypt_rpms = [ + 'clevis', + 'clevis-dracut', + 'clevis-systemd', + 'clevis-udisks2', + 'clevis-luks', + 'cryptsetup', + 'tpm2-tss', + 'tpm2-tools', + 'tpm2-abrmd' + ] + api.produce(TargetUserSpaceUpgradeTasks(install_rpms=required_crypt_rpms)) + api.produce(UpgradeInitramfsTasks(include_dracut_modules=[ + DracutModule(name='clevis'), + DracutModule(name='clevis-pin-tpm2') + ]) + ) diff --git a/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py b/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py index 405a34295b..031a2d37f0 100644 --- a/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py +++ b/repos/system_upgrade/common/actors/inhibitwhenluks/tests/test_inhibitwhenluks.py @@ -1,34 +1,117 @@ -from leapp.models import CephInfo, LsblkEntry, StorageInfo +from leapp.libraries.common.config import version +from leapp.models import CephInfo, LuksDump, LuksDumps, LuksToken, TargetUserSpaceUpgradeTasks from leapp.reporting import Report from leapp.snactor.fixture import current_actor_context from leapp.utils.report import is_inhibitor -def test_actor_with_luks(current_actor_context): - with_luks = [LsblkEntry(name='luks-132', kname='kname1', maj_min='253:0', rm='0', size='10G', bsize=10*(1 << 39), - ro='0', tp='crypt', mountpoint='', parent_name='', parent_path='')] +def test_actor_with_luks1(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_target_major_version', lambda: '8') + current_actor_context.feed(CephInfo(encrypted_volumes=[])) + luks_dump = LuksDump( + version=1, + uuid="dd09e6d4-b595-4f1c-80b8-fd47540e6464", + device_path="/dev/sda", + device_name="sda") + luks_dumps = LuksDumps(dumps=[luks_dump]) + current_actor_context.feed(luks_dumps) + current_actor_context.run() + assert current_actor_context.consume(Report) + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks) + + assert report_fields['title'].startswith("Invalid LUKS encrypted partition detected") + assert "LUKS1 partitions have been discovered on your system" in report_fields['summary'] + assert luks_dump.device_name in report_fields['summary'] - current_actor_context.feed(StorageInfo(lsblk=with_luks)) + +def test_actor_with_luks2(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_target_major_version', lambda: '8') + current_actor_context.feed(CephInfo(encrypted_volumes=[])) + luks_dump = LuksDump( + version=2, + uuid="27b57c75-9adf-4744-ab04-9eb99726a301", + device_path="/dev/sda", + device_name="sda") + luks_dumps = LuksDumps(dumps=[luks_dump]) + current_actor_context.feed(luks_dumps) current_actor_context.run() assert current_actor_context.consume(Report) report_fields = current_actor_context.consume(Report)[0].report assert is_inhibitor(report_fields) + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks) + assert report_fields['title'].startswith("Invalid LUKS encrypted partition detected") + assert "LUKS2 devices without clevis TPM2 token have been discovered on your system" in report_fields['summary'] + assert luks_dump.device_name in report_fields['summary'] -def test_actor_with_luks_ceph_only(current_actor_context): - with_luks = [LsblkEntry(name='luks-132', kname='kname1', maj_min='253:0', rm='0', size='10G', bsize=10*(1 << 39), - ro='0', tp='crypt', mountpoint='', parent_name='', parent_path='')] - ceph_volume = ['luks-132'] - current_actor_context.feed(StorageInfo(lsblk=with_luks)) - current_actor_context.feed(CephInfo(encrypted_volumes=ceph_volume)) + +def test_actor_with_luks2_invalid_token(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_target_major_version', lambda: '8') + current_actor_context.feed(CephInfo(encrypted_volumes=[])) + luks_dump = LuksDump( + version=2, + uuid="dc1dbe37-6644-4094-9839-8fc5dcbec0c6", + device_path="/dev/sda", + device_name="sda", + tokens=[LuksToken(token_id=0, keyslot=1, token_type="clevis")]) + luks_dumps = LuksDumps(dumps=[luks_dump]) + current_actor_context.feed(luks_dumps) + current_actor_context.run() + assert current_actor_context.consume(Report) + report_fields = current_actor_context.consume(Report)[0].report + assert is_inhibitor(report_fields) + + assert report_fields['title'].startswith("Invalid LUKS encrypted partition detected") + assert "LUKS2 devices without clevis TPM2 token have been discovered on your system" in report_fields['summary'] + assert luks_dump.device_name in report_fields['summary'] + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks) + + +def test_actor_with_luks2_clevis_tpm_token(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_target_major_version', lambda: '8') + current_actor_context.feed(CephInfo(encrypted_volumes=[])) + luks_dump = LuksDump( + version=2, + uuid="83050bd9-61c6-4ff0-846f-bfd3ac9bfc67", + device_path="/dev/sda", + device_name="sda", + tokens=[LuksToken(token_id=0, keyslot=1, token_type="clevis-tpm2")]) + luks_dumps = LuksDumps(dumps=[luks_dump]) + current_actor_context.feed(luks_dumps) current_actor_context.run() assert not current_actor_context.consume(Report) + upgrade_tasks = current_actor_context.consume(TargetUserSpaceUpgradeTasks) + assert len(upgrade_tasks) == 1 + assert set(upgrade_tasks[0].install_rpms) == set([ + 'clevis', + 'clevis-dracut', + 'clevis-systemd', + 'clevis-udisks2', + 'clevis-luks', + 'cryptsetup', + 'tpm2-tss', + 'tpm2-tools', + 'tpm2-abrmd' + ]) -def test_actor_without_luks(current_actor_context): - without_luks = [LsblkEntry(name='sda1', kname='sda1', maj_min='8:0', rm='0', size='10G', bsize=10*(1 << 39), - ro='0', tp='part', mountpoint='/boot', parent_name='', parent_path='')] - current_actor_context.feed(StorageInfo(lsblk=without_luks)) +def test_actor_with_luks2_ceph(monkeypatch, current_actor_context): + monkeypatch.setattr(version, 'get_target_major_version', lambda: '8') + ceph_volume = ['sda'] + current_actor_context.feed(CephInfo(encrypted_volumes=ceph_volume)) + luks_dump = LuksDump( + version=2, + uuid="0edb8c11-1a04-4abd-a12d-93433ee7b8d8", + device_path="/dev/sda", + device_name="sda", + tokens=[LuksToken(token_id=0, keyslot=1, token_type="clevis")]) + luks_dumps = LuksDumps(dumps=[luks_dump]) + current_actor_context.feed(luks_dumps) current_actor_context.run() assert not current_actor_context.consume(Report) + + # make sure we don't needlessly include clevis packages, when there is no clevis token + assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks)