-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.py
336 lines (298 loc) · 11.3 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import my_secrets.client as secret_keys
import my_secrets.server_public as server_public_info
from typing import Literal, Union
from shared.handshake_handler import client_handle_handshake
import socket
import shared.rpi_ssl as ssl
from shared.card import *
from shared.protocol import *
from getkey import getkey, keys
from sys import exit
from shared.port import PORT
import shared.paillier as paillier
import math
conn = socket.create_connection(("localhost", PORT))
conn.setblocking(True)
rfile = conn.makefile('rb', buffering=0)
wfile = conn.makefile('wb', buffering=0)
def fetch_record() -> tuple[ssl.ContentType, bytes]:
tls_header = rfile.read(5)
if tls_header is None or len(tls_header) != 5:
raise ConnectionError("Failed to receive expected 5 bytes.")
assert tls_header is not None
content_type = ssl.ContentType(tls_header[0])
assert tls_header[1:3] == b'\x03\x03'
content_length = int.from_bytes(tls_header[3:5], 'big')
tls_content = rfile.read(content_length)
if tls_content is None or len(tls_content) != content_length:
raise ConnectionError("Failed to receive expected record body length.")
return (content_type, tls_content)
# ==== Handshake Stage ====
INFO = {
"client_public": secret_keys.PUBLIC_KEY,
"client_private": secret_keys.PRIVATE_KEY,
"client_modulus": secret_keys.P * secret_keys.Q,
"server_modulus": server_public_info.N,
}
session: ssl.Session = client_handle_handshake(rfile, wfile, INFO)
if session == None:
exit(1)
# ==== Account Auth Stage ====
def input_card_legacy() -> Card:
# Still works if your computer doesn't support getkey or ANSI escape codes
print("Enter card details:")
card_num = ""
while True:
tmp = input("Card number: ").strip()
if not tmp.isdecimal() or len(tmp) != 16:
print("ERROR: Invalid card number format (should be 16 digits).")
continue
if not valid_card_num(tmp):
print("ERROR: Invalid card number.")
continue
card_num = tmp
break
cvc = -1
while True:
tmp = input("CVC: ").strip()
if not tmp.isdecimal() or len(tmp) != 3:
print("ERROR: Invalid CVC format (should be 3 digits).")
continue
cvc = int(tmp)
break
month = -1
year = -1
while True:
tmp = input("Expiration date (MM/YYYY): ").strip()
if len(tmp) != 7 or not tmp[:2].isdecimal() or tmp[2] != "/" or not tmp[3:].isdecimal():
print("ERROR: Invalid expiration date format (should be MM/YYYY).")
continue
month = int(tmp[:2])
if month < 1 or 12 < month:
print("ERROR: Expiration month must be in the range [1,12].")
continue
year = int(tmp[3:])
if year < 2000 or 2**10 + 2000 <= year:
print("ERROR: Expiration year is out of bounds.")
continue
break
pin = -1
while True:
tmp = input("PIN: ").strip()
if len(tmp) != 4 or not tmp.isdecimal():
print("ERROR: Invalid PIN format (should be 4 digits).")
continue
pin = int(tmp)
break
return Card(card_num, cvc, month, year, pin)
def input_card() -> Card:
print("Enter card details:")
card_num = ""
while True:
print("Card number: ", end="")
while len(card_num) != 16:
k = getkey()
if str(k).isdecimal():
print(str(k), end="")
card_num += str(k)
if valid_card_num(card_num):
print("\x1b[0K")
break
else:
card_num = ""
print("\x1b[1K (Invalid card number) \r", end="")
cvc = ""
print("CVC: ", end="")
while len(cvc) != 3:
k = getkey()
if str(k).isdecimal():
print(str(k), end="")
cvc += str(k)
cvc = int(cvc)
print()
month = ""
year = ""
print("Expiration Date: / \x1b[7D", end="")
while month == "":
# First character of month must be 0 or 1
k = getkey()
if k == "0" or k == "1":
month += str(k)
print(k, end="")
while len(month) != 2:
# Second character of month must form a number in [1,12]
k = getkey()
if (month == "0" and k != "0") or (month == "1" and k in ["0", "1", "2"]):
month += str(k)
print(f"{k}/", end="")
while len(year) == 0:
# First character of year must be 2
if getkey() == "2":
year = "2"
print("2", end="")
while len(year) < 4:
# Put in the other numbers
k = getkey()
if str(k).isdecimal():
print(str(k), end="")
year += str(k)
month = int(month)
year = int(year)
pin = ""
print("\nPIN: ", end="")
while len(pin) != 4:
k = getkey()
if str(k).isdecimal():
print("*", end="")
pin += str(k)
pin = int(pin)
print()
return Card(card_num, cvc, month, year, pin)
account_auth = False
def select_mode_legacy() -> MsgType:
# Still works if your computer doesn't support getkey or ANSI escape codes
while True:
mode = input(
"Enter mode (BALANCE, DEPOSIT, WITHDRAW): ").strip().upper()
if mode not in ["BALANCE", "DEPOSIT", "WITHDRAW"]:
print("Invalid mode. Try again.")
continue
return MsgType[mode]
def select_mode() -> Union[MsgType, Literal['EXIT']]:
option = MsgType.BALANCE
while True:
if option == MsgType.BALANCE:
print(
"\r\x1b[7mBALANCE\x1b[27m DEPOSIT WITHDRAW EXIT\x1b[?25l", end="")
elif option == MsgType.DEPOSIT:
print(
"\rBALANCE \x1b[7mDEPOSIT\x1b[27m WITHDRAW EXIT\x1b[?25l", end="")
elif option == MsgType.WITHDRAW:
print(
"\rBALANCE DEPOSIT \x1b[7mWITHDRAW\x1b[27m EXIT\x1b[?25l", end="")
elif option == "EXIT":
print(
"\rBALANCE DEPOSIT WITHDRAW \x1b[7mEXIT\x1b[27m\x1b[?25l", end="")
k = getkey()
if k == keys.ENTER:
print("\x1b[?25h")
return option
elif k == keys.LEFT:
if option == MsgType.DEPOSIT:
option = MsgType.BALANCE
elif option == MsgType.WITHDRAW:
option = MsgType.DEPOSIT
elif option == "EXIT":
option = MsgType.WITHDRAW
elif k == keys.RIGHT:
if option == MsgType.BALANCE:
option = MsgType.DEPOSIT
elif option == MsgType.DEPOSIT:
option = MsgType.WITHDRAW
elif option == MsgType.WITHDRAW:
option = "EXIT"
def request_balance() -> int:
msg = session.build_app_record(bytes([MsgType.BALANCE]))
wfile.write(msg)
rtype, response_body = fetch_record()
assert rtype == ssl.ContentType.Application
response = session.open_encrypted_record(response_body)
assert len(response) == 9 and response[0] == MsgType.BALANCE
return int.from_bytes(response[1:], 'big')
def request_deposit(amount: int) -> bool:
msg = session.build_app_record(
bytes([MsgType.DEPOSIT]) + amount.to_bytes(8, 'big'))
wfile.write(msg)
rtype, response_body = fetch_record()
assert rtype == ssl.ContentType.Application
response = session.open_encrypted_record(response_body)
assert len(response) == 2 and response[0] == MsgType.DEPOSIT
return response[1] == 0x01
def request_withdraw(amount: int) -> bool:
msg = session.build_app_record(
bytes([MsgType.WITHDRAW]) + amount.to_bytes(8, 'big'))
wfile.write(msg)
rtype, response_body = fetch_record()
assert rtype == ssl.ContentType.Application
response = session.open_encrypted_record(response_body)
assert len(response) == 2 and response[0] == MsgType.WITHDRAW
return response[1] == 0x01
try:
while not account_auth:
card = input_card()
# Send the request with the generated check
check = gen_encrypted_check(card, paillier.DEFAULT_SERVER_PUBKEY)
byte_length = math.ceil(check.bit_length() / 8)
check_msg = byte_length.to_bytes(
3, 'big') + check.to_bytes(byte_length, 'big')
request = bytes([MsgType.ACCOUNT_AUTH]) + check_msg + card.to_bytes()
toSend = session.build_app_record(request)
wfile.write(toSend)
rtype, tls_content = fetch_record()
if rtype == ssl.ContentType.Alert:
alevel, atype = session.open_alert_record(tls_content)
if alevel == ssl.AlertLevel.FATAL:
print("FATAL ERROR: " + atype.name)
quit()
else:
pass # handle any warnings that need to be handled
elif rtype == ssl.ContentType.Application:
app_content = session.open_encrypted_record(tls_content)
if app_content[0] == MsgType.ERROR:
print(f"ERROR (app): {AppError(app_content[1]).name}\n")
quit()
elif app_content[0] != MsgType.ACCOUNT_AUTH:
# Something went wrong. Let's just move on.
print(
f"WARNING (app): Recieved app message type {MsgType(app_content[0]).name} instead of ACCOUNT_AUTH")
continue
if app_content[1] == 0x01:
account_auth = True
print("++ Account Authorized ++\n")
else:
print("!! Invalid details !!\n")
else:
pass # That wasn't supposed to happen.
# ==== Commands Stage ====
while True:
print("Select command:")
mode = select_mode()
if mode == MsgType.BALANCE:
print("Fetching balance...")
amount = request_balance()
print(f"Account Balance: ${amount/100:.2f}")
elif mode == MsgType.DEPOSIT:
amount = input("Enter amount to deposit: $")
if not amount.replace(".", "", 1).isdecimal():
print("Invalid number format.")
continue
success = request_deposit(int(float(amount) * 100))
print("Deposit " + ("successful." if success else "unsuccessful."))
elif mode == MsgType.WITHDRAW:
amount = input("Enter amount to withdraw: $")
if not amount.replace(".", "", 1).isdecimal():
print("Invalid number format.")
continue
success = request_withdraw(int(float(amount) * 100))
print("Withdrawal " + ("successful." if success else "unsuccessful."))
elif mode == "EXIT":
raise KeyboardInterrupt()
else:
raise NotImplementedError("This shouldn't be able to happen.")
except KeyboardInterrupt:
print(f"SSL session closing gracefully.")
if not wfile.closed:
wfile.write(session.build_alert_record(
ssl.AlertLevel.FATAL, ssl.AlertType.CloseNotify))
quit()
except ssl.SSLError as e:
print(f"[FATAL] (ssl): {e.atype.name} {e.args}")
if not wfile.closed:
wfile.write(session.build_alert_record(ssl.AlertLevel.FATAL, e.atype))
quit(1)
except BaseException as e:
print(f"[FATAL] (unknown {type(e)}): {e.args}")
if not wfile.closed:
wfile.write(session.build_alert_record(
ssl.AlertLevel.FATAL, ssl.AlertType.InternalError))
quit(1)