Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mdm_client support #336

Merged
merged 31 commits into from
Dec 7, 2023
Merged
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
a0e2c2f
Preparing mdm_client for config download
joenpera Nov 16, 2023
beb9ccd
Change to use MDM defined API instead of test API
joenpera Nov 16, 2023
7aa0eb5
Change to use API get_device_config instead of single radio API
joenpera Nov 16, 2023
eaad548
Add initial version MDM service monitor
saauvine Nov 16, 2023
febde01
Separate main() for MDM and FMO usage
joenpera Nov 17, 2023
5cf0b59
Removing dead code and preparing to start MDM agent
joenpera Nov 17, 2023
ceab649
MDM Agent TLS support
joenpera Nov 22, 2023
750e4aa
MDM Agent TLS support fixes after testing
joenpera Nov 22, 2023
82888de
Return dns name instead of IP address
saauvine Nov 22, 2023
84f517b
Fix MDM/FMO start after testing on device
joenpera Nov 23, 2023
9e2ed39
Fix system time if not set from Certificate notBefore time
joenpera Nov 23, 2023
04719ac
Starting MDM agent in a boot for testing purposes
joenpera Nov 24, 2023
bc85f6c
More comments to entrypoint
joenpera Nov 27, 2023
51d7953
Add polling time adjustments and callback value check
joenpera Nov 28, 2023
eef1aab
Add action handlers for different configuration items
joenpera Nov 28, 2023
9e65b4b
Merge pull request #342 from tiiuae/develop
joenpera Nov 29, 2023
98c6c1f
SLAAC integration
DaniilTroshkovTII Nov 29, 2023
52db0a2
Add support to monitor services via specific network interface
saauvine Nov 29, 2023
c670c51
Add draft for posting certs
saauvine Nov 29, 2023
62a35b7
Merge pull request #344 from tiiuae/mdm_client_slaac_integration
DaniilTroshkovTII Nov 29, 2023
1e5433d
Add FMO to feature configurations
joenpera Nov 29, 2023
1d60d99
Fix UnitTests and Alfred start
joenpera Nov 30, 2023
504ac28
Certificate paths change
joenpera Nov 30, 2023
ed776f0
Merge branch 'develop' into mdm_client
joenpera Nov 30, 2023
42cd276
Add functional test code and helpers for CI/CD testing
joenpera Dec 1, 2023
9722589
Merge branch 'develop' into mdm_client
joenpera Dec 1, 2023
c803c5c
Fix bridge config writing to config files
joenpera Dec 5, 2023
cc70608
Fix bridge configuration problem
joenpera Dec 5, 2023
4b2092f
Init.d service startup changed to use daemon
joenpera Dec 5, 2023
5964263
Merge branch 'develop' into mdm_client
joenpera Dec 5, 2023
10dc6ec
Change validation to validate against actual interfaces
joenpera Dec 7, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
298 changes: 238 additions & 60 deletions modules/sc-mesh-secure-deployment/src/nats/comms_nats_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
import logging
import threading
import json
from typing import Optional

from nats.aio.client import Client as NATS
from concurrent.futures import ThreadPoolExecutor
import requests

from src import comms_settings
from src import comms_command
Expand All @@ -17,6 +21,12 @@
from src import batadvvis
from src import batstat

# FMO_SUPPORTED_COMMANDS = [
# "GET_IDENTITY",
# "ENABLE_VISUALISATION",
# "DISABLE_VISUALISATION",
# ]


