Skip to content

Commit

Permalink
InhibitWhenLuks: modify the inhibitor to use LuksDump
Browse files Browse the repository at this point in the history
Consume LuksDump messages to decide whether the upgrade process should
be inhibited. If all devices are LUKS2 with clevis TPM2 binding, don't
inhibit.
  • Loading branch information
danzatt authored and pirat89 committed Aug 2, 2024
1 parent 7002c0d commit ffcb92b
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 50 deletions.
31 changes: 6 additions & 25 deletions repos/system_upgrade/common/actors/inhibitwhenluks/actor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from leapp import reporting
from leapp.actors import Actor
from leapp.models import CephInfo, StorageInfo
from leapp.reporting import create_report, Report
from leapp.libraries.actor.inhibitwhenluks import check_invalid_luks_devices
from leapp.models import CephInfo, LuksDumps, TargetUserSpaceUpgradeTasks
from leapp.reporting import Report
from leapp.tags import ChecksPhaseTag, IPUWorkflowTag


Expand All @@ -13,28 +13,9 @@ class InhibitWhenLuks(Actor):
"""

name = 'check_luks_and_inhibit'
consumes = (StorageInfo, CephInfo)
produces = (Report,)
consumes = (LuksDumps, CephInfo)
produces = (Report, TargetUserSpaceUpgradeTasks)
tags = (ChecksPhaseTag, IPUWorkflowTag)

def process(self):
# If encrypted Ceph volumes present, check if there are more encrypted disk in lsblk than Ceph vol
ceph_vol = []
try:
ceph_info = next(self.consume(CephInfo))
if ceph_info:
ceph_vol = ceph_info.encrypted_volumes[:]
except StopIteration:
pass

for storage_info in self.consume(StorageInfo):
for blk in storage_info.lsblk:
if blk.tp == 'crypt' and blk.name not in ceph_vol:
create_report([
reporting.Title('LUKS encrypted partition detected'),
reporting.Summary('Upgrading system with encrypted partitions is not supported'),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
])
break
check_invalid_luks_devices()
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
from leapp import reporting
from leapp.libraries.common.config.version import get_target_major_version
from leapp.libraries.stdlib import api
from leapp.models import CephInfo, DracutModule, LuksDumps, TargetUserSpaceUpgradeTasks, UpgradeInitramfsTasks
from leapp.reporting import create_report

CLEVIS_RHEL8_DOC_URL = 'https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/8/html/security_hardening/configuring-automated-unlocking-of-encrypted-volumes-using-policy-based-decryption_security-hardening#configuring-manual-enrollment-of-volumes-using-tpm2_configuring-automated-unlocking-of-encrypted-volumes-using-policy-based-decryption' # noqa: E501; pylint: disable=line-too-long
CLEVIS_RHEL9_DOC_URL = 'https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/9/html/security_hardening/configuring-automated-unlocking-of-encrypted-volumes-using-policy-based-decryption_security-hardening#configuring-manual-enrollment-of-volumes-using-tpm2_configuring-automated-unlocking-of-encrypted-volumes-using-policy-based-decryption' # noqa: E501; pylint: disable=line-too-long
LUKS2_CONVERT_RHEL8_DOC_URL = 'https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/8/html/security_hardening/encrypting-block-devices-using-luks_security-hardening#luks-versions-in-rhel_encrypting-block-devices-using-luks' # noqa: E501; pylint: disable=line-too-long
LUKS2_CONVERT_RHEL9_DOC_URL = 'https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/9/html/security_hardening/encrypting-block-devices-using-luks_security-hardening#luks-versions-in-rhel_encrypting-block-devices-using-luks' # noqa: E501; pylint: disable=line-too-long
FMT_LIST_SEPARATOR = '\n - '


def _at_least_one_tpm_token(luks_dump):
return any([token.token_type == "clevis-tpm2" for token in luks_dump.tokens])


def check_invalid_luks_devices():
target_major_version = get_target_major_version()
if target_major_version == '8':
clevis_doc_url = CLEVIS_RHEL8_DOC_URL
luks2_convert_doc_url = LUKS2_CONVERT_RHEL8_DOC_URL
elif target_major_version == '9':
clevis_doc_url = CLEVIS_RHEL9_DOC_URL
luks2_convert_doc_url = LUKS2_CONVERT_RHEL9_DOC_URL
else:
create_report([
reporting.Title('LUKS encrypted partition detected'),
reporting.Summary('Upgrading system with encrypted partitions is not supported'),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
])
return
ceph_vol = []
try:
ceph_info = next(api.consume(CephInfo), None)
if ceph_info:
ceph_vol = ceph_info.encrypted_volumes[:]
except StopIteration:
pass

luks_dumps = next(api.consume(LuksDumps), None)
if luks_dumps is None:
return

for luks_dump in luks_dumps.dumps:
# if the device is managed by ceph, don't inhibit
if luks_dump.device_name in ceph_vol:
continue

list_luks1_partitions = []
list_no_tpm2_partitions = []

if luks_dump.version == 1:
list_luks1_partitions.append(luks_dump.device_name)
elif luks_dump.version == 2 and not _at_least_one_tpm_token(luks_dump):
list_no_tpm2_partitions.append(luks_dump.device_name)

if list_luks1_partitions or list_no_tpm2_partitions:
summary = (
'Only systems where all encrypted devices are LUKS2 '
'devices with Clevis TPM 2.0 token can be updated.'
)
report_hints = []

if list_luks1_partitions:
luks1_partitions_text = ''
for partition in list_luks1_partitions:
luks1_partitions_text += '{0}{1}'.format(FMT_LIST_SEPARATOR, partition)

summary += '\nThe following LUKS1 partitions have been discovered on your system: '
summary += luks1_partitions_text
report_hints.append(reporting.Remediation(
hint=("Convert your LUKS1 encrypted partition to LUKS2 and bind it to TPM2 using clevis.")
))
report_hints.append(reporting.ExternalLink(
url=luks2_convert_doc_url,
title='LUKS versions in RHEL: Conversion'
))

if list_no_tpm2_partitions:
no_tpm2_partitions_text = ''
for partition in list_no_tpm2_partitions:
no_tpm2_partitions_text += '{0}{1}'.format(FMT_LIST_SEPARATOR, partition)

summary += ('\nThe following LUKS2 devices without clevis TPM2 token '
'have been discovered on your system:')
summary += no_tpm2_partitions_text

report_hints.append(reporting.Remediation(hint="Add Clevis TPM2 binding to the volume."))
report_hints.append(reporting.ExternalLink(
url=clevis_doc_url,
title='Configuring manual enrollment of LUKS-encrypted volumes by using a TPM 2.0 policy'
)
)

create_report([
reporting.Title('Invalid LUKS encrypted partition detected'),
reporting.Summary(summary),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
] + report_hints)
else:
required_crypt_rpms = [
'clevis',
'clevis-dracut',
'clevis-systemd',
'clevis-udisks2',
'clevis-luks',
'cryptsetup',
'tpm2-tss',
'tpm2-tools',
'tpm2-abrmd'
]
api.produce(TargetUserSpaceUpgradeTasks(install_rpms=required_crypt_rpms))
api.produce(UpgradeInitramfsTasks(include_dracut_modules=[
DracutModule(name='clevis'), DracutModule(name='clevis-pin-tpm2')]))
Original file line number Diff line number Diff line change
@@ -1,34 +1,117 @@
from leapp.models import CephInfo, LsblkEntry, StorageInfo
from leapp.libraries.common.config import version
from leapp.models import CephInfo, LuksDump, LuksDumps, LuksToken, TargetUserSpaceUpgradeTasks
from leapp.reporting import Report
from leapp.snactor.fixture import current_actor_context
from leapp.utils.report import is_inhibitor


def test_actor_with_luks(current_actor_context):
with_luks = [LsblkEntry(name='luks-132', kname='kname1', maj_min='253:0', rm='0', size='10G', bsize=10*(1 << 39),
ro='0', tp='crypt', mountpoint='', parent_name='', parent_path='')]
def test_actor_with_luks1(monkeypatch, current_actor_context):
monkeypatch.setattr(version, 'get_target_major_version', lambda: '8')
current_actor_context.feed(CephInfo(encrypted_volumes=[]))
luks_dump = LuksDump(
version=1,
uuid="dd09e6d4-b595-4f1c-80b8-fd47540e6464",
device_path="/dev/sda",
device_name="sda")
luks_dumps = LuksDumps(dumps=[luks_dump])
current_actor_context.feed(luks_dumps)
current_actor_context.run()
assert current_actor_context.consume(Report)
report_fields = current_actor_context.consume(Report)[0].report
assert is_inhibitor(report_fields)
assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks)

assert report_fields['title'].startswith("Invalid LUKS encrypted partition detected")
assert "LUKS1 partitions have been discovered on your system" in report_fields['summary']
assert luks_dump.device_name in report_fields['summary']

current_actor_context.feed(StorageInfo(lsblk=with_luks))

def test_actor_with_luks2(monkeypatch, current_actor_context):
monkeypatch.setattr(version, 'get_target_major_version', lambda: '8')
current_actor_context.feed(CephInfo(encrypted_volumes=[]))
luks_dump = LuksDump(
version=2,
uuid="27b57c75-9adf-4744-ab04-9eb99726a301",
device_path="/dev/sda",
device_name="sda")
luks_dumps = LuksDumps(dumps=[luks_dump])
current_actor_context.feed(luks_dumps)
current_actor_context.run()
assert current_actor_context.consume(Report)
report_fields = current_actor_context.consume(Report)[0].report
assert is_inhibitor(report_fields)
assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks)

assert report_fields['title'].startswith("Invalid LUKS encrypted partition detected")
assert "LUKS2 devices without clevis TPM2 token have been discovered on your system" in report_fields['summary']
assert luks_dump.device_name in report_fields['summary']

def test_actor_with_luks_ceph_only(current_actor_context):
with_luks = [LsblkEntry(name='luks-132', kname='kname1', maj_min='253:0', rm='0', size='10G', bsize=10*(1 << 39),
ro='0', tp='crypt', mountpoint='', parent_name='', parent_path='')]
ceph_volume = ['luks-132']
current_actor_context.feed(StorageInfo(lsblk=with_luks))
current_actor_context.feed(CephInfo(encrypted_volumes=ceph_volume))

def test_actor_with_luks2_invalid_token(monkeypatch, current_actor_context):
monkeypatch.setattr(version, 'get_target_major_version', lambda: '8')
current_actor_context.feed(CephInfo(encrypted_volumes=[]))
luks_dump = LuksDump(
version=2,
uuid="dc1dbe37-6644-4094-9839-8fc5dcbec0c6",
device_path="/dev/sda",
device_name="sda",
tokens=[LuksToken(token_id=0, keyslot=1, token_type="clevis")])
luks_dumps = LuksDumps(dumps=[luks_dump])
current_actor_context.feed(luks_dumps)
current_actor_context.run()
assert current_actor_context.consume(Report)
report_fields = current_actor_context.consume(Report)[0].report
assert is_inhibitor(report_fields)

assert report_fields['title'].startswith("Invalid LUKS encrypted partition detected")
assert "LUKS2 devices without clevis TPM2 token have been discovered on your system" in report_fields['summary']
assert luks_dump.device_name in report_fields['summary']
assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks)


def test_actor_with_luks2_clevis_tpm_token(monkeypatch, current_actor_context):
monkeypatch.setattr(version, 'get_target_major_version', lambda: '8')
current_actor_context.feed(CephInfo(encrypted_volumes=[]))
luks_dump = LuksDump(
version=2,
uuid="83050bd9-61c6-4ff0-846f-bfd3ac9bfc67",
device_path="/dev/sda",
device_name="sda",
tokens=[LuksToken(token_id=0, keyslot=1, token_type="clevis-tpm2")])
luks_dumps = LuksDumps(dumps=[luks_dump])
current_actor_context.feed(luks_dumps)
current_actor_context.run()
assert not current_actor_context.consume(Report)

upgrade_tasks = current_actor_context.consume(TargetUserSpaceUpgradeTasks)
assert len(upgrade_tasks) == 1
assert set(upgrade_tasks[0].install_rpms) == set([
'clevis',
'clevis-dracut',
'clevis-systemd',
'clevis-udisks2',
'clevis-luks',
'cryptsetup',
'tpm2-tss',
'tpm2-tools',
'tpm2-abrmd'
])

def test_actor_without_luks(current_actor_context):
without_luks = [LsblkEntry(name='sda1', kname='sda1', maj_min='8:0', rm='0', size='10G', bsize=10*(1 << 39),
ro='0', tp='part', mountpoint='/boot', parent_name='', parent_path='')]

current_actor_context.feed(StorageInfo(lsblk=without_luks))
def test_actor_with_luks2_ceph(monkeypatch, current_actor_context):
monkeypatch.setattr(version, 'get_target_major_version', lambda: '8')
ceph_volume = ['sda']
current_actor_context.feed(CephInfo(encrypted_volumes=ceph_volume))
luks_dump = LuksDump(
version=2,
uuid="0edb8c11-1a04-4abd-a12d-93433ee7b8d8",
device_path="/dev/sda",
device_name="sda",
tokens=[LuksToken(token_id=0, keyslot=1, token_type="clevis")])
luks_dumps = LuksDumps(dumps=[luks_dump])
current_actor_context.feed(luks_dumps)
current_actor_context.run()
assert not current_actor_context.consume(Report)

# make sure we don't needlessly include clevis packages, when there is no clevis token
assert not current_actor_context.consume(TargetUserSpaceUpgradeTasks)
6 changes: 3 additions & 3 deletions repos/system_upgrade/common/actors/luksscanner/actor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from leapp.actors import Actor
from leapp.libraries.actor import luksscanner
from leapp.models import LuksDump, StorageInfo
from leapp.models import LuksDumps, StorageInfo
from leapp.reporting import Report
from leapp.tags import FactsPhaseTag, IPUWorkflowTag

Expand All @@ -16,8 +16,8 @@ class LuksScanner(Actor):

name = 'luks_scanner'
consumes = (StorageInfo,)
produces = (Report, LuksDump)
produces = (Report, LuksDumps)
tags = (IPUWorkflowTag, FactsPhaseTag)

def process(self):
self.produce(*luksscanner.get_luks_dumps())
self.produce(luksscanner.get_luks_dumps_model())
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from leapp.libraries import stdlib
from leapp.libraries.actor.luksdump_parser import LuksDumpParser
from leapp.libraries.stdlib import api
from leapp.models import LuksDump, LuksToken, StorageInfo
from leapp.models import LuksDump, LuksDumps, LuksToken, StorageInfo


def aslist(f):
Expand Down Expand Up @@ -118,3 +118,7 @@ def get_luks_dumps():
for blk in storage_info.lsblk:
if blk.tp == 'crypt' and blk.parent_path:
yield get_luks_dump_by_device(blk.parent_path, blk.parent_name)


def get_luks_dumps_model():
return LuksDumps(dumps=get_luks_dumps())
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
import pytest

from leapp.libraries.stdlib import api
from leapp.models import CephInfo, LsblkEntry, LuksDump, StorageInfo
from leapp.reporting import Report
from leapp.models import LsblkEntry, LuksDumps, StorageInfo
from leapp.snactor.fixture import current_actor_context
from leapp.utils.report import is_inhibitor

CUR_DIR = os.path.dirname(os.path.abspath(__file__))

Expand Down Expand Up @@ -127,9 +125,10 @@ def test_actor_with_luks(monkeypatch, current_actor_context, variant, luks_versi
current_actor_context.feed(StorageInfo(lsblk=with_luks))
current_actor_context.run()

luks_dumps = current_actor_context.consume(LuksDump)
luks_dumps = current_actor_context.consume(LuksDumps)
assert len(luks_dumps) == 1
luks_dump = luks_dumps[0]
assert len(luks_dumps[0].dumps) == 1
luks_dump = luks_dumps[0].dumps[0]

assert luks_dump.version == luks_version
assert luks_dump.uuid == uuid
Expand Down
16 changes: 15 additions & 1 deletion repos/system_upgrade/common/models/luksdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ class LuksToken(Model):

class LuksDump(Model):
"""
Information about LUKS-encrypted device.
Information about a single LUKS-encrypted device.
Note this model is supposed to be used as a part of LuksDumps msg.
"""
topic = SystemInfoTopic

Expand Down Expand Up @@ -57,3 +59,15 @@ class LuksDump(Model):
"""
List of LUKS2 tokens
"""


class LuksDumps(Model):
"""
Information about a all LUKS-encrypted devices on the system.
"""
topic = SystemInfoTopic

dumps = fields.List(fields.Model(LuksDump))
"""
List of LuksDump representing all the encrypted devices on the system.
"""

0 comments on commit ffcb92b

Please sign in to comment.