Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nats_integration to develop #311

Merged
merged 11 commits into from
Sep 27, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ async def main(server, port, keyfile=None, certfile=None, interval=1000):
nats_client = NATS()
csac = CommsCsa()

status, _, identity_dict = cc.command.get_identity()

if status == "OK":
identity = identity_dict["identity"]
cc.logger.debug("Identity: %s", identity)
else:
cc.logger.error("Failed to get identity!")
return

async def stop():
await asyncio.sleep(1)
asyncio.get_running_loop().stop()
Expand Down Expand Up @@ -181,16 +190,16 @@ async def message_handler(message):
cc.logger.debug("Received a message on '%s': %s", subject, data)
ret, info, resp = "FAIL", "Not supported subject", ""

if subject == "comms.settings":
if subject == f"comms.settings.{identity}":
ret, info = cc.settings.handle_mesh_settings(data)
elif subject == "comms.settings_csa":
ret, info, delay = cc.settings.handle_mesh_settings_csa(data)
csac.delay = delay
csac.ack_sent = "status" in data

elif subject == "comms.command":
elif subject == f"comms.command.{identity}" or "GET_IDENTITY" in data:
ret, info, resp = cc.command.handle_command(data, cc)
elif subject == "comms.status":
elif subject == f"comms.status.{identity}":
ret, info = "OK", "Returning current status"

if subject == "comms.settings_csa":
Expand All @@ -212,19 +221,19 @@ async def message_handler(message):
cc.logger.debug("Sending response: %s", str(response)[:1000])
await message.respond(json.dumps(response).encode("utf-8"))

await nats_client.subscribe("comms.settings", cb=message_handler)
await nats_client.subscribe(f"comms.settings.{identity}", cb=message_handler)
await nats_client.subscribe("comms.settings_csa", cb=message_handler)
await nats_client.subscribe("comms.command", cb=message_handler)
await nats_client.subscribe("comms.status", cb=message_handler)
await nats_client.subscribe(f"comms.command.>", cb=message_handler)
await nats_client.subscribe(f"comms.status.{identity}", cb=message_handler)

cc.logger.debug("comms_nats_controller Listening for requests")
while True:
await asyncio.sleep(float(cc.interval) / 1000.0)
try:
if cc.telemetry.visualisation_enabled:
msg = cc.telemetry.mesh_visual()
cc.logger.debug("Publishing comms.visual: %s", msg)
await nats_client.publish("comms.visual", msg.encode())
cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg)
await nats_client.publish(f"comms.visual.{identity}", msg.encode())
except Exception as e:
cc.logger.error("Error:", e)

Expand Down
77 changes: 26 additions & 51 deletions modules/sc-mesh-secure-deployment/src/nats/comms_nats_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import time
import logging
import argparse
import netifaces as ni
import netifaces as netifaces

class NatsDiscovery:
class NatsDiscovery: # pylint: disable=too-few-public-methods
"""
Nats Discovery class. Utilizes the batctl command to discover devices on the mesh network.
"""
Expand All @@ -29,42 +29,23 @@ def __init__(self, role, key, cert):
# logger for this module and derived from main logger
self.logger = self.main_logger.getChild("discovery")

def __get_authorization_config(self) -> str:
"""
Get the authorization configuration for the nats-server configuration file.
:return: authorization configuration
"""
if self.key is not None or self.cert is not None:
authorization = f"""
tls {{
cert_file: "{self.cert}"
key_file: "{self.key}"
timeout: 2
verify: true
}}
"""
else:
authorization = ""

return authorization

def __generate_seed_config(self) -> None:
"""
Generate the nats-server configuration file for the seed node.
:return: None
"""

