Skip to content

Commit

Permalink
Change master seed on each save
Browse files Browse the repository at this point in the history
Fixes: #219
  • Loading branch information
A6GibKm committed Nov 29, 2024
1 parent b94fee3 commit 5d79550
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 27 deletions.
17 changes: 14 additions & 3 deletions pykeepass/kdbx_parsing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Adapter,
BitsSwapped,
BitStruct,
Bytes,
Container,
Flag,
GreedyBytes,
Expand All @@ -31,6 +32,16 @@
log = logging.getLogger(__name__)


class RandomBytes(Bytes):
"""Same as Bytes, but generate random bytes when building"""

def _build(self, obj, stream, context, path):
length = self.length(context) if callable(self.length) else self.length
data = get_random_bytes(length)
stream_write(stream, data, length, path)
return data


class HeaderChecksumError(Exception):
pass

Expand Down Expand Up @@ -183,7 +194,7 @@ def compute_master(context):

# combine the transformed key with the header master seed to find the master_key
master_key = hashlib.sha256(
context._.header.value.dynamic_header.master_seed.data +
context._.header.dynamic_header.master_seed.data +
context.transformed_key).digest()
return master_key

Expand Down Expand Up @@ -312,7 +323,7 @@ class DecryptedPayload(Adapter):
def _decode(self, payload_data, con, path):
cipher = self.get_cipher(
con.master_key,
con._.header.value.dynamic_header.encryption_iv.data
con._.header.dynamic_header.encryption_iv.data
)
payload_data = cipher.decrypt(payload_data)
# FIXME: Construct ugliness. Fixes #244. First 32 bytes of decrypted kdbx3 payload
Expand All @@ -332,7 +343,7 @@ def _encode(self, payload_data, con, path):
payload_data = self.pad(payload_data)
cipher = self.get_cipher(
con.master_key,
con._.header.value.dynamic_header.encryption_iv.data
con._.header.dynamic_header.encryption_iv.data
)
payload_data = cipher.encrypt(payload_data)

Expand Down
30 changes: 26 additions & 4 deletions pykeepass/kdbx_parsing/kdbx.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,39 @@
from construct import Bytes, Check, Int16ul, RawCopy, Struct, Switch, this

from construct import Bytes, Check, Int16ul, RawCopy, Struct, Switch, this, stream_seek, stream_tell, stream_read, Subconstruct
from .kdbx3 import Body as Body3
from .kdbx3 import DynamicHeader as DynamicHeader3
from .kdbx4 import Body as Body4
from .kdbx4 import DynamicHeader as DynamicHeader4



class Copy(Subconstruct):
"""Same as RawCopy, but don't create parent container when parsing.
Instead store data in ._data attribute of subconstruct, and never rebuild from data
"""

def _parse(self, stream, context, path):
offset1 = stream_tell(stream, path)
obj = self.subcon._parsereport(stream, context, path)
offset2 = stream_tell(stream, path)
stream_seek(stream, offset1, 0, path)
obj._data = stream_read(stream, offset2 - offset1, path)
return obj

def _build(self, obj, stream, context, path):
offset1 = stream_tell(stream, path)
obj = self.subcon._build(obj, stream, context, path)
offset2 = stream_tell(stream, path)
stream_seek(stream, offset1, 0, path)
obj._data = stream_read(stream, offset2 - offset1, path)
return obj


# verify file signature
def check_signature(ctx):
return ctx.sig1 == b'\x03\xd9\xa2\x9a' and ctx.sig2 == b'\x67\xFB\x4B\xB5'

