Skip to content

Commit

Permalink
Merge pull request #311 from tiiuae/nats_integration
Browse files Browse the repository at this point in the history
nats_integration to develop
  • Loading branch information
joenpera authored Sep 27, 2023
2 parents 3a7034a + b30c29b commit 307d6e8
Show file tree
Hide file tree
Showing 31 changed files with 717 additions and 289 deletions.
33 changes: 23 additions & 10 deletions modules/sc-mesh-secure-deployment/src/nats/comms_nats_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def stop(self):
self.batman.thread_running = False # thread loop disabled
self.t2.join() # wait for thread to finish


# pylint: disable=too-many-instance-attributes
class CommsController: # pylint: disable=too-few-public-methods
"""
Expand Down Expand Up @@ -99,7 +100,8 @@ def __init__(self, server: str, port: str, interval: int = 1000):
# logger for this module and derived from main logger
self.logger = self.main_logger.getChild("controller")

class CommsCsa: # pylint: disable=too-few-public-methods

class CommsCsa: # pylint: disable=too-few-public-methods
"""
Comms CSA class to storage settings for CSA for a state change
"""
Expand All @@ -117,6 +119,15 @@ async def main(server, port, keyfile=None, certfile=None, interval=1000):
nats_client = NATS()
csac = CommsCsa()

status, _, identity_dict = cc.command.get_identity()

if status == "OK":
identity = identity_dict["identity"]
cc.logger.debug("Identity: %s", identity)
else:
cc.logger.error("Failed to get identity!")
return

async def stop():
await asyncio.sleep(1)
asyncio.get_running_loop().stop()
Expand Down Expand Up @@ -156,6 +167,7 @@ async def reconnected_cb():
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1)

async def handle_settings_csa_post(ret):
if ret == "OK":
ret = "ACK"
Expand All @@ -181,16 +193,16 @@ async def message_handler(message):
cc.logger.debug("Received a message on '%s': %s", subject, data)
ret, info, resp = "FAIL", "Not supported subject", ""

if subject == "comms.settings":
if subject == f"comms.settings.{identity}":
ret, info = cc.settings.handle_mesh_settings(data)
elif subject == "comms.settings_csa":
ret, info, delay = cc.settings.handle_mesh_settings_csa(data)
csac.delay = delay
csac.ack_sent = "status" in data

elif subject == "comms.command":
elif subject == f"comms.command.{identity}" or subject == "comms.identity":
ret, info, resp = cc.command.handle_command(data, cc)
elif subject == "comms.status":
elif subject == f"comms.status.{identity}":
ret, info = "OK", "Returning current status"

if subject == "comms.settings_csa":
Expand All @@ -204,27 +216,28 @@ async def message_handler(message):
'visualisation_active': cc.comms_status.is_visualisation_active,
'mesh_radio_on': cc.comms_status.is_mesh_radio_on,
'ap_radio_on': cc.comms_status.is_ap_radio_on,
'security_status': cc.comms_status.security_status }
'security_status': cc.comms_status.security_status}

if resp != "":
response['data'] = resp

cc.logger.debug("Sending response: %s", str(response)[:1000])
await message.respond(json.dumps(response).encode("utf-8"))

await nats_client.subscribe("comms.settings", cb=message_handler)
await nats_client.subscribe(f"comms.settings.{identity}", cb=message_handler)
await nats_client.subscribe("comms.settings_csa", cb=message_handler)
await nats_client.subscribe("comms.command", cb=message_handler)
await nats_client.subscribe("comms.status", cb=message_handler)
await nats_client.subscribe(f"comms.command.{identity}", cb=message_handler)
await nats_client.subscribe("comms.identity", cb=message_handler)
await nats_client.subscribe(f"comms.status.{identity}", cb=message_handler)

cc.logger.debug("comms_nats_controller Listening for requests")
while True:
await asyncio.sleep(float(cc.interval) / 1000.0)
try:
if cc.telemetry.visualisation_enabled:
msg = cc.telemetry.mesh_visual()
cc.logger.debug("Publishing comms.visual: %s", msg)
await nats_client.publish("comms.visual", msg.encode())
cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg)
await nats_client.publish(f"comms.visual.{identity}", msg.encode())
except Exception as e:
cc.logger.error("Error:", e)

Expand Down
169 changes: 102 additions & 67 deletions modules/sc-mesh-secure-deployment/src/nats/comms_nats_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,28 @@
import time
import logging
import argparse
import netifaces as ni
import netifaces as netifaces
import os
import textwrap

class NatsDiscovery:

class NatsDiscovery: # pylint: disable=too-few-public-methods
"""
Nats Discovery class. Utilizes the batctl command to discover devices on the mesh network.
"""
def __init__(self, role, key, cert):
def __init__(self, role, key, cert, servercert, ca):
self.role = role
self.key = key
self.cert = cert
self.server_cert = servercert
self.cert_authority = ca
self.leaf_port = 7422
self.seed_ip_address = ""
self.tls_required = False

if os.path.exists(self.key) and os.path.exists(self.cert) \
and os.path.exists(self.server_cert) and os.path.exists(self.cert_authority):
self.tls_required = True

# base logger for discovery
self.main_logger = logging.getLogger("nats")
Expand All @@ -29,60 +39,87 @@ def __init__(self, role, key, cert):
# logger for this module and derived from main logger
self.logger = self.main_logger.getChild("discovery")

def __get_authorization_config(self) -> str:
"""
Get the authorization configuration for the nats-server configuration file.
:return: authorization configuration
"""
if self.key is not None or self.cert is not None:
authorization = f"""
tls {{
cert_file: "{self.cert}"
key_file: "{self.key}"
timeout: 2
verify: true
}}
"""
else:
authorization = ""

return authorization

def __generate_seed_config(self) -> None:
"""
Generate the nats-server configuration file for the seed node.
:return: None
"""

config = f"""
listen: 0.0.0.0:4222
leafnodes {{
port: 7422
}}
{self.__get_authorization_config()}
"""
with open('/var/run/nats.conf', 'w') as f:
f.write(config)
if self.tls_required:
config = textwrap.dedent(f"""
listen: 0.0.0.0:4222
tls {{
key_file: {self.key}
cert_file: {self.server_cert}
ca_file: {self.cert_authority}
verify: true
}}
leafnodes {{
port: 7422
tls {{
key_file: {self.key}
cert_file: {self.server_cert}
ca_file: {self.cert_authority}
verify: true
}}
}}
""")
else:
config = textwrap.dedent("""
listen: 0.0.0.0:4222
leafnodes {
port: 7422
}
""")

with open('/var/run/nats.conf', 'w', encoding='UTF-8') as file_nats_conf:
file_nats_conf.write(config)

def __generate_leaf_config(self, _seed_route) -> None:
"""
Generate the nats-server configuration file for the leaf node.
:param _seed_route: seed node route
:return: None
"""
config = f"""
listen: 0.0.0.0:4222
leafnodes {{
remotes = [
{{
url: "nats://{_seed_route}"
}},
]
}}
{self.__get_authorization_config()}
"""
with open('/var/run/nats.conf', 'w') as f:
f.write(config)
if self.tls_required:
protocol = "tls"
config = textwrap.dedent(f"""
listen: 0.0.0.0:4222
tls {{
key_file: {self.key}
cert_file: {self.server_cert}
ca_file: {self.cert_authority}
verify: true
}}
leafnodes {{
remotes = [
{{
url: "{protocol}://{_seed_route}"
tls {{
key_file: {self.key}
cert_file: {self.cert}
ca_file: {self.cert_authority}
verify: true
}}
}}
]
}}
""")
else:
protocol = "nats"
config = textwrap.dedent(f"""
listen: 0.0.0.0:4222
leafnodes {{
remotes = [
{{
url: "{protocol}://{_seed_route}"
}}
]
}}
""")

with open('/var/run/nats.conf', 'w', encoding='UTF-8') as file_nats_conf:
file_nats_conf.write(config)

def __reload_nats_server_config(self) -> int:
"""
Expand All @@ -102,14 +139,14 @@ def __reload_nats_server_config(self) -> int:

