diff --git a/modules/sc-mesh-secure-deployment/src/nats/coverage_report.txt b/modules/sc-mesh-secure-deployment/src/nats/coverage_report.txt index 83fee8b8..3992afb8 100644 --- a/modules/sc-mesh-secure-deployment/src/nats/coverage_report.txt +++ b/modules/sc-mesh-secure-deployment/src/nats/coverage_report.txt @@ -2,8 +2,8 @@ Name Stmts Miss Cover Missing ----------------------------------------------------------------- mdm_agent.py 458 110 76% 115-117, 190, 209-210, 215-216, 246, 250-254, 265-273, 282-284, 290, 310-319, 325-353, 364, 378-379, 388-392, 406, 408, 441-456, 464-474, 492-496, 591, 650, 661-667, 680-681, 702, 713-719, 740-741, 748-749, 756-757, 787-793, 796-817, 825-827, 833, 853-854, 864, 921 src/__init__.py 0 0 100% -src/bat_ctrl_utils.py 130 19 85% 40-41, 78-79, 135-136, 161-162, 173, 193-194, 224-225, 254-263, 277-279 -src/cbma_adaptation.py 458 372 19% 81-82, 90-102, 112-181, 184-192, 195-219, 225-239, 242-249, 257-294, 297-302, 309-319, 322-325, 328-338, 341-348, 352-399, 403-419, 422-462, 470-483, 486-511, 518-532, 536-563, 578-606, 616-647, 657-668, 671-682, 693-721, 729-765, 774-845, 848-855, 864-874, 882-891 +src/bat_ctrl_utils.py 129 16 88% 77-78, 134-135, 160-161, 172, 192-193, 223-224, 253-254, 276-278 +src/cbma_adaptation.py 518 53 90% 130-131, 257-258, 333, 381-382, 434-436, 502, 546-549, 563, 590-592, 629-630, 647-648, 653-657, 660-661, 671, 704-705, 735-736, 775-777, 790, 806-810, 826-827, 867-868, 899, 913-924, 947-948, 978-979 src/cbma_paths.py 7 0 100% src/comms_command.py 192 39 80% 88, 90, 94-102, 105, 107, 112, 117, 121, 142-144, 147-150, 155-157, 164, 175, 189-190, 235-240, 251, 262, 291, 309-310, 322, 336-337, 349, 373-375 src/comms_common.py 26 0 100% @@ -11,14 +11,15 @@ src/comms_config_store.py 21 0 100% src/comms_controller.py 38 5 87% 33, 48, 79-82 src/comms_if_monitor.py 56 2 96% 95-96 src/comms_service_discovery.py 88 14 84% 114-115, 147, 152-153, 162, 180-211 -src/comms_settings.py 253 27 89% 12-13, 156-157, 172-192, 257-264, 320-323, 363, 370 +src/comms_settings.py 253 25 90% 156-157, 172-192, 257-264, 320-323, 363, 370 src/comms_status.py 302 23 92% 70, 194, 225-227, 240-242, 247, 251-253, 257-261, 270, 278, 294, 302, 330, 433, 436-438 -src/constants.py 44 0 100% +src/constants.py 47 0 100% src/interface.py 5 0 100% src/validation.py 104 2 98% 221-222 tests/__init__.py 0 0 100% tests/service_discovery_helper.py 22 0 100% tests/test_bat_ctrl_utils.py 129 10 92% 232-233, 241-242, 245-246, 249-250, 253-254 +tests/test_cbma_adaptation.py 248 8 97% 274-277, 282-283, 292-293 tests/test_command.py 210 0 100% tests/test_config_store.py 26 0 100% tests/test_constants.py 27 0 100% @@ -30,6 +31,6 @@ tests/test_settings.py 173 0 100% tests/test_status.py 128 8 94% 28-35 tests/test_validation.py 146 0 100% ----------------------------------------------------------------- -TOTAL 3621 638 82% +TOTAL 3931 322 92% Not tested files as not MDM content or tested elsewhere: batadvvis.py,batstat.py,fmo_agent.py,comms_nats_discovery.py,cbma/*,debug_tests/*,comms_mesh_telemetry.py,comms_interface_info.py diff --git a/modules/sc-mesh-secure-deployment/src/nats/run_unittests_PC.sh b/modules/sc-mesh-secure-deployment/src/nats/run_unittests_PC.sh index d2532f1f..c7979d0c 100755 --- a/modules/sc-mesh-secure-deployment/src/nats/run_unittests_PC.sh +++ b/modules/sc-mesh-secure-deployment/src/nats/run_unittests_PC.sh @@ -26,8 +26,9 @@ python3 -m venv unittest source unittest/bin/activate # install dependencies to virtualenv -pip install coverage==7.4.4 # this is for testing purpose pip install -r requirements.txt +# install testing only related dependencies +pip install -r ./tests/requirements.txt # List of files not to used for coverage calculation. # Files tested elsewhere or not needed to be tested or not mesh shield content @@ -43,3 +44,9 @@ echo -e "Not tested files as not MDM content or tested elsewhere:\n $not_used" > # deactivate virtualenv deactivate +# Clean up __pycache__ directories +find . -type d -name '__pycache__' -exec rm -rf {} + +# Clean up unittest venv +rm -rf unittest +# Clean up coverage tool's SQL database +rm -f .coverage diff --git a/modules/sc-mesh-secure-deployment/src/nats/src/bat_ctrl_utils.py b/modules/sc-mesh-secure-deployment/src/nats/src/bat_ctrl_utils.py index e7ad3b85..a21f94e0 100644 --- a/modules/sc-mesh-secure-deployment/src/nats/src/bat_ctrl_utils.py +++ b/modules/sc-mesh-secure-deployment/src/nats/src/bat_ctrl_utils.py @@ -4,7 +4,6 @@ import logging import subprocess from typing import Optional, Union -import re from pyroute2 import IPRoute # type: ignore[import-not-found, import-untyped] from src.constants import Constants @@ -73,8 +72,8 @@ def create_batman_interface( ip.link("add", ifname=batman_if, kind="batadv", address=mac_addr) else: ip.link("add", ifname=batman_if, kind="batadv") - # Add interface to the book keeping list - self.__bat_interfaces.append(batman_if) + # Add interface to the book keeping list + self.__bat_interfaces.append(batman_if) except Exception as e: self.logger.error( "Error creating Batman interface %s: %s", diff --git a/modules/sc-mesh-secure-deployment/src/nats/src/cbma_adaptation.py b/modules/sc-mesh-secure-deployment/src/nats/src/cbma_adaptation.py index e304192d..7dcb4fb2 100644 --- a/modules/sc-mesh-secure-deployment/src/nats/src/cbma_adaptation.py +++ b/modules/sc-mesh-secure-deployment/src/nats/src/cbma_adaptation.py @@ -12,9 +12,9 @@ import json import random import fnmatch -import re import ipaddress -from pyroute2 import IPRoute # type: ignore[import-not-found, import-untyped] +import errno +from pyroute2 import IPRoute, NetlinkError, arp # type: ignore[import-not-found, import-untyped] from src import cbma_paths from src.comms_controller import CommsController @@ -30,7 +30,7 @@ # pylint: disable=broad-except, invalid-name, too-many-instance-attributes class CBMAAdaptation(object): """ - CBMA Control + CBMA Adaptation """ def __init__( @@ -60,6 +60,7 @@ def __init__( self.BR_NAME: str = Constants.BR_NAME.value self.LOWER_BATMAN: str = Constants.LOWER_BATMAN.value self.UPPER_BATMAN: str = Constants.UPPER_BATMAN.value + self.ALLOWED_KIND_LIST = {"vlan", "batadv"} # Set minimum required configuration self.__white_interfaces = [self.LOWER_BATMAN] @@ -85,29 +86,106 @@ def __init__( self.__cbma_config = self.__config.read("CBMA") self.__vlan_config = self.__config.read("VLAN") - # Extend/update default configs using yaml file content + # Create VLAN interfaces if configured + self.__create_vlan_interfaces() + if self.__cbma_config: - white_interfaces = self.__cbma_config.get("white_interfaces") - red_interfaces = self.__cbma_config.get("red_interfaces") - exclude_interfaces = self.__cbma_config.get("exclude_interfaces") + white_interfaces = self.__cbma_config.get("white_interfaces", []) + red_interfaces = self.__cbma_config.get("red_interfaces", []) + exclude_interfaces = self.__cbma_config.get("exclude_interfaces", []) self.logger.info(f"White interfaces config: {white_interfaces}") self.logger.info(f"Red interfaces config: {red_interfaces}") self.logger.info(f"Exclude interfaces config: {exclude_interfaces}") - if white_interfaces: + valid = self.__validate_cbma_config( + exclude_interfaces, white_interfaces, red_interfaces + ) + # Extend/update default configs using validated yaml file content + if valid: self.__white_interfaces.extend(white_interfaces) - if red_interfaces: self.__red_interfaces.extend(red_interfaces) - if exclude_interfaces: self.__na_cbma_interfaces.extend(exclude_interfaces) - # Create VLAN interfaces if configured - self.__create_vlan_interfaces() + def __validate_cbma_config( + self, + exclude_interfaces: List[str], + white_interfaces: List[str], + red_interfaces: List[str], + ) -> bool: + # Init empty list + black_interfaces: List[str] = [] + + # Validate input param types + if not isinstance(white_interfaces, list) or \ + not isinstance(red_interfaces, list) or \ + not isinstance(exclude_interfaces, list): + self.logger.error("Input params are not lists!") + return False - def __create_vlan_interfaces(self) -> None: - if not self.__vlan_config: - self.logger.info("No VLAN configuration found") - return + if any( + Constants.LOWER_BATMAN.value in interface_list + or Constants.UPPER_BATMAN.value in interface_list + for interface_list in [exclude_interfaces, white_interfaces, red_interfaces] + ): + self.logger.error("bat0/bat1 should not exist in CBMA configs!") + return False + + with self.__lock: + self.__get_interfaces() + interfaces = deepcopy(self.__interfaces) + + for interface in interfaces: + black_interfaces.append(interface.interface_name) + # Remove interfaces that do not have a certificate + filtered_black_interfaces = [] + for interface in black_interfaces: + mac_addr = self.__get_mac_addr(interface) + if self.__has_certificate(self.__cbma_certs_path, mac_addr): + filtered_black_interfaces.append(interface) + + black_interfaces = filtered_black_interfaces + if not black_interfaces: + self.logger.error("No valid black interfaces!") + return False + + # Remove interfaces in exclude_config list from black_interfaces + black_interfaces = [ + interface + for interface in black_interfaces + if interface not in exclude_interfaces + ] + if not black_interfaces: + self.logger.error("No black interfaces left if applied exclude_interfaces!") + return False + + # Remove interfaces in white_interfaces list from black_interfaces + black_interfaces = [ + interface + for interface in black_interfaces + if interface not in white_interfaces + ] + if not black_interfaces: + self.logger.error("No black interfaces left if applied white_interfaces!") + return False + + # Remove interfaces in red_interfaces list from black_interfaces + black_interfaces = [ + interface + for interface in black_interfaces + if interface not in red_interfaces + ] + if not black_interfaces: + self.logger.error("No black interfaces left if applied red_interfaces!") + return False + + self.logger.info("Black interfaces after validation: %s", black_interfaces) + return True + + def __create_vlan_interfaces(self) -> bool: + success = True + if self.__vlan_config is None: + self.logger.debug("No VLAN configuration found") + return success for vlan_name, vlan_data in self.__vlan_config.items(): self.logger.info(f"{vlan_name}, {vlan_data}") @@ -120,9 +198,6 @@ def __create_vlan_interfaces(self) -> None: if parent_interface and vlan_id: try: - # Delete existing VLAN interface if it exists - subprocess.run(["ip", "link", "delete", vlan_name], check=False) - # Create VLAN interface subprocess.run( [ @@ -176,35 +251,60 @@ def __create_vlan_interfaces(self) -> None: f"VLAN interface {vlan_name} created and configured." ) except subprocess.CalledProcessError as e: + success = False self.logger.error(f"Error creating VLAN interface {vlan_name}: {e}") else: + success = False self.logger.error(f"Invalid configuration for VLAN {vlan_name}.") + return success - def __delete_vlan_interfaces(self) -> None: + def __delete_vlan_interfaces(self) -> bool: + success = True if not self.__vlan_config: - return + self.logger.debug("No VLAN interfaces to delete") + return success for vlan_name, _ in self.__vlan_config.items(): try: - # Delete existing VLAN interface if it exists - subprocess.run(["ip", "link", "delete", vlan_name], check=False) + if any( + interface.interface_name == vlan_name + for interface in self.__interfaces + ): + # Delete existing VLAN interface + subprocess.run(["ip", "link", "delete", vlan_name], check=True) except subprocess.CalledProcessError as e: self.logger.error(f"Error deleting VLAN interface {vlan_name}: {e}") + success = False + return success def __get_interfaces(self) -> None: interfaces = [] - ipr = IPRoute() - for link in ipr.get_links(): + ip = IPRoute() + for link in ip.get_links(): ifname = link.get_attr("IFLA_IFNAME") ifstate = link.get_attr("IFLA_OPERSTATE") mac_address = link.get_attr("IFLA_ADDRESS") + ifi_type = link.get("ifi_type") + kind = None + + link_info = link.get_attr("IFLA_LINKINFO") + if link_info: + kind = link_info.get_attr("IFLA_INFO_KIND") interface_info = { "interface_name": ifname, "operstate": ifstate, "mac_address": mac_address, } - interfaces.append(interface_info) + if ifi_type in (arp.ARPHRD_ETHER, arp.ARPHRD_IEEE80211): + # Filters out interfaces with kinds like dummy, sit, and bridge + # thus should add only physical interfaces, vlan and batadv interfaces + if kind is None or kind in self.ALLOWED_KIND_LIST: + interfaces.append(interface_info) + # We want to monitor also br-lan status thus expcetion to basic rules + elif kind == "bridge" and ifname== Constants.BR_NAME.value: + interfaces.append(interface_info) + ip.close() self.__interfaces.clear() @@ -221,22 +321,19 @@ def __get_interfaces(self) -> None: interfaces, ) - def __create_bridge(self, bridge_name) -> None: + def __create_bridge(self, bridge_name) -> bool: + success = True ip = IPRoute() try: - bridge_indices = ip.link_lookup(ifname=bridge_name) - if not bridge_indices: - ip.link("add", ifname=bridge_name, kind="bridge") - - except Exception as e: - self.logger.exception( - "Error creating bridge %s! Error: %s", - bridge_name, - e, - ) - + ip.link("add", ifname=bridge_name, kind="bridge") + except NetlinkError as e: + if e.code == errno.EEXIST and "File exists" in e.args[1]: + self.logger.warning("Bridge %s already exists!", bridge_name) + else: + success = False finally: ip.close() + return success def __set_interface_mac(self, interface, new_mac): try: @@ -244,7 +341,6 @@ def __set_interface_mac(self, interface, new_mac): subprocess.check_call( ["ip", "link", "set", "dev", interface, "address", new_mac] ) - except subprocess.CalledProcessError as e: self.logger.error( "Error setting MAC address for %s! Error: %s", interface, e @@ -283,7 +379,7 @@ def __add_interface_to_bridge( ip.link("set", index=interface_index, master=bridge_index) except Exception as e: - self.logger.debug( + self.logger.error( "Error adding interface %s to bridge %s! Error: %s", interface_to_add, bridge_name, @@ -324,24 +420,29 @@ def __get_mac_addr(self, interface_name: str) -> Union[None, str]: return interface.mac_address return None # Interface not found in the list - def __shutdown_and_delete_bridge(self, bridge_name: str) -> None: + def __shutdown_and_delete_bridge(self, bridge_name: str) -> bool: + success = True ip = IPRoute() try: index = ip.link_lookup(ifname=bridge_name)[0] ip.link("set", index=index, state="down") ip.link("delete", index=index) except IndexError: - self.logger.debug( + self.logger.warning( "Not able to delete bridge %s! Bridge not found!", bridge_name ) + except Exception as e: + success = False + self.logger.error("Error %s deleting bridge %s!", e, bridge_name) finally: ip.close() + return success def __has_certificate(self, cert_path: str, mac: str) -> bool: certificate_path = f"{cert_path}/MAC/{mac}.crt" if not os.path.exists(certificate_path): - self.logger.warning("Certificate not found: %s", certificate_path) + self.logger.debug("Certificate not found: %s", certificate_path) return False self.logger.debug("Certificate found: %s", certificate_path) @@ -365,7 +466,9 @@ def __update_cbma_interface_lists(self) -> None: # Remove interfaces without certificates from lower CBMA interface list for interface in interfaces_without_certificate: self.logger.debug( - "Interface %s doesn't have certificate!", interface.interface_name + "Interface %s doesn't have certificate!, mac %s", + interface.interface_name, + interface.mac_address, ) if interface in self.__lower_cbma_interfaces: self.__lower_cbma_interfaces.remove(interface) @@ -398,21 +501,20 @@ def __update_cbma_interface_lists(self) -> None: if interface in self.__lower_cbma_interfaces: self.__lower_cbma_interfaces.remove(interface) - @staticmethod - def __wait_for_ap(timeout: int = 4) -> bool: + def __wait_for_ap(self, timeout: int = 4) -> bool: start_time = time.time() while True: try: - result = subprocess.check_output(["iw", "dev", "wlan1", "info"]) - result_str = result.decode("utf-8") - # Use regular expressions to extract the interface type - match = re.search(r"type\s+([\w-]+)", result_str) - if match.group(1) == "AP": + result = subprocess.check_output( + ["iw", "dev", "wlan1", "info"] + ).decode() + if "type AP" in result: return True elapsed_time = time.time() - start_time if elapsed_time >= timeout: + self.logger.warning("__wait_for_ap timeout") return False # Timeout reached time.sleep(1) except subprocess.CalledProcessError: @@ -451,10 +553,10 @@ def __wait_for_interface(self, if_name, timeout: int = 3) -> bool: start_time = time.time() while True: try: - socket.create_server( + with socket.create_server( (link_local_address, 0, 0, index), family=socket.AF_INET6 - ) - return True + ): + return True except Exception: elapsed_time = time.time() - start_time if elapsed_time >= timeout: @@ -475,12 +577,10 @@ def __create_mac(self, randomized: bool = False, interface_mac: str = "") -> str mac[0] |= 0x02 # Set the locally administered bit return bytes(mac).hex(sep=":", bytes_per_sep=1) else: - # Split MAC address into octets and flip the locally administered bit - octets = interface_mac.split(":") - first_octet = int(octets[0], 16) - flipped_first_octet = first_octet ^ 2 - - return f"{flipped_first_octet}:{':'.join(octets[1:])}" + # Flip the locally administered bit + mac_bytes = bytearray.fromhex(interface_mac.replace(':', '')) + mac_bytes[0] ^= 0x2 + return mac_bytes.hex(sep=':', bytes_per_sep=1) def __init_batman_and_bridge(self) -> None: if_name = self.__comms_ctrl.settings.mesh_vif[0] @@ -562,6 +662,14 @@ def __setup_radios(self) -> bool: return True + def __get_base_mtu_size(self): + """ + Function to get base mtu size. Functions return value + can be patched during unit testing to support smaller + mtu sizes. + """ + return Constants.BASE_MTU_SIZE.value + def __set_mtu_size(self, interface_name: str, interface_color: str = None) -> None: """ Set the MTU size for the specified interface color. @@ -575,9 +683,9 @@ def __set_mtu_size(self, interface_name: str, interface_color: str = None) -> No interface_name: The name of the interface interface_color: The color of the interface """ - BASE_MTU_SIZE = 1500 - MACSEC_OVERHEAD = 16 - BATMAN_OVERHEAD = 24 + BASE_MTU_SIZE = self.__get_base_mtu_size() + MACSEC_OVERHEAD = Constants.MACSEC_OVERHEAD.value + BATMAN_OVERHEAD = Constants.BATMAN_OVERHEAD.value # if upper or lower batman if interface_name == self.UPPER_BATMAN: @@ -600,7 +708,8 @@ def __set_mtu_size(self, interface_name: str, interface_color: str = None) -> No try: subprocess.run( - ["ip", "link", "set", "dev", interface_name, "mtu", mtu_size], check=True + ["ip", "link", "set", "dev", interface_name, "mtu", mtu_size], + check=True, ) except subprocess.CalledProcessError as e: self.logger.error( @@ -678,7 +787,6 @@ def __get_link_local_ipv6_address(self, interface_name: str) -> str: if self.__is_valid_ipv6_local((ip_address, prefix_length)): ip.close() return ip_address - ip.close() return "" def __add_global_ipv6_address(self, interface_name: str, new_prefix: str) -> None: @@ -703,7 +811,7 @@ def __add_global_ipv6_address(self, interface_name: str, new_prefix: str) -> Non # Modify the prefix new_address = ( - new_prefix + link_local_address[link_local_address.find("::") + 1 :] + new_prefix + link_local_address[link_local_address.find("::") + 1:] ) self.logger.debug( f"Current {interface_name} Local IPv6 address: {link_local_address}" @@ -715,10 +823,10 @@ def __add_global_ipv6_address(self, interface_name: str, new_prefix: str) -> Non # Add the modified IPv6 address to the interface ip.addr("add", index=index, address=new_address, prefixlen=64) - ip.close() - except Exception as e: - self.logger.error(f"Error: {e}") + self.logger.warning( + f"Error adding global ipv6 address for interface {interface_name}: {e}" + ) def __setup_lower_cbma(self) -> bool: """ @@ -830,7 +938,7 @@ def __setup_upper_cbma(self) -> bool: ret = self.__upper_cbma_controller.add_interface( _interface.interface_name ) - self.logger.debug( + self.logger.info( f"Upper CBMA interfaces added: {_interface.interface_name} " f"status: {ret}" ) @@ -845,6 +953,7 @@ def __setup_upper_cbma(self) -> bool: return intf_added def __cleanup_cbma(self) -> None: + self.__get_interfaces() self.__shutdown_interface(self.LOWER_BATMAN) self.__shutdown_interface(self.UPPER_BATMAN) @@ -887,5 +996,6 @@ def stop_cbma(self) -> bool: ) self.__cleanup_cbma() + self.stop_radios() return lower_stopped and upper_stopped diff --git a/modules/sc-mesh-secure-deployment/src/nats/src/constants.py b/modules/sc-mesh-secure-deployment/src/nats/src/constants.py index 8aece6fb..8e550481 100644 --- a/modules/sc-mesh-secure-deployment/src/nats/src/constants.py +++ b/modules/sc-mesh-secure-deployment/src/nats/src/constants.py @@ -90,3 +90,8 @@ class Constants(Enum): OK_POLLING_TIME_SECONDS: int = 600 FAIL_POLLING_TIME_SECONDS: int = 1 + + # MTU sizes + BASE_MTU_SIZE: int = 1500 + MACSEC_OVERHEAD: int = 16 + BATMAN_OVERHEAD: int = 24 diff --git a/modules/sc-mesh-secure-deployment/src/nats/tests/requirements.txt b/modules/sc-mesh-secure-deployment/src/nats/tests/requirements.txt new file mode 100644 index 00000000..bb301877 --- /dev/null +++ b/modules/sc-mesh-secure-deployment/src/nats/tests/requirements.txt @@ -0,0 +1,3 @@ +coverage==7.4.4 +pyroute2==0.7.12 +parameterized==0.9.0 diff --git a/modules/sc-mesh-secure-deployment/src/nats/tests/test_cbma_adaptation.py b/modules/sc-mesh-secure-deployment/src/nats/tests/test_cbma_adaptation.py new file mode 100644 index 00000000..94170e12 --- /dev/null +++ b/modules/sc-mesh-secure-deployment/src/nats/tests/test_cbma_adaptation.py @@ -0,0 +1,625 @@ +import subprocess +import os +import logging +import unittest +from unittest.mock import MagicMock, patch +from parameterized import parameterized + +from pyroute2 import IPRoute +import yaml +from src.cbma_adaptation import CBMAAdaptation +from src import cbma_paths +from controller import CBMAController + + +# pylint: disable=broad-except, protected-access +class TestCBMAAdaptation(unittest.TestCase): + + INTERFACE_SETTINGS = [ + { + "ifname": "ifdummy0_black", + "kind": "dummy", + "address": "12:34:56:78:90:ab", + "state": "up", + }, + {"ifname": "ifdummy2_white", "kind": "dummy", "state": "up"}, + { + "ifname": "ifdummy2_white", + "kind": "dummy", + "address": "22:33:44:55:66:77", + "state": "up", + }, + { + "ifname": "ifdummy3_black", + "kind": "dummy", + "address": "12:33:44:55:66:77", + "state": "up", + }, + ] + + FAKE_CERT_EXISTS_FOR_MAC = { + "12:34:56:78:90:ab", + "22:33:44:55:66:77", + "12:33:44:55:66:77", + "10:34:56:78:90:ab", + } + + CBMA_FAKE_SUCCESS_FOR_INTERFACES = {"ifdummy0_black", "bat0"} + + @classmethod + def setUpClass(cls): + cls.logger = logging.getLogger(__name__) # Unit test logger + cls.logger.setLevel(logging.DEBUG) # Set the logger level as needed + # Add a StreamHandler to direct log output to stderr (terminal) + cls.handler = logging.StreamHandler() + cls.handler.setLevel(logging.DEBUG) # Set the handler level as needed + cls.logger.addHandler(cls.handler) + + ip = IPRoute() + # Create interfaces using settings from the list + for settings in cls.INTERFACE_SETTINGS: + try: + ip.link("add", **settings) + except Exception as e: + print(f"Error creating interface {settings.get('ifname')}: {e}") + ip.close() + + cls.yaml_content_1 = { + "CBMA": { + "exclude_interfaces": [ + "eth0", + "eth1", + "usb0", + "lan1", + "lan2", + "lan3", + "osf0", + "vlan_black", + "vlan_red", + ], + "white_interfaces": ["halow1", "ifdummy2_white"], + "red_interfaces": ["ifdummy1_red", "vlan_red"], + }, + "BATMAN": { + "routing_algo": "BATMAN_IV", + "hop_penalty": { + "meshif": {"bat0": 0, "bat1": 0}, + "hardif": {"halow1": 20}, + }, + }, + "VLAN": { + "vlan_black": { + "parent_interface": "ifdummy0_black", + "vlan_id": 100, + "ipv4_address": "192.168.1.1", + "ipv4_subnet_mask": "255.255.255.0", + "ipv6_local_address": "fe80::192.168.1.1", + "ipv6_prefix_length": 64, + }, + "vlan_red": {"parent_interface": "ifdummy0_black", "vlan_id": 200}, + }, + } + cls.yaml_content_2 = { + "CBMA": { + "exclude_interfaces": [], + "white_interfaces": ["ifdummy2_white"], + "red_interfaces": ["ifdummy1_red", "vlan_red"], + }, + "BATMAN": { + "routing_algo": "BATMAN_IV", + "hop_penalty": { + "meshif": {"bat0": 0, "bat1": 0}, + "hardif": {"halow1": 20}, + }, + }, + } + + cls.yaml_content_3 = """ + CBMA: + exclude_interfaces: + - eth0 + - eth1 + - usb0 + - lan1 + - lan2 + - lan3 + - osf0 + - vlan_black + - "vlan_red + white_interfaces + - halow1 + red_interfaces: + - wlan1 + """ + # No black interfaces left if applied exclude_interfaces + cls.yaml_content_4 = { + "CBMA": { + "exclude_interfaces": ["ifdummy0_black", "ifdummy3_black"], + "white_interfaces": ["ifdummy2_white"], + "red_interfaces": ["ifdummy1_red", "vlan_red"], + }, + "BATMAN": { + "routing_algo": "BATMAN_IV", + "hop_penalty": { + "meshif": {"bat0": 0, "bat1": 0}, + "hardif": {"halow1": 20}, + }, + }, + } + # No black interfaces if applied exclude_interfaces + white_interfaces + cls.yaml_content_5 = { + "CBMA": { + "exclude_interfaces": ["ifdummy0_black"], + "white_interfaces": ["ifdummy3_black", "ifdummy2_white"], + "red_interfaces": ["ifdummy1_red", "vlan_red"], + }, + "BATMAN": { + "routing_algo": "BATMAN_IV", + "hop_penalty": { + "meshif": {"bat0": 0, "bat1": 0}, + "hardif": {"halow1": 20}, + }, + }, + } + # No black interfaces if applied exclude_interfaces + # + white_interfaces + red_interfaces + cls.yaml_content_6 = { + "CBMA": { + "exclude_interfaces": [], + "white_interfaces": ["ifdummy3_black", "ifdummy2_white"], + "red_interfaces": ["ifdummy1_red", "vlan_red", "ifdummy0_black"], + }, + "BATMAN": { + "routing_algo": "BATMAN_IV", + "hop_penalty": { + "meshif": {"bat0": 0, "bat1": 0}, + "hardif": {"halow1": 20}, + }, + }, + } + + cls.yaml_content_7 = """ + CBMA: + exclude_interfaces: + - vlan_black + white_interfaces: None + red_interfaces: + "wlan1" + BATMAN: + routing_algo: BATMAN_IV + """ + # Write YAML content to files + # Not forcing yaml file content in same order as in dictionary in order + # to ensure yaml file parsing work properly. + with open("ms_config.yaml", "w", encoding="utf-8") as file: + yaml.dump(cls.yaml_content_1, file) + with open("ms_config2.yaml", "w", encoding="utf-8") as file: + yaml.dump(cls.yaml_content_2, file) + with open("invalid_yaml_file.yaml", "w", encoding="utf-8") as file: + file.write(cls.yaml_content_3) + with open("ms_config4.yaml", "w", encoding="utf-8") as file: + yaml.dump(cls.yaml_content_4, file) + with open("ms_config5.yaml", "w", encoding="utf-8") as file: + yaml.dump(cls.yaml_content_5, file) + with open("ms_config6.yaml", "w", encoding="utf-8") as file: + yaml.dump(cls.yaml_content_6, file) + with open("ms_config7.yaml", "w", encoding="utf-8") as file: + file.write(cls.yaml_content_7) + + # Create fakecertificate file + cls.fake_certificate = "just fake" + cls.folder = "./MAC" + if not os.path.exists(cls.folder): + os.makedirs(cls.folder) + file_path = os.path.join(cls.folder, "fake_mac.crt") + with open(file_path, "w", encoding="utf-8") as file_4: + file_4.write(cls.fake_certificate) + + # Mock the CommsController object and other common mocks + cls.mock_comms_ctrl = MagicMock() + cls.mock_lock = MagicMock() + cls.mock_comms_ctrl.command.handle_command.return_value = ("OK", None, None) + + # Patch CBMAAdaptation.__has_certificate() to return True for + # MACs in FAKE_CERT_EXISTS_FOR_MAC list + cls.patcher_has_certificate = patch.object( + CBMAAdaptation, + "_CBMAAdaptation__has_certificate", + side_effect=lambda cert_path, mac: mac in cls.FAKE_CERT_EXISTS_FOR_MAC, + ) + # Patch CBMAController.add_interface() to return True for + # interfaces listed in CBMA_FAKE_SUCCESS_FOR_INTERFACES + cls.patcher_add_interface = patch.object( + CBMAController, + "add_interface", + side_effect=lambda interface: interface + in cls.CBMA_FAKE_SUCCESS_FOR_INTERFACES, + ) + + # Patch CBMAAdaptation.get_base_mtu_size() to return such a small MTU + # size that it will work on Ubuntu with default Batman without jumbo + # frame support + cls.patcher_get_base_mtu_size = patch.object( + CBMAAdaptation, "_CBMAAdaptation__get_base_mtu_size", return_value=1400 + ) + # Patch CommsController.command.handle_command() to retur OK + # for e.g. radio bring up/down commands. + cls.patcher_comms_ctrl = patch.object( + cls.mock_comms_ctrl.command, + "handle_command", + return_value=("OK", None, None), + ) + + # Start patches + cls.mock_has_certificate = cls.patcher_has_certificate.start() + cls.mock_add_interface = cls.patcher_add_interface.start() + cls.mock_get_base_mtu_size = cls.patcher_get_base_mtu_size.start() + cls.mock_comms_ctrl_instance = cls.patcher_comms_ctrl.start() + + @classmethod + def tearDownClass(cls): + # Delete created yaml/crt files + for file_name in [ + "ms_config.yaml", + "ms_config2.yaml", + "invalid_yaml_file.yaml", + "ms_config4.yaml", + "ms_config5.yaml", + "ms_config6.yaml", + "ms_config7.yaml", + "./MAC/fake_mac.crt", + ]: + try: + os.remove(file_name) + except FileNotFoundError: + print(f"File {file_name} not found, skipping deletion.") + except Exception as e: + print(f"Error deleting file {file_name}: {e}") + + # Delete folder + try: + os.rmdir(cls.folder) + except Exception as e: + print(f"Error deleting folder {cls.folder}: {e}") + + # Delete interfaces using settings from the list + for settings in cls.INTERFACE_SETTINGS: + try: + ifname = settings.get("ifname") + subprocess.run(["ip", "link", "delete", ifname], check=True) + except subprocess.CalledProcessError as e: + print(f"Error deleting interface {ifname}: {e}") + except Exception as e: + print(f"Unexpected error deleting interface {ifname}: {e}") + + # Stop patches + cls.patcher_has_certificate.stop() + cls.patcher_add_interface.stop() + cls.patcher_get_base_mtu_size.stop() + cls.patcher_comms_ctrl.stop() + + # Clean up logging resources + cls.logger.removeHandler(cls.handler) + cls.handler.close() + + def setUp(self): + # Instantiates CBMAAdaptation with ms_config.yaml. Used for most of the tests. + # Tip: comment following line to get traces from CBMA adaptation. Note that it + # will break some of the tests that relies on logger outputs. + self.logger = MagicMock() + + self.cbma_adaptation = CBMAAdaptation( + self.mock_comms_ctrl, self.logger, self.mock_lock, "ms_config.yaml" + ) + # Allow "dummy" interfaces during unit testing + self.cbma_adaptation.ALLOWED_KIND_LIST.add("dummy") + + def tearDown(self): + self.cbma_adaptation.stop_cbma() + + def test_setup_cbma(self): + # Patch CBMAAdaptation.__get_mac_addr() to return ifdummy0_black + # interface's MAC that will be given as a parameter to create MAC + # for lower batman + with patch.object( + CBMAAdaptation, + "_CBMAAdaptation__get_mac_addr", + return_value="12:34:56:78:90:ab", + ): + result = self.cbma_adaptation.setup_cbma() + self.assertTrue(result) + + def test_stop_cbma(self): + result = self.cbma_adaptation.stop_cbma() + self.assertTrue(result) + + def test_setup_and_stop_cbma2(self): + # Tries to setup and stop CBMA with such config + # that doesn't have VLAN definitions. + self.cbma_adaptation2 = CBMAAdaptation( + self.mock_comms_ctrl, self.logger, self.mock_lock, "ms_config2.yaml" + ) + # Allow "dummy" interfaces during unit testing + self.cbma_adaptation2.ALLOWED_KIND_LIST.add("dummy") + with patch.object( + CBMAAdaptation, + "_CBMAAdaptation__get_mac_addr", + return_value="12:34:56:78:90:ab", + ): + setup_result = self.cbma_adaptation2.setup_cbma() + self.assertTrue(setup_result) + stop_result = self.cbma_adaptation2.stop_cbma() + self.assertTrue(stop_result) + + def test_reading_invalid_config_file(self): + mock_logger = MagicMock() + + with patch.object(mock_logger, "getChild"): + # Mock the logger object returned by getLogger + mock_logger.getChild.return_value.error = MagicMock() + + self.cbma_adaptation3 = CBMAAdaptation( + self.mock_comms_ctrl, + mock_logger, + self.mock_lock, + "invalid_yaml_file.yaml", + ) + call_args_list = mock_logger.getChild.return_value.error.mock_calls + + # Assert that the error method of the mocked logger.getChild() + # was called with the expected message + expected_message = "Error reading config file" + self.assertTrue( + any( + expected_message in call_args[1][0] for call_args in call_args_list + ), + f"Expected message '{expected_message}' not found in logger calls.", + ) + self.assertIsNone(self.cbma_adaptation3._CBMAAdaptation__config) + self.assertIsNone(self.cbma_adaptation3._CBMAAdaptation__cbma_config) + self.assertIsNone(self.cbma_adaptation3._CBMAAdaptation__vlan_config) + + def test_creating_vlan_interface(self): + # VLAN interfaces has been created already + # when self.cbma_adaptation was instantiated. + # Try to create VLAN interfaces again with same settings. Should fail. + result = self.cbma_adaptation._CBMAAdaptation__create_vlan_interfaces() + self.assertFalse(result) + + def test_deleting_vlan_interface(self): + # VLAN interfaces has been created already + # when CBMAAdaptation was instantiated for the first time. + # Try to delete them twice. Second deletion should fail. + result = self.cbma_adaptation._CBMAAdaptation__delete_vlan_interfaces() + self.assertTrue(result) + result = self.cbma_adaptation._CBMAAdaptation__delete_vlan_interfaces() + self.assertFalse(result) + + def test_creating_and_deleting_bridge_interface(self): + # First bridge creation should succeed fine without any warning prints + result = self.cbma_adaptation._CBMAAdaptation__create_bridge("br-lan") + self.assertTrue(result) + self.cbma_adaptation.logger.warning.assert_not_called() + self.cbma_adaptation.logger.warning.reset_mock() + + # Second trial should succeed but warning should be printed. + result = self.cbma_adaptation._CBMAAdaptation__create_bridge("br-lan") + self.assertTrue(result) + self.cbma_adaptation.logger.warning.assert_called_once() + self.cbma_adaptation.logger.warning.reset_mock() + + # First deletion should work fine without any issues + result = self.cbma_adaptation._CBMAAdaptation__shutdown_and_delete_bridge( + "br-lan" + ) + self.assertTrue(result) + self.cbma_adaptation.logger.warning.assert_not_called() + self.cbma_adaptation.logger.warning.reset_mock() + + # Second deletion should succeed but warning should be printed + result = self.cbma_adaptation._CBMAAdaptation__shutdown_and_delete_bridge( + "br-lan" + ) + self.assertTrue(result) + self.cbma_adaptation.logger.warning.assert_called_once() + + def test_set_interface_mac(self): + # Try to set MAC for br-lan that should not exist yet + # but only after setting up CBMA + self.cbma_adaptation._CBMAAdaptation__set_interface_mac( + "br-lan", "22:34:44:55:66:77" + ) + expected_message = "Error setting MAC address" + self.assertTrue( + any( + expected_message in str(call) + for call in self.cbma_adaptation.logger.error.call_args_list + ), + f"Expected message '{expected_message}' not found in logger.error calls.", + ) + self.cbma_adaptation.logger.error.reset_mock() + + # Try to set MAC for vlan_black that should exist already + self.cbma_adaptation._CBMAAdaptation__set_interface_mac( + "vlan_black", "22:34:44:55:66:77" + ) + self.cbma_adaptation.logger.error.assert_not_called() + + def test_add_interface_to_bridge(self): + # Bridge not created by default => should fail + self.cbma_adaptation._CBMAAdaptation__add_interface_to_bridge( + "br-lan", "vlan_red" + ) + + expected_message = "Cannot add interface to bridge" + self.assertTrue( + any( + expected_message in str(call) + for call in self.cbma_adaptation.logger.debug.call_args_list + ), + f"Expected message '{expected_message}' not found in logger.error calls.", + ) + # Create bridge + self.cbma_adaptation._CBMAAdaptation__create_bridge("br-lan") + # Try to add non-existent interface to bridge + self.cbma_adaptation._CBMAAdaptation__add_interface_to_bridge( + "br-lan", "foobar" + ) + expected_message = "Cannot add interface" + self.assertTrue( + any( + expected_message in str(call) + for call in self.cbma_adaptation.logger.debug.call_args_list + ), + f"Expected message '{expected_message}' not found in logger.error calls.", + ) + # Reset the mock to clear previous calls + self.cbma_adaptation.logger.debug.reset_mock() + self.cbma_adaptation.logger.error.reset_mock() + + # Try adding existing interface to bridge + self.cbma_adaptation._CBMAAdaptation__add_interface_to_bridge( + "br-lan", "vlan_red" + ) + # Success means no debug traces in tested function + self.cbma_adaptation.logger.debug.assert_not_called() + self.cbma_adaptation.logger.error.assert_not_called() + + def test_set_interface_up(self): + self.cbma_adaptation.logger.error.reset_mock() + # vlan_red_ create in cbma_adaptation instantiation, + # setting it up should pass + self.cbma_adaptation._CBMAAdaptation__set_interface_up("vlan_red") + self.cbma_adaptation.logger.error.assert_not_called() + + # Setting up not existing interface should fail + self.cbma_adaptation._CBMAAdaptation__set_interface_up("vlan_blue") + self.cbma_adaptation.logger.error.assert_called_once() + + def test_get_mac_addr(self): + # Update interfaces list from where to get MAC + self.cbma_adaptation._CBMAAdaptation__get_interfaces() + # Get MAC + result = self.cbma_adaptation._CBMAAdaptation__get_mac_addr("vlan_red") + self.assertIsNotNone(result) + result = self.cbma_adaptation._CBMAAdaptation__get_mac_addr("foobar") + self.assertIsNone(result) + + def test_has_certificate(self): + self.patcher_has_certificate.stop() + result = self.cbma_adaptation._CBMAAdaptation__has_certificate(".", "no_mac") + self.assertFalse(result) + result = self.cbma_adaptation._CBMAAdaptation__has_certificate(".", "fake_mac") + self.assertTrue(result) + self.patcher_has_certificate.start() + + def test_wait_for_ap_no_mock(self): + # Not expected to have AP interface to wait by default + result = self.cbma_adaptation._CBMAAdaptation__wait_for_ap() + self.assertFalse(result) + + @patch("subprocess.check_output") + @patch("time.sleep") + def test_wait_for_ap_with_mock(self, mock_sleep, mock_check_output): + # Fake success + fake_output = """ + Interface wlan1 + ifindex 6 + wdev 0x1 + addr e4:5f:01:bd:6d:cb + ssid comms_sleeve#6dcb + type AP + wiphy 0 + channel 1 (2412 MHz), width: 20 MHz, center1: 2412 MHz + txpower 31.00 dBm + """ + mock_check_output.return_value = fake_output.encode() + result = self.cbma_adaptation._CBMAAdaptation__wait_for_ap() + mock_check_output.assert_called_once_with(["iw", "dev", "wlan1", "info"]) + self.assertTrue(result) + mock_sleep.assert_not_called() + mock_check_output.reset_mock() + + def side_effect(*args, **kwargs): + fake_output = """ + Interface wlan1 + ifindex 6 + wdev 0x1 + addr e4:5f:01:bd:6d:cb + type managed + wiphy 0 + channel 1 (2412 MHz), width: 20 MHz, center1: 2412 MHz + txpower 31.00 dBm + """ + return fake_output.encode() + + mock_check_output.side_effect = side_effect + result = self.cbma_adaptation._CBMAAdaptation__wait_for_ap(1) + self.assertFalse(result) + mock_sleep.assert_called() + self.cbma_adaptation.logger.warning.assert_called_with("__wait_for_ap timeout") + + def test_wait_for_interface(self): + # Should be up by default + result = self.cbma_adaptation._CBMAAdaptation__wait_for_interface("vlan_red") + self.assertTrue(result) + if result: + subprocess.run(["ip", "link", "set", "vlan_red", "down"], check=True) + result = self.cbma_adaptation._CBMAAdaptation__wait_for_interface("vlan_red") + self.assertFalse(result) + self.cbma_adaptation.logger.warning.assert_called_with( + "__wait_for_interface timeout for %s", "vlan_red" + ) + + def _validate_interfaces_config(self, cbma_adaptation): + cbma_config = getattr(cbma_adaptation, "_CBMAAdaptation__cbma_config", None) + white_interfaces = cbma_config.get("white_interfaces") + red_interfaces = cbma_config.get("red_interfaces") + exclude_interfaces = cbma_config.get("exclude_interfaces") + return cbma_adaptation._CBMAAdaptation__validate_cbma_config( + exclude_interfaces, white_interfaces, red_interfaces + ) + + def _test_invalid_config(self, cbma_adaptation, expected_error_msg): + cbma_adaptation.logger.error.reset_mock() + result = self._validate_interfaces_config(cbma_adaptation) + self.assertFalse(result) + cbma_adaptation.logger.error.assert_called_with(expected_error_msg) + + def test_validate_cbma_config_success(self): + # Default cbma_adaptation uses valid ms_config.yaml + result = self._validate_interfaces_config(self.cbma_adaptation) + self.assertTrue(result) + self.cbma_adaptation.logger.info.assert_called_with( + "Black interfaces after validation: %s", ["ifdummy0_black", "ifdummy3_black"] + ) + + @parameterized.expand([ + ("ms_config4.yaml", "No black interfaces left if applied exclude_interfaces!"), + ("ms_config5.yaml", "No black interfaces left if applied white_interfaces!"), + ("ms_config6.yaml", "No black interfaces left if applied red_interfaces!"), + ("ms_config7.yaml", "Input params are not lists!"), + ]) + def test_invalid_cbma_configs(self, config_file, expected_error_msg): + # Stop default CBMAAdaptation to remove VLAN interfaces + self.cbma_adaptation.stop_cbma() + # Instantiate CBMAAdaptation with parametrized config file + self.cbma_adaptation2 = CBMAAdaptation( + self.mock_comms_ctrl, self.logger, self.mock_lock, config_file + ) + # Reset mock as validation fails in isntantiation phase as + # dummy test interfaces are not allowed in that phase yet. + self.cbma_adaptation2.logger.error.reset_mock() + self.cbma_adaptation2.ALLOWED_KIND_LIST.add("dummy") + self._test_invalid_config(self.cbma_adaptation2, expected_error_msg) + + @parameterized.expand([ + ("No valid black interfaces!"), + ]) + def test_cbma_configs_without_certificates(self, expected_error_msg): + self.patcher_has_certificate.stop() + self.cbma_adaptation.logger.error.reset_mock() + # Allo dummy type of interfaces in testing phase + self.cbma_adaptation.ALLOWED_KIND_LIST.add("dummy") + self._test_invalid_config(self.cbma_adaptation, expected_error_msg) + self.patcher_has_certificate.start()