Skip to content

Commit

Permalink
Crypto module
Browse files Browse the repository at this point in the history
  • Loading branch information
seba-aln committed Sep 27, 2023
1 parent 701ece7 commit e10bcda
Show file tree
Hide file tree
Showing 10 changed files with 753 additions and 3 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,12 @@ jobs:
cp sdk-specifications/features/access/authorization-failure-reporting.feature tests/acceptance/pam
cp sdk-specifications/features/access/grant-token.feature tests/acceptance/pam
cp sdk-specifications/features/access/revoke-token.feature tests/acceptance/pam
cp sdk-specifcations/features/encryption/cryptor-module.feature tests/accetpance/encryption/assets/cryptor-module.feature
cp sdk-specifcations/features/encryption/assets/ tests/accetpance/encryption/assets/
sudo pip3 install -r requirements-dev.txt
behave --junit tests/acceptance/pam
behave --junit tests/acceptance/encryption/cryptor-module.feature
- name: Expose acceptance tests reports
uses: actions/upload-artifact@v3
if: always()
Expand Down
168 changes: 166 additions & 2 deletions pubnub/crypto.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
import hashlib
import json
import random
from base64 import decodebytes, encodebytes
import logging

from pubnub.crypto_core import PubNubCrypto

from base64 import decodebytes, encodebytes, b64decode, b64encode
from Cryptodome.Cipher import AES
from Cryptodome.Util.Padding import pad, unpad
from pubnub.crypto_core import PubNubCrypto, PubNubCryptor, PubNubLegacyCryptor, PubNubAesCbcCryptor, CryptoHeader, \
CryptorPayload
from pubnub.exceptions import PubNubException
from typing import Union, Dict


Initial16bytes = '0123456789012345'
Expand Down Expand Up @@ -103,3 +108,162 @@ def decrypt(self, key, file):
result = unpad(cipher.decrypt(extracted_file), 16)

return result


class PubNubCryptoModule(PubNubCrypto):
FALLBACK_CRYPTOR_ID: str = '0000'
CRYPTOR_VERSION: str = 1
cryptor_map = {}
default_cryptor_id: str

def __init__(self, cryptor_map: Dict[str, PubNubCryptor], default_cryptor: PubNubCryptor):
self.cryptor_map = cryptor_map
self.default_cryptor_id = default_cryptor.CRYPTOR_ID

def register_cryptor(self, cryptor_id, cryptor_instance):
if len(cryptor_id) != 4:
raise PubNubException('Malformed cryptor_id.')

if cryptor_id in self.cryptor_map.keys():
raise PubNubException('Cryptor_id already in use')

if not isinstance(cryptor_instance, PubNubCrypto):
raise PubNubException('Invalid cryptor instance')

self.cryptor_map[cryptor_id] = cryptor_instance

def _validate_cryptor_id(self, cryptor_id: str) -> str:
cryptor_id = cryptor_id or self.default_cryptor_id

if len(cryptor_id) != 4:
logging.error(f'Malformed cryptor id: {cryptor_id}')
raise PubNubException('Malformed cryptor id')

if cryptor_id not in self.cryptor_map.keys():
logging.error(f'Unsupported cryptor: {cryptor_id}')
raise PubNubException('unknown cryptor error')
return cryptor_id

# encrypt string
def encrypt(self, message: str, cryptor_id: str = None) -> str:
cryptor_id = self._validate_cryptor_id(cryptor_id)
data = message.encode('utf-8')
crypto_payload = self.cryptor_map[cryptor_id].encrypt(data)
header = self.encode_header(cryptor_id=cryptor_id, cryptor_data=crypto_payload['cryptor_data'])
return b64encode(header + crypto_payload['data']).decode()

def decrypt(self, input):
data = b64decode(input)
header = self.decode_header(data)

if header:
cryptor_id = header['cryptor_id']
cryptor_version = header['cryptor_ver']
payload = CryptorPayload(data=data[header['length']:], cryptor_data=header['cryptor_data'])
if not header:
cryptor_id = self.FALLBACK_CRYPTOR_ID
cryptor_version = self.CRYPTOR_VERSION
payload = CryptorPayload(data=data)

if cryptor_id not in self.cryptor_map.keys() or cryptor_version > self.CRYPTOR_VERSION:
raise PubNubException('unknown cryptor error')

message = self._get_cryptor(cryptor_id).decrypt(payload)
try:
return json.loads(message)
except Exception:
return message