return 0

def __update_configurations_and_restart(self, ip) -> None:
def __update_configurations_and_restart(self, ip_address) -> None:
"""
Update the nats-server configuration and restart the nats-server.
:param ip: ip address of the seed node
:param ip_address: ip address of the seed node
:return: None
"""
self.logger.debug("Updating configurations and reloading nats-server configuration")
self.__generate_leaf_config(ip)
self.__generate_leaf_config(ip_address)

# reload nats-server configuration
ret = self.__reload_nats_server_config()
Expand All @@ -124,12 +161,13 @@ def __get_mesh_macs() -> list:
:return: list of mac addresses
"""
try:
ret = subprocess.run(["batctl", "o", "-H"], shell=False, check=True, capture_output=True)
ret = subprocess.run(["batctl", "o", "-H"], shell=False,
check=True, capture_output=True)
if ret.returncode != 0:
return []
else:
macs = re.findall(r' \* (([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}))', ret.stdout.decode('utf-8'))
return [mac[0] for mac in macs]
macs = re.findall(r' \* (([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2}))',
ret.stdout.decode('utf-8'))
return [mac[0] for mac in macs]
except:
return []

Expand All @@ -140,9 +178,9 @@ def __mac_to_ip(mac) -> str:
:param mac: mac address
:return: ip address
"""
ip_br_lan = ni.ifaddresses('br-lan')[ni.AF_INET][0]['addr'].split(".")[0:-1]
ip_br_lan = netifaces.ifaddresses('br-lan')[netifaces.AF_INET][0]['addr'].split(".")[0:-1]
ip_br_lan = ".".join(ip_br_lan) + "."
return ip_br_lan + str(int(mac.split(":")[-1],16))
return ip_br_lan + str(int(mac.split(":")[-1], 16))

