This repository has been archived by the owner on Nov 15, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
main.py
133 lines (100 loc) · 4.09 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import asyncio
import logging
import os
import sys
from argparse import ArgumentParser
from database import Database
from device_selector import Device, pick_available
from protocol.commands import SignDataRequest, SignDataResponse
from protocol.packets import CurrentLocationPacket
from protocol.protocol import Protocol, data_type
from radio.xbee import XBee
from sign import Signature, Signifier
from utils.concurrency import set_interval
from utils.expiring_dict import ExpiringDict
logging.basicConfig(
stream=sys.stdout,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] [%(name)s] %(message)s",
)
logger = logging.getLogger(__name__)
use_stubs = bool(os.environ.get("USE_STUBS", False))
signifier = Signifier.from_files("public_key.pem", "private_key.pem")
def sign_data_handler(remote_address: data_type, request: SignDataRequest):
sign = signifier.sign(request.data)
return SignDataResponse(public_key=sign.public_key, sign=sign.sign)
gps_cache = ExpiringDict(default_ttl=2)
class GpsDeviceWithAddress(Device):
def __init__(self, address, *args, **kwargs):
super().__init__(*args, **kwargs)
self.address = address
def gps_data_received(remote_address: data_type, packet: CurrentLocationPacket):
logging.info("GPS received: %s", packet)
gps_cache.set(
hash(str(remote_address)),
GpsDeviceWithAddress(
address=remote_address,
position=packet.position.to_gvector(),
velocity=packet.velocity.to_gvector()),
)
async def sign_data_task(protocol, database):
unsigned_data = database.get_unsigned_data()
if not use_stubs:
device = None
logging.info("Devices with GPS info: %d", len(gps_cache))
if len(gps_cache):
me = database.get_latest_gps()
if me:
devices = pick_available(me=me, others=gps_cache.values())
logging.info("Devices available by GPS: %d", len(devices))
if len(devices):
logging.info("Device picked: %s", devices[0].address)
device = devices[0].address
if device is None:
device = await protocol.radio.discover_first_remote_device()
logging.info("Nearby device found: %s", device)
if device is None:
logging.info("No devices nearby")
return
for data in unsigned_data:
sign_response = await protocol.send_request(
device,
SignDataRequest(data=data.get_data_for_sign()),
)
database.set_sign(data, Signature(public_key=sign_response.public_key, sign=sign_response.sign))
else:
for data in unsigned_data:
sign = signifier.sign(data.get_data_for_sign())
await asyncio.sleep(1)
database.set_sign(data, sign)
async def send_gps_task(protocol, database):
device_location = database.get_latest_gps()
if device_location:
gps_packet = CurrentLocationPacket.create_from_device(device_location)
logging.info("Send GPS: %s", gps_packet)
protocol.send_packet_broadcast(gps_packet)
async def main():
parser = ArgumentParser()
parser.add_argument('--device', required=True)
parser.add_argument('--db-uri', required=False)
parser.add_argument('--timeout', type=float, default=30)
use_stubs = bool(os.environ.get('USE_STUBS', False))
args = parser.parse_args()
if not use_stubs:
xbee = XBee(args.device)
xbee.open()
protocol = Protocol(xbee, args.timeout)
protocol.on_packet(CurrentLocationPacket, gps_data_received)
protocol.on_request(SignDataRequest, sign_data_handler)
if args.db_uri:
database = Database(args.db_uri)
set_interval(sign_data_task, 1, protocol, database)
set_interval(send_gps_task, 1, protocol, database)
else:
if args.db_uri:
database = Database(args.db_uri)
set_interval(sign_data_task, 1, None, database)
if __name__ == "__main__":
asyncio.ensure_future(main())
loop = asyncio.get_event_loop()
loop.run_forever()