From 5ed0bbaff5e83971d09a482382423122e53778f0 Mon Sep 17 00:00:00 2001 From: Marek Sebera Date: Sun, 12 Nov 2023 14:29:37 +0100 Subject: [PATCH] saving work --- Makefile | 2 +- okdmr/hhb/__main__.py | 5 +- okdmr/hhb/callback_interface.py | 7 - okdmr/hhb/custom_bridge_datagram_protocol.py | 43 -- okdmr/hhb/hhb_repeater_storage.py | 23 + okdmr/hhb/hytera_homebrew_bridge.py | 139 +---- okdmr/hhb/hytera_mmdvm_translator.py | 11 +- okdmr/hhb/hytera_protocols.py | 616 ------------------- okdmr/hhb/hytera_repeater.py | 114 ++++ okdmr/hhb/mmdvm_protocol.py | 54 +- okdmr/hhb/mmdvm_utils.py | 1 - okdmr/hhb/settings.py | 126 +--- okdmr/hhb/snmp.py | 231 ------- okdmr/hhb/utils.py | 5 - okdmr/tests/hhb/test_callback_interface.py | 9 - okdmr/tests/hhb/test_main.py | 4 + okdmr/tests/hhb/test_udp_nat.py | 36 ++ 17 files changed, 250 insertions(+), 1176 deletions(-) delete mode 100644 okdmr/hhb/callback_interface.py delete mode 100644 okdmr/hhb/custom_bridge_datagram_protocol.py create mode 100644 okdmr/hhb/hhb_repeater_storage.py delete mode 100644 okdmr/hhb/hytera_protocols.py create mode 100644 okdmr/hhb/hytera_repeater.py delete mode 100755 okdmr/hhb/snmp.py delete mode 100644 okdmr/tests/hhb/test_callback_interface.py create mode 100644 okdmr/tests/hhb/test_main.py create mode 100644 okdmr/tests/hhb/test_udp_nat.py diff --git a/Makefile b/Makefile index 1f42605..c9eb753 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ .PHONY: test test: python -m coverage erase - PYTHONPATH=. pytest -vrP --cov-report=term-missing --cov=okdmr.hhb --cov-report=xml + pytest -vrP --cov-report=term-missing --cov=okdmr.hhb --cov-report=xml clean: git clean -xdff diff --git a/okdmr/hhb/__main__.py b/okdmr/hhb/__main__.py index 1b7f4c8..05ff684 100644 --- a/okdmr/hhb/__main__.py +++ b/okdmr/hhb/__main__.py @@ -54,6 +54,7 @@ def main(): ) loop = asyncio.new_event_loop() + loop.set_debug(True) asyncio.set_event_loop(loop=loop) # order is IMPORTANT, various asyncio object are created at bridge init # and those must be created after the main loop is created @@ -63,9 +64,11 @@ def main(): loop.add_signal_handler(signal, bridge.stop_running) try: - loop.run_until_complete(future=bridge.go()) + loop.run_until_complete(bridge.go()) + loop.run_forever() except BaseException as e: + mainlog.exception(msg="HHB Main caught") mainlog.exception(msg="", exc_info=e) finally: mainlog.info("Hytera Homebrew Bridge Ended") diff --git a/okdmr/hhb/callback_interface.py b/okdmr/hhb/callback_interface.py deleted file mode 100644 index 1e52e38..0000000 --- a/okdmr/hhb/callback_interface.py +++ /dev/null @@ -1,7 +0,0 @@ -class CallbackInterface: - """ - This interface doesn't have other purpose than code de-duplication - """ - - async def homebrew_connect(self, ip: str, port: int) -> None: - pass diff --git a/okdmr/hhb/custom_bridge_datagram_protocol.py b/okdmr/hhb/custom_bridge_datagram_protocol.py deleted file mode 100644 index d4c0e85..0000000 --- a/okdmr/hhb/custom_bridge_datagram_protocol.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python3 -from asyncio import protocols - -from okdmr.dmrlib.utils.logging_trait import LoggingTrait - -from okdmr.hhb.settings import BridgeSettings -from okdmr.hhb.snmp import SNMP - - -class CustomBridgeDatagramProtocol(protocols.DatagramProtocol, LoggingTrait): - """ - Code de-duplication - """ - - def __init__(self, settings: BridgeSettings) -> None: - """ - - :param settings: - """ - super().__init__() - self.settings = settings - - async def hytera_repeater_obtain_snmp( - self, address: tuple, force: bool = False - ) -> None: - """ - - :param address: - :param force: - :return: - """ - self.settings.hytera_repeater_ip = address[0] - if self.settings.snmp_enabled and ( - force or not self.settings.hytera_snmp_data.get(address[0]) - ): - await SNMP().walk_ip(address, self.settings) - else: - self.log_warning( - f"SNMP is disabled or not available " - f"snmp_enabled:{self.settings.snmp_enabled} " - f"force:{force} " - f"hytera_snmp_data:{address[0] in self.settings.hytera_snmp_data}" - ) diff --git a/okdmr/hhb/hhb_repeater_storage.py b/okdmr/hhb/hhb_repeater_storage.py new file mode 100644 index 0000000..10a31e4 --- /dev/null +++ b/okdmr/hhb/hhb_repeater_storage.py @@ -0,0 +1,23 @@ +from okdmr.dmrlib.storage import ADDRESS_TYPE, ADDRESS_EMPTY +from okdmr.dmrlib.storage.repeater import Repeater +from okdmr.dmrlib.storage.repeater_storage import RepeaterStorage + +from okdmr.hhb.hytera_repeater import HyteraRepeater + + +class HHBRepeaterStorage(RepeaterStorage): + """ """ + + def create_repeater( + self, + dmr_id: int = None, + address_in: ADDRESS_TYPE = ADDRESS_EMPTY, + address_out: ADDRESS_TYPE = ADDRESS_EMPTY, + address_nat: ADDRESS_TYPE = ADDRESS_EMPTY, + ) -> Repeater: + return HyteraRepeater( + address_in=address_in, + address_out=address_out, + dmr_id=dmr_id, + address_nat=address_nat, + ) diff --git a/okdmr/hhb/hytera_homebrew_bridge.py b/okdmr/hhb/hytera_homebrew_bridge.py index a61d19a..68fc0c1 100755 --- a/okdmr/hhb/hytera_homebrew_bridge.py +++ b/okdmr/hhb/hytera_homebrew_bridge.py @@ -1,118 +1,30 @@ #!/usr/bin/env python3 import asyncio -import socket -from asyncio import AbstractEventLoop, Queue -from typing import Optional, Dict +from asyncio import AbstractEventLoop +from typing import Optional +from uuid import UUID -from okdmr.hhb.callback_interface import CallbackInterface -from okdmr.hhb.hytera_mmdvm_translator import HyteraMmdvmTranslator -from okdmr.hhb.hytera_protocols import ( - HyteraP2PProtocol, - HyteraDMRProtocol, - HyteraRDACProtocol, -) -from okdmr.hhb.mmdvm_protocol import MMDVMProtocol -from okdmr.hhb.settings import BridgeSettings - - -class HyteraRepeater(CallbackInterface): - def __init__(self, ip: str, settings: BridgeSettings, asyncloop: AbstractEventLoop): - # ip of hytera repeater - self.ip: str = ip - self.dmr_port: int = 0 - self.settings: BridgeSettings = settings - self.loop: AbstractEventLoop = asyncloop - # message queues for translator - self.queue_mmdvm_outgoing: Queue = Queue() - self.queue_hytera_incoming: Queue = Queue() - self.queue_hytera_outgoing: Queue = Queue() - self.queue_mmdvm_incoming: Queue = Queue() - # homebrew / mmdvm - self.homebrew_protocol: MMDVMProtocol = MMDVMProtocol( - settings=self.settings, - connection_lost_callback=self.homebrew_connection_lost, - queue_outgoing=self.queue_mmdvm_outgoing, - queue_incoming=self.queue_mmdvm_incoming, - hytera_repeater_ip=ip, - ) - self.hytera_dmr_protocol: HyteraDMRProtocol = HyteraDMRProtocol( - settings=self.settings, - queue_incoming=self.queue_hytera_incoming, - queue_outgoing=self.queue_hytera_outgoing, - hytera_repeater_ip=ip, - ) - self.hytera_mmdvm_translator: HyteraMmdvmTranslator = HyteraMmdvmTranslator( - settings=self.settings, - mmdvm_incoming=self.queue_mmdvm_incoming, - hytera_incoming=self.queue_hytera_incoming, - mmdvm_outgoing=self.queue_mmdvm_outgoing, - hytera_outgoing=self.queue_hytera_outgoing, - hytera_repeater_ip=self.ip, - ) - - def homebrew_connection_lost(self, ip: str, port: int) -> None: - asyncio.run(self.homebrew_connect(ip=ip, port=port)) - - async def hytera_dmr_connect(self) -> None: - (transport, _) = await self.loop.create_datagram_endpoint( - lambda: self.hytera_dmr_protocol, - sock=self.settings.hytera_repeater_data[self.ip].dmr_socket, - ) +from okdmr.dmrlib.protocols.hytera.p2p_datagram_protocol import P2PDatagramProtocol +from okdmr.dmrlib.protocols.hytera.rdac_datagram_protocol import RDACDatagramProtocol +from okdmr.dmrlib.storage.repeater_storage import RepeaterStorage +from okdmr.dmrlib.utils.logging_trait import LoggingTrait - async def homebrew_connect(self, ip: str, port: int) -> None: - incorrect_config_params = self.settings.get_incorrect_configurations(ip) - if len(incorrect_config_params) > 0: - self.homebrew_protocol.log_error( - "Current configuration is not valid for connection" - ) - self.settings.print_settings() - for param, current_value, error_message in incorrect_config_params: - self.homebrew_protocol.log_error( - f"PARAM: {param} CURRENT_VALUE: {current_value} MESSAGE: {error_message}" - ) - return - - # target address - hb_target_address = (self.settings.hb_master_host, self.settings.hb_master_port) - # Create Homebrew protocol handler - hb_transport, _ = await self.loop.create_datagram_endpoint( - lambda: self.homebrew_protocol, - local_addr=(self.settings.hb_local_ip, self.settings.hb_local_port), - remote_addr=hb_target_address, - reuse_port=True, - ) - hb_local_socket = hb_transport.get_extra_info("socket") - if isinstance(hb_local_socket, socket.socket): - # Extract bound socket port - self.settings.hb_local_port = hb_local_socket.getsockname()[1] - - async def go(self): - # start DMR protocol - await self.hytera_dmr_connect() - - # start translator tasks - self.loop.create_task(self.hytera_mmdvm_translator.translate_from_mmdvm()) - self.loop.create_task(self.hytera_mmdvm_translator.translate_from_hytera()) - - # mmdvm maintenance (auto login, auth, ping/pong) - self.loop.create_task(self.homebrew_protocol.periodic_maintenance()) - - # send translated or protocol generated packets to respective upstreams - self.loop.create_task(self.hytera_dmr_protocol.send_hytera_from_queue()) - self.loop.create_task(self.homebrew_protocol.send_mmdvm_from_queue()) +from okdmr.hhb.hhb_repeater_storage import HHBRepeaterStorage +from okdmr.hhb.hytera_repeater import HyteraRepeater +from okdmr.hhb.settings import BridgeSettings -class HyteraHomebrewBridge(CallbackInterface): +class HyteraHomebrewBridge(LoggingTrait): def __init__(self, settings_ini_path: str): self.loop: Optional[AbstractEventLoop] = None self.settings: BridgeSettings = BridgeSettings(filepath=settings_ini_path) - self.repeaters: Dict[str, HyteraRepeater] = {} + self.storage: RepeaterStorage = HHBRepeaterStorage() # hytera ipsc: p2p dmr and rdac - self.hytera_p2p_protocol: HyteraP2PProtocol = HyteraP2PProtocol( - settings=self.settings, repeater_accepted_callback=self + self.hytera_p2p_protocol: P2PDatagramProtocol = P2PDatagramProtocol( + storage=self.storage, ) - self.hytera_rdac_protocol: HyteraRDACProtocol = HyteraRDACProtocol( - settings=self.settings, rdac_completed_callback=self + self.hytera_rdac_protocol: RDACDatagramProtocol = RDACDatagramProtocol( + storage=self.storage, callback=lambda u: self.homebrew_connect(u) ) # prepare translator @@ -125,14 +37,10 @@ async def go(self) -> None: if not self.settings.hytera_disable_rdac: await self.hytera_rdac_connect() - async def homebrew_connect(self, ip: str, port: int) -> None: - if not self.repeaters.get(ip): - self.repeaters[ip] = HyteraRepeater( - ip=ip, settings=self.settings, asyncloop=self.loop - ) - await self.repeaters[ip].go() - - await self.repeaters[ip].homebrew_connect(ip=ip, port=port) + async def homebrew_connect(self, repeater_id: UUID) -> None: + rpt: HyteraRepeater = self.storage.match_uuid(repeater_id) + await rpt.go() + await rpt.homebrew_connect() async def hytera_p2p_connect(self) -> None: """ @@ -155,8 +63,11 @@ async def hytera_rdac_connect(self) -> None: ) def stop_running(self) -> None: - for ip, repeater in self.repeaters.items(): - repeater.homebrew_protocol.disconnect() + self.log_info("stop_running called") + + for rpt in self.storage.all(): + if isinstance(rpt, HyteraRepeater): + rpt.homebrew_protocol.disconnect() self.hytera_p2p_protocol.disconnect() self.loop.stop() diff --git a/okdmr/hhb/hytera_mmdvm_translator.py b/okdmr/hhb/hytera_mmdvm_translator.py index c794730..f8fa5a7 100644 --- a/okdmr/hhb/hytera_mmdvm_translator.py +++ b/okdmr/hhb/hytera_mmdvm_translator.py @@ -2,9 +2,11 @@ import asyncio from asyncio import Queue, CancelledError from typing import Optional +from uuid import UUID from kaitaistruct import KaitaiStruct from okdmr.dmrlib.etsi.layer2.burst import Burst +from okdmr.dmrlib.storage.repeater_storage import RepeaterStorage from okdmr.dmrlib.transmission.transmission_watcher import TransmissionWatcher from okdmr.dmrlib.utils.bits_bytes import byteswap_bytes from okdmr.dmrlib.utils.logging_trait import LoggingTrait @@ -29,7 +31,8 @@ def __init__( hytera_outgoing: Queue, mmdvm_incoming: Queue, mmdvm_outgoing: Queue, - hytera_repeater_ip: str, + repeater_id: UUID, + storage: RepeaterStorage, ): self.transmission_watcher: TransmissionWatcher = TransmissionWatcher() self.settings = settings @@ -37,7 +40,9 @@ def __init__( self.queue_hytera_output = hytera_outgoing self.queue_mmdvm_to_translate = mmdvm_incoming self.queue_mmdvm_output = mmdvm_outgoing - self.hytera_ip: str = hytera_repeater_ip + rpt = storage.match_uuid(repeater_id) + + self.hytera_ip: str = rpt.address_out[0] async def translate_from_hytera(self): loop = asyncio.get_running_loop() @@ -81,7 +86,7 @@ async def translate_from_hytera(self): + byteswap_bytes(packet.ipsc_payload) ) - self.queue_mmdvm_output.put_nowait((self.hytera_ip, mmdvm_out)) + self.queue_mmdvm_output.put_nowait((self.ip, mmdvm_out)) else: print( "Hytera BurstInfo not available", diff --git a/okdmr/hhb/hytera_protocols.py b/okdmr/hhb/hytera_protocols.py deleted file mode 100644 index 4f64e00..0000000 --- a/okdmr/hhb/hytera_protocols.py +++ /dev/null @@ -1,616 +0,0 @@ -#!/usr/bin/env python3 -import asyncio -import logging -import socket -from asyncio import transports, Queue -from binascii import hexlify -from typing import Optional, Tuple, Dict - -from kaitaistruct import ValidationNotEqualError, KaitaiStruct -from okdmr.dmrlib.utils.parsing import parse_hytera_data -from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol - -from okdmr.hhb.callback_interface import CallbackInterface -from okdmr.hhb.custom_bridge_datagram_protocol import ( - CustomBridgeDatagramProtocol, -) -from okdmr.hhb.packet_format import ( - common_log_format, -) -from okdmr.hhb.settings import BridgeSettings - - -class HyteraP2PProtocol(CustomBridgeDatagramProtocol): - COMMAND_PREFIX: bytes = bytes([0x50, 0x32, 0x50]) - PING_PREFIX: bytes = bytes([0x0A, 0x00, 0x00, 0x00, 0x14]) - ACK_PREFIX: bytes = bytes([0x0C, 0x00, 0x00, 0x00, 0x14]) - - PACKET_TYPE_REQUEST_REGISTRATION = 0x10 - PACKET_TYPE_REQUEST_DMR_STARTUP = 0x11 - PACKET_TYPE_REQUEST_RDAC_STARTUP = 0x12 - KNOWN_PACKET_TYPES = [ - PACKET_TYPE_REQUEST_DMR_STARTUP, - PACKET_TYPE_REQUEST_RDAC_STARTUP, - PACKET_TYPE_REQUEST_REGISTRATION, - ] - - def __init__( - self, settings: BridgeSettings, repeater_accepted_callback: CallbackInterface - ): - super().__init__(settings) - self.transport: Optional[transports.DatagramTransport] = None - self.repeater_accepted_callback = repeater_accepted_callback - - @staticmethod - def packet_is_command(data: bytes) -> bool: - return data[:3] == HyteraP2PProtocol.COMMAND_PREFIX - - @staticmethod - def packet_is_ping(data: bytes) -> bool: - return data[4:9] == HyteraP2PProtocol.PING_PREFIX - - @staticmethod - def packet_is_ack(data: bytes) -> bool: - return data[4:9] == HyteraP2PProtocol.ACK_PREFIX - - @staticmethod - def command_get_type(data: bytes) -> int: - return data[20] if len(data) > 20 else 0 - - def handle_registration(self, data: bytes, address: Tuple[str, int]) -> None: - data = bytearray(data) - data[3] = 0x50 - # set repeater ID - data[4] += 1 - # set operation result status code - data[13] = 0x01 - data[14] = 0x01 - data[15] = 0x5A - data.append(0x01) - - self.transport.sendto(data, address) - - asyncio.gather(self.hytera_repeater_obtain_snmp(address)) - self.settings.hytera_is_registered[address[0]] = True - self.log_info(f"handle_registration for {address} launching homebrew_connect") - asyncio.get_running_loop().create_task( - self.repeater_accepted_callback.homebrew_connect(address[0], address[1]) - ) - - def handle_rdac_request(self, data: bytes, address: Tuple[str, int]) -> None: - if not self.settings.is_repeater_registered(address[0]): - self.log_debug( - f"Rejecting RDAC request for not-registered repeater {address[0]}" - ) - self.transport.sendto(bytes([0x00]), address) - return - - response_address = (address[0], self.settings.p2p_port) - - data = bytearray(data) - # set RDAC id - data[4] += 1 - # set operation result status code - data[13] = 0x01 - data.append(0x01) - - self.settings.hytera_repeater_data[address[0]].hytera_repeater_ip = address[0] - - self.transport.sendto(data, response_address) - self.log_debug("RDAC Accept for %s:%s" % address) - - # redirect repeater to correct RDAC port - data = self.get_redirect_packet(data, self.settings.rdac_port) - self.transport.sendto(data, response_address) - - @staticmethod - def get_redirect_packet(data: bytearray, target_port: int): - logging.getLogger().debug(f"Providing redirect packet to port {target_port}") - data = data[: len(data) - 1] - data[4] = 0x0B - data[12] = 0xFF - data[13] = 0xFF - data[14] = 0x01 - data[15] = 0x00 - data += bytes([0xFF, 0x01]) - data += target_port.to_bytes(2, "little") - return data - - def handle_dmr_request(self, data: bytes, address: Tuple[str, int]) -> None: - if not self.settings.is_repeater_registered(address[0]): - self.log_debug( - f"Rejecting DMR request for not-registered repeater {address[0]}" - ) - self.transport.sendto(bytes([0x00]), address) - return - - response_address = (address[0], self.settings.p2p_port) - - data = bytearray(data) - # set DMR id - data[4] += 1 - data[13] = 0x01 - data.append(0x01) - - self.transport.sendto(data, response_address) - self.log_debug("DMR Accept for %s:%s" % address) - - data = self.get_redirect_packet( - data, self.settings.get_repeater_dmr_port(address[0]) - ) - self.transport.sendto(data, response_address) - - def handle_ping(self, data: bytes, address: Tuple[str, int]) -> None: - if not self.settings.hytera_is_registered.get(address[0]): - self.log_debug( - f"Rejecting ping request for not-registered repeater {address[0]}" - ) - self.transport.sendto(bytes([0x00]), address) - return - data = bytearray(data) - data[12] = 0xFF - data[14] = 0x01 - self.transport.sendto(data, address) - - def connection_lost(self, exc: Optional[Exception]) -> None: - self.log_debug("connection lost") - if exc: - self.log_exception(exc) - - def connection_made(self, transport: transports.BaseTransport) -> None: - self.transport = transport - sock: socket.socket = transport.get_extra_info("socket") - if sock: - self.log_debug(f"new peer {sock}") - self.log_debug("connection prepared") - - def datagram_received(self, data: bytes, address: Tuple[str, int]) -> None: - packet_type = self.command_get_type(data) - is_command = self.packet_is_command(data) - self.settings.ensure_repeater_data(address) - if is_command: - if packet_type not in self.KNOWN_PACKET_TYPES: - if not self.packet_is_ack(data): - self.log_error("Received %s bytes from %s" % (len(data), address)) - self.log_error(data.hex()) - self.log_error("Idle packet of type:%s received" % packet_type) - if packet_type == self.PACKET_TYPE_REQUEST_REGISTRATION: - self.handle_registration(data, address) - elif packet_type == self.PACKET_TYPE_REQUEST_RDAC_STARTUP: - self.handle_rdac_request(data, address) - elif packet_type == self.PACKET_TYPE_REQUEST_DMR_STARTUP: - self.handle_dmr_request(data, address) - elif self.packet_is_ping(data): - self.handle_ping(data, address) - else: - self.log_error( - "Idle packet received, %d bytes from %s" % (len(data), address) - ) - self.log_debug(data.hex()) - - def send_connection_reset(self): - self.log_debug("Sending Connection Reset") - self.transport.sendto(bytes([0x00])) - - def disconnect(self): - self.log_warning("Self Disconnect") - if self.transport and not self.transport.is_closing(): - self.send_connection_reset() - - -class HyteraRDACProtocol(CustomBridgeDatagramProtocol): - STEP0_REQUEST = bytes( - [0x7E, 0x04, 0x00, 0xFE, 0x20, 0x10, 0x00, 0x00, 0x00, 0x0C, 0x60, 0xE1] - ) - STEP0_RESPONSE = bytes([0x7E, 0x04, 0x00, 0xFD]) - STEP1_REQUEST = bytes( - [ - 0x7E, - 0x04, - 0x00, - 0x00, - 0x20, - 0x10, - 0x00, - 0x01, - 0x00, - 0x18, - 0x9B, - 0x60, - 0x02, - 0x04, - 0x00, - 0x05, - 0x00, - 0x64, - 0x00, - 0x00, - 0x00, - 0x01, - 0xC4, - 0x03, - ] - ) - STEP1_RESPONSE = bytes([0x7E, 0x04, 0x00, 0x10]) - STEP2_RESPONSE = bytes([0x7E, 0x04, 0x00, 0x00]) - STEP3_REQUEST = bytes( - [0x7E, 0x04, 0x00, 0x10, 0x20, 0x10, 0x00, 0x01, 0x00, 0x0C, 0x61, 0xCE] - ) - STEP3_RESPONSE = bytes([0x7E, 0x04, 0x00, 0x00]) - STEP4_REQUEST_1 = bytes( - [0x7E, 0x04, 0x00, 0x10, 0x20, 0x10, 0x00, 0x02, 0x00, 0x0C, 0x61, 0xCD] - ) - STEP4_REQUEST_2 = bytes( - [ - 0x7E, - 0x04, - 0x00, - 0x00, - 0x20, - 0x10, - 0x00, - 0x02, - 0x00, - 0x19, - 0x58, - 0xA0, - 0x02, - 0xD4, - 0x02, - 0x06, - 0x00, - 0x64, - 0x00, - 0x00, - 0x00, - 0x02, - 0x00, - 0xF0, - 0x03, - ] - ) - STEP4_RESPONSE_1 = bytes([0x7E, 0x04, 0x00, 0x10]) - STEP4_RESPONSE_2 = bytes([0x7E, 0x04, 0x00, 0x00]) - STEP6_REQUEST_1 = bytes( - [0x7E, 0x04, 0x00, 0x10, 0x20, 0x10, 0x00, 0x03, 0x00, 0x0C, 0x61, 0xCC] - ) - STEP6_REQUEST_2 = bytes( - [ - 0x7E, - 0x04, - 0x00, - 0x00, - 0x20, - 0x10, - 0x00, - 0x03, - 0x00, - 0x19, - 0x73, - 0x84, - 0x02, - 0xD6, - 0x82, - 0x06, - 0x00, - 0x00, - 0x64, - 0x00, - 0x00, - 0x00, - 0x02, - 0x6E, - 0x03, - ] - ) - STEP6_RESPONSE = bytes([0x7E, 0x04, 0x00, 0x10]) - STEP7_REQUEST = bytes( - [ - 0x7E, - 0x04, - 0x00, - 0x00, - 0x20, - 0x10, - 0x00, - 0x04, - 0x00, - 0x19, - 0x57, - 0x9F, - 0x02, - 0xD4, - 0x02, - 0x06, - 0x00, - 0x64, - 0x00, - 0x00, - 0x00, - 0x02, - 0x01, - 0xEF, - 0x03, - ] - ) - STEP7_RESPONSE_1 = bytes([0x7E, 0x04, 0x00, 0x10]) - STEP7_RESPONSE_2 = bytes([0x7E, 0x04, 0x00, 0x00]) - STEP10_REQUEST = bytes( - [ - 0x7E, - 0x04, - 0x00, - 0x00, - 0x20, - 0x10, - 0x00, - 0x15, - 0x00, - 0x18, - 0x9C, - 0x4B, - 0x02, - 0x05, - 0x00, - 0x05, - 0x00, - 0x64, - 0x00, - 0x00, - 0x00, - 0x01, - 0xC3, - 0x03, - ] - ) - STEP10_RESPONSE_1 = bytes([0x7E, 0x04, 0x00, 0x10]) - STEP10_RESPONSE_2 = bytes([0x7E, 0x04, 0x00, 0x00]) - STEP12_REQUEST_1 = bytes( - [0x7E, 0x04, 0x00, 0x10, 0x20, 0x10, 0x00, 0x15, 0x00, 0x0C, 0x61, 0xBA] - ) - STEP12_REQUEST_2 = bytes( - [0x7E, 0x04, 0x00, 0xFB, 0x20, 0x10, 0x00, 0x16, 0x00, 0x0C, 0x60, 0xCE] - ) - STEP12_RESPONSE = bytes([0x7E, 0x04, 0x00, 0xFA]) - - def __init__( - self, settings: BridgeSettings, rdac_completed_callback: CallbackInterface - ): - super().__init__(settings) - self.transport: Optional[transports.DatagramTransport] = None - self.rdac_completed_callback = rdac_completed_callback - self.step: Dict[str, int] = dict() - - def step0(self, _: bytes, address: Tuple[str, int]) -> None: - self.log_debug("RDAC identification started") - self.step[address[0]] = 1 - self.transport.sendto(self.STEP0_REQUEST, address) - - def step1(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP0_RESPONSE)] == self.STEP0_RESPONSE: - self.step[address[0]] = 2 - self.transport.sendto(self.STEP1_REQUEST, address) - - def step2(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP1_RESPONSE)] == self.STEP1_RESPONSE: - self.step[address[0]] = 3 - - def step3(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP2_RESPONSE)] == self.STEP2_RESPONSE: - self.settings.hytera_repeater_id = int.from_bytes( - data[18:21], byteorder="little" - ) - self.step[address[0]] = 4 - self.transport.sendto(self.STEP3_REQUEST, address) - - def step4(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP3_RESPONSE)] == self.STEP3_RESPONSE: - self.step[address[0]] = 5 - self.transport.sendto(self.STEP4_REQUEST_1, address) - self.transport.sendto(self.STEP4_REQUEST_2, address) - - def step5(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP4_RESPONSE_1)] == self.STEP4_RESPONSE_1: - self.step[address[0]] = 6 - - def step6(self, data: bytes, address: Tuple[str, int]) -> None: - ip: str = address[0] - if data[: len(self.STEP4_RESPONSE_2)] == self.STEP4_RESPONSE_2: - self.settings.hytera_repeater_data[ip].hytera_callsign = ( - data[88:108] - .decode("utf_16_le") - .encode("utf-8") - .strip(b"\x00") - .decode("utf-8") - ) - self.settings.hytera_repeater_data[ip].hytera_hardware = ( - data[120:184] - .decode("utf_16_le") - .encode("utf-8") - .strip(b"\x00") - .decode("utf-8") - ) - self.settings.hytera_repeater_data[ip].hytera_firmware = ( - data[56:88] - .decode("utf_16_le") - .encode("utf-8") - .strip(b"\x00") - .decode("utf-8") - ) - self.settings.hytera_repeater_data[ip].hytera_serial_number = ( - data[184:216] - .decode("utf_16_le") - .encode("utf-8") - .strip(b"\x00") - .decode("utf-8") - ) - self.step[address[0]] = 7 - self.transport.sendto(self.STEP6_REQUEST_1, address) - self.transport.sendto(self.STEP6_REQUEST_2, address) - - def step7(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP6_RESPONSE)] == self.STEP6_RESPONSE: - self.step[address[0]] = 8 - self.transport.sendto(self.STEP7_REQUEST, address) - - def step8(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP7_RESPONSE_1)] == self.STEP7_RESPONSE_1: - self.step[address[0]] = 10 - - def step10(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP7_RESPONSE_2)] == self.STEP7_RESPONSE_2: - self.settings.hytera_repeater_data[address[0]].hytera_repeater_mode = data[ - 26 - ] - self.settings.hytera_repeater_data[ - address[0] - ].hytera_tx_freq = int.from_bytes(data[29:33], byteorder="little") - self.settings.hytera_repeater_data[ - address[0] - ].hytera_rx_freq = int.from_bytes(data[33:37], byteorder="little") - self.step[address[0]] = 11 - self.transport.sendto(self.STEP10_REQUEST, address) - - def step11(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP10_RESPONSE_1)] == self.STEP10_RESPONSE_1: - self.step[address[0]] = 12 - - def step12(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP10_RESPONSE_2)] == self.STEP10_RESPONSE_2: - self.step[address[0]] = 13 - self.transport.sendto(self.STEP12_REQUEST_1, address) - self.transport.sendto(self.STEP12_REQUEST_2, address) - - def step13(self, data: bytes, address: Tuple[str, int]) -> None: - if data[: len(self.STEP12_RESPONSE)] == self.STEP12_RESPONSE: - self.step[address[0]] = 14 - self.log_debug("rdac completed identification") - self.settings.print_repeater_configuration() - asyncio.gather(self.hytera_repeater_obtain_snmp(address)) - self.log_info(f"RDAC step13 for {address} launching homebrew_connect") - asyncio.get_running_loop().create_task( - self.rdac_completed_callback.homebrew_connect(address[0], address[1]) - ) - - def step14(self, data: bytes, address: Tuple[str, int]) -> None: - pass - - def connection_lost(self, exc: Optional[Exception]) -> None: - self.log_info("connection lost") - if exc: - self.log_exception(exc) - - def connection_made(self, transport: transports.BaseTransport) -> None: - self.transport = transport - self.log_debug("connection prepared") - - def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: - self.settings.ensure_repeater_data(addr) - - if not self.step.get(addr[0]): - self.step[addr[0]] = 0 - - if len(data) == 1 and self.step[addr[0]] != 14: - if self.step[addr[0]] == 4: - self.log_error( - "check repeater zone programming, if Digital IP" - "Multi-Site Connect mode allows data pass from timeslots" - ) - self.log_error( - "restart process if response is protocol reset and current step is not 14" - ) - self.step[addr[0]] = 0 - self.step0(data, addr) - elif len(data) != 1 and self.step[addr[0]] == 14: - self.log_error("RDAC finished, received extra data %s" % hexlify(data)) - elif len(data) == 1 and self.step[addr[0]] == 14: - if data[0] == 0x00: - # no data available response - self.transport.sendto(bytes(0x41), addr) - else: - getattr(self, "step%d" % self.step[addr[0]])(data, addr) - - -class HyteraDMRProtocol(CustomBridgeDatagramProtocol): - def __init__( - self, - settings: BridgeSettings, - queue_incoming: Queue, - queue_outgoing: Queue, - hytera_repeater_ip: str, - ) -> None: - super().__init__(settings) - self.transport: Optional[transports.DatagramTransport] = None - self.queue_incoming = queue_incoming - self.queue_outgoing = queue_outgoing - self.ip: str = hytera_repeater_ip - self.log_info( - f"HyteraDMRProtocol on creation expecting ip {self.ip} and port {self.settings.get_repeater_dmr_port(self.ip)}" - ) - - async def send_hytera_from_queue(self) -> None: - while asyncio.get_running_loop().is_running(): - ip, packet = await self.queue_outgoing.get() - assert isinstance(ip, str) - assert isinstance(packet, bytes) - if self.transport and not self.transport.is_closing(): - ipsc = IpSiteConnectProtocol.from_bytes(packet) - self.log_debug( - common_log_format( - proto="HHB->HYTER", - from_ip_port=(), - to_ip_port=(), - use_color=True, - packet_data=ipsc, - dmrdata_hash="", - ) - ) - self.transport.sendto(packet, (ip, self.settings.dmr_port)) - - # notify about outbound done - self.queue_outgoing.task_done() - - def connection_lost(self, exc: Optional[Exception]) -> None: - self.log_info("connection lost") - if exc: - self.log_exception(exc) - - def connection_made(self, transport: transports.BaseTransport) -> None: - self.transport = transport - self.log_debug("connection prepared") - - def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: - if self.ip != addr[0]: - self.log_debug( - f"HyteraDMRProtocol ignore from {addr[0]} expected {self.ip} data {data.hex()}" - ) - return - else: - self.log_debug(f"HyteraDMRProtocol accept from {addr[0]} data {data.hex()}") - - self.settings.ensure_repeater_data(addr) - - try: - hytera_data: KaitaiStruct = parse_hytera_data(data) - self.queue_incoming.put_nowait((addr[0], hytera_data)) - - self.log_debug( - common_log_format( - proto="HYTER->HHB", - from_ip_port=(), - to_ip_port=(), - use_color=True, - packet_data=hytera_data, - dmrdata_hash="", - ) - ) - except EOFError as e: - self.log_error(f"Cannot parse IPSC DMR packet {hexlify(data)} from {addr}") - self.log_exception(e) - except ValidationNotEqualError as e: - self.log_error(f"Cannot parse IPSC DMR packet {hexlify(data)} from {addr}") - self.log_error("Parser for Hytera data failed to match the packet data") - self.log_exception(e) - except BaseException as e: - self.log_error("[datagram_received] unhandled exception") - self.log_exception(e) diff --git a/okdmr/hhb/hytera_repeater.py b/okdmr/hhb/hytera_repeater.py new file mode 100644 index 0000000..2526670 --- /dev/null +++ b/okdmr/hhb/hytera_repeater.py @@ -0,0 +1,114 @@ +import asyncio +import socket +from asyncio import Queue +from logging import Logger + +from okdmr.dmrlib.protocols.hytera.p2p_datagram_protocol import P2PDatagramProtocol +from okdmr.dmrlib.storage import ADDRESS_TYPE, ADDRESS_EMPTY +from okdmr.dmrlib.storage.repeater import Repeater +from okdmr.dmrlib.storage.repeater_storage import RepeaterStorage + +from okdmr.hhb.hytera_mmdvm_translator import HyteraMmdvmTranslator +from okdmr.hhb.mmdvm_protocol import MMDVMProtocol +from okdmr.hhb.settings import BridgeSettings + + +class HyteraRepeater(Repeater): + def __init__( + self, + settings: BridgeSettings, + storage: RepeaterStorage, + dmr_id: int = 0, + callsign: str = "", + serial: str = "", + address_in: ADDRESS_TYPE = ADDRESS_EMPTY, + address_out: ADDRESS_TYPE = ADDRESS_EMPTY, + address_nat: ADDRESS_TYPE = ADDRESS_EMPTY, + snmp_enabled: bool = True, + nat_enabled: bool = False, + logger: Logger = None, + ): + super().__init__( + dmr_id=dmr_id, + callsign=callsign, + serial=serial, + address_in=address_in, + address_out=address_out, + nat_enabled=nat_enabled, + address_nat=address_nat, + snmp_enabled=snmp_enabled, + logger=logger, + ) + + self.storage: RepeaterStorage = storage + self.settings: BridgeSettings = settings + self.dmr_port: int = 0 + # message queues for translator + self.queue_mmdvm_outgoing: Queue = Queue() + self.queue_hytera_incoming: Queue = Queue() + self.queue_hytera_outgoing: Queue = Queue() + self.queue_mmdvm_incoming: Queue = Queue() + # homebrew / mmdvm + self.homebrew_protocol: MMDVMProtocol = MMDVMProtocol( + settings=self.settings, + connection_lost_callback=self.homebrew_connection_lost, + queue_outgoing=self.queue_mmdvm_outgoing, + queue_incoming=self.queue_mmdvm_incoming, + repeater_id=self.id, + storage=self.storage, + ) + self.hytera_dmr_protocol: P2PDatagramProtocol = P2PDatagramProtocol( + storage=self.storage, + p2p_port=self.settings.p2p_port, + rdac_port=self.settings.rdac_port, + ) + self.hytera_mmdvm_translator: HyteraMmdvmTranslator = HyteraMmdvmTranslator( + settings=self.settings, + storage=self.storage, + mmdvm_incoming=self.queue_mmdvm_incoming, + hytera_incoming=self.queue_hytera_incoming, + mmdvm_outgoing=self.queue_mmdvm_outgoing, + hytera_outgoing=self.queue_hytera_outgoing, + repeater_id=self.id, + ) + + def homebrew_connection_lost(self) -> None: + asyncio.run(self.homebrew_connect()) + + async def hytera_dmr_connect(self) -> None: + (transport, _) = await asyncio.get_event_loop().create_datagram_endpoint( + lambda: self.hytera_dmr_protocol, + ) + + async def homebrew_connect(self) -> None: + + # target address + hb_target_address = (self.settings.hb_master_host, self.settings.hb_master_port) + # Create Homebrew protocol handler + hb_transport, _ = await asyncio.get_event_loop().create_datagram_endpoint( + lambda: self.homebrew_protocol, + local_addr=(self.settings.hb_local_ip, self.settings.hb_local_port), + remote_addr=hb_target_address, + reuse_port=True, + ) + hb_local_socket = hb_transport.get_extra_info("socket") + if isinstance(hb_local_socket, socket.socket): + # Extract bound socket port + self.settings.hb_local_port = hb_local_socket.getsockname()[1] + + async def go(self): + # start DMR protocol + await self.hytera_dmr_connect() + + loop = asyncio.get_running_loop() + + # start translator tasks + loop.create_task(self.hytera_mmdvm_translator.translate_from_mmdvm()) + loop.create_task(self.hytera_mmdvm_translator.translate_from_hytera()) + + # mmdvm maintenance (auto login, auth, ping/pong) + loop.create_task(self.homebrew_protocol.periodic_maintenance()) + + # send translated or protocol generated packets to respective upstreams + # loop.create_task(self.hytera_dmr_protocol.send_hytera_from_queue()) + loop.create_task(self.homebrew_protocol.send_mmdvm_from_queue()) diff --git a/okdmr/hhb/mmdvm_protocol.py b/okdmr/hhb/mmdvm_protocol.py index a7c523a..75e8820 100644 --- a/okdmr/hhb/mmdvm_protocol.py +++ b/okdmr/hhb/mmdvm_protocol.py @@ -7,12 +7,13 @@ from hashlib import sha256 from socket import socket from typing import Optional, Callable, Tuple +from uuid import UUID +from okdmr.dmrlib.storage.repeater import Repeater +from okdmr.dmrlib.storage.repeater_storage import RepeaterStorage +from okdmr.dmrlib.utils.logging_trait import LoggingTrait from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020 -from okdmr.hhb.custom_bridge_datagram_protocol import ( - CustomBridgeDatagramProtocol, -) from okdmr.hhb.packet_format import ( common_log_format, get_dmr_data_hash, @@ -21,7 +22,7 @@ from okdmr.hhb.utils import log_mmdvm_configuration -class MMDVMProtocol(CustomBridgeDatagramProtocol): +class MMDVMProtocol(LoggingTrait): CON_NEW: int = 1 CON_LOGIN_REQUEST_SENT: int = 2 CON_LOGIN_RESPONSE_SENT: int = 3 @@ -30,20 +31,21 @@ class MMDVMProtocol(CustomBridgeDatagramProtocol): def __init__( self, + repeater_id: UUID, + storage: RepeaterStorage, settings: BridgeSettings, connection_lost_callback: Callable, queue_outgoing: Queue, queue_incoming: Queue, - hytera_repeater_ip: str, ) -> None: - super().__init__(settings) - self.settings = settings self.transport: Optional[transports.DatagramTransport] = None self.connection_lost_callback = connection_lost_callback self.connection_status: int = self.CON_NEW self.queue_outgoing: Queue = queue_outgoing self.queue_incoming: Queue = queue_incoming - self.ip: str = hytera_repeater_ip + self.repeater_id: UUID = repeater_id + self.storage: RepeaterStorage = storage + self.settings: BridgeSettings = settings async def periodic_maintenance(self) -> None: while not asyncio.get_running_loop().is_closed(): @@ -154,7 +156,7 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: self.connection_status = self.CON_NEW is_handled = True elif isinstance(packet.command_data, Mmdvm2020.TypeDmrData): - self.queue_incoming.put_nowait((self.ip, packet)) + self.queue_incoming.put_nowait((self.repeater_id, packet)) is_handled = True self.log_debug( @@ -173,13 +175,16 @@ def datagram_received(self, data: bytes, addr: Tuple[str, int]) -> None: f"UNHANDLED {packet.__class__.__name__} {packet.command_data.__class__.__name__} {hexlify(data)} status {self.connection_status}" ) + def rpt(self) -> Repeater: + return self.storage.match_uuid(self.repeater_id) + def send_login_request(self) -> None: self.log_info("Sending Login Request") self.connection_status = self.CON_LOGIN_REQUEST_SENT self.queue_outgoing.put_nowait( ( - self.ip, - struct.pack(">4sI", b"RPTL", self.settings.get_repeater_dmrid(self.ip)), + self.repeater_id, + struct.pack(">4sI", b"RPTL", self.rpt().dmr_id), ) ) @@ -189,7 +194,7 @@ def send_login_response(self, challenge: int) -> None: challenge_response = struct.pack( ">4sI32s", b"RPTK", - self.settings.get_repeater_dmrid(self.ip), + self.rpt().dmr_id, a2b_hex( sha256( b"".join( @@ -201,17 +206,18 @@ def send_login_response(self, challenge: int) -> None: ).hexdigest() ), ) - self.queue_outgoing.put_nowait((self.ip, challenge_response)) + self.queue_outgoing.put_nowait((self.repeater_id, challenge_response)) def send_configuration(self) -> None: self.log_info(f"Sending self configuration to master") + rpt = self.rpt() packet = struct.pack( ">4sI8s9s9s2s2s8s9s3s20s19s1s124s40s40s", b"RPTC", - self.settings.get_repeater_dmrid(self.ip), - self.settings.get_repeater_callsign(self.ip)[0:8].ljust(8).encode(), - self.settings.get_repeater_rx_freq(self.ip)[0:9].rjust(9, "0").encode(), - self.settings.get_repeater_tx_freq(self.ip)[0:9].rjust(9, "0").encode(), + rpt.dmr_id, + rpt.callsign[0:8].ljust(8).encode(), + rpt.attr("rx_freq")[0:9].rjust(9, "0").encode(), + rpt.attr("tx_freq")[0:9].rjust(9, "0").encode(), str(self.settings.hb_tx_power & 0xFFFF).rjust(2, "0").encode(), str(self.settings.hb_color_code & 0xF).rjust(2, "0").encode(), self.settings.hb_latitude[0:8].rjust(8, "0").encode(), @@ -227,23 +233,19 @@ def send_configuration(self) -> None: self.settings.hb_package_id[0:40].ljust(40).encode(), ) - self.queue_outgoing.put_nowait((self.ip, packet)) + self.queue_outgoing.put_nowait((self.repeater_id, packet)) config: Mmdvm2020 = Mmdvm2020.from_bytes(packet) log_mmdvm_configuration(logger=self.get_logger(), packet=config) def send_ping(self) -> None: - packet = struct.pack( - ">7sI", b"RPTPING", self.settings.get_repeater_dmrid(self.ip) - ) - self.queue_outgoing.put_nowait((self.ip, packet)) + packet = struct.pack(">7sI", b"RPTPING", self.rpt().dmr_id) + self.queue_outgoing.put_nowait((self.repeater_id, packet)) def send_closing(self) -> None: self.log_info("Closing MMDVM connection") - packet = struct.pack( - ">5sI", b"RPTCL", self.settings.get_repeater_dmrid(self.ip) - ) - self.queue_outgoing.put_nowait((self.ip, packet)) + packet = struct.pack(">5sI", b"RPTCL", self.rpt().dmr_id) + self.queue_outgoing.put_nowait((self.repeater_id, packet)) def disconnect(self) -> None: if self.transport and not self.transport.is_closing(): diff --git a/okdmr/hhb/mmdvm_utils.py b/okdmr/hhb/mmdvm_utils.py index 958d808..34059c0 100644 --- a/okdmr/hhb/mmdvm_utils.py +++ b/okdmr/hhb/mmdvm_utils.py @@ -5,7 +5,6 @@ from okdmr.dmrlib.etsi.layer2.burst import Burst from okdmr.dmrlib.etsi.layer2.elements.data_types import DataTypes from okdmr.dmrlib.etsi.layer2.elements.voice_bursts import VoiceBursts -from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020 from okdmr.kaitai.hytera.ip_site_connect_protocol import IpSiteConnectProtocol diff --git a/okdmr/hhb/settings.py b/okdmr/hhb/settings.py index d0b4a35..c1c6cce 100644 --- a/okdmr/hhb/settings.py +++ b/okdmr/hhb/settings.py @@ -1,44 +1,12 @@ #!/usr/bin/env python3 import configparser -import socket -from typing import Dict, Tuple from okdmr.dmrlib.utils.logging_trait import LoggingTrait _UNSET = object() -class HyteraRepeaterData: - def __init__(self, ipsc_ip: str): - self.hytera_repeater_id: int = 0 - self.hytera_callsign: str = "" - self.hytera_hardware: str = "" - self.hytera_firmware: str = "" - self.hytera_serial_number: str = "" - self.hytera_repeater_mode: int = 0 - self.hytera_tx_freq: int = 0 - self.hytera_rx_freq: int = 0 - self.hytera_repeater_ip: str = "" - self.dmr_socket: socket.socket = HyteraRepeaterData.create_dmr_socket( - ipsc_ip=ipsc_ip - ) - self.dmr_port: int = self.dmr_socket.getsockname()[1] - - @staticmethod - def create_dmr_socket(ipsc_ip: str) -> socket.socket: - # create socket manually, to be able to find out the free udp port used - sock = socket.socket( - type=socket.SocketKind.SOCK_DGRAM, proto=socket.IPPROTO_UDP - ) - sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - sock.bind((ipsc_ip, 0)) - return sock - - def __repr__(self) -> str: - return f"[DMRID: {self.hytera_repeater_id}] [IP: {self.hytera_repeater_ip}] [CALL: {self.hytera_callsign}]" - - class BridgeSettings(LoggingTrait): SECTION_GENERAL = "general" SECTION_HOMEBREW = "homebrew" @@ -81,13 +49,15 @@ def __init__(self, filepath: str = None, filedata: str = None) -> None: if filepath and filedata: raise SystemError( - "Both filename and filedata provided, this is unsupported, choose one" + "Both filename and filedata provided, this is not supported, choose one" ) parser = configparser.ConfigParser() parser.sections() if filepath: - parser.read(filenames=filepath) + cnt_read_files = parser.read(filenames=filepath) + if not cnt_read_files: + raise SystemError("No configuration file was successfully read") else: parser.read_string(string=filedata) @@ -105,8 +75,11 @@ def __init__(self, filepath: str = None, filedata: str = None) -> None: % (self.hb_protocol, self.MMDVM_KNOWN_PROTOCOLS) ) + # single mmdvm upstream self.hb_master_host = parser.get(self.SECTION_HOMEBREW, "master_ip") self.hb_master_port = parser.getint(self.SECTION_HOMEBREW, "master_port") + + # single local ip/port for connecting to mmdvm upstream, and password self.hb_local_ip = parser.get(self.SECTION_HOMEBREW, "local_ip") self.hb_local_port = parser.getint( self.SECTION_HOMEBREW, "local_port", fallback=0 @@ -174,13 +147,6 @@ def __init__(self, filepath: str = None, filedata: str = None) -> None: self.SECTION_IPSC, "disable_rdac", fallback=False ) - # hytera_protocols variables - self.hytera_is_registered: Dict[str, bool] = dict() - self.hytera_snmp_data: Dict[str, dict] = dict() - - # hytera repeater data - self.hytera_repeater_data: Dict[str, HyteraRepeaterData] = dict() - @staticmethod def getint_safe( parser: configparser.ConfigParser, section: str, key: str, fallback=_UNSET @@ -202,77 +168,6 @@ def getint_safe( raise return fallback - def get_repeater_dmr_port(self, ip: str) -> int: - return self.hytera_repeater_data.get( - ip, HyteraRepeaterData(self.ipsc_ip) - ).dmr_port - - def get_repeater_rx_freq(self, ip: str) -> str: - from okdmr.hhb import snmp - - return str( - self.hb_rx_freq - or self.hytera_repeater_data.get( - ip, HyteraRepeaterData(self.ipsc_ip) - ).hytera_rx_freq - or self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_RX_FREQUENCE) - ) - - def get_repeater_tx_freq(self, ip: str) -> str: - from okdmr.hhb import snmp - - return str( - self.hb_tx_freq - or self.hytera_repeater_data.get( - ip, HyteraRepeaterData(self.ipsc_ip) - ).hytera_tx_freq - or self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_TX_FREQUENCE) - ) - - def get_repeater_callsign(self, ip: str) -> str: - from okdmr.hhb import snmp - - return ( - self.hb_callsign - or self.hytera_repeater_data.get( - ip, HyteraRepeaterData(self.ipsc_ip) - ).hytera_callsign - or self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_RADIO_ALIAS) - ) - - def get_repeater_dmrid(self, ip: str) -> int: - from okdmr.hhb import snmp - - return int( - self.hb_repeater_dmr_id - or self.hytera_repeater_data.get( - ip, HyteraRepeaterData(self.ipsc_ip) - ).hytera_repeater_id - or self.hytera_snmp_data.get(ip, {}).get(snmp.SNMP.OID_RADIO_ID) - or 0 - ) - - def get_incorrect_configurations(self, ip: str) -> list: - rtn: list = list() - - generic_error_message: str = ( - "Value might have not been configured and was not obtained in Hytera repeater " - "configuration process (either P2P, RDAC or SNMP) " - ) - - repeater_id = self.get_repeater_dmrid(ip=ip) - if repeater_id < 1: - rtn.append(("homebrew.repeater_dmr_id", repeater_id, generic_error_message)) - - repeater_callsign = self.get_repeater_callsign(ip=ip) - if not repeater_callsign: - rtn.append(("homebrew.callsign", repeater_callsign, generic_error_message)) - - return rtn - - def is_repeater_registered(self, repeater_ip: str) -> bool: - return repeater_ip in self.hytera_is_registered.keys() - def print_settings(self) -> None: self.log_info("Settings Loaded") self.log_info( @@ -282,10 +177,3 @@ def print_settings(self) -> None: self.log_info( f"Upstream Homebrew/MMDVM server is expected at {self.hb_master_host}:{self.hb_master_port}\n" ) - - def print_repeater_configuration(self): - pass - - def ensure_repeater_data(self, address: Tuple[str, int]): - if not self.hytera_repeater_data.get(address[0], None): - self.hytera_repeater_data[address[0]] = HyteraRepeaterData(self.ipsc_ip) diff --git a/okdmr/hhb/snmp.py b/okdmr/hhb/snmp.py deleted file mode 100755 index f513c8e..0000000 --- a/okdmr/hhb/snmp.py +++ /dev/null @@ -1,231 +0,0 @@ -#!/usr/bin/env python3 -import logging -import sys - -import asyncio - -import puresnmp -from okdmr.dmrlib.utils.logging_trait import LoggingTrait - -from okdmr.hhb.settings import BridgeSettings -from okdmr.hhb.utils import octet_string_to_utf8 - - -class SNMP(LoggingTrait): - # in milli-volts (V * 1000) - OID_PSU_VOLTAGE: str = "1.3.6.1.4.1.40297.1.2.1.2.1.0" - # in milli-celsius (C * 1000) - OID_PA_TEMPERATURE: str = "1.3.6.1.4.1.40297.1.2.1.2.2.0" - # voltage ratio on the TX in dB - OID_VSWR: str = "1.3.6.1.4.1.40297.1.2.1.2.4.0" - # Forward power in milli-watt - OID_TX_FWD_POWER: str = "1.3.6.1.4.1.40297.1.2.1.2.5.0" - # Reflected power in milli-watt - OID_TX_REF_POWER: str = "1.3.6.1.4.1.40297.1.2.1.2.6.0" - OID_RSSI_TS1: str = "1.3.6.1.4.1.40297.1.2.1.2.9.0" - OID_RSSI_TS2: str = "1.3.6.1.4.1.40297.1.2.1.2.10.0" - - OID_REPEATER_MODEL: str = "1.3.6.1.4.1.40297.1.2.4.1.0" - OID_MODEL_NUMBER: str = "1.3.6.1.4.1.40297.1.2.4.2.0" - # string - OID_FIRMWARE_VERSION: str = "1.3.6.1.4.1.40297.1.2.4.3.0" - # Radio Data Version, string - OID_RCDB_VERSION: str = "1.3.6.1.4.1.40297.1.2.4.4.0" - OID_SERIAL_NUMBER: str = "1.3.6.1.4.1.40297.1.2.4.5.0" - # callsign - OID_RADIO_ALIAS: str = "1.3.6.1.4.1.40297.1.2.4.6.0" - # integer - OID_RADIO_ID: str = "1.3.6.1.4.1.40297.1.2.4.7.0" - # digital=0, analog=1, mixed=2 - OID_CUR_CHANNEL_MODE: str = "1.3.6.1.4.1.40297.1.2.4.8.0" - OID_CUR_CHANNEL_NAME: str = "1.3.6.1.4.1.40297.1.2.4.9.0" - # Hz - OID_TX_FREQUENCE: str = "1.3.6.1.4.1.40297.1.2.4.10.0" - # Hz - OID_RX_FREQUENCE: str = "1.3.6.1.4.1.40297.1.2.4.11.0" - # receive=0, transmit=1 - OID_WORK_STATUS: str = "1.3.6.1.4.1.40297.1.2.4.12.0" - OID_CUR_ZONE_ALIAS: str = "1.3.6.1.4.1.40297.1.2.4.13.0" - - READABLE_LABELS = { - OID_PSU_VOLTAGE: ("PSU Voltage", "%d mV"), - OID_PA_TEMPERATURE: ("PA Temperature", "%d m°C"), - OID_VSWR: ("VSWR", "%d dB"), - OID_TX_FWD_POWER: ("TX Forward Power", "%d mW"), - OID_TX_REF_POWER: ("TX Reflected Power", "%d mW"), - OID_RSSI_TS1: ("RSSI TS1", "%d dB"), - OID_RSSI_TS2: ("RSSI TS2", "%d dB"), - OID_REPEATER_MODEL: ("Repeater Model", "%s"), - OID_MODEL_NUMBER: ("Repeater Model Identification", "%s"), - OID_FIRMWARE_VERSION: ("Repeater Firmware", "%s"), - OID_RCDB_VERSION: ("Repeater Radio Data (RCDB)", "%s"), - OID_SERIAL_NUMBER: ("Repeater Serial No.", "%s"), - OID_RADIO_ALIAS: ("Radio Alias (Callsign)", "%s"), - OID_RADIO_ID: ("Repeater ID", "%d"), - OID_CUR_CHANNEL_NAME: ("Current Channel Name", "%s"), - OID_CUR_CHANNEL_MODE: ( - "Current Channel Zone (0=DIGITAL, 1=ANALOG, 2=MIXED)", - "%d", - ), - OID_TX_FREQUENCE: ("TX Frequence", "%d Hz"), - OID_RX_FREQUENCE: ("RX Frequence", "%d Hz"), - OID_WORK_STATUS: ("Work Status (0=RECEIVE, 1=TRANSMIT)", "%d"), - OID_CUR_ZONE_ALIAS: ("Current Zone Alias", "%s"), - } - - ALL_STRINGS = ( - OID_REPEATER_MODEL, - OID_MODEL_NUMBER, - OID_FIRMWARE_VERSION, - OID_RCDB_VERSION, - OID_RADIO_ALIAS, - OID_CUR_ZONE_ALIAS, - OID_SERIAL_NUMBER, - OID_CUR_CHANNEL_NAME, - ) - - ALL_FLOATS = ( - OID_PSU_VOLTAGE, - OID_VSWR, - OID_PA_TEMPERATURE, - OID_TX_FWD_POWER, - OID_TX_REF_POWER, - ) - - ALL_KNOWN = ( - OID_PSU_VOLTAGE, - OID_PA_TEMPERATURE, - OID_VSWR, - OID_TX_FWD_POWER, - OID_TX_REF_POWER, - OID_RSSI_TS1, - OID_RSSI_TS2, - OID_REPEATER_MODEL, - OID_MODEL_NUMBER, - OID_FIRMWARE_VERSION, - OID_RCDB_VERSION, - OID_SERIAL_NUMBER, - OID_RADIO_ALIAS, - OID_RADIO_ID, - OID_CUR_CHANNEL_MODE, - OID_CUR_CHANNEL_NAME, - OID_TX_FREQUENCE, - OID_RX_FREQUENCE, - OID_WORK_STATUS, - OID_CUR_ZONE_ALIAS, - ) - - OID_WALK_BASE_1: str = "1.3.6.1.4.1.40297.1.2.4" - OID_WALK_BASE_2: str = "1.3.6.1.4.1.40297.1.2.1.2" - - async def walk_ip( - self, address: tuple, settings_storage: BridgeSettings, first_try: bool = True - ) -> dict: - ip, port = address - is_success: bool = False - - if not settings_storage.hytera_snmp_data.get(address[0]): - settings_storage.hytera_snmp_data[address[0]] = {} - - other_family: str = ( - "public" if settings_storage.snmp_family == "hytera" else "hytera" - ) - client = puresnmp.PyWrapper( - puresnmp.Client( - ip=ip, credentials=puresnmp.V1(settings_storage.snmp_family) - ) - ) - - try: - for oid in SNMP.ALL_KNOWN: - snmp_result = await client.get(oid=oid) - - if oid in SNMP.ALL_STRINGS: - snmp_result = octet_string_to_utf8(str(snmp_result, "utf8")) - elif oid in SNMP.ALL_FLOATS: - snmp_result = int.from_bytes(snmp_result, byteorder="big") - settings_storage.hytera_snmp_data[address[0]][oid] = snmp_result - is_success = True - except SystemError: - self.log_error("SNMP failed to obtain repeater info") - except asyncio.Timeout: - if first_try: - self.log_debug( - "Failed with SNMP family %s, trying with %s as well" - % (settings_storage.snmp_family, other_family) - ) - settings_storage.snmp_family = other_family - await self.walk_ip( - address=address, settings_storage=settings_storage, first_try=False - ) - else: - self.log_error( - "SNMP failed, maybe try changing setting.ini [snmp] family = %s" - % other_family - ) - except BaseException as e: - self.log_exception("Unhandled exception") - self.log_exception(e) - - if is_success: - self.print_snmp_data(settings_storage=settings_storage, address=address) - - return settings_storage.hytera_snmp_data[address[0]] - - def print_snmp_data(self, settings_storage: BridgeSettings, address: tuple): - self.log_info( - "-------------- REPEATER SNMP CONFIGURATION ----------------------------" - ) - # ip address longest 15 letters (255.255.255.255) - longest_label = 15 - for key in SNMP.READABLE_LABELS: - label_len = len(SNMP.READABLE_LABELS.get(key)[0]) - if label_len > longest_label: - longest_label = label_len - - if len(address) == 2: - # log IP address first - self.log_info( - "%s| %s" - % ( - str("IP Address").ljust(longest_label + 5), - f"{address[0]} (port {address[1]})", - ) - ) - - for key in settings_storage.hytera_snmp_data[address[0]]: - print_settings = SNMP.READABLE_LABELS.get(key) - if print_settings: - value = settings_storage.hytera_snmp_data[address[0]].get(key) - self.log_info( - "%s| %s" - % ( - str(print_settings[0]).ljust(longest_label + 5), - print_settings[1] % value, - ) - ) - self.log_info( - "-------------- REPEATER SNMP CONFIGURATION ----------------------------" - ) - - -if __name__ == "__main__": - if len(sys.argv) < 2: - print("use as snmp.py ") - exit(1) - - logging.basicConfig(level=logging.INFO) - # suppress puresnmp verbose/debug logs - logging.getLogger("puresnmp.transport").setLevel(logging.INFO) - # suppress puresnmp_plugins experimental warning - if not sys.warnoptions: - import warnings - - warnings.filterwarnings( - message="Experimental SNMPv1 support", category=UserWarning, action="ignore" - ) - - settings: BridgeSettings = BridgeSettings(filedata=BridgeSettings.MINIMAL_SETTINGS) - _target: SNMP = SNMP() - _target_address: tuple = (sys.argv[1], 0) - asyncio.run(_target.walk_ip(_target_address, settings_storage=settings)) diff --git a/okdmr/hhb/utils.py b/okdmr/hhb/utils.py index bf218a2..f60b503 100644 --- a/okdmr/hhb/utils.py +++ b/okdmr/hhb/utils.py @@ -1,6 +1,5 @@ #!/usr/bin/env python3 import logging -import string from okdmr.kaitai.homebrew.mmdvm2020 import Mmdvm2020 @@ -9,10 +8,6 @@ def half_byte_to_bytes(half_byte: int, output_bytes: int = 2) -> bytes: return bytes([half_byte | half_byte << 4]) * output_bytes -def octet_string_to_utf8(octets: str) -> str: - return "".join(filter(lambda c: c in string.printable, octets)) - - def assemble_hytera_ipsc_sync_packet( is_private_call: bool, source_id: int, diff --git a/okdmr/tests/hhb/test_callback_interface.py b/okdmr/tests/hhb/test_callback_interface.py deleted file mode 100644 index ae4cbc2..0000000 --- a/okdmr/tests/hhb/test_callback_interface.py +++ /dev/null @@ -1,9 +0,0 @@ -from okdmr.hhb.callback_interface import CallbackInterface -from okdmr.hhb.hytera_homebrew_bridge import HyteraRepeater, HyteraHomebrewBridge - - -def test_do_succ(): - c = CallbackInterface() - assert isinstance(c, CallbackInterface) - assert issubclass(HyteraRepeater, CallbackInterface) - assert issubclass(HyteraHomebrewBridge, CallbackInterface) diff --git a/okdmr/tests/hhb/test_main.py b/okdmr/tests/hhb/test_main.py new file mode 100644 index 0000000..d854621 --- /dev/null +++ b/okdmr/tests/hhb/test_main.py @@ -0,0 +1,4 @@ +def test_main_import(): + from okdmr.hhb.__main__ import main + + assert main diff --git a/okdmr/tests/hhb/test_udp_nat.py b/okdmr/tests/hhb/test_udp_nat.py new file mode 100644 index 0000000..051920f --- /dev/null +++ b/okdmr/tests/hhb/test_udp_nat.py @@ -0,0 +1,36 @@ +import asyncio +import sys +from asyncio import DatagramProtocol +from asyncio import transports +from typing import Any, Union + +if __name__ == "__main__": + listen_ip: str = sys.argv[1] + listen_port: int = int(sys.argv[2]) + target_ip: str = sys.argv[3] + target_port: int = int(sys.argv[4]) + print(f"Listen {listen_ip}:{listen_port} \nTarget {target_ip}:{target_port}") + + +class SimpleProtocol(DatagramProtocol): + def datagram_received(self, data: bytes, addr: tuple[Union[str, Any], int]) -> None: + print(f"datagram_received from {addr} data {data.hex()}") + self.transport.sendto(bytes([0x44, 0x55, 0x66]), addr) + exit() + + def connection_made(self, transport: transports.DatagramTransport) -> None: + print(f"connection_made {transport}") + self.transport: transports.DatagramTransport = transport + transport.sendto(bytes([0x11, 0x22, 0x33]), (target_ip, target_port)) + + +async def main() -> None: + proto = SimpleProtocol() + _transport, _protocol = await asyncio.get_running_loop().create_datagram_endpoint( + lambda: proto, local_addr=(listen_ip, listen_port) + ) + await asyncio.sleep(60) + + +if __name__ == "__main__": + asyncio.run(main())