@staticmethod
def __scan_port(host, port) -> bool:
Expand Down Expand Up @@ -171,20 +209,19 @@ def run(self):
self.__generate_seed_config()
self.__reload_nats_server_config()
return
else:
# create temporary leaf configuration for nats-server to start
self.__generate_leaf_config("192.168.1.2")
# create temporary leaf configuration for nats-server to start
self.__generate_leaf_config("192.168.1.2")

while True:
macs = self.__get_mesh_macs()
self.logger.debug(f"{macs} len: {len(macs)}")

for mac in macs:
ip = self.__mac_to_ip(mac)
ip_address = self.__mac_to_ip(mac)
if self.seed_ip_address == "":
self.logger.debug(f"Scanning {ip}, {mac}")
if self.__scan_port(ip, self.leaf_port):
self.seed_ip_address = ip
self.logger.debug(f"Scanning {ip_address}, {mac}")
if self.__scan_port(ip_address, self.leaf_port):
self.seed_ip_address = ip_address
gcs_found = 1

if gcs_found:
Expand All @@ -195,17 +232,15 @@ def run(self):

time.sleep(4)


if __name__ == "__main__":
"""
Main function.
:param args: command line arguments
:return: None
"""
parser = argparse.ArgumentParser(description='NATS Discovery')
parser.add_argument('-r', '--role', help='device role', required=True)
parser.add_argument('-k', '--key', help='key file', required=False)
parser.add_argument('-c', '--cert', help='cert file', required=False)
parser.add_argument('-c', '--cert', help='client cert file', required=False)
parser.add_argument('-s', '--servercert', help='server cert file', required=False)
parser.add_argument('-a', '--ca', help='certificate authority file', required=False)
args = parser.parse_args()

forrest = NatsDiscovery(args.role, args.key, args.cert)
forrest = NatsDiscovery(args.role, args.key, args.cert, args.servercert, args.ca)
forrest.run()
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ source_configuration

KEY_FILE="/etc/ssl/private/comms_auth_private_key.pem"
CERT_FILE="/etc/ssl/certs/comms_auth_cert.pem"
SERVER_CERT_FILE="/etc/ssl/certs/comms_server_cert.pem"
CA="/etc/ssl/certs/root-ca.cert.pem"

if [ -e "$KEY_FILE" ] && [ -e "$CERT_FILE" ]; then
ARGS="--role $ROLE -k $KEY_FILE -c $CERT_FILE"
ARGS="--role $ROLE -k $KEY_FILE -c $CERT_FILE -s $SERVER_CERT_FILE -a $CA"
else
ARGS="--role $ROLE"
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,7 @@ LOG_FILE=/opt/nats-server.log
# shellcheck source=/dev/null
[ -r "/etc/default/$DAEMON" ] && . "/etc/default/$DAEMON"

KEY_FILE="/etc/ssl/private/comms_auth_private_key.pem"
CERT_FILE="/etc/ssl/certs/comms_auth_cert.pem"

if [ -e "$KEY_FILE" ] && [ -e "$CERT_FILE" ]; then
NATS_SERVER_ARGS="-l $LOG_FILE -c /var/run/nats.conf --tlsverify --tlscert=$CERT_FILE --tlskey=$KEY_FILE"
else
NATS_SERVER_ARGS="-l $LOG_FILE -c /var/run/nats.conf"
fi
NATS_SERVER_ARGS="-l $LOG_FILE -c /var/run/nats.conf"

start() {
echo "$NATS_SERVER_ARGS"
Expand Down
Loading

0 comments on commit 307d6e8

Please sign in to comment.