class MeshTelemetry:
"""
Expand Down Expand Up @@ -120,22 +130,167 @@ def __init__(self, server: str, port: str, interval: int = 1000):
self.logger = self.main_logger.getChild("controller")


class MdmAgent:
"""
MDM Agent
"""

def __init__(
self, comms_controller, keyfile: str = None, certificate_file: str = None
):
"""
Constructor
"""
self.__previous_config: Optional[str] = self.__read_config_from_file()
self.__comms_controller: CommsController = comms_controller
self.__interval: int = 10
self.__url: str = "http://0.0.0.0:5000/config"
self.__keyfile: str = keyfile
self.__certificate_file: str = certificate_file
self.__ssl_context: Optional[ssl.SSLContext] = None
self.__config_version: int = 0
self.mdm_connection_status: bool = True

async def mdm_server_address_cb(self, address: str, status: str) -> None:
"""
Callback for MDM server address
:param address: MDM server address
:param status: MDM connection status
:return: -
"""
self.__url = address
self.mdm_connection_status = status

async def execute(self) -> None:
"""
Execute MDM agent
:return: -
"""
executor = ThreadPoolExecutor(1)
if self.__certificate_file and self.__keyfile:
self.__ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
self.__ssl_context.load_cert_chain(
certfile=self.__certificate_file, keyfile=self.__keyfile
)

while True:
if self.mdm_connection_status:
await self.__loop_run_executor(executor)
await asyncio.sleep(float(self.__interval))

def __http_request(self) -> requests.Response:
"""
HTTP request
:return: HTTP response
"""
try:
if self.__ssl_context is not None:
self.__comms_controller.logger.debug("SSL context is not None")
return requests.get(self.__url, verify=self.__ssl_context, timeout=2)
self.__comms_controller.logger.debug("SSL context is None")
return requests.get(self.__url, timeout=2) # todo for testing only
except requests.exceptions.ConnectionError as err:
self.__comms_controller.logger.error(
"HTTP request failed with error: %s", err
)
return requests.Response()

def __handle_received_config(self, response: requests.Response) -> None:
"""
Handle received config
:param response: HTTP response
:return: -
"""

self.__comms_controller.logger.debug(
f"HTTP Request Response: {response.text.strip()} {str(response.status_code).strip()}"
)
self.__previous_config = response.text.strip()

data = json.loads(response.text)
ret, info = self.__comms_controller.settings.handle_mesh_settings(
json.dumps(data["payload"])
)
self.__comms_controller.logger.debug("ret: %s info: %s", ret, info)
if ret == "OK":
for radio in data["payload"]["radios"]:
# Extract the radio_index
index_number = radio["radio_index"]

# Create the command
cmd = json.dumps(
{
"api_version": 1,
"cmd": "APPLY",
"radio_index": index_number,
}
)

ret, info, _ = self.__comms_controller.command.handle_command(
cmd, self.__comms_controller
)
self.__comms_controller.logger.debug("ret: %s info: %s", ret, info)
self.__config_version = int(data["version"])
self.__write_config_to_file(response)

@staticmethod
def __read_config_from_file() -> Optional[str]:
"""
Read config from file
:return: config
"""
try:
with open("/opt/config.json", "r", encoding="utf-8") as f:
return f.read().strip()
except FileNotFoundError:
return None

@staticmethod
def __write_config_to_file(response: requests.Response) -> None:
"""
Write config to file
:param response: HTTP response
:return: -
"""
with open("/opt/config.json", "w", encoding="utf-8") as f:
f.write(response.text.strip())

async def __loop_run_executor(self, executor) -> None:
"""
Loop run executor
:param executor: executor
"""
# This function makes a synchronous HTTP request using ThreadPoolExecutor
https_loop = asyncio.get_event_loop()
response = await https_loop.run_in_executor(executor, self.__http_request)
self.__comms_controller.logger.debug(
"HTTP Request status: %s", str(response.status_code)
)
if (
response.status_code == 200
and self.__previous_config != response.text.strip()
and response.text.strip() != ""
):
self.__handle_received_config(response)


# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
async def main(server, port, keyfile=None, certfile=None, interval=1000):
async def main(server, port, keyfile=None, certfile=None, interval=1000, agent="fmo"):
"""
main
"""
cc = CommsController(server, port, interval)
nats_client = NATS()

status, _, identity_dict = cc.command.get_identity()
if agent == "fmo":
nats_client = NATS()
status, _, identity_dict = cc.command.get_identity()

if status == "OK":
identity = identity_dict["identity"]
cc.logger.debug("Identity: %s", identity)
else:
cc.logger.error("Failed to get identity!")
return
if status == "OK":
identity = identity_dict["identity"]
cc.logger.debug("Identity: %s", identity)
else:
cc.logger.error("Failed to get identity!")
return

async def stop():
await asyncio.sleep(1)
Expand All @@ -159,49 +314,58 @@ async def disconnected_cb():
async def reconnected_cb():
cc.logger.debug("Got reconnected...")

# Create SSL context if certfile and keyfile are provided
ssl_context = None
if certfile and keyfile:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)

# Connect to NATS server with TLS enabled if ssl_context is provided
if ssl_context:
await nats_client.connect(
f"tls://{server}:{port}",
tls=ssl_context,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)
else:
await nats_client.connect(
f"nats://{server}:{port}",
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)
if agent == "fmo":
# Create SSL context if certfile and keyfile are provided
ssl_context = None
if certfile and keyfile:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)

# Connect to NATS server with TLS enabled if ssl_context is provided
if ssl_context:
await nats_client.connect(
f"tls://{server}:{port}",
tls=ssl_context,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)
else:
await nats_client.connect(
f"nats://{server}:{port}",
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)

async def message_handler(message):
async def fmo_message_handler(message):
"""
Message handler for FMO
"""
# reply = message.reply
subject = message.subject
data = message.data.decode()
cc.logger.debug("Received a message on '%s': %s", subject, data)
ret, info, resp = "FAIL", "Not supported subject", ""

if subject == f"comms.settings.{identity}":
ret, info = cc.settings.handle_mesh_settings(data)
elif subject == f"comms.channel_change.{identity}":
ret, info, _index = cc.settings.handle_mesh_settings_channel_change(data)
if ret == "TRIGGER":
cmd = json.dumps(
{"api_version": 1, "cmd": "APPLY", "radio_index": f"{_index}"}
command = json.loads(data)["cmd"]

if agent == "fmo": # and command in FMO_SUPPORTED_COMMANDS:
if subject == f"comms.settings.{identity}":
ret, info = cc.settings.handle_mesh_settings(data)
elif subject == f"comms.channel_change.{identity}":
ret, info, _index = cc.settings.handle_mesh_settings_channel_change(
data
)
ret, info, resp = cc.command.handle_command(cmd, cc)
elif subject in (f"comms.command.{identity}", "comms.identity"):
ret, info, resp = cc.command.handle_command(data, cc)
elif subject == f"comms.status.{identity}":
ret, info = "OK", "Returning current status"
if ret == "TRIGGER":
cmd = json.dumps(
{"api_version": 1, "cmd": "APPLY", "radio_index": f"{_index}"}
)
ret, info, resp = cc.command.handle_command(cmd, cc)
elif subject in (f"comms.command.{identity}", "comms.identity"):
ret, info, resp = cc.command.handle_command(data, cc)
elif subject == f"comms.status.{identity}":
ret, info = "OK", "Returning current status"

# Update status info
_ = [item.refresh_status() for item in cc.c_status]
Expand All @@ -225,22 +389,33 @@ async def message_handler(message):
cc.logger.debug("Sending response: %s", str(response)[:1000])
await message.respond(json.dumps(response).encode("utf-8"))

await nats_client.subscribe(f"comms.settings.{identity}", cb=message_handler)
await nats_client.subscribe(f"comms.channel_change.{identity}", cb=message_handler)
await nats_client.subscribe(f"comms.command.{identity}", cb=message_handler)
await nats_client.subscribe("comms.identity", cb=message_handler)
await nats_client.subscribe(f"comms.status.{identity}", cb=message_handler)
if agent == "fmo":
await nats_client.subscribe(
f"comms.settings.{identity}", cb=fmo_message_handler
)
await nats_client.subscribe(
f"comms.channel_change.{identity}", cb=fmo_message_handler
)
await nats_client.subscribe(f"comms.status.{identity}", cb=fmo_message_handler)
await nats_client.subscribe("comms.identity", cb=fmo_message_handler)
await nats_client.subscribe(f"comms.command.{identity}", cb=fmo_message_handler)

cc.logger.debug(f"{agent}: comms_nats_controller Listening for requests")

cc.logger.debug("comms_nats_controller Listening for requests")
while True:
await asyncio.sleep(float(cc.interval) / 1000.0)
try:
if cc.telemetry.visualisation_enabled:
msg = cc.telemetry.mesh_visual()
cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg)
await nats_client.publish(f"comms.visual.{identity}", msg.encode())
except Exception as e:
cc.logger.error("Error:", e)
# separate instances needed for FMO/MDM
if agent == "mdm":
mdm = MdmAgent(cc, keyfile, certfile)
await mdm.execute()
else:
while True:
await asyncio.sleep(float(cc.interval) / 1000.0)
try:
if cc.telemetry.visualisation_enabled and agent == "fmo":
msg = cc.telemetry.mesh_visual()
cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg)
await nats_client.publish(f"comms.visual.{identity}", msg.encode())
except Exception as e:
cc.logger.error("Error:", e)


if __name__ == "__main__":
Expand All @@ -249,9 +424,12 @@ async def message_handler(message):
parser.add_argument("-p", "--port", help="Server port", required=True)
parser.add_argument("-k", "--keyfile", help="TLS keyfile", required=False)
parser.add_argument("-c", "--certfile", help="TLS certfile", required=False)
parser.add_argument("-a", "--agent", help="mdm or fmo", required=False)
args = parser.parse_args()

loop = asyncio.new_event_loop()
loop.run_until_complete(main(args.server, args.port, args.keyfile, args.certfile))
loop.run_until_complete(
main(args.server, args.port, args.keyfile, args.certfile, agent=args.agent)
)
loop.run_forever()
loop.close()
Loading