config = f"""
config = """
listen: 0.0.0.0:4222
leafnodes {{
leafnodes {
port: 7422
}}
{self.__get_authorization_config()}
}
"""
with open('/var/run/nats.conf', 'w') as f:
f.write(config)
with open('/var/run/nats.conf', 'w', encoding='UTF-8') as file_nats_conf:
file_nats_conf.write(config)

def __generate_leaf_config(self, _seed_route) -> None:
@staticmethod
def __generate_leaf_config(_seed_route) -> None:
"""
Generate the nats-server configuration file for the leaf node.
:param _seed_route: seed node route
Expand All @@ -79,10 +60,9 @@ def __generate_leaf_config(self, _seed_route) -> None:
}},
]
}}
{self.__get_authorization_config()}
"""
with open('/var/run/nats.conf', 'w') as f:
f.write(config)
with open('/var/run/nats.conf', 'w', encoding='UTF-8') as file_nats_conf:
file_nats_conf.write(config)

def __reload_nats_server_config(self) -> int:
"""
Expand All @@ -102,14 +82,14 @@ def __reload_nats_server_config(self) -> int:

return 0

def __update_configurations_and_restart(self, ip) -> None:
def __update_configurations_and_restart(self, ip_address) -> None:
"""
Update the nats-server configuration and restart the nats-server.
:param ip: ip address of the seed node
:param ip_address: ip address of the seed node
:return: None
"""
self.logger.debug("Updating configurations and reloading nats-server configuration")
self.__generate_leaf_config(ip)
self.__generate_leaf_config(ip_address)

# reload nats-server configuration
ret = self.__reload_nats_server_config()
Expand All @@ -124,12 +104,13 @@ def __get_mesh_macs() -> list:
:return: list of mac addresses
"""
try:
ret = subprocess.run(["batctl", "o", "-H"], shell=False, check=True, capture_output=True)
ret = subprocess.run(["batctl", "o", "-H"], shell=False,
check=True, capture_output=True)
if ret.returncode != 0:
return []
else:
macs = re.findall(r' \* (([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}))', ret.stdout.decode('utf-8'))
return [mac[0] for mac in macs]
macs = re.findall(r' \* (([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}))',
ret.stdout.decode('utf-8'))
return [mac[0] for mac in macs]
except:
return []

Expand All @@ -140,7 +121,7 @@ def __mac_to_ip(mac) -> str:
:param mac: mac address
:return: ip address
"""
ip_br_lan = ni.ifaddresses('br-lan')[ni.AF_INET][0]['addr'].split(".")[0:-1]
ip_br_lan = netifaces.ifaddresses('br-lan')[netifaces.AF_INET][0]['addr'].split(".")[0:-1]
ip_br_lan = ".".join(ip_br_lan) + "."
return ip_br_lan + str(int(mac.split(":")[-1],16))

Expand Down Expand Up @@ -171,20 +152,19 @@ def run(self):
self.__generate_seed_config()
self.__reload_nats_server_config()
return
else:
# create temporary leaf configuration for nats-server to start
self.__generate_leaf_config("192.168.1.2")
# create temporary leaf configuration for nats-server to start
self.__generate_leaf_config("192.168.1.2")

while True:
macs = self.__get_mesh_macs()
self.logger.debug(f"{macs} len: {len(macs)}")

for mac in macs:
ip = self.__mac_to_ip(mac)
ip_address = self.__mac_to_ip(mac)
if self.seed_ip_address == "":
self.logger.debug(f"Scanning {ip}, {mac}")
if self.__scan_port(ip, self.leaf_port):
self.seed_ip_address = ip
self.logger.debug(f"Scanning {ip_address}, {mac}")
if self.__scan_port(ip_address, self.leaf_port):
self.seed_ip_address = ip_address
gcs_found = 1

if gcs_found:
Expand All @@ -196,11 +176,6 @@ def run(self):
time.sleep(4)

if __name__ == "__main__":
"""
Main function.
:param args: command line arguments
:return: None
"""
parser = argparse.ArgumentParser(description='NATS Discovery')
parser.add_argument('-r', '--role', help='device role', required=True)
parser.add_argument('-k', '--key', help='key file', required=False)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ LOG_FILE=/opt/nats-server.log
[ -r "/etc/default/$DAEMON" ] && . "/etc/default/$DAEMON"

