diff --git a/pubnub/crypto.py b/pubnub/crypto.py index abd4ca4c..0fd4be6f 100644 --- a/pubnub/crypto.py +++ b/pubnub/crypto.py @@ -112,7 +112,6 @@ def decrypt(self, key, file): class PubNubCryptoModule(PubNubCrypto): FALLBACK_CRYPTOR_ID: str = '0000' - CRYPTOR_VERSION: str = 1 cryptor_map = {} default_cryptor_id: str @@ -120,18 +119,6 @@ def __init__(self, cryptor_map: Dict[str, PubNubCryptor], default_cryptor: PubNu self.cryptor_map = cryptor_map self.default_cryptor_id = default_cryptor.CRYPTOR_ID - def register_cryptor(self, cryptor_id, cryptor_instance): - if len(cryptor_id) != 4: - raise PubNubException('Malformed cryptor_id.') - - if cryptor_id in self.cryptor_map.keys(): - raise PubNubException('Cryptor_id already in use') - - if not isinstance(cryptor_instance, PubNubCrypto): - raise PubNubException('Invalid cryptor instance') - - self.cryptor_map[cryptor_id] = cryptor_instance - def _validate_cryptor_id(self, cryptor_id: str) -> str: cryptor_id = cryptor_id or self.default_cryptor_id @@ -155,19 +142,13 @@ def encrypt(self, message: str, cryptor_id: str = None) -> str: def decrypt(self, input): data = b64decode(input) header = self.decode_header(data) - if header: cryptor_id = header['cryptor_id'] - cryptor_version = header['cryptor_ver'] payload = CryptorPayload(data=data[header['length']:], cryptor_data=header['cryptor_data']) if not header: cryptor_id = self.FALLBACK_CRYPTOR_ID - cryptor_version = self.CRYPTOR_VERSION payload = CryptorPayload(data=data) - if cryptor_id not in self.cryptor_map.keys() or cryptor_version > self.CRYPTOR_VERSION: - raise PubNubException('unknown cryptor error') - message = self._get_cryptor(cryptor_id).decrypt(payload) try: return json.loads(message) @@ -189,27 +170,25 @@ def decrypt_file(self, file_data): header = self.decode_header(file_data) if header: cryptor_id = header['cryptor_id'] - cryptor_version = header['cryptor_ver'] payload = CryptorPayload(data=file_data[header['length']:], cryptor_data=header['cryptor_data']) else: cryptor_id = self.FALLBACK_CRYPTOR_ID - cryptor_version = self.CRYPTOR_VERSION payload = CryptorPayload(data=file_data) - if cryptor_id not in self.cryptor_map.keys() or cryptor_version > self.CRYPTOR_VERSION: + if cryptor_id not in self.cryptor_map.keys(): raise PubNubException('unknown cryptor error') return self._get_cryptor(cryptor_id).decrypt(payload, binary_mode=True) - def encode_header(self, cryptor_ver=None, cryptor_id: str = None, cryptor_data: any = None) -> str: + def encode_header(self, cryptor_id: str = None, cryptor_data: any = None) -> str: if cryptor_id == self.FALLBACK_CRYPTOR_ID: return b'' if cryptor_data and len(cryptor_data) > 65535: raise PubNubException('Cryptor data is too long') cryptor_id = self._validate_cryptor_id(cryptor_id) - cryptor_ver = cryptor_ver or self.CRYPTOR_VERSION + sentinel = b'PNED' - version = cryptor_ver.to_bytes(1, byteorder='big') + version = CryptoHeader.header_ver.to_bytes(1, byteorder='big') crid = bytes(cryptor_id, 'utf-8') if cryptor_data: @@ -234,7 +213,10 @@ def decode_header(self, header: bytes) -> Union[None, CryptoHeader]: return False try: - cryptor_ver = header[4] + header_version = header[4] + if header_version > CryptoHeader.header_ver: + raise PubNubException('unknown cryptor error') + cryptor_id = header[5:9].decode() crlen = header[9] if crlen < 255: @@ -245,9 +227,9 @@ def decode_header(self, header: bytes) -> Union[None, CryptoHeader]: cryptor_data = header[12:12 + crlen] hlen = 12 + crlen - return CryptoHeader(sentinel=sentinel, cryptor_ver=cryptor_ver, cryptor_id=cryptor_id, + return CryptoHeader(sentinel=sentinel, header_ver=header_version, cryptor_id=cryptor_id, cryptor_data=cryptor_data, length=hlen) - except Exception: + except IndexError: raise PubNubException('decryption error') diff --git a/pubnub/crypto_core.py b/pubnub/crypto_core.py index 214e8ced..25a27158 100644 --- a/pubnub/crypto_core.py +++ b/pubnub/crypto_core.py @@ -1,10 +1,11 @@ import hashlib import json import random -import os +import secrets from abc import abstractmethod from Cryptodome.Cipher import AES +from Cryptodome.Util.Padding import pad, unpad class PubNubCrypto: @@ -22,7 +23,7 @@ def decrypt(self, key, msg): class CryptoHeader(dict): sentinel: str - cryptor_ver: int + header_ver: int = 1 cryptor_id: str cryptor_data: any length: any @@ -35,7 +36,6 @@ class CryptorPayload(dict): class PubNubCryptor: CRYPTOR_ID: str - CRYPTOR_VERSION: int = 1 @abstractmethod def encrypt(self, data: bytes) -> CryptorPayload: @@ -125,39 +125,33 @@ def get_secret(self, key): class PubNubAesCbcCryptor(PubNubCryptor): CRYPTOR_ID = 'ACRH' - CRYPTOR_VERSION: int = 1 mode = AES.MODE_CBC def __init__(self, cipher_key): self.cipher_key = cipher_key def get_initialization_vector(self) -> bytes: - return os.urandom(16) + return secrets.token_bytes(16) def get_secret(self, key) -> str: return hashlib.sha256(key.encode("utf-8")).digest() - def pad(self, msg: bytes, block_size=AES.block_size) -> bytes: - padding = block_size - (len(msg) % block_size) - return msg + bytes(chr(padding) * padding, 'utf-8') - - def depad(self, msg: bytes) -> bytes: - return msg[:-msg[-1]] - def encrypt(self, data: bytes, key=None) -> CryptorPayload: key = key or self.cipher_key secret = self.get_secret(key) iv = self.get_initialization_vector() cipher = AES.new(secret, mode=self.mode, iv=iv) - encrypted = cipher.encrypt(self.pad(data)) + encrypted = cipher.encrypt(pad(data, AES.block_size)) return CryptorPayload(data=encrypted, cryptor_data=iv) def decrypt(self, payload: CryptorPayload, key=None, binary_mode: bool = False): key = key or self.cipher_key secret = self.get_secret(key) iv = payload['cryptor_data'] + cipher = AES.new(secret, mode=self.mode, iv=iv) + if binary_mode: - return self.depad(cipher.decrypt(payload['data'])) + return unpad(cipher.decrypt(payload['data']), AES.block_size) else: - return self.depad(cipher.decrypt(payload['data'])).decode() + return unpad(cipher.decrypt(payload['data']), AES.block_size).decode() diff --git a/tests/acceptance/encryption/cryptor-module.feature b/tests/acceptance/encryption/cryptor-module.feature deleted file mode 100644 index d2f4f844..00000000 --- a/tests/acceptance/encryption/cryptor-module.feature +++ /dev/null @@ -1,121 +0,0 @@ -@featureSet=cryptoModule @beta -Feature: Crypto module - As a PubNub user - I want to be able to encrypt data using crypto module - I want to be able to decrypt data generated by previous cryptors - - Scenario Outline: AES-CBC cryptor data header can be processed - Given Crypto module with 'acrh' cryptor - * with '' cipher key - When I decrypt '' file - Then I receive '' - - Examples: - | cipher_key | file | outcome | - # File without header can't be processed by crypto module (it doesn't have - # legacy cryptor registered). - | pubnubenigma | file-legacy-civ.jpg | unknown cryptor error | - # File without version can't be processed by specific new cryptor. - | pubnubenigma | file-cryptor-no-version.txt | decryption error | - # File with header which has unknown version - | pubnubenigma | file-cryptor-unknown-acrh.jpg | unknown cryptor error | - # File with header which has too short identifier can't be processed. - | pubnubenigma | file-cryptor-v1-short.txt | decryption error | - # File with header with cryptor identifier not registered in crypto module - # can't be processed. - | pubnubenigma | file-cryptor-v1-unknown.txt | unknown cryptor error | - | pubnubenigma | file-cryptor-v1-acrh.jpg | success | - | pubnubenigma | empty-file-cryptor-v1-acrh.txt | success | - - Scenario Outline: Data encrypted with legacy AES-CBC cryptor is decryptable with legacy implementation - Given Crypto module with 'legacy' cryptor - Given Legacy code with '' cipher key and '' vector - * with '' cipher key - * with '' vector - When I encrypt '' file as 'binary' - Then Successfully decrypt an encrypted file with legacy code - - Examples: - | cipher_key | vector | file | - | pubnubenigma | random | file.jpg | - | pubnubenigma | constant | file.jpg | - | pubnubenigma | random | file.txt | - | pubnubenigma | constant | file.txt | - | pubnubenigma | random | empty-file.txt | - | pubnubenigma | constant | empty-file.txt | - - # Stream-based encryption may not be supported by all platforms so it has been moved to the - # separate scenario with ability to opt-out. - Scenario Outline: Stream data encrypted with legacy AES-CBC cryptor is decryptable with legacy implementation - Given Crypto module with 'legacy' cryptor - Given Legacy code with '' cipher key and '' vector - * with '' cipher key - * with '' vector - When I encrypt '' file as 'stream' - Then Successfully decrypt an encrypted file with legacy code - - Examples: - | cipher_key | vector | file | - | pubnubenigma | random | file.jpg | - | pubnubenigma | random | file.txt | - | pubnubenigma | random | empty-file.txt | - - Scenario Outline: Cryptor is able to process sample files as binary - Given Crypto module with '' cryptor - * with '' cipher key - * with '' vector - When I decrypt '' file as 'binary' - Then Decrypted file content equal to the '' file content - - Examples: - | cryptor_id | cipher_key | vector | encrypted_file | source_file | - | legacy | pubnubenigma | constant | file-cryptor-legacy-civ.jpg | file.jpg | - | legacy | pubnubenigma | random | file-cryptor-legacy-riv.jpg | file.jpg | - | legacy | pubnubenigma | constant | file-cryptor-legacy-civ.txt | file.txt | - | legacy | pubnubenigma | random | file-cryptor-legacy-riv.txt | file.txt | - | legacy | pubnubenigma | constant | empty-file-cryptor-legacy-civ.txt | empty-file.txt | - | legacy | pubnubenigma | random | empty-file-cryptor-legacy-riv.txt | empty-file.txt | - | legacy | pubnubenigma | constant | file-legacy-civ.jpg | file.jpg | - | legacy | pubnubenigma | random | file-legacy-riv.jpg | file.jpg | - | legacy | pubnubenigma | constant | file-legacy-civ.txt | file.txt | - | legacy | pubnubenigma | random | file-legacy-riv.txt | file.txt | - | legacy | pubnubenigma | constant | empty-file-legacy-civ.txt | empty-file.txt | - | legacy | pubnubenigma | random | empty-file-legacy-riv.txt | empty-file.txt | - | acrh | pubnubenigma | - | file-cryptor-v1-acrh.jpg | file.jpg | - | acrh | pubnubenigma | - | file-cryptor-v1-acrh.txt | file.txt | - | acrh | pubnubenigma | - | empty-file-cryptor-v1-acrh.txt | empty-file.txt | - - # Stream-based decryption may not be supported by all platforms so it has been moved to the - # separate scenario with ability to opt-out. - Scenario Outline: Cryptor is able to process sample files as stream - Given Crypto module with '' cryptor - * with '' cipher key - * with '' vector - When I decrypt '' file as 'stream' - Then Decrypted file content equal to the '' file content - - Examples: - | cryptor_id | cipher_key | vector | encrypted_file | source_file | - | legacy | pubnubenigma | random | file-cryptor-legacy-riv.jpg | file.jpg | - | legacy | pubnubenigma | random | file-cryptor-legacy-riv.txt | file.txt | - | legacy | pubnubenigma | random | empty-file-cryptor-legacy-riv.txt | empty-file.txt | - | legacy | pubnubenigma | random | file-legacy-riv.jpg | file.jpg | - | legacy | pubnubenigma | random | file-legacy-riv.txt | file.txt | - | legacy | pubnubenigma | random | empty-file-legacy-riv.txt | empty-file.txt | - | acrh | pubnubenigma | - | file-cryptor-v1-acrh.jpg | file.jpg | - | acrh | pubnubenigma | - | file-cryptor-v1-acrh.txt | file.txt | - | acrh | pubnubenigma | - | empty-file-cryptor-v1-acrh.txt | empty-file.txt | - - Scenario Outline: Crypto module can handle encrypted data from different cryptors - Given Crypto module with default '' and additional '' cryptors - * with '' cipher key - * with '' vector - When I decrypt '' file as 'binary' - Then Decrypted file content equal to the '' file content - - Examples: - | cryptor_id1 | cryptor_id2 | cipher_key | vector | encrypted_file | source_file | - | legacy | acrh | pubnubenigma | constant | file-cryptor-legacy-civ.jpg | file.jpg | - | acrh | legacy | pubnubenigma | random | file-legacy-riv.jpg | file.jpg | - | legacy | acrh | pubnubenigma | constant | empty-file-cryptor-legacy-civ.txt | empty-file.txt | - | acrh | legacy | pubnubenigma | random | empty-file-legacy-riv.txt | empty-file.txt | \ No newline at end of file diff --git a/tests/acceptance/encryption/steps/when_steps.py b/tests/acceptance/encryption/steps/when_steps.py index edb3bd8b..6d13e93f 100644 --- a/tests/acceptance/encryption/steps/when_steps.py +++ b/tests/acceptance/encryption/steps/when_steps.py @@ -1,6 +1,7 @@ from base64 import b64decode from behave import when from tests.acceptance.encryption.environment import PNContext, get_crypto_module, get_asset_path +from pubnub.exceptions import PubNubException @when("I decrypt '{filename}' file") @@ -11,7 +12,7 @@ def step_impl(context: PNContext, filename): file_bytes = file_handle.read() crypto.decrypt_file(file_bytes) context.outcome = 'success' - except Exception as e: + except PubNubException as e: context.outcome = str(e).replace('None: ', '') @@ -36,5 +37,5 @@ def step_impl(context: PNContext, filename, file_mode): file_bytes = file_handle.read() context.decrypted_file = crypto.decrypt_file(file_bytes) context.outcome = 'success' - except Exception as e: + except PubNubException as e: context.outcome = str(e).replace('None: ', '') diff --git a/tests/unit/test_crypto.py b/tests/unit/test_crypto.py index 8eadb1a1..4cc84946 100644 --- a/tests/unit/test_crypto.py +++ b/tests/unit/test_crypto.py @@ -93,7 +93,7 @@ def test_header_encoder(self): assert b'PNED\x01ACRH\x00' == header cryptor_data = b'\x21' - header = crypto.encode_header(cryptor_ver=1, cryptor_data=cryptor_data) + header = crypto.encode_header(cryptor_data=cryptor_data) assert b'PNED\x01ACRH\x01' + cryptor_data == header cryptor_data = b'\x21' * 255 @@ -108,7 +108,7 @@ def test_header_encoder(self): def test_header_decoder(self): crypto = AesCbcCryptoModule('myCipherKey', True) header = crypto.decode_header(b'PNED\x01ACRH\x00') - assert header['cryptor_ver'] == 1 + assert header['header_ver'] == 1 assert header['cryptor_id'] == 'ACRH' assert header['cryptor_data'] == b'' @@ -188,3 +188,20 @@ def test_encrypt_module_decrypt_legacy_random_iv(self): decrypted = crypto.decrypt(self.cipher_key, encrypted) assert decrypted == original_message + + def test_php_encrypted_crosscheck(self): + crypto = AesCbcCryptoModule(self.cipher_key, False) + phpmess = "KGc+SNJD7mIveY+KNIL/L9ZzAjC0dCJCju+HXRwSW2k=" + decrypted = crypto.decrypt(phpmess) + assert decrypted == 'PHP can backwards Legacy static' + + crypto = AesCbcCryptoModule(self.cipher_key, True) + phpmess = "PXjHv0L05kgj0mqIE9s7n4LDPrLtjnfamMoHyiMoL0R1uzSMsYp7dDfqEWrnoaqS" + decrypted = crypto.decrypt(phpmess) + assert decrypted == 'PHP can backwards Legacy random' + + crypto = AesCbcCryptoModule(self.cipher_key, True) + phpmess = "UE5FRAFBQ1JIEHvl3cY3RYsHnbKm6VR51XG/Y7HodnkumKHxo+mrsxbIjZvFpVuILQ0oZysVwjNsDNMKiMfZteoJ8P1/" \ + "mvPmbuQKLErBzS2l7vEohCwbmAJODPR2yNhJGB8989reTZ7Y7Q==" + decrypted = crypto.decrypt(phpmess) + assert decrypted == 'PHP can into space with headers and aes cbc and other shiny stuff'