-
Notifications
You must be signed in to change notification settings - Fork 1
/
Server.py
185 lines (145 loc) · 5.29 KB
/
Server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
"""
Server application for VRED-Networking
"""
import timeit
import pyuv
import msgpack
import signal
import logging
# TODO: argparse ports and ip
TCP_PORT = 40305
UDP_PORT = 40306
SELF = "0.0.0.0"
# Don't try to send sync packs to udp connections older than that (value in sec)
# TODO: Also kill the corresponding tcp connection??
UDP_TIMEOUT = 10
# Safe last sync message for each net id and supply it to everyone who connects (initial state sync)
KEEP_STATE = True
##### TCP stuff #####
# TODO: Do something useful on error
def tcp_read(client, data, error):
# No data means the client has closed the connection
if data is None:
logging.info(f"{client.getpeername()} disconnected")
close_tcp(client)
# TODO: Also remove this clients UDP connection
return
logging.debug(f"Got TCP data {data} from {client.getpeername()}")
# Break at newlines
split = data.split(b"\n")
for message in split:
if len(message) > 0:
try:
answer = parse(message)
except Exception as e:
# TODO: Less broad exception
logging.error(f"Parsing message failed with {e}")
return
if answer["distribute"]:
for c in tcp_connections:
if c != client:
c.write(answer["data"] + b"\n")
else:
client.write(answer["data"] + b"\n")
def close_tcp(client):
client.close()
tcp_connections.remove(client)
def tcp_connect(server, error):
client = pyuv.TCP(server.loop)
server.accept(client)
tcp_connections.append(client)
client.start_read(tcp_read)
client.nodelay(True)
logging.info(f"{client.getpeername()} connected")
##### UDP Stuff #####
def udp_read(handle, ip_port, flags, data, error):
""" Read UDP messages, parse them and answer based on the message contents
Also put every connection in a dict, so we know who to send sync packs to.
"""
if data is not None:
logging.debug(f"Got UDP data {data} from {ip_port}")
udp_connections[ip_port] = timeit.default_timer()
try:
answer = parse(data)
except Exception as e:
# TODO: Less broad exception
logging.error(f"Parsing message failed with {e}")
return
if answer["distribute"]:
for address in udp_connections:
if address != ip_port:
handle.send(address, answer["data"])
else:
handle.send(ip_port, answer["data"])
##### Stuff #####
def parse(data):
""" Parse a message to look for state, and save it if so
Hey -> Answer with Ho and initial state, if available
RPC -> Distribute to everyone else
Ping -> answer with Pong packet
Sync Pack -> Send to all udp connections
also add to initial state map if newest for its node
"""
message = msgpack.unpackb(data, use_list = True, raw = False)
logging.debug(f"Parsed message: {message}")
msg_type = message[0]
if msg_type == "hey":
answer = {"distribute": False, "data": msgpack.packb(("ho", last_state))}
elif msg_type == "rpc":
answer = {"distribute": True, "data": data}
elif msg_type == "ping":
message[0] = "pong"
answer = {"distribute": False, "data": msgpack.packb(message)}
elif msg_type == "pos" or msg_type == "rot" or msg_type == "scale" or msg_type == "state":
answer = {"distribute": True, "data": data}
save_state(message)
else:
raise LookupError(f"Unknown message type {message[0]}")
return answer
def save_state(message):
""" Put the newest sync data for each node in a dict for inital setup """
if KEEP_STATE:
tpe = message[0]
for pack in message[1]:
net_id = pack[0]
seq = pack [1]
if not net_id in last_state[tpe] or last_state[tpe][net_id][0] < seq:
last_state[tpe][net_id] = (seq, pack[2])
def check_udp(timer):
""" Check all recent udp connections, remove if dead """
now = timeit.default_timer()
for ip_port, time in list(udp_connections.items()):
if now - time > UDP_TIMEOUT:
logging.warning(f"{ip_port} for {now - time}s over timeout ({UDP_TIMEOUT})")
del(udp_connections[ip_port])
def signal_cb(handle, signum):
""" Handle shutdown signals for graceful shutdown """
logging.debug("Shutting things down")
[c.close() for c in tcp_connections]
handle.close()
tcp.close()
udp.close()
loop.stop()
logging.info("Shutdown complete")
logging.basicConfig(format='%(asctime)s %(levelname)s: %(message)s', level=logging.DEBUG)
tcp_connections = []
udp_connections = {}
last_state = {"pos": {}, "rot": {}, "scale": {}, "state": {}}
loop = pyuv.Loop.default_loop()
# TCP Connection
tcp = pyuv.TCP(loop)
tcp.bind((SELF, TCP_PORT))
tcp.listen(tcp_connect)
tcp.nodelay(True)
# UDP Connection
udp = pyuv.UDP(loop)
udp.bind((SELF, UDP_PORT))
udp.start_recv(udp_read)
# Handle shutdown gracefully (listen to ctrl c / SIGINT)
signal_handle = pyuv.Signal(loop)
signal_handle.start(signal_cb, signal.SIGINT)
# Remove dead UDP connections (Heartbeat timeout)
heartbeat_timer = pyuv.Timer(loop)
heartbeat_timer.start(check_udp, 1, 1)
logging.info("Server startup complete")
loop.run()