KDBX = Struct(
"header" / RawCopy(
"header" / Copy(
Struct(
"sig1" / Bytes(4),
"sig2" / Bytes(4),
Expand All @@ -27,7 +49,7 @@ def check_signature(ctx):
)
),
"body" / Switch(
this.header.value.major_version,
this.header.major_version,
{3: Body3,
4: Body4
}
Expand Down
16 changes: 9 additions & 7 deletions pykeepass/kdbx_parsing/kdbx3.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
Reparsed,
TwoFishPayload,
Unprotect,
RandomBytes,
aes_kdf,
compute_key_composite,
compute_master,
Expand All @@ -63,8 +64,8 @@ def compute_transformed(context):
keyfile=context._._.keyfile
)
transformed_key = aes_kdf(
context._.header.value.dynamic_header.transform_seed.data,
context._.header.value.dynamic_header.transform_rounds.data,
context._.header.dynamic_header.transform_seed.data,
context._.header.dynamic_header.transform_rounds.data,
key_composite
)

Expand Down Expand Up @@ -97,6 +98,7 @@ def compute_transformed(context):
{'compression_flags': CompressionFlags,
'cipher_id': CipherId,
'transform_rounds': Int64ul,
'master_seed': RandomBytes(32),
'protected_stream_id': ProtectedStreamId
},
default=GreedyBytes
Expand Down Expand Up @@ -160,16 +162,16 @@ def compute_transformed(context):
# validate payload decryption
"cred_check" / Checksum(
Bytes(32),
lambda this: this._._.header.value.dynamic_header.stream_start_bytes.data,
lambda this: this._._.header.dynamic_header.stream_start_bytes.data,
this,
# exception=CredentialsError
),
"xml" / Unprotect(
this._._.header.value.dynamic_header.protected_stream_id.data,
this._._.header.value.dynamic_header.protected_stream_key.data,
this._._.header.dynamic_header.protected_stream_id.data,
this._._.header.dynamic_header.protected_stream_key.data,
XML(
IfThenElse(
this._._.header.value.dynamic_header.compression_flags.data.compression,
this._._.header.dynamic_header.compression_flags.data.compression,
Decompressed(Concatenated(PayloadBlocks)),
Concatenated(PayloadBlocks)
)
Expand All @@ -187,7 +189,7 @@ def compute_transformed(context):
"payload" / If(this._._.decrypt,
UnpackedPayload(
Switch(
this._.header.value.dynamic_header.cipher_id.data,
this._.header.dynamic_header.cipher_id.data,
{'aes256': AES256Payload(GreedyBytes),
'chacha20': ChaCha20Payload(GreedyBytes),
'twofish': TwoFishPayload(GreedyBytes),
Expand Down
17 changes: 10 additions & 7 deletions pykeepass/kdbx_parsing/kdbx4.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
Decompressed,
DynamicDict,
ProtectedStreamId,
RandomBytes,
Reparsed,
TwoFishPayload,
Unprotect,
Expand All @@ -67,7 +68,7 @@ def compute_transformed(context):
password=context._._.password,
keyfile=context._._.keyfile
)
kdf_parameters = context._.header.value.dynamic_header.kdf_parameters.data.dict
kdf_parameters = context._.header.dynamic_header.kdf_parameters.data.dict

if context._._.transformed_key is not None:
transformed_key = context._._.transformed_key
Expand Down Expand Up @@ -106,12 +107,12 @@ def compute_header_hmac_hash(context):
hashlib.sha512(
b'\xff' * 8 +
hashlib.sha512(
context._.header.value.dynamic_header.master_seed.data +
context._.header.dynamic_header.master_seed.data +
context.transformed_key +
b'\x01'
).digest()
).digest(),
context._.header.data,
context._.header._data,
hashlib.sha256
).digest()

Expand Down Expand Up @@ -173,6 +174,8 @@ def compute_header_hmac_hash(context):
this.id,
{'compression_flags': CompressionFlags,
'kdf_parameters': VariantDictionary,
'master_seed': RandomBytes(32),
'encryption_iv': RandomBytes(12),
'cipher_id': CipherId
},
default=GreedyBytes
Expand All @@ -198,7 +201,7 @@ def compute_payload_block_hash(this):
hashlib.sha512(
struct.pack('<Q', this._index) +
hashlib.sha512(
this._._.header.value.dynamic_header.master_seed.data +
this._._.header.dynamic_header.master_seed.data +
this._.transformed_key + b'\x01'
).digest()
).digest(),
Expand Down Expand Up @@ -233,7 +236,7 @@ def compute_payload_block_hash(this):
))

DecryptedPayload = Switch(
this._.header.value.dynamic_header.cipher_id.data,
this._.header.dynamic_header.cipher_id.data,
{'aes256': AES256Payload(EncryptedPayload),
'chacha20': ChaCha20Payload(EncryptedPayload),
'twofish': TwoFishPayload(EncryptedPayload)
Expand Down Expand Up @@ -289,7 +292,7 @@ def compute_payload_block_hash(this):
"sha256" / Checksum(
Bytes(32),
lambda data: hashlib.sha256(data).digest(),
this._.header.data,
this._.header._data,
# exception=HeaderChecksumError,
),
"cred_check" / If(this._._.decrypt,
Expand All @@ -303,7 +306,7 @@ def compute_payload_block_hash(this):
"payload" / If(this._._.decrypt,
UnpackedPayload(
IfThenElse(
this._.header.value.dynamic_header.compression_flags.data.compression,
this._.header.dynamic_header.compression_flags.data.compression,
Decompressed(DecryptedPayload),
DecryptedPayload
)
Expand Down
12 changes: 6 additions & 6 deletions pykeepass/pykeepass.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,15 @@ def version(self):
"""tuple: Length 2 tuple of ints containing major and minor versions.
Generally (3, 1) or (4, 0)."""
return (
self.kdbx.header.value.major_version,
self.kdbx.header.value.minor_version
self.kdbx.header.major_version,
self.kdbx.header.minor_version
)

@property
def encryption_algorithm(self):
"""str: encryption algorithm used by database during decryption.
Can be one of 'aes256', 'chacha20', or 'twofish'."""
return self.kdbx.header.value.dynamic_header.cipher_id.data
return self.kdbx.header.dynamic_header.cipher_id.data

@property
def kdf_algorithm(self):
Expand All @@ -205,7 +205,7 @@ def kdf_algorithm(self):
if self.version == (3, 1):
return 'aeskdf'
elif self.version == (4, 0):
kdf_parameters = self.kdbx.header.value.dynamic_header.kdf_parameters.data.dict
kdf_parameters = self.kdbx.header.dynamic_header.kdf_parameters.data.dict
if kdf_parameters['$UUID'].value == kdf_uuids['argon2']:
return 'argon2'
elif kdf_parameters['$UUID'].value == kdf_uuids['argon2id']:
Expand All @@ -225,9 +225,9 @@ def database_salt(self):
credentials which are used in extension to current keyfile."""

if self.version == (3, 1):
return self.kdbx.header.value.dynamic_header.transform_seed.data
return self.kdbx.header.dynamic_header.transform_seed.data

kdf_parameters = self.kdbx.header.value.dynamic_header.kdf_parameters.data.dict
kdf_parameters = self.kdbx.header.dynamic_header.kdf_parameters.data.dict
return kdf_parameters['S'].value

@property
Expand Down
25 changes: 25 additions & 0 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,31 @@ def test_open_no_decrypt(self):
self.assertEqual(kp.encryption_algorithm, enc_alg)
self.assertEqual(kp.version, version)

def test_master_seed_differs(self):
databases = [
# 'test3.kdbx',
'test4.kdbx',
]
keyfiles = [
# 'test3.key',
'test4.key',
]
for database, keyfile in zip(databases, keyfiles):
path = os.path.join(base_dir, database)
keyfile = os.path.join(base_dir, keyfile)
kp = PyKeePass(path, password='password', keyfile=keyfile)
master_seed = kp.kdbx.header.dynamic_header.master_seed.data
vector_iv = kp.kdbx.header.dynamic_header.vector_iv.data
stream = BytesIO()
kp.save(stream)
stream.seek(0)
new_kp = PyKeePass(stream, password='password', keyfile=keyfile)
new_master_seed = new_kp.kdbx.header.dynamic_header.master_seed.data
new_vector_iv = new_kp.kdbx.header.dynamic_header.vector_iv.data

self.assertNotEqual(master_seed, new_master_seed)
self.assertNotEqual(vector_iv, new_vector_iv)

if __name__ == '__main__':
unittest.main()

0 comments on commit 5d79550

Please sign in to comment.