-
Notifications
You must be signed in to change notification settings - Fork 0
/
sender_AES.py
127 lines (99 loc) · 4.06 KB
/
sender_AES.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import pdfplumber
import requests
import json
import os
import base64
import time
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization, padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.asymmetric import x25519
# Measure start time
start_time = time.time()
def generate_key_pair():
# Generate private key
private_key = x25519.X25519PrivateKey.generate()
# Get corresponding public key
public_key = private_key.public_key()
return private_key, public_key
def key_exchange(private_key, peer_public_key):
# Perform key exchange
shared_key = private_key.exchange(peer_public_key)
# Derive symmetric key from shared key using HKDF
derived_key = HKDF(
algorithm=hashes.SHA256(),
length=32, # Length of the derived key
salt=None,
info=b'ECDH Key Derivation',
backend=default_backend()
).derive(shared_key)
return derived_key
def encrypt_message(plaintext, key):
# Pad the plaintext to ensure it's a multiple of block size
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_data = padder.update(plaintext.encode()) + padder.finalize()
# Generate a random IV (Initialization Vector)
iv = os.urandom(16)
# Create a cipher object
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
# Encrypt the padded data
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_data) + encryptor.finalize()
# Return IV and ciphertext
return iv + ciphertext
def generate_hmac(key, data):
h = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
h.update(data)
return h.finalize()
# Specify the Flask server URL (replace 'flask_server_ip' with the public IP address of the machine hosting the Flask server)
flask_server_url = "http://192.168.1.4:5000"
# Read PDF file and extract text
with pdfplumber.open("input.pdf") as pdf:
text = ""
for page in pdf.pages:
text += page.extract_text()
# Write extracted text to input.txt
with open("input.txt", "w") as file:
file.write(text)
# # Read message from input.txt
# with open("inp.txt", "r") as file:
# message_to_send = file.read()
# Generate key pairs for devices A and B
private_key_A, public_key_A = generate_key_pair()
private_key_B, public_key_B = generate_key_pair()
# Perform key exchange between A and B
shared_key_A = key_exchange(private_key_A, public_key_B)
shared_key_B = key_exchange(private_key_B, public_key_A)
# Generate authentication key
auth_key = os.urandom(32)
# Encrypt message with shared key
encrypted_message = encrypt_message(text, shared_key_A)
# Generate HMAC for authentication
hmac_digest = generate_hmac(auth_key, encrypted_message)
# Serialize private and public keys
private_key_A_bytes = private_key_A.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
public_key_B_bytes = public_key_B.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
)
# Serialize the data to send
data_to_send = {
'message': base64.b64encode(encrypted_message).decode(),
'hmac': base64.b64encode(hmac_digest).decode(),
'auth_key': base64.b64encode(auth_key).decode(),
'private_key': base64.b64encode(private_key_A_bytes).decode(),
'peer_public_key': base64.b64encode(public_key_B_bytes).decode()
}
# Send the serialized data to the Flask server
response = requests.post(f"{flask_server_url}/deviceA/send_message", json=data_to_send)
# Measure end time
end_time = time.time()
# Calculate execution time
execution_time = end_time - start_time
print("Execution Time:", execution_time)