Skip to content

Commit

Permalink
Separate main() for MDM and FMO usage
Browse files Browse the repository at this point in the history
- increasing code readability

Jira-ID: MSS20-230

Signed-off-by: Mika Joenpera <[email protected]>
  • Loading branch information
joenpera committed Nov 17, 2023
1 parent eaad548 commit febde01
Showing 1 changed file with 106 additions and 80 deletions.
186 changes: 106 additions & 80 deletions modules/sc-mesh-secure-deployment/src/nats/comms_nats_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,23 +316,53 @@ async def __loop_run_executor(self, executor) -> None:
self.__handle_received_config(response)


async def main_mdm(keyfile=None, certfile=None, interval=1000):
"""
main
"""
cc = CommsController("local", "5000", interval)

async def stop():
await asyncio.sleep(1)
asyncio.get_running_loop().stop()
def signal_handler():
cc.logger.debug("Disconnecting...")
asyncio.create_task(stop())

for sig in ("SIGINT", "SIGTERM"):
asyncio.get_running_loop().add_signal_handler(
getattr(signal, sig), signal_handler
)

cc.logger.debug("MDM: comms_nats_controller Listening for requests")

# separate instances needed for FMO/MDM

mdm = MdmAgent(cc, keyfile, certfile)
monitor = comms_service_discovery.CommsServiceMonitor(
service_name="MDM Service",
service_type="_mdm._tcp.local.",
service_cb=mdm.mdm_server_address_cb,
)
await asyncio.gather(mdm.execute(), monitor.async_run())

# pylint: disable=too-many-arguments, too-many-locals, too-many-statements
async def main(server, port, keyfile=None, certfile=None, interval=1000, agent="fmo"):
async def main_fmo(server, port, keyfile=None, certfile=None, interval=1000):
"""
main
"""
cc = CommsController(server, port, interval)

if agent == "fmo":
nats_client = NATS()
status, _, identity_dict = cc.command.get_identity()

if status == "OK":
identity = identity_dict["identity"]
cc.logger.debug("Identity: %s", identity)
else:
cc.logger.error("Failed to get identity!")
return
nats_client = NATS()
status, _, identity_dict = cc.command.get_identity()

if status == "OK":
identity = identity_dict["identity"]
cc.logger.debug("Identity: %s", identity)
else:
cc.logger.error("Failed to get identity!")
return

async def stop():
await asyncio.sleep(1)
Expand All @@ -356,29 +386,29 @@ async def disconnected_cb():
async def reconnected_cb():
cc.logger.debug("Got reconnected...")

if agent == "fmo":
# Create SSL context if certfile and keyfile are provided
ssl_context = None
if certfile and keyfile:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)

# Connect to NATS server with TLS enabled if ssl_context is provided
if ssl_context:
await nats_client.connect(
f"tls://{server}:{port}",
tls=ssl_context,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)
else:
await nats_client.connect(
f"nats://{server}:{port}",
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)

# Create SSL context if certfile and keyfile are provided
ssl_context = None
if certfile and keyfile:
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.load_cert_chain(certfile=certfile, keyfile=keyfile)

# Connect to NATS server with TLS enabled if ssl_context is provided
if ssl_context:
await nats_client.connect(
f"tls://{server}:{port}",
tls=ssl_context,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)
else:
await nats_client.connect(
f"nats://{server}:{port}",
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=-1,
)

