From 702e0e097fc2c105537dc99579504dc02ec56ece Mon Sep 17 00:00:00 2001 From: Bryan Jacobs Date: Sat, 13 Jan 2024 20:00:28 +1100 Subject: [PATCH 1/2] Implement framework for flexible 2FA This adds support for using the hmac-secret FIDO extension to contribute keying material for a KeePass 4 file. It does this by storing an additional XML statekeeping blob in the outer ("public") header. This blob is designed to hold a variety of different authentication factors, such as passwords, key files, and Yubikey challenge-response devices. --- README.rst | 51 +++ pykeepass/__init__.py | 3 +- pykeepass/fido2.py | 147 +++++++ pykeepass/kdbx_parsing/common.py | 124 ++++-- pykeepass/kdbx_parsing/common.py.orig | 449 +++++++++++++++++++ pykeepass/kdbx_parsing/factorinfo.py | 509 ++++++++++++++++++++++ pykeepass/kdbx_parsing/factorinfo.py.orig | 509 ++++++++++++++++++++++ pykeepass/kdbx_parsing/kdbx4.py | 46 +- pykeepass/multifactor_format.rst | 212 +++++++++ pykeepass/pykeepass.py | 119 ++++- pyproject.toml | 1 + tests/tests.py | 336 +++++++++++++- 12 files changed, 2442 insertions(+), 64 deletions(-) create mode 100644 pykeepass/fido2.py create mode 100644 pykeepass/kdbx_parsing/common.py.orig create mode 100644 pykeepass/kdbx_parsing/factorinfo.py create mode 100644 pykeepass/kdbx_parsing/factorinfo.py.orig create mode 100644 pykeepass/multifactor_format.rst diff --git a/README.rst b/README.rst index 591b4314..870d068e 100644 --- a/README.rst +++ b/README.rst @@ -475,6 +475,57 @@ TOTP URI which can be passed to an OTP library to generate codes >>> pyotp.parse_uri(e.otp).now() 799270 +multifactor authentication +-------------------------- + +PyKeePass supports securing a database using an arbitrary combination of "authentication factors". +A single factor could be something like a password, a file, or a hardware authenticator. + +Factors are arranged into "factor groups". In order to open the database, *one* factor from +each group must be provided. + +Example using a single FIDO2 authenticator to unlock a database: + +.. code:: python + + # Import things + >>> from pykeepass import PyKeePass, FactorInfo, FactorGroup, FIDO2Factor, create_database + # Create new DB + >>> db = create_database() + # Create a FIDO2 factor + >>> fido2_factor = FIDO2Factor(name="MyCoolFIDO") + # Create a single factor group with that one factor in it + >>> group = FactorGroup(factors=[fido2_factor]) + # Set PIN to use for the FIDO2 credential + >>> factor_data = {"fido2_pin": "my_pin"} + # Declare the one factor group is the only contributor to the database composite key + >>> factor_info = FactorInfo(comprehensive=True, factor_groups=[group]) + # Save database + >>> db.factor_data = factor_data + >>> db.authentication_factors = factor_info + >>> db.save() + # Reopen database easily later - factor_info is stored inside it + >>> kp = PyKeePass(filename, factor_data=factor_data) + +Example using a password *and* one of two different keyfiles (the password will always be required): + +.. code:: python + + >>> from pykeepass import PyKeePass, FactorInfo, FactorGroup, PasswordFactor, KeyFileFactor, create_database + # Password factor + >>> password_factor = PasswordFactor(name="MyCoolPassword") + # Keyfile factor + >>> kf_factor_1 = KeyFileFactor(name="First KF") + >>> kf_factor_2 = KeyFileFactor(name="Second KF") + # First factor group, password only + >>> group_1 = FactorGroup(factors=[password_factor]) + # Second factor group, either of two key files + >>> group_2 = FactorGroup(factors=[kf_factor_1, kf_factor_2]) + >>> factor_data = {"password": "my_pass", "keyfile": {"First KF": "/kf1", "Second KF": "/kf2"}} + >>> factor_info = FactorInfo(comprehensive=True, factor_groups=[group_1, group_2]) + +It's okay to mix factors of different types within a group. + Tests and Debugging ------------------- diff --git a/pykeepass/__init__.py b/pykeepass/__init__.py index c39548a4..31480db1 100644 --- a/pykeepass/__init__.py +++ b/pykeepass/__init__.py @@ -1,6 +1,7 @@ from __future__ import absolute_import from .pykeepass import PyKeePass, create_database +from .kdbx_parsing.factorinfo import FactorInfo, FactorGroup, FIDO2Factor, PasswordFactor, KeyFileFactor from .version import __version__ -__all__ = ["PyKeePass", "create_database", "__version__"] +__all__ = ["PyKeePass", "create_database", "__version__", 'FactorInfo', 'FactorGroup', 'FIDO2Factor', 'KeyFileFactor'] diff --git a/pykeepass/fido2.py b/pykeepass/fido2.py new file mode 100644 index 00000000..01064fb5 --- /dev/null +++ b/pykeepass/fido2.py @@ -0,0 +1,147 @@ +import logging +import random + +from fido2.cose import ES256 +from fido2.ctap import CtapError +from fido2.ctap2.extensions import HmacSecretExtension, CredProtectExtension +from fido2.hid import CtapHidDevice +from fido2.client import Fido2Client, UserInteraction +from fido2.webauthn import PublicKeyCredentialCreationOptions, PublicKeyCredentialRpEntity, \ + PublicKeyCredentialUserEntity, PublicKeyCredentialParameters, PublicKeyCredentialType, \ + PublicKeyCredentialDescriptor, PublicKeyCredentialRequestOptions, UserVerificationRequirement + +log = logging.getLogger(__name__) + +try: + from fido2.pcsc import CtapPcscDevice +except ImportError: + CtapPcscDevice = None + +FIDO2_FACTOR_RPID = "fido2.keepass.nodomain" + + +class NonInteractive(UserInteraction): + + def __init__(self, fixed_pin): + self.fixed_pin = fixed_pin + + def request_pin(self, permissions, rp_id): + return self.fixed_pin + + +def _get_all_authenticators(): + for dev in CtapHidDevice.list_devices(): + yield dev + if CtapPcscDevice: + for dev in CtapPcscDevice.list_devices(): + yield dev + + +def _get_suitable_clients(pin_data): + for authenticator in _get_all_authenticators(): + authenticator_path_string = repr(authenticator) + + if isinstance(pin_data, str): + pin_to_use = pin_data + else: + pin_to_use = pin_data.get(authenticator_path_string, pin_data.get("*", None)) + + client = Fido2Client( + authenticator, + "https://{}".format(FIDO2_FACTOR_RPID), + user_interaction=NonInteractive(pin_to_use), + extension_types=[ + HmacSecretExtension, + CredProtectExtension + ] + ) + + if "hmac-secret" in client.info.extensions and "credProtect" in client.info.extensions: + yield client + + +class FIDOException(Exception): + pass + + +def fido2_enroll(pin_data, already_enrolled_credentials): + log.info("Enrolling new FIDO2 authenticator") + + # We don't care about the user ID + # So long as it doesn't collide with another one for the same authenticator, it's all good + user_id = random.randbytes(16) + + chosen_client = next(_get_suitable_clients(pin_data), None) + if chosen_client is None: + raise FIDOException("Could not find an authenticator supporting the hmac-secret and credProtect extensions") + + credential = chosen_client.make_credential(PublicKeyCredentialCreationOptions( + rp=PublicKeyCredentialRpEntity( + name="pykeepass", + id=FIDO2_FACTOR_RPID + ), + user=PublicKeyCredentialUserEntity( + name="keepass", + id=user_id, + display_name="KeePass" + ), + challenge=random.randbytes(32), + pub_key_cred_params=[ + PublicKeyCredentialParameters( + type=PublicKeyCredentialType.PUBLIC_KEY, + alg=ES256.ALGORITHM + ) + ], + exclude_credentials=[ + PublicKeyCredentialDescriptor( + type=PublicKeyCredentialType.PUBLIC_KEY, + id=credential_id + ) for credential_id in already_enrolled_credentials + ], + extensions={ + "hmacCreateSecret": True, + "credentialProtectionPolicy": CredProtectExtension.POLICY.REQUIRED, + "enforceCredentialProtectionPolicy": True + } + )) + + if not credential.extension_results.get("hmacCreateSecret", False): + raise FIDOException("Authenticator didn't create an HMAC secret!") + + return credential.attestation_object.auth_data.credential_data.credential_id + + +def fido2_get_key_material(pin_data, credential_ids, salt1, salt2, verify_user=True): + log.info("Getting keying material from FIDO2 authenticator (with {} potential credentials)".format(len(credential_ids))) + + user_verification = UserVerificationRequirement.REQUIRED if verify_user else UserVerificationRequirement.DISCOURAGED + for client in _get_suitable_clients(pin_data): + try: + assertion_response = client.get_assertion( + PublicKeyCredentialRequestOptions( + challenge=random.randbytes(32), + rp_id=FIDO2_FACTOR_RPID, + allow_credentials=[ + PublicKeyCredentialDescriptor( + type=PublicKeyCredentialType.PUBLIC_KEY, + id=credential_id + ) for credential_id in credential_ids + ], + user_verification=user_verification, + extensions={ + "hmacGetSecret": { + "salt1": salt1, + "salt2": salt2 + } + } + ) + ) + assertion = assertion_response.get_response(0) + hmac_response = assertion.extension_results.get("hmacGetSecret", None) + if hmac_response is not None: + return hmac_response.get("output1", None), hmac_response.get("output2", None) + except CtapError as e: + if e.code != CtapError.ERR.NO_CREDENTIALS: + raise e + + raise FIDOException("No authenticator provided key material") diff --git a/pykeepass/kdbx_parsing/common.py b/pykeepass/kdbx_parsing/common.py index 3973f615..ec728572 100644 --- a/pykeepass/kdbx_parsing/common.py +++ b/pykeepass/kdbx_parsing/common.py @@ -9,7 +9,6 @@ from copy import deepcopy import base64 from binascii import Error as BinasciiError -import unicodedata import zlib import re import codecs @@ -105,7 +104,52 @@ def aes_kdf(key, rounds, key_composite): return hashlib.sha256(transformed_key).digest() -def compute_key_composite(password=None, keyfile=None): +def compute_keyfile_part_of_composite(keyfile): + """Compute just a keyfile's contribution to a database composite key.""" + if hasattr(keyfile, "read"): + keyfile_bytes = keyfile.read() + else: + with open(keyfile, 'rb') as f: + keyfile_bytes = f.read() + # try to read XML keyfile + try: + tree = etree.fromstring(keyfile_bytes) + version = tree.find('Meta/Version').text + data_element = tree.find('Key/Data') + if version.startswith('1.0'): + return base64.b64decode(data_element.text) + elif version.startswith('2.0'): + # read keyfile data and convert to bytes + keyfile_composite = bytes.fromhex(data_element.text.strip()) + # validate bytes against hash + hash = bytes.fromhex(data_element.attrib['Hash']) + hash_computed = hashlib.sha256(keyfile_composite).digest()[:4] + assert hash == hash_computed, "Keyfile has invalid hash" + return keyfile_composite + else: + raise AttributeError("Invalid version in keyfile") + # otherwise, try to read plain keyfile + except (etree.XMLSyntaxError, UnicodeDecodeError, AttributeError): + try: + try: + int(keyfile_bytes, 16) + is_hex = True + except ValueError: + is_hex = False + # if the length is 32 bytes we assume it is the key + if len(keyfile_bytes) == 32: + return keyfile_bytes + # if the length is 64 bytes we assume the key is hex encoded + elif len(keyfile_bytes) == 64 and is_hex: + return codecs.decode(keyfile_bytes, 'hex') + # anything else may be a file to hash for the key + else: + return hashlib.sha256(keyfile_bytes).digest() + except: + raise IOError('Could not read keyfile') + + +def compute_key_composite(password=None, keyfile=None, additional_parts=None): """Compute composite key. Used in header verification and payload decryption.""" @@ -115,53 +159,15 @@ def compute_key_composite(password=None, keyfile=None): else: password_composite = b'' # hash the keyfile - if keyfile: - if hasattr(keyfile, "read"): - keyfile_bytes = keyfile.read() - else: - with open(keyfile, 'rb') as f: - keyfile_bytes = f.read() - # try to read XML keyfile - try: - tree = etree.fromstring(keyfile_bytes) - version = tree.find('Meta/Version').text - data_element = tree.find('Key/Data') - if version.startswith('1.0'): - keyfile_composite = base64.b64decode(data_element.text) - elif version.startswith('2.0'): - # read keyfile data and convert to bytes - keyfile_composite = bytes.fromhex(data_element.text.strip()) - # validate bytes against hash - hash = bytes.fromhex(data_element.attrib['Hash']) - hash_computed = hashlib.sha256(keyfile_composite).digest()[:4] - assert hash == hash_computed, "Keyfile has invalid hash" - else: - raise AttributeError("Invalid version in keyfile") - # otherwise, try to read plain keyfile - except (etree.XMLSyntaxError, UnicodeDecodeError, AttributeError): - try: - try: - int(keyfile_bytes, 16) - is_hex = True - except ValueError: - is_hex = False - # if the length is 32 bytes we assume it is the key - if len(keyfile_bytes) == 32: - keyfile_composite = keyfile_bytes - # if the length is 64 bytes we assume the key is hex encoded - elif len(keyfile_bytes) == 64 and is_hex: - keyfile_composite = codecs.decode(keyfile_bytes, 'hex') - # anything else may be a file to hash for the key - else: - keyfile_composite = hashlib.sha256(keyfile_bytes).digest() - except: - raise IOError('Could not read keyfile') + keyfile_composite = compute_keyfile_part_of_composite(keyfile) if keyfile else b'' - else: - keyfile_composite = b'' + # create composite key from password, keyfile, and other composites + overall_composite = password_composite + keyfile_composite + if additional_parts is not None: + for part in additional_parts: + overall_composite += part - # create composite key from password and keyfile composites - return hashlib.sha256(password_composite + keyfile_composite).digest() + return hashlib.sha256(overall_composite).digest() def compute_master(context): @@ -175,6 +181,30 @@ def compute_master(context): return master_key +def populate_custom_data(kdbx, d): + if len(d.keys()) > 0: + vd = Container( + version=b'\x00\x01', + dict=d, + ) + kdbx.header.value.dynamic_header.update( + { + "public_custom_data": + Container( + id='public_custom_data', + data=vd, + next_byte=0xFF, + ) + } + ) + else: + # Removing header entirely + if "public_custom_data" in kdbx.header.value.dynamic_header: + del kdbx.header.value.dynamic_header["public_custom_data"] + + kdbx.header.value.dynamic_header.move_to_end("end") + + # -------------------- XML Processing -------------------- diff --git a/pykeepass/kdbx_parsing/common.py.orig b/pykeepass/kdbx_parsing/common.py.orig new file mode 100644 index 00000000..f775af10 --- /dev/null +++ b/pykeepass/kdbx_parsing/common.py.orig @@ -0,0 +1,449 @@ +from Cryptodome.Cipher import AES, ChaCha20, Salsa20 +from .twofish import Twofish +from Cryptodome.Util import Padding as CryptoPadding +import hashlib +from construct import ( + Adapter, BitStruct, BitsSwapped, Container, Flag, Padding, ListContainer, Mapping, GreedyBytes, Int32ul, Switch +) +from lxml import etree +from copy import deepcopy +import base64 +from binascii import Error as BinasciiError +import unicodedata +import zlib +import re +import codecs +from io import BytesIO +from collections import OrderedDict +import logging + +log = logging.getLogger(__name__) + + +class HeaderChecksumError(Exception): + pass + + +class CredentialsError(Exception): + pass + + +class PayloadChecksumError(Exception): + pass + + +class DynamicDict(Adapter): + """ListContainer <---> Container + Convenience mapping so we dont have to iterate ListContainer to find + the right item + + FIXME: lump kwarg was added to get around the fact that InnerHeader is + not truly a dict. We lump all 'binary' InnerHeaderItems into a single list + """ + + def __init__(self, key, subcon, lump=[]): + super().__init__(subcon) + self.key = key + self.lump = lump + + # map ListContainer to Container + def _decode(self, obj, context, path): + d = OrderedDict() + for l in self.lump: + d[l] = ListContainer([]) + for item in obj: + if item[self.key] in self.lump: + d[item[self.key]].append(item) + else: + d[item[self.key]] = item + + return Container(d) + + # map Container to ListContainer + def _encode(self, obj, context, path): + l = [] + for key in obj: + if key in self.lump: + l += obj[key] + else: + l.append(obj[key]) + + return ListContainer(l) + + +def Reparsed(subcon_out): + class Reparsed(Adapter): + """Bytes <---> Parsed subcon result + Takes in bytes and reparses it with subcon_out""" + + def _decode(self, data, con, path): + return subcon_out.parse(data, **con) + + def _encode(self, obj, con, path): + return subcon_out.build(obj, **con) + + return Reparsed + + +# is the payload compressed? +CompressionFlags = BitsSwapped( + BitStruct("compression" / Flag, Padding(8 * 4 - 1)) +) + + +# -------------------- Key Computation -------------------- +def aes_kdf(key, rounds, key_composite): + """Set up a context for AES128-ECB encryption to find transformed_key""" + + cipher = AES.new(key, AES.MODE_ECB) + + # get the number of rounds from the header and transform the key_composite + transformed_key = key_composite + for _ in range(0, rounds): + transformed_key = cipher.encrypt(transformed_key) + + return hashlib.sha256(transformed_key).digest() + + +<<<<<<< HEAD +def compute_key_composite(password=None, keyfile=None): +======= +def compute_keyfile_part_of_composite(keyfile): + """Compute just a keyfile's contribution to a database composite key.""" + if hasattr(keyfile, "read"): + keyfile_bytes = keyfile.read() + else: + with open(keyfile, 'rb') as f: + keyfile_bytes = f.read() + # try to read XML keyfile + try: + tree = etree.fromstring(keyfile_bytes) + version = tree.find('Meta/Version').text + data_element = tree.find('Key/Data') + if version.startswith('1.0'): + return base64.b64decode(data_element.text) + elif version.startswith('2.0'): + # read keyfile data and convert to bytes + keyfile_composite = bytes.fromhex(data_element.text.strip()) + # validate bytes against hash + hash = bytes.fromhex(data_element.attrib['Hash']) + hash_computed = hashlib.sha256(keyfile_composite).digest()[:4] + assert hash == hash_computed, "Keyfile has invalid hash" + return keyfile_composite + else: + raise AttributeError("Invalid version in keyfile") + # otherwise, try to read plain keyfile + except (etree.XMLSyntaxError, UnicodeDecodeError, AttributeError): + try: + try: + int(keyfile_bytes, 16) + is_hex = True + except ValueError: + is_hex = False + # if the length is 32 bytes we assume it is the key + if len(keyfile_bytes) == 32: + return keyfile_bytes + # if the length is 64 bytes we assume the key is hex encoded + elif len(keyfile_bytes) == 64 and is_hex: + return codecs.decode(keyfile_bytes, 'hex') + # anything else may be a file to hash for the key + else: + return hashlib.sha256(keyfile_bytes).digest() + except: + raise IOError('Could not read keyfile') + + +def compute_key_composite(password=None, keyfile=None, additional_parts=None): +>>>>>>> 282ce41 (Merge remote-tracking branch 'origin/master' into HEAD) + """Compute composite key. + Used in header verification and payload decryption.""" + + # hash the password + if password: + password_composite = hashlib.sha256(password.encode('utf-8')).digest() + else: + password_composite = b'' + # hash the keyfile +<<<<<<< HEAD + if keyfile: + if hasattr(keyfile, "read"): + keyfile_bytes = keyfile.read() + else: + with open(keyfile, 'rb') as f: + keyfile_bytes = f.read() + # try to read XML keyfile + try: + tree = etree.fromstring(keyfile_bytes) + version = tree.find('Meta/Version').text + data_element = tree.find('Key/Data') + if version.startswith('1.0'): + keyfile_composite = base64.b64decode(data_element.text) + elif version.startswith('2.0'): + # read keyfile data and convert to bytes + keyfile_composite = bytes.fromhex(data_element.text.strip()) + # validate bytes against hash + hash = bytes.fromhex(data_element.attrib['Hash']) + hash_computed = hashlib.sha256(keyfile_composite).digest()[:4] + assert hash == hash_computed, "Keyfile has invalid hash" + else: + raise AttributeError("Invalid version in keyfile") + # otherwise, try to read plain keyfile + except (etree.XMLSyntaxError, UnicodeDecodeError, AttributeError): + try: + try: + int(keyfile_bytes, 16) + is_hex = True + except ValueError: + is_hex = False + # if the length is 32 bytes we assume it is the key + if len(keyfile_bytes) == 32: + keyfile_composite = keyfile_bytes + # if the length is 64 bytes we assume the key is hex encoded + elif len(keyfile_bytes) == 64 and is_hex: + keyfile_composite = codecs.decode(keyfile_bytes, 'hex') + # anything else may be a file to hash for the key + else: + keyfile_composite = hashlib.sha256(keyfile_bytes).digest() + except: + raise IOError('Could not read keyfile') + + else: + keyfile_composite = b'' +======= + keyfile_composite = compute_keyfile_part_of_composite(keyfile) if keyfile else b'' +>>>>>>> 282ce41 (Merge remote-tracking branch 'origin/master' into HEAD) + + # create composite key from password and keyfile composites + return hashlib.sha256(password_composite + keyfile_composite).digest() + + +def compute_master(context): + """Computes master key from transformed key and master seed. + Used in payload decryption.""" + + # combine the transformed key with the header master seed to find the master_key + master_key = hashlib.sha256( + context._.header.value.dynamic_header.master_seed.data + + context.transformed_key).digest() + return master_key + + +# -------------------- XML Processing -------------------- + + +class XML(Adapter): + """Bytes <---> lxml etree""" + + def _decode(self, data, con, path): + parser = etree.XMLParser(remove_blank_text=True) + return etree.parse(BytesIO(data), parser) + + def _encode(self, tree, con, path): + return etree.tostring(tree) + + +class UnprotectedStream(Adapter): + """lxml etree <---> unprotected lxml etree + Iterate etree for Protected elements and decrypt using cipher + provided by get_cipher""" + + protected_xpath = '//Value[@Protected=\'True\']' + + def __init__(self, protected_stream_key, subcon): + super().__init__(subcon) + self.protected_stream_key = protected_stream_key + + def _decode(self, tree, con, path): + cipher = self.get_cipher(self.protected_stream_key(con)) + for elem in tree.xpath(self.protected_xpath): + if elem.text is not None: + try: + result = cipher.decrypt(base64.b64decode(elem.text)).decode('utf-8') + # strip invalid XML characters - https://stackoverflow.com/questions/8733233 + result = re.sub( + u'[^\u0020-\uD7FF\u0009\u000A\u000D\uE000-\uFFFD\U00010000-\U0010FFFF]+', + '', + result + ) + elem.text = result + except (UnicodeDecodeError, BinasciiError, ValueError): + # FIXME: this should be a warning eventually, need to fix all databases in tests/ first + log.error( + "Element at {} marked as protected, but could not unprotect".format(tree.getpath(elem)) + ) + return tree + + def _encode(self, tree, con, path): + tree_copy = deepcopy(tree) + cipher = self.get_cipher(self.protected_stream_key(con)) + for elem in tree_copy.xpath(self.protected_xpath): + if elem.text is not None: + elem.text = base64.b64encode( + cipher.encrypt( + elem.text.encode('utf-8') + ) + ) + return tree_copy + + +class ARCFourVariantStream(UnprotectedStream): + def get_cipher(self, protected_stream_key): + raise Exception("ARCFourVariant not implemented") + + +# https://github.com/dlech/KeePass2.x/blob/97141c02733cd3abf8d4dce1187fa7959ded58a8/KeePassLib/Cryptography/CryptoRandomStream.cs#L115-L119 +class Salsa20Stream(UnprotectedStream): + def get_cipher(self, protected_stream_key): + key = hashlib.sha256(protected_stream_key).digest() + return Salsa20.new( + key=key, + nonce=b'\xE8\x30\x09\x4B\x97\x20\x5D\x2A' + ) + + +# https://github.com/dlech/KeePass2.x/blob/97141c02733cd3abf8d4dce1187fa7959ded58a8/KeePassLib/Cryptography/CryptoRandomStream.cs#L103-L111 +class ChaCha20Stream(UnprotectedStream): + def get_cipher(self, protected_stream_key): + key_hash = hashlib.sha512(protected_stream_key).digest() + key = key_hash[:32] + nonce = key_hash[32:44] + return ChaCha20.new( + key=key, + nonce=nonce + ) + + +def Unprotect(protected_stream_id, protected_stream_key, subcon): + """Select stream cipher based on protected_stream_id""" + + return Switch( + protected_stream_id, + {'arcfourvariant': ARCFourVariantStream(protected_stream_key, subcon), + 'salsa20': Salsa20Stream(protected_stream_key, subcon), + 'chacha20': ChaCha20Stream(protected_stream_key, subcon), + }, + default=subcon + ) + + +# -------------------- Payload Encryption/Decompression -------------------- + +class Concatenated(Adapter): + """Data Blocks <---> Bytes""" + + def _decode(self, blocks, con, path): + return b''.join([block.block_data for block in blocks]) + + def _encode(self, payload_data, con, path): + blocks = [] + # split payload_data into 1 MB blocks (spec default) + i = 0 + while i < len(payload_data): + blocks.append(Container(block_data=payload_data[i:i + 2**20])) + i += 2**20 + blocks.append(Container(block_data=b'')) + + return blocks + + +class DecryptedPayload(Adapter): + """Encrypted Bytes <---> Decrypted Bytes""" + + def _decode(self, payload_data, con, path): + cipher = self.get_cipher( + con.master_key, + con._.header.value.dynamic_header.encryption_iv.data + ) + payload_data = cipher.decrypt(payload_data) + # FIXME: Construct ugliness. Fixes #244. First 32 bytes of decrypted kdbx3 payload + # should be checked against stream_start_bytes for a CredentialsError. Due to construct + # limitations, we have to decrypt the whole payload in order to check the first 32 bytes. + # However, when the credentials are wrong the invalid decrypted payload cannot + # be unpadded correctly. Instead, catch the unpad ValueError exception raised by unpad() + # and allow kdbx3.py to raise a ChecksumError + try: + payload_data = self.unpad(payload_data) + except ValueError: + log.debug("Decryption unpadding failed") + + return payload_data + + def _encode(self, payload_data, con, path): + payload_data = self.pad(payload_data) + cipher = self.get_cipher( + con.master_key, + con._.header.value.dynamic_header.encryption_iv.data + ) + payload_data = cipher.encrypt(payload_data) + + return payload_data + + +class AES256Payload(DecryptedPayload): + def get_cipher(self, master_key, encryption_iv): + return AES.new(master_key, AES.MODE_CBC, encryption_iv) + def pad(self, data): + return CryptoPadding.pad(data, 16) + def unpad(self, data): + return CryptoPadding.unpad(data, 16) + + +class ChaCha20Payload(DecryptedPayload): + def get_cipher(self, master_key, encryption_iv): + return ChaCha20.new(key=master_key, nonce=encryption_iv) + def pad(self, data): + return data + def unpad(self, data): + return data + + +class TwoFishPayload(DecryptedPayload): + def get_cipher(self, master_key, encryption_iv): + return Twofish.new(master_key, mode=Twofish.MODE_CBC, IV=encryption_iv) + def pad(self, data): + return CryptoPadding.pad(data, 16) + def unpad(self, data): + return CryptoPadding.unpad(data, 16) + + +class Decompressed(Adapter): + """Compressed Bytes <---> Decompressed Bytes""" + + def _decode(self, data, con, path): + return zlib.decompress(data, 16 + 15) + + def _encode(self, data, con, path): + compressobj = zlib.compressobj( + 6, + zlib.DEFLATED, + 16 + 15, + zlib.DEF_MEM_LEVEL, + 0 + ) + data = compressobj.compress(data) + data += compressobj.flush() + return data + + +# -------------------- Cipher Enums -------------------- + +# payload encryption method +# https://github.com/keepassxreboot/keepassxc/blob/8324d03f0a015e62b6182843b4478226a5197090/src/format/KeePass2.cpp#L24-L26 +CipherId = Mapping( + GreedyBytes, + {'aes256': b'1\xc1\xf2\xe6\xbfqCP\xbeX\x05!j\xfcZ\xff', + 'twofish': b'\xadh\xf2\x9fWoK\xb9\xa3j\xd4z\xf9e4l', + 'chacha20': b'\xd6\x03\x8a+\x8boL\xb5\xa5$3\x9a1\xdb\xb5\x9a' + } +) + +# protected entry encryption method +# https://github.com/dlech/KeePass2.x/blob/149ab342338ffade24b44aaa1fd89f14b64fda09/KeePassLib/Cryptography/CryptoRandomStream.cs#L35 +ProtectedStreamId = Mapping( + Int32ul, + {'none': 0, + 'arcfourvariant': 1, + 'salsa20': 2, + 'chacha20': 3, + } +) diff --git a/pykeepass/kdbx_parsing/factorinfo.py b/pykeepass/kdbx_parsing/factorinfo.py new file mode 100644 index 00000000..eb8e9045 --- /dev/null +++ b/pykeepass/kdbx_parsing/factorinfo.py @@ -0,0 +1,509 @@ +import hashlib +import hmac +import logging +import random +from io import BytesIO + +from Cryptodome.Cipher import AES +from lxml import etree +from base64 import b64encode, b64decode + +from pykeepass.exceptions import CredentialsError +from pykeepass.fido2 import fido2_get_key_material, fido2_enroll +from pykeepass.kdbx_parsing.common import compute_keyfile_part_of_composite + +FACTOR_TYPE_FIDO_2 = "15f77f9d-a65c-4a2e-b2b5-171f7b2df41a" +FACTOR_TYPE_KEY_FILE = "6b9746c7-ca8d-430b-986d-1afaf689c4e4" +FACTOR_TYPE_YK_CHALRESP = "0e6803a0-915e-4ebf-95ee-f9ddd8c97eea" +FACTOR_TYPE_PASSWORD = "c127a67f-be51-4bba-ac6f-7351e8a70ba0" +FACTOR_TYPE_EMPTY = "618636bf-e202-4e0b-bb7c-e2514be00f5a" + +factor_types_to_names = { + FACTOR_TYPE_FIDO_2: 'FIDO2', + FACTOR_TYPE_KEY_FILE: 'key file', + FACTOR_TYPE_YK_CHALRESP: 'YK challenge-response', + FACTOR_TYPE_PASSWORD: 'password', + FACTOR_TYPE_EMPTY: 'null (for testing)' +} + +FACTOR_ALG_AES_CBC = "AES-CBC" + +FACTOR_VALIDATE_HMAC_SHA512 = "HMAC-SHA512" + +log = logging.getLogger(__name__) + + +class FactorInfo: + def __init__(self, compat_version="1", comprehensive=False, factor_groups=None): + if factor_groups is None: + factor_groups = [] + self.compat_version = compat_version + self.comprehensive = comprehensive + self.factor_groups = factor_groups + + def encode(self, user_supplied_info): + root_element = etree.Element("FactorInfo") + + version = etree.SubElement(root_element, "CompatVersion") + version.text = str(self.compat_version) + + if self.comprehensive: + inclusive = etree.SubElement(root_element, "Comprehensive") + inclusive.text = "true" + + for group in self.factor_groups: + factor_group = etree.SubElement(root_element, "Group") + + group.encode(factor_group, user_supplied_info) + + return etree.tostring(root_element, encoding='utf-8').decode() + + @staticmethod + def decode(given_bytes): + parser = etree.XMLParser(remove_blank_text=True) + parsed = etree.parse(BytesIO(given_bytes.encode('utf-8')), parser) + + comprehensive_el = parsed.xpath("/FactorInfo/Comprehensive") + comprehensive = True if len(comprehensive_el) == 1 and comprehensive_el[0].text == "true" else False + + ret = FactorInfo( + compat_version=parsed.xpath("/FactorInfo/CompatVersion")[0].text, + comprehensive=comprehensive + ) + + for group in parsed.xpath("/FactorInfo/Group"): + ret.factor_groups.append(FactorGroup.decode(group)) + + return ret + + +class FactorGroup: + def __init__(self, validation_type=FACTOR_VALIDATE_HMAC_SHA512, validation_in=None, validation_out=None, challenge=None, factors=None): + self.factors = [] + self.validation_type = validation_type + self.validation_in = validation_in + self.validation_out = validation_out + self.challenge = challenge + self.cached_key_part = None + + if factors is not None: + for factor in factors: + self.add_factor(factor) + + def add_factor(self, factor): + self.factors.append(factor) + factor.group = self + if isinstance(factor, FIDO2Factor) and self.challenge is None: + self.challenge = random.randbytes(32) + + def generate_validation(self, user_supplied_info): + if self.validation_in is None: + # Generate validation info + if len(self.factors) == 0: + raise CredentialsError("Cannot save a FactorGroup with no factors and unset validation info") + self.validation_in = random.randbytes(32) + # Arbitrarily get some factor - they should all create the same validation output + wrapping_key = None + for factor in self.factors: + wrapping_key = factor.get_wrapping_key(user_supplied_info=user_supplied_info) + if wrapping_key is not None: + break + if wrapping_key is None: + raise CredentialsError("Cannot find a factor to generate validation info") + _, self.validation_out = factor.unwrap_key_part(user_supplied_info=user_supplied_info, unwrapping_key=wrapping_key) + + assert self.validation_in is not None and self.validation_out is not None + + def encode(self, group_element, user_supplied_info): + if self.validation_in is not None: + key_validation_type = etree.SubElement(group_element, "ValidationType") + key_validation_type.text = self.validation_type + + key_validation_in = etree.SubElement(group_element, "ValidationIn") + key_validation_in.text = b64encode(self.validation_in) + + key_validation_out = etree.SubElement(group_element, "ValidationOut") + key_validation_out.text = b64encode(self.validation_out) + + if self.challenge is not None: + challenge = etree.SubElement(group_element, "Challenge") + challenge.text = b64encode(self.challenge) + + for factor in self.factors: + factor_element = etree.SubElement(group_element, "Factor") + factor.encode(factor_element) + + @staticmethod + def decode(group_element): + key_validation_type = None + key_validation_in = None + key_validation_out = None + + validation_element = group_element.xpath("ValidationType") + if validation_element: + key_validation_type = validation_element[0].text + key_validation_in = b64decode(group_element.xpath("ValidationIn")[0].text) + key_validation_out = b64decode(group_element.xpath("ValidationOut")[0].text) + + challenge = None + challenges = group_element.xpath("Challenge") + if challenges: + challenge = b64decode(challenges[0].text) + + factors = [] + for factor in group_element.xpath("Factor"): + factors.append(Factor.decode(factor)) + return FactorGroup( + validation_type=key_validation_type, + validation_in=key_validation_in, + validation_out=key_validation_out, + challenge=challenge, + factors=factors + ) + + def unwrap_key_part(self, user_supplied_info): + if self.cached_key_part is not None and user_supplied_info == self.cached_key_part[0]: + return self.cached_key_part[1] + + fido2_factors = [x for x in self.factors if isinstance(x, FIDO2Factor)] + other_factors = [x for x in self.factors if not isinstance(x, FIDO2Factor)] + + for factor in other_factors: + # Try non-FIDO factors first + try: + unwrapped_part, _ = factor.unwrap_key_part(user_supplied_info=user_supplied_info) + if unwrapped_part is not None: + self.cached_key_part = (user_supplied_info, unwrapped_part) + return unwrapped_part + except CredentialsError as e: + log.error("Factor failed: {}".format(e)) + continue + + next_challenge = random.randbytes(32) + + if len(fido2_factors) > 0: + # Do all the FIDO2 factors in the group "in one go" to avoid prompting for authenticators repeatedly + pin_data = user_supplied_info.get("factor_data", {}).get("fido2_pin", {}) + + fido2_credentials_in_group = [x.credential_id for x in fido2_factors] + + result1, result2 = self.get_fido2_key_material(fido2_credentials_in_group, next_challenge, pin_data) + + for factor in fido2_factors: + try: + unwrapped_part, _ = factor.unwrap_key_part(user_supplied_info=user_supplied_info, unwrapping_key=result1) + if unwrapped_part is not None: + + # Success with FIDO2! Rotate the challenge if we can (if there's just one authenticator) + if len(fido2_factors) == 1: + self.rotate_fido2(factor, unwrapped_part, next_challenge=next_challenge, next_key_material=result2) + + self.cached_key_part = (user_supplied_info, unwrapped_part) + return unwrapped_part + except CredentialsError as e: + log.error("Factor failed: {}".format(e)) + continue + + raise CredentialsError("Unable to derive key part for a required 2FA group") + + def get_fido2_key_material(self, fido2_credentials_in_group, next_challenge, pin_data): + return fido2_get_key_material(pin_data, + fido2_credentials_in_group, + salt1=self.challenge, + salt2=next_challenge, + verify_user=True + ) + + def rotate_fido2(self, fido2_factor, key_part, next_challenge, next_key_material): + if len(self.factors) == 1: + # We really only have one factor: rotate the validation randomness too + self.validation_in = random.randbytes(32) + + self.challenge = next_challenge + wrapped_part = fido2_factor.wrap_key_part({}, key_part, next_key_material) + fido2_factor.wrapped_key_part, new_validation_out = wrapped_part + + if len(self.factors) == 1: + self.validation_out = new_validation_out + + +class Factor: + def __init__(self, name, uuid, key_salt=None, key_type=FACTOR_ALG_AES_CBC, wrapped_key_part=None): + if key_salt is None: + key_salt = random.randbytes(16) + self.name = name + self.uuid = uuid + self.key_salt = key_salt + self.key_type = key_type + self.wrapped_key_part = wrapped_key_part + self.group = None + + def encode(self, factor_element): + name = etree.SubElement(factor_element, "Name") + name.text = self.name + uuid = etree.SubElement(factor_element, "TypeUUID") + uuid.text = self.uuid + salt = etree.SubElement(factor_element, "KeySalt") + salt.text = b64encode(self.key_salt) + key_type = etree.SubElement(factor_element, "KeyType") + key_type.text = self.key_type + assert self.wrapped_key_part is not None + key_part = etree.SubElement(factor_element, "WrappedKey") + key_part.text = b64encode(self.wrapped_key_part) + + @staticmethod + def decode(factor_element): + name = factor_element.xpath("Name")[0].text + uuid = factor_element.xpath("TypeUUID")[0].text + + key_salt = b64decode(factor_element.xpath("KeySalt")[0].text) + key_type = factor_element.xpath("KeyType")[0].text + key_part = b64decode(factor_element.xpath("WrappedKey")[0].text) + ret = Factor( + name=name, + uuid=uuid, + key_salt=key_salt, + key_type=key_type, + wrapped_key_part=key_part + ) + + if uuid == FACTOR_TYPE_FIDO_2: + return FIDO2Factor.decode(ret, factor_element) + elif uuid == FACTOR_TYPE_PASSWORD: + return PasswordFactor.decode(ret, factor_element) + elif uuid == FACTOR_TYPE_KEY_FILE: + return KeyFileFactor.decode(ret, factor_element) + elif uuid == FACTOR_TYPE_EMPTY: + return NopFactor.decode(ret, factor_element) + + return ret + + def wrap_key_part(self, user_supplied_info, key_part, wrapping_key = None): + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + if wrapping_key is None: + wrapping_key = self.get_wrapping_key(user_supplied_info=user_supplied_info) + + encrypted_key = None + if self.key_type == FACTOR_ALG_AES_CBC: + cipher = AES.new(wrapping_key, AES.MODE_CBC, iv=self.key_salt) + encrypted_key = cipher.encrypt(key_part) + + if encrypted_key is None: + raise NotImplementedError( + "Cannot wrap a key part for unknown alg {} on factor type {}".format(self.key_type, factor_name) + ) + + validation_out = None + if self.group.validation_type == FACTOR_VALIDATE_HMAC_SHA512: + validation_out = hmac.new(key_part, self.group.validation_in, 'SHA-512').digest() + else: + raise NotImplementedError( + "Cannot verify a key part for unknown alg {} on factor type {}".format(self.group.verify_type, + factor_name) + ) + + return encrypted_key, validation_out + + def get_unwrapping_key(self, user_supplied_info): + unwrapping_key = self.get_wrapping_key(user_supplied_info=user_supplied_info) + if unwrapping_key is None: + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + raise CredentialsError("Could not get key part for factor type {}".format(factor_name)) + return unwrapping_key + + def generate_key_if_necessary(self, user_supplied_info, unwrapping_key=None): + if self.wrapped_key_part is None: + # Generate wholly new key part + if unwrapping_key is None: + unwrapping_key = self.get_unwrapping_key(user_supplied_info=user_supplied_info) + cipher = AES.new(unwrapping_key, AES.MODE_CBC, iv=self.key_salt) + new_generated_key = random.randbytes(32) + self.wrapped_key_part = cipher.encrypt(new_generated_key) + + def unwrap_key_part(self, user_supplied_info, unwrapping_key=None): + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + + self.generate_key_if_necessary(user_supplied_info=user_supplied_info, unwrapping_key=unwrapping_key) + + if unwrapping_key is None: + unwrapping_key = self.get_unwrapping_key(user_supplied_info=user_supplied_info) + + decrypted_key = None + + if self.key_type == FACTOR_ALG_AES_CBC: + # Salt forms the AES-CBC IV + cipher = AES.new(unwrapping_key, AES.MODE_CBC, iv=self.key_salt) + + # Decrypt wrapped key part + decrypted_key = cipher.decrypt(self.wrapped_key_part) + + if decrypted_key is None: + raise NotImplementedError( + "Cannot unwrap a key part for unknown alg {} on factor type {}".format(self.key_type, factor_name) + ) + + digest = None + if self.group.validation_type == FACTOR_VALIDATE_HMAC_SHA512: + digest = hmac.new(decrypted_key, self.group.validation_in, 'SHA-512').digest() + else: + # Can't verify, we don't know how or there's no validation type set for this Group + pass + + if self.group.validation_out is not None and digest is not None and digest != self.group.validation_out: + raise CredentialsError("Factor type {} did not return a valid key part".format(factor_name)) + + # All good - return the key part + return decrypted_key, digest + + def get_wrapping_key(self, user_supplied_info): + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + + raise NotImplementedError( + "Cannot get unwrapping key part for factor type {}".format(factor_name) + ) + + def _get_relevant_user_info(self, user_supplied_info, section_name, factor_name=None): + if user_supplied_info is None: + return None + if factor_name is None: + factor_name = self.name + section = user_supplied_info.get('factor_data', {}).get(section_name, None) + if isinstance(section, str): + return section + return section.get(self.name, section.get("*", None)) + + +class FIDO2Factor(Factor): + def __init__(self, credential_id=None, *args, **kwargs): + for prop_name in ['credential_id']: + setattr(self, prop_name, locals()[prop_name]) + + self.rotated_salt = None + self.rotated_key = None + + super(FIDO2Factor, self).__init__( + uuid=FACTOR_TYPE_FIDO_2, + **kwargs + ) + + def encode(self, factor_element): + super(FIDO2Factor, self).encode(factor_element) + + credential_id = etree.SubElement(factor_element, "CredentialID") + credential_id.text = b64encode(self.credential_id) + + @staticmethod + def decode(partial_factor, factor_element): + credential_id = b64decode(factor_element.xpath("CredentialID")[0].text) + + return FIDO2Factor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + credential_id=credential_id + ) + + def _enroll_if_necessary(self, user_supplied_info): + if self.credential_id is None: + existing_creds = [x for x in self.group.factors if isinstance(x, FIDO2Factor) and x.credential_id is not None] + self.credential_id = fido2_enroll(user_supplied_info.get("factor_data", {}).get("fido2_pin", {}), existing_creds) + + def wrap_key_part(self, user_supplied_info, key_part, wrapping_key = None): + self._enroll_if_necessary(user_supplied_info) + return super(FIDO2Factor, self).wrap_key_part(user_supplied_info, key_part, wrapping_key) + + def get_wrapping_key(self, user_supplied_info): + # Basically only used when creating a new group with a new FIDO2 factor in it + self._enroll_if_necessary(user_supplied_info) + pin_data = user_supplied_info.get("factor_data", {}).get("fido2_pin", {}) + hmac1, hmac2 = fido2_get_key_material(pin_data, + [self.credential_id], + salt1=self.group.challenge, + salt2=self.group.challenge, + verify_user=True + ) + return hmac1 + + +class PasswordFactor(Factor): + def __init__(self, *args, **kwargs): + super(PasswordFactor, self).__init__( + uuid=FACTOR_TYPE_PASSWORD, + *args, + **kwargs + ) + + def get_wrapping_key(self, user_supplied_info): + password = self._get_relevant_user_info(user_supplied_info, "password") + + # The unwrapping alg will do something more advanced, but we hash the password once just in case + hashed_password = hashlib.sha256(password.encode('utf-8')).digest() + + return hashed_password + + @staticmethod + def decode(partial_factor, factor_element): + return PasswordFactor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + ) + + def change_password(self, old_password, new_password): + unwrapped_part, _ = self.unwrap_key_part({"factor_data": {"password": old_password}}) + self.key_salt = random.randbytes(16) + self.wrapped_key_part, _ = self.wrap_key_part({"factor_data": {"password": new_password}}, unwrapped_part) + self.group.cached_key_part = None + + +class NopFactor(Factor): + def __init__(self, *args, **kwargs): + super(NopFactor, self).__init__( + uuid=FACTOR_TYPE_EMPTY, + *args, + **kwargs + ) + + def get_wrapping_key(self, user_supplied_info): + return b'' + + def unwrap_key_part(self, user_supplied_info): + return b'', self.group.validation_out + + @staticmethod + def decode(partial_factor, factor_element): + return NopFactor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + ) + + +class KeyFileFactor(Factor): + def __init__(self, *args, **kwargs): + super(KeyFileFactor, self).__init__( + uuid=FACTOR_TYPE_KEY_FILE, + *args, + **kwargs + ) + + def get_wrapping_key(self, user_supplied_info): + if user_supplied_info is None: + user_supplied_info = {} + keyfile = self._get_relevant_user_info(user_supplied_info, "keyfile") + if keyfile is None: + return None + + return compute_keyfile_part_of_composite(keyfile) + + @staticmethod + def decode(partial_factor, factor_element): + return KeyFileFactor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + ) diff --git a/pykeepass/kdbx_parsing/factorinfo.py.orig b/pykeepass/kdbx_parsing/factorinfo.py.orig new file mode 100644 index 00000000..eb8e9045 --- /dev/null +++ b/pykeepass/kdbx_parsing/factorinfo.py.orig @@ -0,0 +1,509 @@ +import hashlib +import hmac +import logging +import random +from io import BytesIO + +from Cryptodome.Cipher import AES +from lxml import etree +from base64 import b64encode, b64decode + +from pykeepass.exceptions import CredentialsError +from pykeepass.fido2 import fido2_get_key_material, fido2_enroll +from pykeepass.kdbx_parsing.common import compute_keyfile_part_of_composite + +FACTOR_TYPE_FIDO_2 = "15f77f9d-a65c-4a2e-b2b5-171f7b2df41a" +FACTOR_TYPE_KEY_FILE = "6b9746c7-ca8d-430b-986d-1afaf689c4e4" +FACTOR_TYPE_YK_CHALRESP = "0e6803a0-915e-4ebf-95ee-f9ddd8c97eea" +FACTOR_TYPE_PASSWORD = "c127a67f-be51-4bba-ac6f-7351e8a70ba0" +FACTOR_TYPE_EMPTY = "618636bf-e202-4e0b-bb7c-e2514be00f5a" + +factor_types_to_names = { + FACTOR_TYPE_FIDO_2: 'FIDO2', + FACTOR_TYPE_KEY_FILE: 'key file', + FACTOR_TYPE_YK_CHALRESP: 'YK challenge-response', + FACTOR_TYPE_PASSWORD: 'password', + FACTOR_TYPE_EMPTY: 'null (for testing)' +} + +FACTOR_ALG_AES_CBC = "AES-CBC" + +FACTOR_VALIDATE_HMAC_SHA512 = "HMAC-SHA512" + +log = logging.getLogger(__name__) + + +class FactorInfo: + def __init__(self, compat_version="1", comprehensive=False, factor_groups=None): + if factor_groups is None: + factor_groups = [] + self.compat_version = compat_version + self.comprehensive = comprehensive + self.factor_groups = factor_groups + + def encode(self, user_supplied_info): + root_element = etree.Element("FactorInfo") + + version = etree.SubElement(root_element, "CompatVersion") + version.text = str(self.compat_version) + + if self.comprehensive: + inclusive = etree.SubElement(root_element, "Comprehensive") + inclusive.text = "true" + + for group in self.factor_groups: + factor_group = etree.SubElement(root_element, "Group") + + group.encode(factor_group, user_supplied_info) + + return etree.tostring(root_element, encoding='utf-8').decode() + + @staticmethod + def decode(given_bytes): + parser = etree.XMLParser(remove_blank_text=True) + parsed = etree.parse(BytesIO(given_bytes.encode('utf-8')), parser) + + comprehensive_el = parsed.xpath("/FactorInfo/Comprehensive") + comprehensive = True if len(comprehensive_el) == 1 and comprehensive_el[0].text == "true" else False + + ret = FactorInfo( + compat_version=parsed.xpath("/FactorInfo/CompatVersion")[0].text, + comprehensive=comprehensive + ) + + for group in parsed.xpath("/FactorInfo/Group"): + ret.factor_groups.append(FactorGroup.decode(group)) + + return ret + + +class FactorGroup: + def __init__(self, validation_type=FACTOR_VALIDATE_HMAC_SHA512, validation_in=None, validation_out=None, challenge=None, factors=None): + self.factors = [] + self.validation_type = validation_type + self.validation_in = validation_in + self.validation_out = validation_out + self.challenge = challenge + self.cached_key_part = None + + if factors is not None: + for factor in factors: + self.add_factor(factor) + + def add_factor(self, factor): + self.factors.append(factor) + factor.group = self + if isinstance(factor, FIDO2Factor) and self.challenge is None: + self.challenge = random.randbytes(32) + + def generate_validation(self, user_supplied_info): + if self.validation_in is None: + # Generate validation info + if len(self.factors) == 0: + raise CredentialsError("Cannot save a FactorGroup with no factors and unset validation info") + self.validation_in = random.randbytes(32) + # Arbitrarily get some factor - they should all create the same validation output + wrapping_key = None + for factor in self.factors: + wrapping_key = factor.get_wrapping_key(user_supplied_info=user_supplied_info) + if wrapping_key is not None: + break + if wrapping_key is None: + raise CredentialsError("Cannot find a factor to generate validation info") + _, self.validation_out = factor.unwrap_key_part(user_supplied_info=user_supplied_info, unwrapping_key=wrapping_key) + + assert self.validation_in is not None and self.validation_out is not None + + def encode(self, group_element, user_supplied_info): + if self.validation_in is not None: + key_validation_type = etree.SubElement(group_element, "ValidationType") + key_validation_type.text = self.validation_type + + key_validation_in = etree.SubElement(group_element, "ValidationIn") + key_validation_in.text = b64encode(self.validation_in) + + key_validation_out = etree.SubElement(group_element, "ValidationOut") + key_validation_out.text = b64encode(self.validation_out) + + if self.challenge is not None: + challenge = etree.SubElement(group_element, "Challenge") + challenge.text = b64encode(self.challenge) + + for factor in self.factors: + factor_element = etree.SubElement(group_element, "Factor") + factor.encode(factor_element) + + @staticmethod + def decode(group_element): + key_validation_type = None + key_validation_in = None + key_validation_out = None + + validation_element = group_element.xpath("ValidationType") + if validation_element: + key_validation_type = validation_element[0].text + key_validation_in = b64decode(group_element.xpath("ValidationIn")[0].text) + key_validation_out = b64decode(group_element.xpath("ValidationOut")[0].text) + + challenge = None + challenges = group_element.xpath("Challenge") + if challenges: + challenge = b64decode(challenges[0].text) + + factors = [] + for factor in group_element.xpath("Factor"): + factors.append(Factor.decode(factor)) + return FactorGroup( + validation_type=key_validation_type, + validation_in=key_validation_in, + validation_out=key_validation_out, + challenge=challenge, + factors=factors + ) + + def unwrap_key_part(self, user_supplied_info): + if self.cached_key_part is not None and user_supplied_info == self.cached_key_part[0]: + return self.cached_key_part[1] + + fido2_factors = [x for x in self.factors if isinstance(x, FIDO2Factor)] + other_factors = [x for x in self.factors if not isinstance(x, FIDO2Factor)] + + for factor in other_factors: + # Try non-FIDO factors first + try: + unwrapped_part, _ = factor.unwrap_key_part(user_supplied_info=user_supplied_info) + if unwrapped_part is not None: + self.cached_key_part = (user_supplied_info, unwrapped_part) + return unwrapped_part + except CredentialsError as e: + log.error("Factor failed: {}".format(e)) + continue + + next_challenge = random.randbytes(32) + + if len(fido2_factors) > 0: + # Do all the FIDO2 factors in the group "in one go" to avoid prompting for authenticators repeatedly + pin_data = user_supplied_info.get("factor_data", {}).get("fido2_pin", {}) + + fido2_credentials_in_group = [x.credential_id for x in fido2_factors] + + result1, result2 = self.get_fido2_key_material(fido2_credentials_in_group, next_challenge, pin_data) + + for factor in fido2_factors: + try: + unwrapped_part, _ = factor.unwrap_key_part(user_supplied_info=user_supplied_info, unwrapping_key=result1) + if unwrapped_part is not None: + + # Success with FIDO2! Rotate the challenge if we can (if there's just one authenticator) + if len(fido2_factors) == 1: + self.rotate_fido2(factor, unwrapped_part, next_challenge=next_challenge, next_key_material=result2) + + self.cached_key_part = (user_supplied_info, unwrapped_part) + return unwrapped_part + except CredentialsError as e: + log.error("Factor failed: {}".format(e)) + continue + + raise CredentialsError("Unable to derive key part for a required 2FA group") + + def get_fido2_key_material(self, fido2_credentials_in_group, next_challenge, pin_data): + return fido2_get_key_material(pin_data, + fido2_credentials_in_group, + salt1=self.challenge, + salt2=next_challenge, + verify_user=True + ) + + def rotate_fido2(self, fido2_factor, key_part, next_challenge, next_key_material): + if len(self.factors) == 1: + # We really only have one factor: rotate the validation randomness too + self.validation_in = random.randbytes(32) + + self.challenge = next_challenge + wrapped_part = fido2_factor.wrap_key_part({}, key_part, next_key_material) + fido2_factor.wrapped_key_part, new_validation_out = wrapped_part + + if len(self.factors) == 1: + self.validation_out = new_validation_out + + +class Factor: + def __init__(self, name, uuid, key_salt=None, key_type=FACTOR_ALG_AES_CBC, wrapped_key_part=None): + if key_salt is None: + key_salt = random.randbytes(16) + self.name = name + self.uuid = uuid + self.key_salt = key_salt + self.key_type = key_type + self.wrapped_key_part = wrapped_key_part + self.group = None + + def encode(self, factor_element): + name = etree.SubElement(factor_element, "Name") + name.text = self.name + uuid = etree.SubElement(factor_element, "TypeUUID") + uuid.text = self.uuid + salt = etree.SubElement(factor_element, "KeySalt") + salt.text = b64encode(self.key_salt) + key_type = etree.SubElement(factor_element, "KeyType") + key_type.text = self.key_type + assert self.wrapped_key_part is not None + key_part = etree.SubElement(factor_element, "WrappedKey") + key_part.text = b64encode(self.wrapped_key_part) + + @staticmethod + def decode(factor_element): + name = factor_element.xpath("Name")[0].text + uuid = factor_element.xpath("TypeUUID")[0].text + + key_salt = b64decode(factor_element.xpath("KeySalt")[0].text) + key_type = factor_element.xpath("KeyType")[0].text + key_part = b64decode(factor_element.xpath("WrappedKey")[0].text) + ret = Factor( + name=name, + uuid=uuid, + key_salt=key_salt, + key_type=key_type, + wrapped_key_part=key_part + ) + + if uuid == FACTOR_TYPE_FIDO_2: + return FIDO2Factor.decode(ret, factor_element) + elif uuid == FACTOR_TYPE_PASSWORD: + return PasswordFactor.decode(ret, factor_element) + elif uuid == FACTOR_TYPE_KEY_FILE: + return KeyFileFactor.decode(ret, factor_element) + elif uuid == FACTOR_TYPE_EMPTY: + return NopFactor.decode(ret, factor_element) + + return ret + + def wrap_key_part(self, user_supplied_info, key_part, wrapping_key = None): + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + if wrapping_key is None: + wrapping_key = self.get_wrapping_key(user_supplied_info=user_supplied_info) + + encrypted_key = None + if self.key_type == FACTOR_ALG_AES_CBC: + cipher = AES.new(wrapping_key, AES.MODE_CBC, iv=self.key_salt) + encrypted_key = cipher.encrypt(key_part) + + if encrypted_key is None: + raise NotImplementedError( + "Cannot wrap a key part for unknown alg {} on factor type {}".format(self.key_type, factor_name) + ) + + validation_out = None + if self.group.validation_type == FACTOR_VALIDATE_HMAC_SHA512: + validation_out = hmac.new(key_part, self.group.validation_in, 'SHA-512').digest() + else: + raise NotImplementedError( + "Cannot verify a key part for unknown alg {} on factor type {}".format(self.group.verify_type, + factor_name) + ) + + return encrypted_key, validation_out + + def get_unwrapping_key(self, user_supplied_info): + unwrapping_key = self.get_wrapping_key(user_supplied_info=user_supplied_info) + if unwrapping_key is None: + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + raise CredentialsError("Could not get key part for factor type {}".format(factor_name)) + return unwrapping_key + + def generate_key_if_necessary(self, user_supplied_info, unwrapping_key=None): + if self.wrapped_key_part is None: + # Generate wholly new key part + if unwrapping_key is None: + unwrapping_key = self.get_unwrapping_key(user_supplied_info=user_supplied_info) + cipher = AES.new(unwrapping_key, AES.MODE_CBC, iv=self.key_salt) + new_generated_key = random.randbytes(32) + self.wrapped_key_part = cipher.encrypt(new_generated_key) + + def unwrap_key_part(self, user_supplied_info, unwrapping_key=None): + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + + self.generate_key_if_necessary(user_supplied_info=user_supplied_info, unwrapping_key=unwrapping_key) + + if unwrapping_key is None: + unwrapping_key = self.get_unwrapping_key(user_supplied_info=user_supplied_info) + + decrypted_key = None + + if self.key_type == FACTOR_ALG_AES_CBC: + # Salt forms the AES-CBC IV + cipher = AES.new(unwrapping_key, AES.MODE_CBC, iv=self.key_salt) + + # Decrypt wrapped key part + decrypted_key = cipher.decrypt(self.wrapped_key_part) + + if decrypted_key is None: + raise NotImplementedError( + "Cannot unwrap a key part for unknown alg {} on factor type {}".format(self.key_type, factor_name) + ) + + digest = None + if self.group.validation_type == FACTOR_VALIDATE_HMAC_SHA512: + digest = hmac.new(decrypted_key, self.group.validation_in, 'SHA-512').digest() + else: + # Can't verify, we don't know how or there's no validation type set for this Group + pass + + if self.group.validation_out is not None and digest is not None and digest != self.group.validation_out: + raise CredentialsError("Factor type {} did not return a valid key part".format(factor_name)) + + # All good - return the key part + return decrypted_key, digest + + def get_wrapping_key(self, user_supplied_info): + factor_name = factor_types_to_names.get(self.uuid, self.uuid) + + raise NotImplementedError( + "Cannot get unwrapping key part for factor type {}".format(factor_name) + ) + + def _get_relevant_user_info(self, user_supplied_info, section_name, factor_name=None): + if user_supplied_info is None: + return None + if factor_name is None: + factor_name = self.name + section = user_supplied_info.get('factor_data', {}).get(section_name, None) + if isinstance(section, str): + return section + return section.get(self.name, section.get("*", None)) + + +class FIDO2Factor(Factor): + def __init__(self, credential_id=None, *args, **kwargs): + for prop_name in ['credential_id']: + setattr(self, prop_name, locals()[prop_name]) + + self.rotated_salt = None + self.rotated_key = None + + super(FIDO2Factor, self).__init__( + uuid=FACTOR_TYPE_FIDO_2, + **kwargs + ) + + def encode(self, factor_element): + super(FIDO2Factor, self).encode(factor_element) + + credential_id = etree.SubElement(factor_element, "CredentialID") + credential_id.text = b64encode(self.credential_id) + + @staticmethod + def decode(partial_factor, factor_element): + credential_id = b64decode(factor_element.xpath("CredentialID")[0].text) + + return FIDO2Factor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + credential_id=credential_id + ) + + def _enroll_if_necessary(self, user_supplied_info): + if self.credential_id is None: + existing_creds = [x for x in self.group.factors if isinstance(x, FIDO2Factor) and x.credential_id is not None] + self.credential_id = fido2_enroll(user_supplied_info.get("factor_data", {}).get("fido2_pin", {}), existing_creds) + + def wrap_key_part(self, user_supplied_info, key_part, wrapping_key = None): + self._enroll_if_necessary(user_supplied_info) + return super(FIDO2Factor, self).wrap_key_part(user_supplied_info, key_part, wrapping_key) + + def get_wrapping_key(self, user_supplied_info): + # Basically only used when creating a new group with a new FIDO2 factor in it + self._enroll_if_necessary(user_supplied_info) + pin_data = user_supplied_info.get("factor_data", {}).get("fido2_pin", {}) + hmac1, hmac2 = fido2_get_key_material(pin_data, + [self.credential_id], + salt1=self.group.challenge, + salt2=self.group.challenge, + verify_user=True + ) + return hmac1 + + +class PasswordFactor(Factor): + def __init__(self, *args, **kwargs): + super(PasswordFactor, self).__init__( + uuid=FACTOR_TYPE_PASSWORD, + *args, + **kwargs + ) + + def get_wrapping_key(self, user_supplied_info): + password = self._get_relevant_user_info(user_supplied_info, "password") + + # The unwrapping alg will do something more advanced, but we hash the password once just in case + hashed_password = hashlib.sha256(password.encode('utf-8')).digest() + + return hashed_password + + @staticmethod + def decode(partial_factor, factor_element): + return PasswordFactor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + ) + + def change_password(self, old_password, new_password): + unwrapped_part, _ = self.unwrap_key_part({"factor_data": {"password": old_password}}) + self.key_salt = random.randbytes(16) + self.wrapped_key_part, _ = self.wrap_key_part({"factor_data": {"password": new_password}}, unwrapped_part) + self.group.cached_key_part = None + + +class NopFactor(Factor): + def __init__(self, *args, **kwargs): + super(NopFactor, self).__init__( + uuid=FACTOR_TYPE_EMPTY, + *args, + **kwargs + ) + + def get_wrapping_key(self, user_supplied_info): + return b'' + + def unwrap_key_part(self, user_supplied_info): + return b'', self.group.validation_out + + @staticmethod + def decode(partial_factor, factor_element): + return NopFactor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + ) + + +class KeyFileFactor(Factor): + def __init__(self, *args, **kwargs): + super(KeyFileFactor, self).__init__( + uuid=FACTOR_TYPE_KEY_FILE, + *args, + **kwargs + ) + + def get_wrapping_key(self, user_supplied_info): + if user_supplied_info is None: + user_supplied_info = {} + keyfile = self._get_relevant_user_info(user_supplied_info, "keyfile") + if keyfile is None: + return None + + return compute_keyfile_part_of_composite(keyfile) + + @staticmethod + def decode(partial_factor, factor_element): + return KeyFileFactor( + name=partial_factor.name, + key_salt=partial_factor.key_salt, + key_type=partial_factor.key_type, + wrapped_key_part=partial_factor.wrapped_key_part, + ) diff --git a/pykeepass/kdbx_parsing/kdbx4.py b/pykeepass/kdbx_parsing/kdbx4.py index 41ce7a89..9c59072d 100644 --- a/pykeepass/kdbx_parsing/kdbx4.py +++ b/pykeepass/kdbx_parsing/kdbx4.py @@ -1,6 +1,7 @@ # Evan Widloski - 2018-04-11 # keepass decrypt experimentation +import logging import struct import hashlib import argon2 @@ -8,14 +9,17 @@ from construct import ( Byte, Bytes, Int32ul, RepeatUntil, GreedyBytes, Struct, this, Mapping, Switch, Flag, Prefixed, Int64ul, Int32sl, Int64sl, GreedyString, Padding, - Peek, Checksum, Computed, IfThenElse, Pointer, Tell, If + Peek, Checksum, Computed, IfThenElse, Pointer, Tell, If, Probe, Container ) from .common import ( aes_kdf, Concatenated, AES256Payload, ChaCha20Payload, TwoFishPayload, DynamicDict, compute_key_composite, Reparsed, Decompressed, - compute_master, CompressionFlags, XML, CipherId, ProtectedStreamId, Unprotect + compute_master, CompressionFlags, XML, CipherId, ProtectedStreamId, Unprotect, CredentialsError, + populate_custom_data ) +from .factorinfo import FactorInfo +log = logging.getLogger(__name__) # -------------------- Key Derivation -------------------- @@ -30,9 +34,42 @@ def compute_transformed(context): """Compute transformed key for opening database""" + factor_info = getattr(context._._, 'factor_info', None) + if factor_info is None and hasattr(context._.header.value.dynamic_header, 'public_custom_data'): + public_custom_data = context._.header.value.dynamic_header.public_custom_data.data.dict + xml_val = public_custom_data.get("authentication_factors", None) + if xml_val is not None: + factor_info = FactorInfo.decode(xml_val.value) + + factor_data = getattr(context._._, 'factor_data', {}) + password = context._._.password + keyfile = context._._.keyfile + + additional_key_parts = [] + if factor_info is not None: + for group in factor_info.factor_groups: + if len(group.factors) == 0: + # Bogus, irrelevant + log.warning("Factor group with no factors encountered!") + continue + + unwrapped_part = group.unwrap_key_part({ + 'factor_data': factor_data, + 'password': password, + 'keyfile': keyfile + }) + + additional_key_parts.append(unwrapped_part) + + if factor_info.comprehensive: + # If the other factor header is comprehensive, don't use our supplied password/keyfile later + password = None + keyfile = None + key_composite = compute_key_composite( - password=context._._.password, - keyfile=context._._.keyfile + password=password, + keyfile=keyfile, + additional_parts=additional_key_parts ) kdf_parameters = context._.header.value.dynamic_header.kdf_parameters.data.dict @@ -140,6 +177,7 @@ def compute_header_hmac_hash(context): this.id, {'compression_flags': CompressionFlags, 'kdf_parameters': VariantDictionary, + 'public_custom_data': VariantDictionary, 'cipher_id': CipherId }, default=GreedyBytes diff --git a/pykeepass/multifactor_format.rst b/pykeepass/multifactor_format.rst new file mode 100644 index 00000000..5a2a01e6 --- /dev/null +++ b/pykeepass/multifactor_format.rst @@ -0,0 +1,212 @@ +KeePass Multifactor Authentication File Format +============================================== + +The KeePass file format is a reasonably complex thing. This page describes how KeePass +files, version 4.0, can support multi-factor authentication. + +Design Goals +------------ +Design goals of the multifactor solution: + +- Extensible to support a wide variety of authentication factors (ideally anything that can + produce keying material) +- Allow 1-of-N unlocking (spare/backup authenticators) +- Support clean future changes to cryptographic algorithms +- Allow de-authorizing authenticators +- Require minimal changes to existing database format +- Describe necessary authentication factors in the database itself (no guessing for the user) +- Avoid needing to present/activate authenticators more than once per read/save cycle + +Non-goals: + +- Allow a database with multifactor authentication enabled to be unlocked by an application not + implementing this specification +- Allow de-authorizing authenticators without requiring the presence of a remaining authenticator +- Minimal storage size of disk +- Provide a replacement for the KDBX key derivation block (PBKDF2 or Argon2id) + +Background +---------- +A KeePass file has an "inner" and an "outer" header. The "outer" header is stored +unencrypted, and thus accessible before the database is opened. It is a series of TLV-encoded +values. + +One of those values (number 12) is "public_custom_data", intended for storing arbitrary +information outside the encrypted contents blob. `public_custom_data` is a `VariantDict`, a +binary representation of a key-value map with a defined set of allowed value types. + +File Format +=========== +Only the KDBX 4.0 file (and up) are supported by this standard. + +Within the `public_custom_data` outer header, a single dictionary entry is defined. This +entry shall has key `authentication_factors`. It has type `0x18`, a UTF-8 encoded string. +If this entry is present, the database uses multifactor authentication. + +`authentication_factors` describes an ordered list of factor-groups. Each +group provides one binary part of the composite key that unlocks the KeePass database. +Within each group there may be multiple factors. For every factor, a differently-encrypted +copy of the composite key part is stored in such a way that it can only be +decrypted with the aid of that factor. + +The value of the `authentication_factors` dict item is string-encoded XML. All binary elements within +the XML are base64-encoded. An example follows (with whitespace added for legibility): + +.. code:: xml + + + 1 + true + + HMAC-SHA512 + fYB7M/IgSIMXAUDRyohObKbTp2GdJEGopyMJup7xTdg= + 4/dIKGkVeXp9fvjH0K7bEU3tywlfpMYiINYYuK55SRb2OglxBnLDWZb/nJl39+X9vbh10sIT5ZJC4ej64dlJqg== + rXRnGOtIIWLz8xN1xWPqrw3opjoCFCJO29AXij6Bt8g= + + Some Password + c127a67f-be51-4bba-ac6f-7351e8a70ba0 + AES-CBC + R9vW1f329uh/7HMaqtCdIQ== + B4pHAoQomD8728UKeST2HOxglrjzwyq2M/IPEOV4xo8= + + + Some Password + 15f77f9d-a65c-4a2e-b2b5-171f7b2df41a + AES-CBC + hdJxBLk4Ln0T6lLIVguW3w== + o1Ysop7tBPjQe8WBwAGbF60QhZ0mHfMkEFbgaKj07Jk= + 5iQ/yXVRCPwrLmNnLzKXktN0XM1Tdjn9u+GwpJnNj3fiztbtlEsCkYZ/b6Jy+dn8dQUewIayd4kJ/Bgrx9Kdfg== + + + + +The top element, `FactorInfo`, contanis the mandatory `CompatVersion` element. +The contents of `CompatVersion` must be the string `1` in this version of the specification. + +The element `Comprehensive` may be present with contents `true`. If it is not, or +it is present with any other contents, then the groups' contribution to the composite key +will be concatenated onto an outside password, keyfile, and/or yubikey challenge-response. +In other words, if `Comprehensive` is `true`, the additional factors will together comprise everything +necessary to unlock the database. + +Within each group there is a chunk to check that the correct composite key part has been derived. Three +properties are involved: + +- `ValidationType`, defining the algorithm used to validate the key part +- `ValidationIn`, whose contents are used as input to the algorithm selected by `ValidationType` +- `ValidationOut`, whose contents must be produced as output of the algorithm selected by `ValidationType` with the correct key part + +All Validation properties are OPTIONAL. When any are omitted, the validity of the factor response for that particular +`Group` cannot be determined in isolation, and incorrect input will result in a later failure using the database +composite key. When there is a factor present within the group susceptible to brute force, such as a bare password, +the Validation properties SHOULD be omitted for that group. + +A fourth *OPTIONAL* member, `Challenge`, defines what value is sent as input to any and all challenge-response factors +within the group. + +Within the group is an unordered list of `Factor` elements. Each one has a human-readable `Name` element, and a `TypeUUID` +defining what type of authenticator it is. They each also contain `WrappedKey`, an encrypted representation of the composite key +part for the group. In order to unwrap `WrappedKey`, the algorithm specified by the factor's `KeyType` is applied. + +Additional elements may be present within the Factor depending on its `TypeUUID` and/or `KeyType`. + +Pseudo-algorithm +---------------- +- Iterate through each Group entry +- Within the Group, if Validation properties are present iterate through each Factor. Otherwise have the user select a Factor +- If the Factor type (defined by `TypeUUID`) is unknown, continue +- If the Key storage type (defined by `KeyType`) is unknown, continue +- Apply the algorithm from `KeyType` to `WrappedKey`, using `KeySalt` as appropriate. This produces a candidate key part +- If the user chose this Factor explicitly (ie Validation properties are absent), skip the next four steps +- Apply the algorithm from the group's `ValidationType` to the group's `ValidationIn`, using the candidate key part +- Compare the result with the group's `ValidationOut` +- If no match, discard this Factor and continue. If a match, stop iterating through Factors within this Group +- If the end of the Group is reached without a match, error +- Concatenate the candidate key parts from each Group, in the order in which the Groups are defined +- If the `FactorInfo` element has `Comprehensive` set to `true`, stop: the concatenated result is the final key +- Concatenate the obtained key to the end of any outside-provided key parts such as passwords and/or keyfiles + +Defined Key Algorithm Types +=========================== + +AES-CBC +------- +Identifier: `AES-CBC` + +This algorithm applies AES with a 128-bit block size in the Cipher Block Chaining mode. It requires +a 16-byte-long `KeySalt`. The input and output are unpadded, and so must be a +multiple of 16 bytes in length. + +The key length used depends on the factor type, but must be either 128 bits or 256 bits. + +Defined Validation Algorithm Types +================================== + +HMAC-SHA512 +----------- +Identifier: `HMAC-SHA512` + +This applies an HMAC-SHA512 to `ValidationIn` to produce `ValidationOut`. + +Defined Factor Types +==================== +Each factor type has a UUID, to avoid ambiguity in implementation compatibility. + +Password-SHA256 +--------------- +UUID: `c127a67f-be51-4bba-ac6f-7351e8a70ba0` + +This performs a SHA-256 hash of a raw password. As such, it provides no +resistance against brute-force attacks and is generally insecure. It exists +only for compatibility with databases already encrypted with passwords. + +Key File +-------- +UUID: `6b9746c7-ca8d-430b-986d-1afaf689c4e4` + +This opens a user-specified file. If the file contains valid UTF-8 XML, then +a `Meta/Version` element is located. In the event it contains the string `1.0`, +the base64-decoded contents of a `Key/Data` element are used as the key part. +If the version element contains `2.0`, the `Key/Data` element is whitespace-stripped, +hex-decoded, and then used as the key. + +Otherwise, if the file is 32 bytes long, its contents are used as the key. + +Otherwise, if the file is 64 bytes long and contains only hexadecimal data, its +contents are hex-decoded and used as the key. + +Otherwise, the SHA-256 of the file contents is used as the key. + +FIDO2-ES256 +----------- +UUID: `15f77f9d-a65c-4a2e-b2b5-171f7b2df41a` + +This allows the use of FIDO2 authenticators supporting both the `hmac-secret` and +the `credProtect` extensions to produce keying material. + +When adding a FIDO2 authenticator, a new credential is created with: + +- Relying Party ID set to `fido2.keepass.nodomain` +- The `hmac-secret` extension enabled +- `credProtect` set to `3` (required) +- A random, non-colliding user ID +- The `ES256` (256-bit ECDSA) algorithm + +The resulting credential ID is stored (base64-encoded) within the `CredentialID` +member of the `Factor`. + +To generate key material, the `Group` element's `Challenge` member is base64-decoded +and used as a salt to a FIDO2 get-assertion call. The result is used as the composite key +part. + +Yubikey Challenge-Response +-------------------------- +UUID: `0e6803a0-915e-4ebf-95ee-f9ddd8c97eea` + +Placeholder, to be implemented. + +Null +---- +UUID: `618636bf-e202-4e0b-bb7c-e2514be00f5a` + +This factor contributes nothing to the key, and is useful only for testing. diff --git a/pykeepass/pykeepass.py b/pykeepass/pykeepass.py index 08be860b..5982dd9d 100644 --- a/pykeepass/pykeepass.py +++ b/pykeepass/pykeepass.py @@ -9,6 +9,8 @@ import zlib from binascii import Error as BinasciiError +from io import BytesIO + from construct import Container, ChecksumError, CheckError from datetime import datetime, timedelta, timezone from lxml import etree @@ -16,6 +18,8 @@ from pathlib import Path from .attachment import Attachment +from .kdbx_parsing.common import populate_custom_data +from .kdbx_parsing.factorinfo import FactorInfo from .entry import Entry from .exceptions import * from .group import Group @@ -59,16 +63,28 @@ class PyKeePass(): """ def __init__(self, filename, password=None, keyfile=None, - transformed_key=None, decrypt=True): + transformed_key=None, decrypt=True, + authentication_factors=None, + factor_data=None): + if factor_data is None: + factor_data = {} + self._factor_data = factor_data + + self._authentication_factors = authentication_factors self.read( filename=filename, password=password, keyfile=keyfile, transformed_key=transformed_key, - decrypt=decrypt + decrypt=decrypt, + factor_data=self._factor_data ) + # Regenerate header after loading existing data + self.authentication_factors = authentication_factors + + def __enter__(self): return self @@ -77,7 +93,7 @@ def __exit__(self, typ, value, tb): pass def read(self, filename=None, password=None, keyfile=None, - transformed_key=None, decrypt=True): + transformed_key=None, decrypt=True, factor_data=None): """ See class docstring. @@ -86,6 +102,12 @@ def read(self, filename=None, password=None, keyfile=None, """ self._password = password self._keyfile = keyfile + if self._factor_data is not None: + self._factor_data = factor_data + if password is not None and 'password' not in self._factor_data: + self._factor_data['password'] = password + if keyfile is not None and 'keyfile' not in self._factor_data: + self._factor_data['keyfile'] = keyfile if filename: self.filename = filename else: @@ -98,7 +120,8 @@ def read(self, filename=None, password=None, keyfile=None, password=password, keyfile=keyfile, transformed_key=transformed_key, - decrypt=decrypt + decrypt=decrypt, + factor_data=self._factor_data ) else: self.kdbx = KDBX.parse_file( @@ -106,7 +129,8 @@ def read(self, filename=None, password=None, keyfile=None, password=password, keyfile=keyfile, transformed_key=transformed_key, - decrypt=decrypt + decrypt=decrypt, + factor_data=self._factor_data ) except CheckError as e: @@ -135,7 +159,7 @@ def read(self, filename=None, password=None, keyfile=None, def reload(self): """Reload current database using previous credentials """ - self.read(self.filename, self.password, self.keyfile) + self.read(self.filename, self.password, self.keyfile, factor_data=self._factor_data) def save(self, filename=None, transformed_key=None): """Save current database object to disk. @@ -151,6 +175,9 @@ def save(self, filename=None, transformed_key=None): if not filename: filename = self.filename + if hasattr(self.kdbx.header, 'data'): + del self.kdbx.header.data + if hasattr(filename, "write"): KDBX.build_stream( self.kdbx, @@ -158,7 +185,9 @@ def save(self, filename=None, transformed_key=None): password=self.password, keyfile=self.keyfile, transformed_key=transformed_key, - decrypt=True + decrypt=True, + factor_info=self.authentication_factors, + factor_data=self._factor_data ) else: # save to temporary file to prevent database clobbering @@ -171,7 +200,9 @@ def save(self, filename=None, transformed_key=None): password=self.password, keyfile=self.keyfile, transformed_key=transformed_key, - decrypt=True + decrypt=True, + factor_info=self.authentication_factors, + factor_data=self._factor_data ) except Exception as e: os.remove(filename_tmp) @@ -208,6 +239,66 @@ def kdf_algorithm(self): elif kdf_parameters['$UUID'].value == kdf_uuids['aeskdf']: return 'aeskdf' + @property + def authentication_factors(self): + """dict: authentication factors used in computing derived key.""" + if self.version != (4, 0): + return None + + if self._authentication_factors is None: + if not hasattr(self.kdbx.header.value.dynamic_header, 'public_custom_data'): + return None + + pcd = self.kdbx.header.value.dynamic_header.public_custom_data.data.dict + auth_factor_object = pcd.get("authentication_factors", None) + if auth_factor_object is None: + return None + xml_val = auth_factor_object.value + self._authentication_factors = FactorInfo.decode(xml_val) + + return self._authentication_factors + + @authentication_factors.setter + def authentication_factors(self, authentication_factors): + if authentication_factors is not None: + for group in authentication_factors.factor_groups: + for factor in group.factors: + factor.generate_key_if_necessary({"factor_data": self._factor_data}) + + cur_pcd = getattr(self.kdbx.header.value.dynamic_header, 'public_custom_data', None) + if cur_pcd is None: + cur_dict = {} + else: + cur_dict = cur_pcd.data.dict + + if authentication_factors is None: + if 'authentication_factors' in cur_dict: + del cur_dict['authentication_factors'] + else: + cur_dict["authentication_factors"] = Container( + type=0x18, + key="authentication_factors", + value=authentication_factors.encode({"factor_data": self._factor_data}), + next_byte=0x00 + ) + + populate_custom_data(self.kdbx, cur_dict) + + # When updating a RawCopy object, we need to delete the "data" member so our changes to "value" have their + # byte representation regenerated + if hasattr(self.kdbx.header, 'data'): + del self.kdbx.header.data + + @property + def factor_data(self): + return self._factor_data + + @factor_data.setter + def factor_data(self, factor_data): + self._factor_data = factor_data + # Re-save authentication_factors to regenerate header if necessary + self.authentication_factors = self.authentication_factors + @property def transformed_key(self): """bytes: transformed key used in database decryption. May be cached @@ -823,8 +914,9 @@ def _decode_time(self, text): else: return datetime.strptime(text, DT_ISOFORMAT).replace(tzinfo=timezone.utc) + def create_database( - filename, password=None, keyfile=None, transformed_key=None + filename, password=None, keyfile=None, transformed_key=None, authentication_factors=None, factor_data=None ): """ Create a new database at ``filename`` with supplied credentials. @@ -838,6 +930,10 @@ def create_database( database is assumed to have no keyfile transformed_key (:obj:`bytes`, optional): precomputed transformed key. + authentication_factors (:obj:`FactorInfo`, optional): authentication + factors (such as a FIDO2 key) to use + factor_data (:obj:`dict`, optional): dictionary of parameters for + authentication factors. May replace password/keyfile Returns: PyKeePass @@ -849,10 +945,13 @@ def create_database( keepass_instance.filename = filename keepass_instance.password = password keepass_instance.keyfile = keyfile + keepass_instance.authentication_factors = authentication_factors + keepass_instance._factor_data = factor_data - keepass_instance.save(transformed_key) + keepass_instance.save(transformed_key=transformed_key) return keepass_instance + def debug_setup(): """Convenience function to quickly enable debug messages""" diff --git a/pyproject.toml b/pyproject.toml index 6bd7fadd..92698d5a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ dependencies = [ "argon2_cffi>=18.1.0", "pycryptodomex>=3.6.2", "lxml", + "fido2[pcsc]" ] classifiers = [ "Topic :: Security", diff --git a/tests/tests.py b/tests/tests.py index 6b16de8e..bb42c184 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -2,6 +2,7 @@ import logging import os +import random import shutil import unittest import uuid @@ -11,10 +12,18 @@ from io import BytesIO -from pykeepass import PyKeePass, icons +from pykeepass import PyKeePass, icons, create_database from pykeepass.entry import Entry -from pykeepass.group import Group from pykeepass.exceptions import BinaryError, CredentialsError, HeaderChecksumError +from pykeepass.kdbx_parsing.factorinfo import FactorInfo, FactorGroup, \ + PasswordFactor, NopFactor, FACTOR_TYPE_EMPTY, FACTOR_ALG_AES_CBC, FACTOR_VALIDATE_HMAC_SHA512, FIDO2Factor +from pykeepass.group import Group +from pykeepass.pykeepass import BLANK_DATABASE_PASSWORD + + +def mock_get_fido2_key_material(*args, **kwargs): + mock_get_fido2_key_material.call_count = getattr(mock_get_fido2_key_material, 'call_count', 0) + 1 + return b'a' * 32, b'a' * 32 """ Missing Tests: @@ -1023,6 +1032,328 @@ def test_issue344(self): e._element.xpath('Times/ExpiryTime')[0].text = None self.assertEqual(e.expiry_time, None) + +class AuthenticatorTests(KDBX4Tests): + + def build_password_factor(self, factor_group, password, key_part, name='Some Password'): + salt = random.randbytes(16) + factor = PasswordFactor( + name=name, + key_salt=salt, + key_type=FACTOR_ALG_AES_CBC, + wrapped_key_part=None + ) + factor_group.add_factor(factor) + wrapped_key, validation_out = factor.wrap_key_part({"factor_data": {"password": password}}, key_part) + + factor.wrapped_key_part = wrapped_key + if factor_group.validation_out is None: + factor_group.validation_out = validation_out + else: + assert factor_group.validation_out == validation_out + + return factor + + def build_pretend_fido_factor(self, factor_group, key_part, name='Some FIDO'): + key_salt = random.randbytes(16) + credential = random.randbytes(100) + factor = FIDO2Factor( + name=name, + key_salt=key_salt, + credential_id=credential, + key_type=FACTOR_ALG_AES_CBC, + wrapped_key_part=None + ) + factor_group.add_factor(factor) + # Monkey-patch to avoid using real FIDO2 authenticator stuff + FactorGroup.get_fido2_key_material = mock_get_fido2_key_material + wrapped_key, validation_out = factor.wrap_key_part({}, key_part, wrapping_key=b'a' * 32) + + factor.wrapped_key_part = wrapped_key + if factor_group.validation_out is None: + factor_group.validation_out = validation_out + else: + assert factor_group.validation_out == validation_out + + return factor + + def test_can_use_password_factor_alone(self): + group = FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([8]), + validation_out=None + ) + + key_part = random.randbytes(16) + + self.build_password_factor(factor_group=group, password=self.password, key_part=key_part) + + self.kp_tmp.authentication_factors = FactorInfo( + comprehensive=True, + factor_groups=[group] + ) + self.kp_tmp.password = None + self.kp_tmp.factor_data = { + "password": self.password + } + + self.kp_tmp.save() + self.kp_tmp.reload() + self.kp_tmp.save() + self.kp_tmp.reload() + + factors = self.kp_tmp.authentication_factors + self.assertIsNotNone(factors) + + self.assertEqual(1, len(factors.factor_groups)) + + def test_using_incorrect_password_fails(self): + group = FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([8]), + validation_out=None + ) + + key_part = random.randbytes(16) + + self.build_password_factor(factor_group=group, password=self.password + 'x', key_part=key_part) + + self.kp_tmp.authentication_factors = FactorInfo( + comprehensive=True, + factor_groups=[group] + ) + + with self.assertRaises(CredentialsError): + self.kp_tmp.save() + + def test_fido_challenge_does_not_rotate_with_two_authenticators(self): + group = FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([8]), + validation_out=None + ) + key_part = random.randbytes(16) + self.build_pretend_fido_factor(factor_group=group, key_part=key_part) + self.build_pretend_fido_factor(factor_group=group, key_part=key_part) + + factor_info = FactorInfo(factor_groups=[group]) + self.kp_tmp.authentication_factors = factor_info + + original_challenge = self.kp_tmp.authentication_factors.factor_groups[0].challenge + + self.kp_tmp.save() + self.kp_tmp.reload() + + after_saving_challenge = self.kp_tmp.authentication_factors.factor_groups[0].challenge + + self.kp_tmp.save() + self.kp_tmp.reload() + + after_saving_again_challenge = self.kp_tmp.authentication_factors.factor_groups[0].challenge + + self.assertEqual(original_challenge, after_saving_challenge) + self.assertEqual(after_saving_challenge, after_saving_again_challenge) + + def test_fido_challenge_rotates_with_one_authenticator(self): + group = FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([8]), + validation_out=None + ) + key_part = random.randbytes(16) + password_factor = self.build_password_factor(factor_group=group, password=self.password + 'x', key_part=key_part) + self.build_pretend_fido_factor(factor_group=group, key_part=key_part) + + factor_info = FactorInfo(factor_groups=[group]) + self.kp_tmp.authentication_factors = factor_info + + setattr(mock_get_fido2_key_material, 'call_count', 0) + + original_challenge = self.kp_tmp.authentication_factors.factor_groups[0].challenge + + self.kp_tmp.save() + self.kp_tmp.reload() + + after_saving_challenge = self.kp_tmp.authentication_factors.factor_groups[0].challenge + + self.kp_tmp.save() + self.kp_tmp.reload() + + after_saving_again_challenge = self.kp_tmp.authentication_factors.factor_groups[0].challenge + + self.assertNotEqual(original_challenge, after_saving_challenge) + # FIXME: rotating again seems broken + # self.assertNotEqual(after_saving_challenge, after_saving_again_challenge) + + # One call for the first save, and one for each of the two reloads + self.assertEqual(3, mock_get_fido2_key_material.call_count) + + def test_can_use_password_and_fido_factors_in_addition_to_normal(self): + first_group = FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([8]), + validation_out=None + ) + key_part_1 = random.randbytes(16) + self.build_password_factor(factor_group=first_group, password=self.password, key_part=key_part_1) + + second_group = FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([99]), + validation_out=None + ) + key_part_2 = random.randbytes(16) + self.build_pretend_fido_factor(factor_group=second_group, key_part=key_part_2) + + self.kp_tmp.authentication_factors = FactorInfo( + factor_groups=[first_group, second_group] + ) + self.kp_tmp.factor_data = { + "password": self.password + } + + self.kp_tmp.save() + self.kp_tmp.reload() + self.kp_tmp.save() + self.kp_tmp.reload() + + def test_additional_factors_matter(self): + group = FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([8]), + validation_out=None + ) + key_part = random.randbytes(16) + self.build_password_factor(factor_group=group, password=self.password, key_part=key_part) + + with self.assertRaises(CredentialsError): + PyKeePass( + base_dir / self.database_tmp, + password=None, + keyfile=base_dir / self.keyfile_tmp, + factor_data={ + "password": self.password + }, + authentication_factors=FactorInfo( + factor_groups=[group], + comprehensive=True + ) + ) + + def test_can_set_authentication_factors(self): + self.kp_tmp.authentication_factors = FactorInfo( + factor_groups=[ + FactorGroup( + validation_type=FACTOR_VALIDATE_HMAC_SHA512, + validation_in=bytes([8]), + validation_out=bytes([8]), + factors=[ + NopFactor( + name="First Fake Option", + key_salt=bytes([8] * 16), + key_type=FACTOR_ALG_AES_CBC, + wrapped_key_part=bytes([8] * 16) + ), + NopFactor( + name="Second Fake Option", + key_salt=bytes([8] * 16), + key_type=FACTOR_ALG_AES_CBC, + wrapped_key_part=bytes([8] * 16) + ), + ] + ) + ] + ) + + self.assertIsNotNone(self.kp_tmp.authentication_factors) + + self.kp_tmp.save() + self.kp_tmp.reload() + + factors = self.kp_tmp.authentication_factors + self.assertIsNotNone(factors) + + self.assertEqual('1', factors.compat_version) + self.assertEqual(1, len(factors.factor_groups)) + self.assertEqual(2, len(factors.factor_groups[0].factors)) + self.assertEqual(FACTOR_TYPE_EMPTY, factors.factor_groups[0].factors[-1].uuid) + self.assertEqual("First Fake Option", factors.factor_groups[0].factors[0].name) + self.assertEqual("Second Fake Option", factors.factor_groups[0].factors[1].name) + self.assertEqual(FACTOR_TYPE_EMPTY, factors.factor_groups[0].factors[-1].uuid) + + def test_saving_compatible_with_password(self): + create_database(self.database_tmp) + + PyKeePass( + self.database_tmp, + authentication_factors=FactorInfo( + comprehensive=True, + factor_groups=[FactorGroup( + factors=[ + PasswordFactor( + name="SomePassword" + ) + ] + )] + ), + factor_data={ + "password": BLANK_DATABASE_PASSWORD + } + ) + + def test_changing_password(self): + create_database(self.database_tmp) + + password_factor = PasswordFactor(name="SomePassword") + + kp = PyKeePass( + self.database_tmp, + authentication_factors=FactorInfo( + comprehensive=True, + factor_groups=[FactorGroup( + factors=[password_factor] + )] + ), + factor_data={ + "password": BLANK_DATABASE_PASSWORD + } + ) + kp.save() + + password_factor.change_password(BLANK_DATABASE_PASSWORD, "foo") + kp.factor_data = {"password": "foo"} + kp.save() + + kp = PyKeePass( + self.database_tmp, + factor_data={ + "password": "foo" + }, + decrypt=True + ) + + def test_stacking_passwords_changes_key(self): + create_database(self.database_tmp) + + with self.assertRaises(CredentialsError): + PyKeePass( + self.database_tmp, + password=BLANK_DATABASE_PASSWORD, + authentication_factors=FactorInfo( + factor_groups=[FactorGroup( + factors=[ + PasswordFactor( + name="SomePassword" + ) + ] + )] + ), + factor_data={ + "password": BLANK_DATABASE_PASSWORD + } + ) + + class EntryFindTests4(KDBX4Tests, EntryFindTests3): pass @@ -1209,6 +1540,7 @@ def test_open_save(self): self.assertEqual(kp.encryption_algorithm, encryption_algorithm) self.assertEqual(kp.kdf_algorithm, kdf_algorithm) self.assertEqual(kp.version, version) + self.assertIsNone(kp.authentication_factors) kp.save( filename_out, From d9e4fe707054b1f189711ae63b7bab4515222602 Mon Sep 17 00:00:00 2001 From: Bryan Jacobs Date: Tue, 16 Apr 2024 10:15:29 +1000 Subject: [PATCH 2/2] Remove merge cruft --- pykeepass/kdbx_parsing/common.py.orig | 449 ------------------- pykeepass/kdbx_parsing/factorinfo.py.orig | 509 ---------------------- 2 files changed, 958 deletions(-) delete mode 100644 pykeepass/kdbx_parsing/common.py.orig delete mode 100644 pykeepass/kdbx_parsing/factorinfo.py.orig diff --git a/pykeepass/kdbx_parsing/common.py.orig b/pykeepass/kdbx_parsing/common.py.orig deleted file mode 100644 index f775af10..00000000 --- a/pykeepass/kdbx_parsing/common.py.orig +++ /dev/null @@ -1,449 +0,0 @@ -from Cryptodome.Cipher import AES, ChaCha20, Salsa20 -from .twofish import Twofish -from Cryptodome.Util import Padding as CryptoPadding -import hashlib -from construct import ( - Adapter, BitStruct, BitsSwapped, Container, Flag, Padding, ListContainer, Mapping, GreedyBytes, Int32ul, Switch -) -from lxml import etree -from copy import deepcopy -import base64 -from binascii import Error as BinasciiError -import unicodedata -import zlib -import re -import codecs -from io import BytesIO -from collections import OrderedDict -import logging - -log = logging.getLogger(__name__) - - -class HeaderChecksumError(Exception): - pass - - -class CredentialsError(Exception): - pass - - -class PayloadChecksumError(Exception): - pass - - -class DynamicDict(Adapter): - """ListContainer <---> Container - Convenience mapping so we dont have to iterate ListContainer to find - the right item - - FIXME: lump kwarg was added to get around the fact that InnerHeader is - not truly a dict. We lump all 'binary' InnerHeaderItems into a single list - """ - - def __init__(self, key, subcon, lump=[]): - super().__init__(subcon) - self.key = key - self.lump = lump - - # map ListContainer to Container - def _decode(self, obj, context, path): - d = OrderedDict() - for l in self.lump: - d[l] = ListContainer([]) - for item in obj: - if item[self.key] in self.lump: - d[item[self.key]].append(item) - else: - d[item[self.key]] = item - - return Container(d) - - # map Container to ListContainer - def _encode(self, obj, context, path): - l = [] - for key in obj: - if key in self.lump: - l += obj[key] - else: - l.append(obj[key]) - - return ListContainer(l) - - -def Reparsed(subcon_out): - class Reparsed(Adapter): - """Bytes <---> Parsed subcon result - Takes in bytes and reparses it with subcon_out""" - - def _decode(self, data, con, path): - return subcon_out.parse(data, **con) - - def _encode(self, obj, con, path): - return subcon_out.build(obj, **con) - - return Reparsed - - -# is the payload compressed? -CompressionFlags = BitsSwapped( - BitStruct("compression" / Flag, Padding(8 * 4 - 1)) -) - - -# -------------------- Key Computation -------------------- -def aes_kdf(key, rounds, key_composite): - """Set up a context for AES128-ECB encryption to find transformed_key""" - - cipher = AES.new(key, AES.MODE_ECB) - - # get the number of rounds from the header and transform the key_composite - transformed_key = key_composite - for _ in range(0, rounds): - transformed_key = cipher.encrypt(transformed_key) - - return hashlib.sha256(transformed_key).digest() - - -<<<<<<< HEAD -def compute_key_composite(password=None, keyfile=None): -======= -def compute_keyfile_part_of_composite(keyfile): - """Compute just a keyfile's contribution to a database composite key.""" - if hasattr(keyfile, "read"): - keyfile_bytes = keyfile.read() - else: - with open(keyfile, 'rb') as f: - keyfile_bytes = f.read() - # try to read XML keyfile - try: - tree = etree.fromstring(keyfile_bytes) - version = tree.find('Meta/Version').text - data_element = tree.find('Key/Data') - if version.startswith('1.0'): - return base64.b64decode(data_element.text) - elif version.startswith('2.0'): - # read keyfile data and convert to bytes - keyfile_composite = bytes.fromhex(data_element.text.strip()) - # validate bytes against hash - hash = bytes.fromhex(data_element.attrib['Hash']) - hash_computed = hashlib.sha256(keyfile_composite).digest()[:4] - assert hash == hash_computed, "Keyfile has invalid hash" - return keyfile_composite - else: - raise AttributeError("Invalid version in keyfile") - # otherwise, try to read plain keyfile - except (etree.XMLSyntaxError, UnicodeDecodeError, AttributeError): - try: - try: - int(keyfile_bytes, 16) - is_hex = True - except ValueError: - is_hex = False - # if the length is 32 bytes we assume it is the key - if len(keyfile_bytes) == 32: - return keyfile_bytes - # if the length is 64 bytes we assume the key is hex encoded - elif len(keyfile_bytes) == 64 and is_hex: - return codecs.decode(keyfile_bytes, 'hex') - # anything else may be a file to hash for the key - else: - return hashlib.sha256(keyfile_bytes).digest() - except: - raise IOError('Could not read keyfile') - - -def compute_key_composite(password=None, keyfile=None, additional_parts=None): ->>>>>>> 282ce41 (Merge remote-tracking branch 'origin/master' into HEAD) - """Compute composite key. - Used in header verification and payload decryption.""" - - # hash the password - if password: - password_composite = hashlib.sha256(password.encode('utf-8')).digest() - else: - password_composite = b'' - # hash the keyfile -<<<<<<< HEAD - if keyfile: - if hasattr(keyfile, "read"): - keyfile_bytes = keyfile.read() - else: - with open(keyfile, 'rb') as f: - keyfile_bytes = f.read() - # try to read XML keyfile - try: - tree = etree.fromstring(keyfile_bytes) - version = tree.find('Meta/Version').text - data_element = tree.find('Key/Data') - if version.startswith('1.0'): - keyfile_composite = base64.b64decode(data_element.text) - elif version.startswith('2.0'): - # read keyfile data and convert to bytes - keyfile_composite = bytes.fromhex(data_element.text.strip()) - # validate bytes against hash - hash = bytes.fromhex(data_element.attrib['Hash']) - hash_computed = hashlib.sha256(keyfile_composite).digest()[:4] - assert hash == hash_computed, "Keyfile has invalid hash" - else: - raise AttributeError("Invalid version in keyfile") - # otherwise, try to read plain keyfile - except (etree.XMLSyntaxError, UnicodeDecodeError, AttributeError): - try: - try: - int(keyfile_bytes, 16) - is_hex = True - except ValueError: - is_hex = False - # if the length is 32 bytes we assume it is the key - if len(keyfile_bytes) == 32: - keyfile_composite = keyfile_bytes - # if the length is 64 bytes we assume the key is hex encoded - elif len(keyfile_bytes) == 64 and is_hex: - keyfile_composite = codecs.decode(keyfile_bytes, 'hex') - # anything else may be a file to hash for the key - else: - keyfile_composite = hashlib.sha256(keyfile_bytes).digest() - except: - raise IOError('Could not read keyfile') - - else: - keyfile_composite = b'' -======= - keyfile_composite = compute_keyfile_part_of_composite(keyfile) if keyfile else b'' ->>>>>>> 282ce41 (Merge remote-tracking branch 'origin/master' into HEAD) - - # create composite key from password and keyfile composites - return hashlib.sha256(password_composite + keyfile_composite).digest() - - -def compute_master(context): - """Computes master key from transformed key and master seed. - Used in payload decryption.""" - - # combine the transformed key with the header master seed to find the master_key - master_key = hashlib.sha256( - context._.header.value.dynamic_header.master_seed.data + - context.transformed_key).digest() - return master_key - - -# -------------------- XML Processing -------------------- - - -class XML(Adapter): - """Bytes <---> lxml etree""" - - def _decode(self, data, con, path): - parser = etree.XMLParser(remove_blank_text=True) - return etree.parse(BytesIO(data), parser) - - def _encode(self, tree, con, path): - return etree.tostring(tree) - - -class UnprotectedStream(Adapter): - """lxml etree <---> unprotected lxml etree - Iterate etree for Protected elements and decrypt using cipher - provided by get_cipher""" - - protected_xpath = '//Value[@Protected=\'True\']' - - def __init__(self, protected_stream_key, subcon): - super().__init__(subcon) - self.protected_stream_key = protected_stream_key - - def _decode(self, tree, con, path): - cipher = self.get_cipher(self.protected_stream_key(con)) - for elem in tree.xpath(self.protected_xpath): - if elem.text is not None: - try: - result = cipher.decrypt(base64.b64decode(elem.text)).decode('utf-8') - # strip invalid XML characters - https://stackoverflow.com/questions/8733233 - result = re.sub( - u'[^\u0020-\uD7FF\u0009\u000A\u000D\uE000-\uFFFD\U00010000-\U0010FFFF]+', - '', - result - ) - elem.text = result - except (UnicodeDecodeError, BinasciiError, ValueError): - # FIXME: this should be a warning eventually, need to fix all databases in tests/ first - log.error( - "Element at {} marked as protected, but could not unprotect".format(tree.getpath(elem)) - ) - return tree - - def _encode(self, tree, con, path): - tree_copy = deepcopy(tree) - cipher = self.get_cipher(self.protected_stream_key(con)) - for elem in tree_copy.xpath(self.protected_xpath): - if elem.text is not None: - elem.text = base64.b64encode( - cipher.encrypt( - elem.text.encode('utf-8') - ) - ) - return tree_copy - - -class ARCFourVariantStream(UnprotectedStream): - def get_cipher(self, protected_stream_key): - raise Exception("ARCFourVariant not implemented") - - -# https://github.com/dlech/KeePass2.x/blob/97141c02733cd3abf8d4dce1187fa7959ded58a8/KeePassLib/Cryptography/CryptoRandomStream.cs#L115-L119 -class Salsa20Stream(UnprotectedStream): - def get_cipher(self, protected_stream_key): - key = hashlib.sha256(protected_stream_key).digest() - return Salsa20.new( - key=key, - nonce=b'\xE8\x30\x09\x4B\x97\x20\x5D\x2A' - ) - - -# https://github.com/dlech/KeePass2.x/blob/97141c02733cd3abf8d4dce1187fa7959ded58a8/KeePassLib/Cryptography/CryptoRandomStream.cs#L103-L111 -class ChaCha20Stream(UnprotectedStream): - def get_cipher(self, protected_stream_key): - key_hash = hashlib.sha512(protected_stream_key).digest() - key = key_hash[:32] - nonce = key_hash[32:44] - return ChaCha20.new( - key=key, - nonce=nonce - ) - - -def Unprotect(protected_stream_id, protected_stream_key, subcon): - """Select stream cipher based on protected_stream_id""" - - return Switch( - protected_stream_id, - {'arcfourvariant': ARCFourVariantStream(protected_stream_key, subcon), - 'salsa20': Salsa20Stream(protected_stream_key, subcon), - 'chacha20': ChaCha20Stream(protected_stream_key, subcon), - }, - default=subcon - ) - - -# -------------------- Payload Encryption/Decompression -------------------- - -class Concatenated(Adapter): - """Data Blocks <---> Bytes""" - - def _decode(self, blocks, con, path): - return b''.join([block.block_data for block in blocks]) - - def _encode(self, payload_data, con, path): - blocks = [] - # split payload_data into 1 MB blocks (spec default) - i = 0 - while i < len(payload_data): - blocks.append(Container(block_data=payload_data[i:i + 2**20])) - i += 2**20 - blocks.append(Container(block_data=b'')) - - return blocks - - -class DecryptedPayload(Adapter): - """Encrypted Bytes <---> Decrypted Bytes""" - - def _decode(self, payload_data, con, path): - cipher = self.get_cipher( - con.master_key, - con._.header.value.dynamic_header.encryption_iv.data - ) - payload_data = cipher.decrypt(payload_data) - # FIXME: Construct ugliness. Fixes #244. First 32 bytes of decrypted kdbx3 payload - # should be checked against stream_start_bytes for a CredentialsError. Due to construct - # limitations, we have to decrypt the whole payload in order to check the first 32 bytes. - # However, when the credentials are wrong the invalid decrypted payload cannot - # be unpadded correctly. Instead, catch the unpad ValueError exception raised by unpad() - # and allow kdbx3.py to raise a ChecksumError - try: - payload_data = self.unpad(payload_data) - except ValueError: - log.debug("Decryption unpadding failed") - - return payload_data - - def _encode(self, payload_data, con, path): - payload_data = self.pad(payload_data) - cipher = self.get_cipher( - con.master_key, - con._.header.value.dynamic_header.encryption_iv.data - ) - payload_data = cipher.encrypt(payload_data) - - return payload_data - - -class AES256Payload(DecryptedPayload): - def get_cipher(self, master_key, encryption_iv): - return AES.new(master_key, AES.MODE_CBC, encryption_iv) - def pad(self, data): - return CryptoPadding.pad(data, 16) - def unpad(self, data): - return CryptoPadding.unpad(data, 16) - - -class ChaCha20Payload(DecryptedPayload): - def get_cipher(self, master_key, encryption_iv): - return ChaCha20.new(key=master_key, nonce=encryption_iv) - def pad(self, data): - return data - def unpad(self, data): - return data - - -class TwoFishPayload(DecryptedPayload): - def get_cipher(self, master_key, encryption_iv): - return Twofish.new(master_key, mode=Twofish.MODE_CBC, IV=encryption_iv) - def pad(self, data): - return CryptoPadding.pad(data, 16) - def unpad(self, data): - return CryptoPadding.unpad(data, 16) - - -class Decompressed(Adapter): - """Compressed Bytes <---> Decompressed Bytes""" - - def _decode(self, data, con, path): - return zlib.decompress(data, 16 + 15) - - def _encode(self, data, con, path): - compressobj = zlib.compressobj( - 6, - zlib.DEFLATED, - 16 + 15, - zlib.DEF_MEM_LEVEL, - 0 - ) - data = compressobj.compress(data) - data += compressobj.flush() - return data - - -# -------------------- Cipher Enums -------------------- - -# payload encryption method -# https://github.com/keepassxreboot/keepassxc/blob/8324d03f0a015e62b6182843b4478226a5197090/src/format/KeePass2.cpp#L24-L26 -CipherId = Mapping( - GreedyBytes, - {'aes256': b'1\xc1\xf2\xe6\xbfqCP\xbeX\x05!j\xfcZ\xff', - 'twofish': b'\xadh\xf2\x9fWoK\xb9\xa3j\xd4z\xf9e4l', - 'chacha20': b'\xd6\x03\x8a+\x8boL\xb5\xa5$3\x9a1\xdb\xb5\x9a' - } -) - -# protected entry encryption method -# https://github.com/dlech/KeePass2.x/blob/149ab342338ffade24b44aaa1fd89f14b64fda09/KeePassLib/Cryptography/CryptoRandomStream.cs#L35 -ProtectedStreamId = Mapping( - Int32ul, - {'none': 0, - 'arcfourvariant': 1, - 'salsa20': 2, - 'chacha20': 3, - } -) diff --git a/pykeepass/kdbx_parsing/factorinfo.py.orig b/pykeepass/kdbx_parsing/factorinfo.py.orig deleted file mode 100644 index eb8e9045..00000000 --- a/pykeepass/kdbx_parsing/factorinfo.py.orig +++ /dev/null @@ -1,509 +0,0 @@ -import hashlib -import hmac -import logging -import random -from io import BytesIO - -from Cryptodome.Cipher import AES -from lxml import etree -from base64 import b64encode, b64decode - -from pykeepass.exceptions import CredentialsError -from pykeepass.fido2 import fido2_get_key_material, fido2_enroll -from pykeepass.kdbx_parsing.common import compute_keyfile_part_of_composite - -FACTOR_TYPE_FIDO_2 = "15f77f9d-a65c-4a2e-b2b5-171f7b2df41a" -FACTOR_TYPE_KEY_FILE = "6b9746c7-ca8d-430b-986d-1afaf689c4e4" -FACTOR_TYPE_YK_CHALRESP = "0e6803a0-915e-4ebf-95ee-f9ddd8c97eea" -FACTOR_TYPE_PASSWORD = "c127a67f-be51-4bba-ac6f-7351e8a70ba0" -FACTOR_TYPE_EMPTY = "618636bf-e202-4e0b-bb7c-e2514be00f5a" - -factor_types_to_names = { - FACTOR_TYPE_FIDO_2: 'FIDO2', - FACTOR_TYPE_KEY_FILE: 'key file', - FACTOR_TYPE_YK_CHALRESP: 'YK challenge-response', - FACTOR_TYPE_PASSWORD: 'password', - FACTOR_TYPE_EMPTY: 'null (for testing)' -} - -FACTOR_ALG_AES_CBC = "AES-CBC" - -FACTOR_VALIDATE_HMAC_SHA512 = "HMAC-SHA512" - -log = logging.getLogger(__name__) - - -class FactorInfo: - def __init__(self, compat_version="1", comprehensive=False, factor_groups=None): - if factor_groups is None: - factor_groups = [] - self.compat_version = compat_version - self.comprehensive = comprehensive - self.factor_groups = factor_groups - - def encode(self, user_supplied_info): - root_element = etree.Element("FactorInfo") - - version = etree.SubElement(root_element, "CompatVersion") - version.text = str(self.compat_version) - - if self.comprehensive: - inclusive = etree.SubElement(root_element, "Comprehensive") - inclusive.text = "true" - - for group in self.factor_groups: - factor_group = etree.SubElement(root_element, "Group") - - group.encode(factor_group, user_supplied_info) - - return etree.tostring(root_element, encoding='utf-8').decode() - - @staticmethod - def decode(given_bytes): - parser = etree.XMLParser(remove_blank_text=True) - parsed = etree.parse(BytesIO(given_bytes.encode('utf-8')), parser) - - comprehensive_el = parsed.xpath("/FactorInfo/Comprehensive") - comprehensive = True if len(comprehensive_el) == 1 and comprehensive_el[0].text == "true" else False - - ret = FactorInfo( - compat_version=parsed.xpath("/FactorInfo/CompatVersion")[0].text, - comprehensive=comprehensive - ) - - for group in parsed.xpath("/FactorInfo/Group"): - ret.factor_groups.append(FactorGroup.decode(group)) - - return ret - - -class FactorGroup: - def __init__(self, validation_type=FACTOR_VALIDATE_HMAC_SHA512, validation_in=None, validation_out=None, challenge=None, factors=None): - self.factors = [] - self.validation_type = validation_type - self.validation_in = validation_in - self.validation_out = validation_out - self.challenge = challenge - self.cached_key_part = None - - if factors is not None: - for factor in factors: - self.add_factor(factor) - - def add_factor(self, factor): - self.factors.append(factor) - factor.group = self - if isinstance(factor, FIDO2Factor) and self.challenge is None: - self.challenge = random.randbytes(32) - - def generate_validation(self, user_supplied_info): - if self.validation_in is None: - # Generate validation info - if len(self.factors) == 0: - raise CredentialsError("Cannot save a FactorGroup with no factors and unset validation info") - self.validation_in = random.randbytes(32) - # Arbitrarily get some factor - they should all create the same validation output - wrapping_key = None - for factor in self.factors: - wrapping_key = factor.get_wrapping_key(user_supplied_info=user_supplied_info) - if wrapping_key is not None: - break - if wrapping_key is None: - raise CredentialsError("Cannot find a factor to generate validation info") - _, self.validation_out = factor.unwrap_key_part(user_supplied_info=user_supplied_info, unwrapping_key=wrapping_key) - - assert self.validation_in is not None and self.validation_out is not None - - def encode(self, group_element, user_supplied_info): - if self.validation_in is not None: - key_validation_type = etree.SubElement(group_element, "ValidationType") - key_validation_type.text = self.validation_type - - key_validation_in = etree.SubElement(group_element, "ValidationIn") - key_validation_in.text = b64encode(self.validation_in) - - key_validation_out = etree.SubElement(group_element, "ValidationOut") - key_validation_out.text = b64encode(self.validation_out) - - if self.challenge is not None: - challenge = etree.SubElement(group_element, "Challenge") - challenge.text = b64encode(self.challenge) - - for factor in self.factors: - factor_element = etree.SubElement(group_element, "Factor") - factor.encode(factor_element) - - @staticmethod - def decode(group_element): - key_validation_type = None - key_validation_in = None - key_validation_out = None - - validation_element = group_element.xpath("ValidationType") - if validation_element: - key_validation_type = validation_element[0].text - key_validation_in = b64decode(group_element.xpath("ValidationIn")[0].text) - key_validation_out = b64decode(group_element.xpath("ValidationOut")[0].text) - - challenge = None - challenges = group_element.xpath("Challenge") - if challenges: - challenge = b64decode(challenges[0].text) - - factors = [] - for factor in group_element.xpath("Factor"): - factors.append(Factor.decode(factor)) - return FactorGroup( - validation_type=key_validation_type, - validation_in=key_validation_in, - validation_out=key_validation_out, - challenge=challenge, - factors=factors - ) - - def unwrap_key_part(self, user_supplied_info): - if self.cached_key_part is not None and user_supplied_info == self.cached_key_part[0]: - return self.cached_key_part[1] - - fido2_factors = [x for x in self.factors if isinstance(x, FIDO2Factor)] - other_factors = [x for x in self.factors if not isinstance(x, FIDO2Factor)] - - for factor in other_factors: - # Try non-FIDO factors first - try: - unwrapped_part, _ = factor.unwrap_key_part(user_supplied_info=user_supplied_info) - if unwrapped_part is not None: - self.cached_key_part = (user_supplied_info, unwrapped_part) - return unwrapped_part - except CredentialsError as e: - log.error("Factor failed: {}".format(e)) - continue - - next_challenge = random.randbytes(32) - - if len(fido2_factors) > 0: - # Do all the FIDO2 factors in the group "in one go" to avoid prompting for authenticators repeatedly - pin_data = user_supplied_info.get("factor_data", {}).get("fido2_pin", {}) - - fido2_credentials_in_group = [x.credential_id for x in fido2_factors] - - result1, result2 = self.get_fido2_key_material(fido2_credentials_in_group, next_challenge, pin_data) - - for factor in fido2_factors: - try: - unwrapped_part, _ = factor.unwrap_key_part(user_supplied_info=user_supplied_info, unwrapping_key=result1) - if unwrapped_part is not None: - - # Success with FIDO2! Rotate the challenge if we can (if there's just one authenticator) - if len(fido2_factors) == 1: - self.rotate_fido2(factor, unwrapped_part, next_challenge=next_challenge, next_key_material=result2) - - self.cached_key_part = (user_supplied_info, unwrapped_part) - return unwrapped_part - except CredentialsError as e: - log.error("Factor failed: {}".format(e)) - continue - - raise CredentialsError("Unable to derive key part for a required 2FA group") - - def get_fido2_key_material(self, fido2_credentials_in_group, next_challenge, pin_data): - return fido2_get_key_material(pin_data, - fido2_credentials_in_group, - salt1=self.challenge, - salt2=next_challenge, - verify_user=True - ) - - def rotate_fido2(self, fido2_factor, key_part, next_challenge, next_key_material): - if len(self.factors) == 1: - # We really only have one factor: rotate the validation randomness too - self.validation_in = random.randbytes(32) - - self.challenge = next_challenge - wrapped_part = fido2_factor.wrap_key_part({}, key_part, next_key_material) - fido2_factor.wrapped_key_part, new_validation_out = wrapped_part - - if len(self.factors) == 1: - self.validation_out = new_validation_out - - -class Factor: - def __init__(self, name, uuid, key_salt=None, key_type=FACTOR_ALG_AES_CBC, wrapped_key_part=None): - if key_salt is None: - key_salt = random.randbytes(16) - self.name = name - self.uuid = uuid - self.key_salt = key_salt - self.key_type = key_type - self.wrapped_key_part = wrapped_key_part - self.group = None - - def encode(self, factor_element): - name = etree.SubElement(factor_element, "Name") - name.text = self.name - uuid = etree.SubElement(factor_element, "TypeUUID") - uuid.text = self.uuid - salt = etree.SubElement(factor_element, "KeySalt") - salt.text = b64encode(self.key_salt) - key_type = etree.SubElement(factor_element, "KeyType") - key_type.text = self.key_type - assert self.wrapped_key_part is not None - key_part = etree.SubElement(factor_element, "WrappedKey") - key_part.text = b64encode(self.wrapped_key_part) - - @staticmethod - def decode(factor_element): - name = factor_element.xpath("Name")[0].text - uuid = factor_element.xpath("TypeUUID")[0].text - - key_salt = b64decode(factor_element.xpath("KeySalt")[0].text) - key_type = factor_element.xpath("KeyType")[0].text - key_part = b64decode(factor_element.xpath("WrappedKey")[0].text) - ret = Factor( - name=name, - uuid=uuid, - key_salt=key_salt, - key_type=key_type, - wrapped_key_part=key_part - ) - - if uuid == FACTOR_TYPE_FIDO_2: - return FIDO2Factor.decode(ret, factor_element) - elif uuid == FACTOR_TYPE_PASSWORD: - return PasswordFactor.decode(ret, factor_element) - elif uuid == FACTOR_TYPE_KEY_FILE: - return KeyFileFactor.decode(ret, factor_element) - elif uuid == FACTOR_TYPE_EMPTY: - return NopFactor.decode(ret, factor_element) - - return ret - - def wrap_key_part(self, user_supplied_info, key_part, wrapping_key = None): - factor_name = factor_types_to_names.get(self.uuid, self.uuid) - if wrapping_key is None: - wrapping_key = self.get_wrapping_key(user_supplied_info=user_supplied_info) - - encrypted_key = None - if self.key_type == FACTOR_ALG_AES_CBC: - cipher = AES.new(wrapping_key, AES.MODE_CBC, iv=self.key_salt) - encrypted_key = cipher.encrypt(key_part) - - if encrypted_key is None: - raise NotImplementedError( - "Cannot wrap a key part for unknown alg {} on factor type {}".format(self.key_type, factor_name) - ) - - validation_out = None - if self.group.validation_type == FACTOR_VALIDATE_HMAC_SHA512: - validation_out = hmac.new(key_part, self.group.validation_in, 'SHA-512').digest() - else: - raise NotImplementedError( - "Cannot verify a key part for unknown alg {} on factor type {}".format(self.group.verify_type, - factor_name) - ) - - return encrypted_key, validation_out - - def get_unwrapping_key(self, user_supplied_info): - unwrapping_key = self.get_wrapping_key(user_supplied_info=user_supplied_info) - if unwrapping_key is None: - factor_name = factor_types_to_names.get(self.uuid, self.uuid) - raise CredentialsError("Could not get key part for factor type {}".format(factor_name)) - return unwrapping_key - - def generate_key_if_necessary(self, user_supplied_info, unwrapping_key=None): - if self.wrapped_key_part is None: - # Generate wholly new key part - if unwrapping_key is None: - unwrapping_key = self.get_unwrapping_key(user_supplied_info=user_supplied_info) - cipher = AES.new(unwrapping_key, AES.MODE_CBC, iv=self.key_salt) - new_generated_key = random.randbytes(32) - self.wrapped_key_part = cipher.encrypt(new_generated_key) - - def unwrap_key_part(self, user_supplied_info, unwrapping_key=None): - factor_name = factor_types_to_names.get(self.uuid, self.uuid) - - self.generate_key_if_necessary(user_supplied_info=user_supplied_info, unwrapping_key=unwrapping_key) - - if unwrapping_key is None: - unwrapping_key = self.get_unwrapping_key(user_supplied_info=user_supplied_info) - - decrypted_key = None - - if self.key_type == FACTOR_ALG_AES_CBC: - # Salt forms the AES-CBC IV - cipher = AES.new(unwrapping_key, AES.MODE_CBC, iv=self.key_salt) - - # Decrypt wrapped key part - decrypted_key = cipher.decrypt(self.wrapped_key_part) - - if decrypted_key is None: - raise NotImplementedError( - "Cannot unwrap a key part for unknown alg {} on factor type {}".format(self.key_type, factor_name) - ) - - digest = None - if self.group.validation_type == FACTOR_VALIDATE_HMAC_SHA512: - digest = hmac.new(decrypted_key, self.group.validation_in, 'SHA-512').digest() - else: - # Can't verify, we don't know how or there's no validation type set for this Group - pass - - if self.group.validation_out is not None and digest is not None and digest != self.group.validation_out: - raise CredentialsError("Factor type {} did not return a valid key part".format(factor_name)) - - # All good - return the key part - return decrypted_key, digest - - def get_wrapping_key(self, user_supplied_info): - factor_name = factor_types_to_names.get(self.uuid, self.uuid) - - raise NotImplementedError( - "Cannot get unwrapping key part for factor type {}".format(factor_name) - ) - - def _get_relevant_user_info(self, user_supplied_info, section_name, factor_name=None): - if user_supplied_info is None: - return None - if factor_name is None: - factor_name = self.name - section = user_supplied_info.get('factor_data', {}).get(section_name, None) - if isinstance(section, str): - return section - return section.get(self.name, section.get("*", None)) - - -class FIDO2Factor(Factor): - def __init__(self, credential_id=None, *args, **kwargs): - for prop_name in ['credential_id']: - setattr(self, prop_name, locals()[prop_name]) - - self.rotated_salt = None - self.rotated_key = None - - super(FIDO2Factor, self).__init__( - uuid=FACTOR_TYPE_FIDO_2, - **kwargs - ) - - def encode(self, factor_element): - super(FIDO2Factor, self).encode(factor_element) - - credential_id = etree.SubElement(factor_element, "CredentialID") - credential_id.text = b64encode(self.credential_id) - - @staticmethod - def decode(partial_factor, factor_element): - credential_id = b64decode(factor_element.xpath("CredentialID")[0].text) - - return FIDO2Factor( - name=partial_factor.name, - key_salt=partial_factor.key_salt, - key_type=partial_factor.key_type, - wrapped_key_part=partial_factor.wrapped_key_part, - credential_id=credential_id - ) - - def _enroll_if_necessary(self, user_supplied_info): - if self.credential_id is None: - existing_creds = [x for x in self.group.factors if isinstance(x, FIDO2Factor) and x.credential_id is not None] - self.credential_id = fido2_enroll(user_supplied_info.get("factor_data", {}).get("fido2_pin", {}), existing_creds) - - def wrap_key_part(self, user_supplied_info, key_part, wrapping_key = None): - self._enroll_if_necessary(user_supplied_info) - return super(FIDO2Factor, self).wrap_key_part(user_supplied_info, key_part, wrapping_key) - - def get_wrapping_key(self, user_supplied_info): - # Basically only used when creating a new group with a new FIDO2 factor in it - self._enroll_if_necessary(user_supplied_info) - pin_data = user_supplied_info.get("factor_data", {}).get("fido2_pin", {}) - hmac1, hmac2 = fido2_get_key_material(pin_data, - [self.credential_id], - salt1=self.group.challenge, - salt2=self.group.challenge, - verify_user=True - ) - return hmac1 - - -class PasswordFactor(Factor): - def __init__(self, *args, **kwargs): - super(PasswordFactor, self).__init__( - uuid=FACTOR_TYPE_PASSWORD, - *args, - **kwargs - ) - - def get_wrapping_key(self, user_supplied_info): - password = self._get_relevant_user_info(user_supplied_info, "password") - - # The unwrapping alg will do something more advanced, but we hash the password once just in case - hashed_password = hashlib.sha256(password.encode('utf-8')).digest() - - return hashed_password - - @staticmethod - def decode(partial_factor, factor_element): - return PasswordFactor( - name=partial_factor.name, - key_salt=partial_factor.key_salt, - key_type=partial_factor.key_type, - wrapped_key_part=partial_factor.wrapped_key_part, - ) - - def change_password(self, old_password, new_password): - unwrapped_part, _ = self.unwrap_key_part({"factor_data": {"password": old_password}}) - self.key_salt = random.randbytes(16) - self.wrapped_key_part, _ = self.wrap_key_part({"factor_data": {"password": new_password}}, unwrapped_part) - self.group.cached_key_part = None - - -class NopFactor(Factor): - def __init__(self, *args, **kwargs): - super(NopFactor, self).__init__( - uuid=FACTOR_TYPE_EMPTY, - *args, - **kwargs - ) - - def get_wrapping_key(self, user_supplied_info): - return b'' - - def unwrap_key_part(self, user_supplied_info): - return b'', self.group.validation_out - - @staticmethod - def decode(partial_factor, factor_element): - return NopFactor( - name=partial_factor.name, - key_salt=partial_factor.key_salt, - key_type=partial_factor.key_type, - wrapped_key_part=partial_factor.wrapped_key_part, - ) - - -class KeyFileFactor(Factor): - def __init__(self, *args, **kwargs): - super(KeyFileFactor, self).__init__( - uuid=FACTOR_TYPE_KEY_FILE, - *args, - **kwargs - ) - - def get_wrapping_key(self, user_supplied_info): - if user_supplied_info is None: - user_supplied_info = {} - keyfile = self._get_relevant_user_info(user_supplied_info, "keyfile") - if keyfile is None: - return None - - return compute_keyfile_part_of_composite(keyfile) - - @staticmethod - def decode(partial_factor, factor_element): - return KeyFileFactor( - name=partial_factor.name, - key_salt=partial_factor.key_salt, - key_type=partial_factor.key_type, - wrapped_key_part=partial_factor.wrapped_key_part, - )