KEY_FILE="/etc/ssl/private/comms_auth_private_key.pem"
CERT_FILE="/etc/ssl/certs/comms_auth_cert.pem"
CERT_FILE="/etc/ssl/certs/comms_server_cert.pem"

if [ -e "$KEY_FILE" ] && [ -e "$CERT_FILE" ]; then
NATS_SERVER_ARGS="-l $LOG_FILE -c /var/run/nats.conf --tlsverify --tlscert=$CERT_FILE --tlskey=$KEY_FILE"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
import asyncio
import nats
import json
import base64
import config


async def main():
# Connect to NATS!
nc = await nats.connect(f"{config.MODULE_IP}:{config.MODULE_PORT}")

cmd_dict = {"api_version": 1, "cmd": "GET_IDENTITY"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request("comms.command.>",
cmd.encode(),
timeout=2)
print(rep.data)

parameters = json.loads(rep.data.decode())
print(json.dumps(parameters, indent=2))

if "identity" in parameters["data"]:
with open("identity.py", "w") as f:
f.write(f"MODULE_IDENTITY=\"{parameters['data']['identity']}\"\n")
else:
print("No identity received!!!!!!!!!!!")
await nc.close()
exit(0)

Expand All @@ -27,5 +30,3 @@ async def main():
loop.run_until_complete(main())
loop.run_forever()
loop.close()


Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "GET_CONFIG", "param": "HOSTAPD_CONFIG"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=4)
parameters = json.loads(rep.data.decode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "GET_CONFIG", "param": "WPA_CONFIG"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=4)
parameters = json.loads(rep.data.decode())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "LOGS", "param": "CONTROLLER"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=2)
print(rep.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "LOGS", "param": "DMESG"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=2)
print(rep.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "LOGS", "param": "HOSTAPD"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=2)
print(rep.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "LOGS", "param": "WPA"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=2)
print(rep.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async def main():

cmd_dict = {"api_version": 1,"cmd": "DOWN"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(), timeout=2)
parameters = json.loads(rep.data)
print(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async def main():

cmd_dict = {"api_version": 1,"cmd": "UP"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(), timeout=2)
parameters = json.loads(rep.data)
print(parameters)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "APPLY"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
cmd.encode(),
timeout=5)
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(), timeout=10)
parameters = json.loads(rep.data)
print(parameters)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def main():
"ip": "192.168.1.2", "subnet": "255.255.255.0", "tx_power": "5", "mode": "mesh",
"role": f"{config.MODULE_ROLE}"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.settings",
rep = await nc.request(f"comms.settings.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=2)
parameters = json.loads(rep.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
async def main():
# Connect to NATS!
nc = await nats.connect(f"{config.MODULE_IP}:{config.MODULE_PORT}")
cmd_dict = {"frequency": "2472", "delay": "1", "amount": "2"}
cmd_dict = {"frequency": "2412", "delay": "1", "amount": "2"}
cmd = json.dumps(cmd_dict)
rep = await nc.publish("comms.settings_csa", cmd.encode())
print(f"Published to comms.settings_csa: {cmd} ({rep})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def main():
"ip": "192.168.1.2", "subnet": "255.255.255.0", "tx_power": "5",
"mode": "ap+mesh_mcc", "role": f"{config.MODULE_ROLE}"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.settings",
rep = await nc.request(f"comms.settings.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=2)
parameters = json.loads(rep.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def main():
"ip": "192.168.1.2", "subnet": "255.255.255.0", "tx_power": "5",
"mode": "ap+mesh_scc", "role": f"{config.MODULE_ROLE}"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.settings",
rep = await nc.request(f"comms.settings.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=2)
parameters = json.loads(rep.data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ async def main():

cmd_dict = {"api_version": 1, "cmd": "REVOKE"}
cmd = json.dumps(cmd_dict)
rep = await nc.request("comms.command",
rep = await nc.request(f"comms.command.{config.MODULE_IDENTITY}",
cmd.encode(),
timeout=4)
parameters = json.loads(rep.data)
Expand Down
Loading
Loading