Skip to content

Commit

Permalink
squash! InhibitWhenLuks: modify the inhibitor to use LuksDump
Browse files Browse the repository at this point in the history
fixed rhel7
fixing loop in progress
  • Loading branch information
pirat89 committed Aug 2, 2024
1 parent 2dd6bad commit 6170206
Showing 1 changed file with 57 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,64 +5,88 @@
from leapp.reporting import create_report


CLEVIS_RHEL8_DOC_URL = 'https://red.ht/clevis-tpm2-luks-auto-unlock-rhel8'
CLEVIS_RHEL9_DOC_URL = 'https://red.ht/clevis-tpm2-luks-auto-unlock-rhel9'
LUKS2_CONVERT_RHEL8_DOC_URL = 'https://red.ht/convert-to-luks2-rhel8'
LUKS2_CONVERT_RHEL9_DOC_URL = 'https://red.ht/convert-to-luks2-rhel9'
# https://red.ht/clevis-tpm2-luks-auto-unlock-rhel8
# https://red.ht/clevis-tpm2-luks-auto-unlock-rhel9
# https://red.ht/convert-to-luks2-rhel8
# https://red.ht/convert-to-luks2-rhel9
CLEVIS_DOC_URL_FMT = 'https://red.ht/clevis-tpm2-luks-auto-unlock-rhel{}'
LUKS2_CONVERT_DOC_URL_FMT = 'https://red.ht/convert-to-luks2-rhel{}'

FMT_LIST_SEPARATOR = '\n - '


def _at_least_one_tpm_token(luks_dump):
return any([token.token_type == "clevis-tpm2" for token in luks_dump.tokens])


def _get_ceph_volumes():
ceph_info = next(api.consume(CephInfo), None)
return ceph_info.encrypted_volumes[:] if ceph_info else []


def apply_obsoleted_check_ipu_7_8():
ceph_vol = _get_ceph_volumes()
for storage_info in self.consume(StorageInfo):
for blk in storage_info.lsblk:
if blk.tp == 'crypt' and blk.name not in ceph_vol:
create_report([
reporting.Title('LUKS encrypted partition detected'),
reporting.Summary('Upgrading system with encrypted partitions is not supported'),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
])
break


def report_inhibitor():
summary = (
'We have detected LUKS encrypted volumes that do not meet current'
' criteria to be able to proceed the in-place upgrade process.'
'Right now the process requires automatic unlock of the system ....TODO'
'Only systems where all encrypted devices are LUKS2 '
'devices with Clevis TPM 2.0 token can be updated.'
)
create_report([
reporting.Title('Detected LUKS encrypted volumes unsuitable for in-place upgrade.'),
reporting.Summary(summary),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
] + report_hints)


def check_invalid_luks_devices():
source_major_version = get_source_major_version()
if source_major_version == '8':
clevis_doc_url = CLEVIS_RHEL8_DOC_URL
luks2_convert_doc_url = LUKS2_CONVERT_RHEL8_DOC_URL
elif source_major_version == '9':
clevis_doc_url = CLEVIS_RHEL9_DOC_URL
luks2_convert_doc_url = LUKS2_CONVERT_RHEL9_DOC_URL
else:
create_report([
reporting.Title('LUKS encrypted partition detected'),
reporting.Summary('Upgrading system with encrypted partitions is not supported'),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
])
if source_major_version == '7':
# NOTE: keeping unchanged behaviour for IPU 7 -> 8
apply_obsoleted_check_ipu_7_8()
return
ceph_vol = []
try:
ceph_info = next(api.consume(CephInfo), None)
if ceph_info:
ceph_vol = ceph_info.encrypted_volumes[:]
except StopIteration:
pass

clevis_doc_url = CLEVIS_DOC_URL_FMT.format(source_major_version)
luks2_convert_doc_url = LUKS2_CONVERT_DOC_URL_FMT.format(source_major_version)

luks_dumps = next(api.consume(LuksDumps), None)
if luks_dumps is None:
if not luks_dumps:
api.current_logger().debug('No LUKS volumes detected. Skipping.')
return

list_luks1_partitions = []
list_no_tpm2_partitions = []
ceph_vol = _get_ceph_volumes()
for luks_dump in luks_dumps.dumps:
# if the device is managed by ceph, don't inhibit
if luks_dump.device_name in ceph_vol:
api.current_logger().debug('Skipping LUKS CEPH volume: {}'.format(luks_dump.device_name))
continue

list_luks1_partitions = []
list_no_tpm2_partitions = []

if luks_dump.version == 1:
list_luks1_partitions.append(luks_dump.device_name)
elif luks_dump.version == 2 and not _at_least_one_tpm_token(luks_dump):
list_no_tpm2_partitions.append(luks_dump.device_name)

if list_luks1_partitions or list_no_tpm2_partitions:
summary = (
'Only systems where all encrypted devices are LUKS2 '
'devices with Clevis TPM 2.0 token can be updated.'
)
report_hints = []

if list_luks1_partitions:
Expand Down Expand Up @@ -95,14 +119,8 @@ def check_invalid_luks_devices():
title='Configuring manual enrollment of LUKS-encrypted volumes by using a TPM 2.0 policy'
)
)
report_inhibitor()

create_report([
reporting.Title('Invalid LUKS encrypted partition detected'),
reporting.Summary(summary),
reporting.Severity(reporting.Severity.HIGH),
reporting.Groups([reporting.Groups.BOOT, reporting.Groups.ENCRYPTION]),
reporting.Groups([reporting.Groups.INHIBITOR]),
] + report_hints)
else:
required_crypt_rpms = [
'clevis',
Expand Down

0 comments on commit 6170206

Please sign in to comment.