def encrypt_file(self, file_data, cryptor_id: str = None):
cryptor_id = self._validate_cryptor_id(cryptor_id)
crypto_payload = self.cryptor_map[cryptor_id].encrypt(file_data)
header = self.encode_header(cryptor_id=cryptor_id, cryptor_data=crypto_payload['cryptor_data'])
return b64encode(header + crypto_payload['data']).decode()

def _get_cryptor(self, cryptor_id):
if not cryptor_id or cryptor_id not in self.cryptor_map:
raise PubNubException('unknown cryptor error')
return self.cryptor_map[cryptor_id]

def decrypt_file(self, file_data):
header = self.decode_header(file_data)
if header:
cryptor_id = header['cryptor_id']
cryptor_version = header['cryptor_ver']
payload = CryptorPayload(data=file_data[header['length']:], cryptor_data=header['cryptor_data'])
else:
cryptor_id = self.FALLBACK_CRYPTOR_ID
cryptor_version = self.CRYPTOR_VERSION
payload = CryptorPayload(data=file_data)

if cryptor_id not in self.cryptor_map.keys() or cryptor_version > self.CRYPTOR_VERSION:
raise PubNubException('unknown cryptor error')

return self._get_cryptor(cryptor_id).decrypt(payload, binary_mode=True)

def encode_header(self, cryptor_ver=None, cryptor_id: str = None, cryptor_data: any = None) -> str:
if cryptor_id == self.FALLBACK_CRYPTOR_ID:
return b''
if cryptor_data and len(cryptor_data) > 65535:
raise PubNubException('Cryptor data is too long')
cryptor_id = self._validate_cryptor_id(cryptor_id)
cryptor_ver = cryptor_ver or self.CRYPTOR_VERSION
sentinel = b'PNED'
version = cryptor_ver.to_bytes()
crid = bytes(cryptor_id, 'utf-8')

if cryptor_data:
crd = cryptor_data
cryptor_data_len = len(cryptor_data)
else:
crd = b''
cryptor_data_len = 0

if cryptor_data_len < 255:
crlen = cryptor_data_len.to_bytes(1)
else:
crlen = b'\xff' + cryptor_data_len.to_bytes(2)
return sentinel + version + crid + crlen + crd

def decode_header(self, header: bytes) -> Union[None, CryptoHeader]:
try:
sentinel = header[:4]
if sentinel != b'PNED':
return False
except ValueError:
return False

try:
cryptor_ver = header[4]
cryptor_id = header[5:9].decode()
crlen = header[9]
if crlen < 255:
cryptor_data = header[10: 10 + crlen]
hlen = 10 + crlen
else:
crlen = int(header[10:12].hex(), 16)
cryptor_data = header[12:12 + crlen]
hlen = 12 + crlen

return CryptoHeader(sentinel=sentinel, cryptor_ver=cryptor_ver, cryptor_id=cryptor_id,
cryptor_data=cryptor_data, length=hlen)
except Exception:
raise PubNubException('decryption error')


class LegacyCryptoModule(PubNubCryptoModule):
def __init__(self, cipher_key, use_random_iv) -> None:
cryptor_map = {
PubNubLegacyCryptor.CRYPTOR_ID: PubNubLegacyCryptor(cipher_key, use_random_iv),
PubNubAesCbcCryptor.CRYPTOR_ID: PubNubAesCbcCryptor(),
}
super().__init__(cryptor_map, PubNubLegacyCryptor)


class AesCbcCryptoModule(PubNubCryptoModule):
def __init__(self, cipher_key, use_random_iv) -> None:
cryptor_map = {
PubNubLegacyCryptor.CRYPTOR_ID: PubNubLegacyCryptor(cipher_key, use_random_iv),
PubNubAesCbcCryptor.CRYPTOR_ID: PubNubAesCbcCryptor(cipher_key),
}
super().__init__(cryptor_map, PubNubAesCbcCryptor)
149 changes: 149 additions & 0 deletions pubnub/crypto_core.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
import hashlib
import json
import random

from abc import abstractmethod
from Cryptodome.Cipher import AES
from typing import TypedDict


class PubNubCrypto:
Expand All @@ -12,3 +18,146 @@ def encrypt(self, key, msg):
@abstractmethod
def decrypt(self, key, msg):
pass


class CryptoHeader(TypedDict):
sentinel: str
cryptor_ver: int
cryptor_id: str
cryptor_data: any
length: any


class CryptorPayload(TypedDict):
data: bytes
cryptor_data: bytes


class PubNubCryptor:
CRYPTOR_ID: str
CRYPTOR_VERSION: int = 1

@abstractmethod
def encrypt(self, data: bytes) -> CryptorPayload:
pass