async def fmo_message_handler(message):
"""
Expand All @@ -392,22 +422,22 @@ async def fmo_message_handler(message):

command = json.loads(data)["cmd"]

if agent == "fmo": # and command in FMO_SUPPORTED_COMMANDS:
if subject == f"comms.settings.{identity}":
ret, info = cc.settings.handle_mesh_settings(data)
elif subject == f"comms.channel_change.{identity}":
ret, info, _index = cc.settings.handle_mesh_settings_channel_change(
data

if subject == f"comms.settings.{identity}":
ret, info = cc.settings.handle_mesh_settings(data)
elif subject == f"comms.channel_change.{identity}":
ret, info, _index = cc.settings.handle_mesh_settings_channel_change(
data
)
if ret == "TRIGGER":
cmd = json.dumps(
{"api_version": 1, "cmd": "APPLY", "radio_index": f"{_index}"}
)
if ret == "TRIGGER":
cmd = json.dumps(
{"api_version": 1, "cmd": "APPLY", "radio_index": f"{_index}"}
)
ret, info, resp = cc.command.handle_command(cmd, cc)
elif subject in (f"comms.command.{identity}", "comms.identity"):
ret, info, resp = cc.command.handle_command(data, cc)
elif subject == f"comms.status.{identity}":
ret, info = "OK", "Returning current status"
ret, info, resp = cc.command.handle_command(cmd, cc)
elif subject in (f"comms.command.{identity}", "comms.identity"):
ret, info, resp = cc.command.handle_command(data, cc)
elif subject == f"comms.status.{identity}":
ret, info = "OK", "Returning current status"

# Update status info
_ = [item.refresh_status() for item in cc.c_status]
Expand All @@ -431,38 +461,29 @@ async def fmo_message_handler(message):
cc.logger.debug("Sending response: %s", str(response)[:1000])
await message.respond(json.dumps(response).encode("utf-8"))

if agent == "fmo":
await nats_client.subscribe(
f"comms.settings.{identity}", cb=fmo_message_handler
)
await nats_client.subscribe(
f"comms.channel_change.{identity}", cb=fmo_message_handler
)
await nats_client.subscribe(f"comms.status.{identity}", cb=fmo_message_handler)
await nats_client.subscribe("comms.identity", cb=fmo_message_handler)
await nats_client.subscribe(f"comms.command.{identity}", cb=fmo_message_handler)

cc.logger.debug(f"{agent}: comms_nats_controller Listening for requests")
await nats_client.subscribe(
f"comms.settings.{identity}", cb=fmo_message_handler
)
await nats_client.subscribe(
f"comms.channel_change.{identity}", cb=fmo_message_handler
)
await nats_client.subscribe(f"comms.status.{identity}", cb=fmo_message_handler)
await nats_client.subscribe("comms.identity", cb=fmo_message_handler)
await nats_client.subscribe(f"comms.command.{identity}", cb=fmo_message_handler)

cc.logger.debug("FMO comms_nats_controller Listening for requests")

# separate instances needed for FMO/MDM
if agent == "mdm":
mdm = MdmAgent(cc, keyfile, certfile)
monitor = comms_service_discovery.CommsServiceMonitor(
service_name="MDM Service",
service_type="_mdm._tcp.local.",
service_cb=mdm.mdm_server_address_cb,
)
await asyncio.gather(mdm.execute(), monitor.async_run())
else:
while True:
await asyncio.sleep(float(cc.interval) / 1000.0)
try:
if cc.telemetry.visualisation_enabled and agent == "fmo":
msg = cc.telemetry.mesh_visual()
cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg)
await nats_client.publish(f"comms.visual.{identity}", msg.encode())
except Exception as e:
cc.logger.error("Error:", e)
while True:
await asyncio.sleep(float(cc.interval) / 1000.0)
try:
if cc.telemetry.visualisation_enabled:
msg = cc.telemetry.mesh_visual()
cc.logger.debug(f"Publishing comms.visual.{identity}: %s", msg)
await nats_client.publish(f"comms.visual.{identity}", msg.encode())
except Exception as e:
cc.logger.error("Error:", e)


if __name__ == "__main__":
Expand All @@ -475,8 +496,13 @@ async def fmo_message_handler(message):
args = parser.parse_args()

loop = asyncio.new_event_loop()
loop.run_until_complete(
main(args.server, args.port, args.keyfile, args.certfile, agent=args.agent)
)
if args.agent == "mdm":
loop.run_until_complete(
main_mdm(args.keyfile, args.certfile)
)
else:
loop.run_until_complete(
main_fmo(args.server, args.port, args.keyfile, args.certfile)
)
loop.run_forever()
loop.close()

0 comments on commit febde01

Please sign in to comment.