From febde019ef1bb4480306f406265099a44a162af8 Mon Sep 17 00:00:00 2001 From: Mika Joenpera Date: Fri, 17 Nov 2023 13:26:44 +0200 Subject: [PATCH] Separate main() for MDM and FMO usage - increasing code readability Jira-ID: MSS20-230 Signed-off-by: Mika Joenpera --- .../src/nats/comms_nats_controller.py | 186 ++++++++++-------- 1 file changed, 106 insertions(+), 80 deletions(-) diff --git a/modules/sc-mesh-secure-deployment/src/nats/comms_nats_controller.py b/modules/sc-mesh-secure-deployment/src/nats/comms_nats_controller.py index 3c1da135b..68baea390 100644 --- a/modules/sc-mesh-secure-deployment/src/nats/comms_nats_controller.py +++ b/modules/sc-mesh-secure-deployment/src/nats/comms_nats_controller.py @@ -316,23 +316,53 @@ async def __loop_run_executor(self, executor) -> None: self.__handle_received_config(response) +async def main_mdm(keyfile=None, certfile=None, interval=1000): + """ + main + """ + cc = CommsController("local", "5000", interval) + + async def stop(): + await asyncio.sleep(1) + asyncio.get_running_loop().stop() + def signal_handler(): + cc.logger.debug("Disconnecting...") + asyncio.create_task(stop()) + + for sig in ("SIGINT", "SIGTERM"): + asyncio.get_running_loop().add_signal_handler( + getattr(signal, sig), signal_handler + ) + + cc.logger.debug("MDM: comms_nats_controller Listening for requests") + + # separate instances needed for FMO/MDM + + mdm = MdmAgent(cc, keyfile, certfile) + monitor = comms_service_discovery.CommsServiceMonitor( + service_name="MDM Service", + service_type="_mdm._tcp.local.", + service_cb=mdm.mdm_server_address_cb, + ) + await asyncio.gather(mdm.execute(), monitor.async_run()) + # pylint: disable=too-many-arguments, too-many-locals, too-many-statements -async def main(server, port, keyfile=None, certfile=None, interval=1000, agent="fmo"): +async def main_fmo(server, port, keyfile=None, certfile=None, interval=1000): """ main """ cc = CommsController(server, port, interval) - if agent == "fmo": - nats_client = NATS() - status, _, identity_dict = cc.command.get_identity() - if status == "OK": - identity = identity_dict["identity"] - cc.logger.debug("Identity: %s", identity) - else: - cc.logger.error("Failed to get identity!") - return + nats_client = NATS() + status, _, identity_dict = cc.command.get_identity() + + if status == "OK": + identity = identity_dict["identity"] + cc.logger.debug("Identity: %s", identity) + else: + cc.logger.error("Failed to get identity!") + return async def stop(): await asyncio.sleep(1) @@ -356,29 +386,29 @@ async def disconnected_cb(): async def reconnected_cb(): cc.logger.debug("Got reconnected...") - if agent == "fmo": - # Create SSL context if certfile and keyfile are provided - ssl_context = None - if certfile and keyfile: - ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) - ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile) - - # Connect to NATS server with TLS enabled if ssl_context is provided - if ssl_context: - await nats_client.connect( - f"tls://{server}:{port}", - tls=ssl_context, - reconnected_cb=reconnected_cb, - disconnected_cb=disconnected_cb, - max_reconnect_attempts=-1, - ) - else: - await nats_client.connect( - f"nats://{server}:{port}", - reconnected_cb=reconnected_cb, - disconnected_cb=disconnected_cb, - max_reconnect_attempts=-1, - ) + + # Create SSL context if certfile and keyfile are provided + ssl_context = None + if certfile and keyfile: + ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH) + ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile) + + # Connect to NATS server with TLS enabled if ssl_context is provided + if ssl_context: + await nats_client.connect( + f"tls://{server}:{port}", + tls=ssl_context, + reconnected_cb=reconnected_cb, + disconnected_cb=disconnected_cb, + max_reconnect_attempts=-1, + ) + else: + await nats_client.connect( + f"nats://{server}:{port}", + reconnected_cb=reconnected_cb, + disconnected_cb=disconnected_cb, + max_reconnect_attempts=-1, + ) async def fmo_message_handler(message): """ @@ -392,22 +422,22 @@ async def fmo_message_handler(message): command = json.loads(data)["cmd"] - if agent == "fmo": # and command in FMO_SUPPORTED_COMMANDS: - if subject == f"comms.settings.{identity}": - ret, info = cc.settings.handle_mesh_settings(data) - elif subject == f"comms.channel_change.{identity}": - ret, info, _index = cc.settings.handle_mesh_settings_channel_change( - data + + if subject == f"comms.settings.{identity}": + ret, info = cc.settings.handle_mesh_settings(data) + elif subject == f"comms.channel_change.{identity}": + ret, info, _index = cc.settings.handle_mesh_settings_channel_change( + data + ) + if ret == "TRIGGER": + cmd = json.dumps( + {"api_version": 1, "cmd": "APPLY", "radio_index": f"{_index}"} ) - if ret == "TRIGGER": - cmd = json.dumps( - {"api_version": 1, "cmd": "APPLY", "radio_index": f"{_index}"} - ) - ret, info, resp = cc.command.handle_command(cmd, cc) - elif subject in (f"comms.command.{identity}", "comms.identity"): - ret, info, resp = cc.command.handle_command(data, cc) - elif subject == f"comms.status.{identity}": - ret, info = "OK", "Returning current status" + ret, info, resp = cc.command.handle_command(cmd, cc) + elif subject in (f"comms.command.{identity}", "comms.identity"): + ret, info, resp = cc.command.handle_command(data, cc) + elif subject == f"comms.status.{identity}": + ret, info = "OK", "Returning current status" # Update status info _ = [item.refresh_status() for item in cc.c_status] @@ -431,38 +461,29 @@ async def fmo_message_handler(message): cc.logger.debug("Sending response: %s", str(response)[:1000]) await message.respond(json.dumps(response).encode("utf-8")) - if agent == "fmo": - await nats_client.subscribe( - f"comms.settings.{identity}", cb=fmo_message_handler - ) - await nats_client.subscribe( - f"comms.channel_change.{identity}", cb=fmo_message_handler - ) - await nats_client.subscribe(f"comms.status.{identity}", cb=fmo_message_handler) - await nats_client.subscribe("comms.identity", cb=fmo_message_handler) - await nats_client.subscribe(f"comms.command.{identity}", cb=fmo_message_handler) - cc.logger.debug(f"{agent}: comms_nats_controller Listening for requests") + await nats_client.subscribe( + f"comms.settings.{identity}", cb=fmo_message_handler + ) + await nats_client.subscribe( + f"comms.channel_change.{identity}", cb=fmo_message_handler + ) + await nats_client.subscribe(f"comms.status.{identity}", cb=fmo_message_handler) + await nats_client.subscribe("comms.identity", cb=fmo_message_handler) + await nats_client.subscribe(f"comms.command.{identity}", cb=fmo_message_handler) + + cc.logger.debug("FMO comms_nats_controller Listening for requests") # separate instances needed for FMO/MDM - if agent == "mdm": - mdm = MdmAgent(cc, keyfile, certfile) - monitor = comms_service_discovery.CommsServiceMonitor( - service_name="MDM Service", - service_type="_mdm._tcp.local.", - service_cb=mdm.mdm_server_address_cb, - ) - await asyncio.gather(mdm.execute(), monitor.async_run()) - else: - while True: - await asyncio.sleep(float(cc.interval) / 1000.0) - try: - if cc.telemetry.visualisation_enabled and agent == "fmo": - msg = cc.telemetry.mesh_visual() - cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg) - await nats_client.publish(f"comms.visual.{identity}", msg.encode()) - except Exception as e: - cc.logger.error("Error:", e) + while True: + await asyncio.sleep(float(cc.interval) / 1000.0) + try: + if cc.telemetry.visualisation_enabled: + msg = cc.telemetry.mesh_visual() + cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg) + await nats_client.publish(f"comms.visual.{identity}", msg.encode()) + except Exception as e: + cc.logger.error("Error:", e) if __name__ == "__main__": @@ -475,8 +496,13 @@ async def fmo_message_handler(message): args = parser.parse_args() loop = asyncio.new_event_loop() - loop.run_until_complete( - main(args.server, args.port, args.keyfile, args.certfile, agent=args.agent) - ) + if args.agent == "mdm": + loop.run_until_complete( + main_mdm(args.keyfile, args.certfile) + ) + else: + loop.run_until_complete( + main_fmo(args.server, args.port, args.keyfile, args.certfile) + ) loop.run_forever() loop.close()