@abstractmethod
def decrypt(self, payload: CryptorPayload, binary_mode: bool = False) -> bytes:
pass


class PubNubLegacyCryptor(PubNubCryptor):
CRYPTOR_ID = '0000'
Initial16bytes = b'0123456789012345'

def __init__(self, cipher_key, use_random_iv=False, cipher_mode=AES.MODE_CBC, fallback_cipher_mode=None):
self.cipher_key = cipher_key
self.use_random_iv = use_random_iv
self.mode = cipher_mode
self.fallback_mode = fallback_cipher_mode

def encrypt(self, msg, *, key=None, use_random_iv=None):
key = key or self.cipher_key
use_random_iv = use_random_iv or self.use_random_iv

secret = self.get_secret(key)
initialization_vector = self.get_initialization_vector(use_random_iv)

cipher = AES.new(bytes(secret[0:32], 'utf-8'), self.mode, initialization_vector)
encrypted_message = cipher.encrypt(self.pad(msg))
msg_with_iv = self.append_random_iv(encrypted_message, use_random_iv, initialization_vector)
return CryptorPayload(data=msg_with_iv, cryptor_data=initialization_vector)

def decrypt(self, payload: CryptorPayload, key=None, use_random_iv=False, binary_mode: bool = False):
key = key or self.cipher_key
use_random_iv = use_random_iv or self.use_random_iv
secret = self.get_secret(key)
msg = payload['data']
initialization_vector, extracted_message = self.extract_random_iv(msg, use_random_iv)
cipher = AES.new(bytes(secret[0:32], "utf-8"), self.mode, initialization_vector)
if binary_mode:
return self.depad(cipher.decrypt(extracted_message), binary_mode)
try:
plain = self.depad((cipher.decrypt(extracted_message)).decode('utf-8'), binary_mode)
except UnicodeDecodeError as e:
if not self.fallback_mode:
raise e

cipher = AES.new(bytes(secret[0:32], "utf-8"), self.fallback_mode, initialization_vector)
plain = self.depad((cipher.decrypt(extracted_message)).decode('utf-8'), binary_mode)

try:
return json.loads(plain)
except Exception:
return plain

def append_random_iv(self, message, use_random_iv, initialization_vector):
if self.use_random_iv or use_random_iv:
return initialization_vector + message
else:
return message

def extract_random_iv(self, message, use_random_iv):
if use_random_iv:
return message[0:16], message[16:]
else:
return self.Initial16bytes, message

def get_initialization_vector(self, use_random_iv) -> bytes:
if self.use_random_iv or use_random_iv:
return bytes("{0:016}".format(random.randint(0, 9999999999999999)), 'utf-8')
else:
return self.Initial16bytes

def pad(self, msg, block_size=16):
padding = block_size - (len(msg) % block_size)
return msg + (chr(padding) * padding).encode('utf-8')

def depad(self, msg, binary_mode: bool = False):
if binary_mode:
return msg[0:-msg[-1]]
else:
return msg[0:-ord(msg[-1])]

def get_secret(self, key):
return hashlib.sha256(key.encode("utf-8")).hexdigest()


class PubNubAesCbcCryptor(PubNubCryptor):
CRYPTOR_ID = 'ACRH'
CRYPTOR_VERSION: int = 1
mode = AES.MODE_CBC

def __init__(self, cipher_key):
self.cipher_key = cipher_key

def get_initialization_vector(self) -> bytes:
return random.randbytes(16)

def get_secret(self, key) -> str:
return hashlib.sha256(key.encode("utf-8")).digest()

def pad(self, msg: bytes, block_size=AES.block_size) -> bytes:
padding = block_size - (len(msg) % block_size)
return msg + bytes(chr(padding) * padding, 'utf-8')

def depad(self, msg: bytes) -> bytes:
return msg[:-msg[-1]]

def encrypt(self, data: bytes, key=None) -> CryptorPayload:
key = key or self.cipher_key
secret = self.get_secret(key)
iv = self.get_initialization_vector()
cipher = AES.new(secret, mode=self.mode, iv=iv)
encrypted = cipher.encrypt(self.pad(data))
return CryptorPayload(data=encrypted, cryptor_data=iv)

def decrypt(self, payload: CryptorPayload, key=None, binary_mode: bool = False):
key = key or self.cipher_key
secret = self.get_secret(key)
iv = payload['cryptor_data']
cipher = AES.new(secret, mode=self.mode, iv=iv)
if binary_mode:
return self.depad(cipher.decrypt(payload['data']))
else:
return self.depad(cipher.decrypt(payload['data'])).decode()
Loading

0 comments on commit e10bcda

Please sign in to comment.