-
Notifications
You must be signed in to change notification settings - Fork 0
/
forza_server.py
executable file
·153 lines (119 loc) · 4.52 KB
/
forza_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python3
import random
from math import floor
import bluetooth as bt
import logging
import forza
from forza import format_bytes, TTPacket, b2i, i2b
logger = logging.getLogger(__name__)
def create_server():
server_sock = bt.BluetoothSocket(bt.RFCOMM)
server_sock.bind(("", bt.PORT_ANY))
server_sock.listen(1)
channel = server_sock.getsockname()[1]
#logger.debug('Listening on channel %d' % channel)
bt.advertise_service(
server_sock, forza.NAME,
service_classes=[forza.UUID],
profiles=[])
return server_sock
def calculate_header(imglen):
a = imglen - 242
n1 = a % 256
n2 = (a - n1) / 256 * 255
return imglen - 70 - n2
def image_packet(image, header_value, counter=0, int1=1, int2=0):
payload = i2b(int1) + i2b(len(image) + 8, 4) + \
i2b(counter, 4) + i2b(int2, 4) + image
header = b"\x00"+i2b(header_value, 2)
return TTPacket(type=forza.TYPE_FRAMES, header=header, payload=payload)
class ForzaServer:
buffer = 1024
client_sock = None
''':type: by.BluetoothSocket'''
def _send(self, data):
"""
:type data: Union[TomTomPacket,bytes,List[bytes]]
"""
if isinstance(data, TTPacket):
print('APP: ' + str(data))
data = [bytes(data)]
elif not isinstance(data, list):
print('APP: ' + format_bytes(data))
data = [data]
else:
for x in data:
print('APP: ' + format_bytes(x))
for x in data:
self.client_sock.send(x)
def _read(self, buffer=None):
d = self.client_sock.recv(buffer or self.buffer)
return d
def listen(self):
server = create_server()
#logger.info("Forza Server started")
while True:
self.client_sock, client_info = server.accept()
#logger.info("Accepted connection from VIO %s (%d)" % tuple(client_info))
try:
while True:
data = self._read()
if len(data) > 0:
self.handle_packet(data)
except bt.BluetoothError as e:
if 'Connection reset by peer' in e.args[0]:
logger.warning('Connection reset by VIO')
self.on_reset()
finally:
self.client_sock and self.client_sock.close()
def handle_packet(self, data):
if data[3:5] != b"TT":
print('VIO: ' + format_bytes(data))
if data == forza.HELLO:
self._send(forza.FZA_HELLO)
#logger.info('VIO HELLO -> FZA')
elif data == forza.INFO[-1]:
self._send(forza.FZA_START_TT)
self._send(TTPacket(b"\x00\x00\xab", b"\x00\x01\x01"))
# self._send(TTPacket(b"\x00\x00\xab", b"\x00\x01\x02"))
self._send(TTPacket(b"\x00\x00\xaa", b"\x00\x0f")) # app ready
self._send(TTPacket(b"\x00\x00\xab", b"\x00\x01\x03")) # app stage B-013
#logger.info('VIO INFO -> INIT')
elif data == forza.START_TT_OK:
pass
#logger.info('VIO START TT OK')
elif data[3:5] == b"TT":
packet = TTPacket.from_bytes(data)
while packet.bytes_to_read > 0: # chunked
data = self._read(packet.bytes_to_read)
packet.hydrate(data)
print('VIO: '+str(packet))
self.handle_command(packet)
c_status = 0
sent_counter = 0
def handle_command(self, packet):
if packet.type == forza.TYPE_INIT:
if packet.header == b"\x00\x00\xac":
self.c_status = b2i(packet.payload[-2:])
elif packet.header == b"\x00\x00\xab" and packet.payload == b"\x00\x10\x0a":
logger.info('VIO SHUTDOWN')
sys.exit(0)
elif isinstance(packet, forza.TTTouch):
logger.info('touch %s @ %d, %d', 'down' if packet.down else 'up', packet.x, packet.y)
if self.c_status >= 3:
img = random.choice(['splash1', 'splash2'])
with open('img/%s.png' % img, 'rb') as img:
imgdata = img.read()
p = image_packet(imgdata, calculate_header(len(imgdata)), self.sent_counter)
self._send(p)
self.sent_counter += 1
def on_reset(self):
pass
if __name__ == '__main__':
import sys
logging.basicConfig(stream=sys.stderr, level=logging.DEBUG)
while True:
try:
ForzaServer().listen()
except KeyboardInterrupt:
break