From 353f90b235a33fe3aced7e451042217fcb0d448c Mon Sep 17 00:00:00 2001 From: indraniBh Date: Thu, 22 Aug 2024 11:15:28 +0000 Subject: [PATCH] Fix irror (WIP) --- do.py | 5 +- snappi_ixnetwork/capture.py | 7 +- snappi_ixnetwork/device/base.py | 37 ++-- snappi_ixnetwork/device/bgp.py | 59 +++--- snappi_ixnetwork/device/bgpevpn.py | 152 +++++++++------ snappi_ixnetwork/device/compactor.py | 30 ++- snappi_ixnetwork/device/createixnconfig.py | 34 ++-- snappi_ixnetwork/device/interface.py | 24 +-- snappi_ixnetwork/device/loopbackint.py | 51 +++--- snappi_ixnetwork/device/ngpf.py | 204 +++++++++++---------- snappi_ixnetwork/device/utils.py | 19 +- snappi_ixnetwork/device/vxlan.py | 70 ++++--- snappi_ixnetwork/lag.py | 29 +-- snappi_ixnetwork/logger.py | 10 +- tests/utils/common.py | 1 - 15 files changed, 413 insertions(+), 319 deletions(-) diff --git a/do.py b/do.py index 5e14b0743..075978a80 100644 --- a/do.py +++ b/do.py @@ -40,10 +40,10 @@ def test(card="novus100g"): coverage_threshold = 0 username = os.environ.get("TEST_USERNAME", "admin") psd = os.environ.get("TEST_PASSWORD", "admin") - if card == "novus100g": args = [ - '--location="https://snappi-ixn-ci-novus100g.lbj.is.keysight.com:5000"', + '--location=\ + "https://snappi-ixn-ci-novus100g.lbj.is.keysight.com:5000"', ( '--ports="snappi-ixn-ci-novus100g.lbj.is.keysight.com;1;1' " snappi-ixn-ci-novus100g.lbj.is.keysight.com;1;2" @@ -65,7 +65,6 @@ def test(card="novus100g"): ] else: raise Exception("card %s is not supported for testing" % card) - args += [ "--ext=ixnetwork", "--username=" + username, diff --git a/snappi_ixnetwork/capture.py b/snappi_ixnetwork/capture.py index b7c7b7a94..81f7f54fb 100644 --- a/snappi_ixnetwork/capture.py +++ b/snappi_ixnetwork/capture.py @@ -83,7 +83,10 @@ def config(self): imports.append(capture) for capture_item in self._api.snappi_config.captures: if capture_item.format == "pcap": - self._api.warning("pcap format is not supported for IxNetwork, setting capture format to pcapng") + self._api.warning( + "pcap format is not supported for IxNetwork, \ + setting capture format to pcapng" + ) capture_item.format = "pcapng" if capture_item.port_names is None: continue @@ -348,7 +351,7 @@ def results(self, request): self._api._request("POST", url, payload) path = "%s/capture" % self._api._ixnetwork.Globals.PersistencePath - #Todo: Revert dc to merged capture after fix is available in 9.20 + # Todo: Revert dc to merged capture after fix is available in 9.20 url = "%s/files?absolute=%s&filename=%s" % ( self._api._ixnetwork.href, path, diff --git a/snappi_ixnetwork/device/base.py b/snappi_ixnetwork/device/base.py index a463d2797..43806340a 100644 --- a/snappi_ixnetwork/device/base.py +++ b/snappi_ixnetwork/device/base.py @@ -1,7 +1,6 @@ from snappi_ixnetwork.logger import get_ixnet_logger -__all__ = ['Base', 'MultiValue', 'PostCalculated'] - +__all__ = ["Base", "MultiValue", "PostCalculated"] class MultiValue(object): @@ -25,9 +24,7 @@ def value(self): value = None if self._key == "connectedTo": value = self._ref_obj.get("xpath") - self.logger.debug("Post Calculated %s - %s" % ( - self._key, value - )) + self.logger.debug("Post Calculated %s - %s" % (self._key, value)) return value @@ -57,7 +54,7 @@ def create_node_elemet(self, ixn_obj, node_name, name=None): - We are setting name as multivalue for farther processing - It will return that newly created dict """ - self.logger.debug("Creating node for %s" %node_name) + self.logger.debug("Creating node for %s" % node_name) node = self.create_node(ixn_obj, node_name) return self.add_element(node, name) @@ -75,14 +72,10 @@ def multivalue(self, value, enum=None): return MultiValue(value) def as_multivalue(self, snappi_obj, name, enum=None): - return self.multivalue( - snappi_obj.get(name), enum - ) + return self.multivalue(snappi_obj.get(name), enum) def post_calculated(self, key, ref_ixnobj=None, ixnobj=None): - return PostCalculated( - key, ref_ixnobj, ixnobj - ) + return PostCalculated(key, ref_ixnobj, ixnobj) def get_name(self, object): name = object.get("name") @@ -219,9 +212,9 @@ def config_values(self, ixn_obj, attr_map): raise NameError("ixn_attr is missing within ", ixn_map) enum_map = ixn_map.get("enum_map") values = self.get_multivalues( - snappi_attr, enum_map=enum_map, default=ixn_map.get( - "default_value" - ) + snappi_attr, + enum_map=enum_map, + default=ixn_map.get("default_value"), ) else: ixn_attr = ixn_map @@ -257,7 +250,8 @@ def get_group_nodes(self, tab_name): Fill with other nodes and active_list as False It will raise error if all elements are not same length Finally return list of NodesInfo - It will use some IxNetwork tab which do not have enable/disable features""" + It will use some IxNetwork tab which don't have enable/disable features + """ dummy_tab = None for node in self._symmetric_nodes: dummy_tab = node.get(tab_name) @@ -279,7 +273,9 @@ def get_group_nodes(self, tab_name): tab_lengths.append(len(tab)) active_list.append(True) if len(set(tab_lengths)) > 1: - raise Exception("All the attributes %s should have same lengths" % tab_name) + raise Exception( + "All the attributes %s should have same lengths" % tab_name + ) for idx in range(tab_lengths[-1]): if len(group_nodes) <= idx: group_nodes.append([tab[idx]]) @@ -300,7 +296,8 @@ def get_active_group_nodes(self, tab_name): tab_name ) if len(set(active_list)) > 1: - raise Exception("All the attributes %s should configure with equal length" - % tab_name) + raise Exception( + "All the attributes %s should configure with equal length" + % tab_name + ) return node_info_list - diff --git a/snappi_ixnetwork/device/bgp.py b/snappi_ixnetwork/device/bgp.py index f11ccb4ba..2588e8e6f 100644 --- a/snappi_ixnetwork/device/bgp.py +++ b/snappi_ixnetwork/device/bgp.py @@ -2,6 +2,7 @@ from snappi_ixnetwork.logger import get_ixnet_logger from snappi_ixnetwork.device.bgpevpn import BgpEvpn + class Bgp(Base): _BGP = { "peer_address": "dutIp", @@ -10,7 +11,7 @@ class Bgp(Base): "enum_map": { "ibgp": "internal", "ebgp": "external" - } + }, }, } @@ -19,7 +20,7 @@ class Bgp(Base): "keep_alive_interval": "keepaliveTimer", "update_interval": "updateInterval", "time_to_live": "ttl", - "md5_key": "md5Key" + "md5_key": "md5Key", } _CAPABILITY = { @@ -45,7 +46,7 @@ class Bgp(Base): "ipv6_multicast_mpls_vpn": "ipv6MulticastBgpMplsVpn", "ipv6_unicast_flow_spec": "capabilityipv6UnicastFlowSpec", "ipv6_sr_te_policy": "capabilitySRTEPoliciesV6", - "ipv6_unicast_add_path": "capabilityIpv6UnicastAddPath" + "ipv6_unicast_add_path": "capabilityIpv6UnicastAddPath", } _CAPABILITY_IPv6 = { @@ -57,16 +58,16 @@ class Bgp(Base): "address": "networkAddress", "prefix": "prefixLength", "count": "numberOfAddressesAsy", - "step": "prefixAddrStep" + "step": "prefixAddrStep", } _ROUTE = { - "next_hop_mode" : { + "next_hop_mode": { "ixn_attr": "nextHopType", "enum_map": { "local_ip": "sameaslocalip", "manual": "manually" - } + }, }, "next_hop_address_type": "nextHopIPType", "next_hop_ipv4_address": "ipv4NextHop", @@ -74,7 +75,7 @@ class Bgp(Base): } _COMMUNITY = { - "type" : { + "type": { "ixn_attr": "type", "enum_map": { "manual_as_number": "manual", @@ -83,10 +84,10 @@ class Bgp(Base): "no_export_subconfed": "noexport_subconfed", "llgr_stale": "llgr_stale", "no_llgr": "no_llgr", - } + }, }, "as_number": "asNumber", - "as_custom": "lastTwoOctets" + "as_custom": "lastTwoOctets", } _BGP_AS_MODE = { @@ -141,16 +142,18 @@ def _get_interface_info(self): def _is_valid(self, ip_name): is_invalid = True same_dg_ips, invalid_ips = self._get_interface_info() - self.logger.debug("Validating %s against interface same_dg_ips : %s invalid_ips %s" % ( - ip_name, same_dg_ips, invalid_ips - )) + self.logger.debug( + "Validating %s against interface same_dg_ips : %s invalid_ips %s" % + (ip_name, same_dg_ips, invalid_ips)) if ip_name in invalid_ips: - self._ngpf.api.add_error("Multiple IP {name} on top of name Ethernet".format( - name=ip_name - )) + self._ngpf.api.add_error( + "Multiple IP {name} on top of name Ethernet".format + (name=ip_name)) is_invalid = False if len(same_dg_ips) > 0 and ip_name not in same_dg_ips: - self._ngpf.api.add_error("BGP should not configured on top of different device") + self._ngpf.api.add_error( + "BGP should not configured on top of different device" + ) is_invalid = False return is_invalid @@ -211,7 +214,9 @@ def _config_bgpv4(self, bgp_peers, ixn_ipv4): self.configure_multivalues(advanced, ixn_bgpv4, Bgp._ADVANCED) capability = bgp_peer.get("capability") if capability is not None: - self.configure_multivalues(capability, ixn_bgpv4, Bgp._CAPABILITY) + self.configure_multivalues( + capability, ixn_bgpv4, Bgp._CAPABILITY + ) self._bgp_route_builder(bgp_peer, ixn_bgpv4) self._bgp_evpn.config(bgp_peer, ixn_bgpv4) @@ -231,7 +236,9 @@ def _config_bgpv6(self, bgp_peers, ixn_ipv6): self.configure_multivalues(advanced, ixn_bgpv6, Bgp._ADVANCED) capability = bgp_peer.get("capability") if capability is not None: - self.configure_multivalues(capability, ixn_bgpv6, Bgp._CAPABILITY) + self.configure_multivalues( + capability, ixn_bgpv6, Bgp._CAPABILITY + ) self.configure_multivalues( capability, ixn_bgpv6, Bgp._CAPABILITY_IPv6 ) @@ -293,7 +300,9 @@ def _configure_bgpv6_route(self, v6_routes, ixn_bgp): "connectedTo", ref_ixnobj=ixn_bgp ) self.configure_multivalues(addresse, ixn_ip_pool, Bgp._IP_POOL) - ixn_route = self.create_node_elemet(ixn_ip_pool, "bgpV6IPRouteProperty") + ixn_route = self.create_node_elemet( + ixn_ip_pool, "bgpV6IPRouteProperty" + ) self._ngpf.set_device_info(route, ixn_ip_pool) self._configure_route(route, ixn_route) @@ -306,7 +315,9 @@ def _configure_route(self, route, ixn_route): self.logger.debug("Configuring BGP route advance") multi_exit_discriminator = advanced.get("multi_exit_discriminator") if multi_exit_discriminator is not None: - ixn_route["enableMultiExitDiscriminator"] = self.multivalue(True) + ixn_route["enableMultiExitDiscriminator"] = self.multivalue( + True + ) ixn_route["multiExitDiscriminator"] = self.multivalue( multi_exit_discriminator ) @@ -321,7 +332,9 @@ def _configure_route(self, route, ixn_route): ixn_community = self.create_node_elemet( ixn_route, "bgpCommunitiesList" ) - self.configure_multivalues(community, ixn_community, Bgp._COMMUNITY) + self.configure_multivalues( + community, ixn_community, Bgp._COMMUNITY + ) as_path = route.get("as_path") if as_path is not None: @@ -333,7 +346,9 @@ def _configure_route(self, route, ixn_route): segments = as_path.get("segments") ixn_route["noOfASPathSegmentsPerRouteRange"] = len(segments) for segment in segments: - ixn_segment = self.create_node_elemet(ixn_route, "bgpAsPathSegmentList") + ixn_segment = self.create_node_elemet( + ixn_route, "bgpAsPathSegmentList" + ) ixn_segment["segmentType"] = self.multivalue( segment.get(type), Bgp._BGP_SEG_TYPE ) diff --git a/snappi_ixnetwork/device/bgpevpn.py b/snappi_ixnetwork/device/bgpevpn.py index fd05fd146..789b16fd9 100644 --- a/snappi_ixnetwork/device/bgpevpn.py +++ b/snappi_ixnetwork/device/bgpevpn.py @@ -11,7 +11,7 @@ class BgpEvpn(Base): "enum_map": { "single_active": True, "all_active": False - } + }, }, } @@ -25,11 +25,11 @@ class BgpEvpn(Base): "no_advertised": "noadvertised", "no_export_subconfed": "noexport_subconfed", "llgr_stale": "llgr_stale", - "no_llgr": "no_llgr" + "no_llgr": "no_llgr", } }, "as_number": "asNumber", - "as_custom": "lastTwoOctets" + "as_custom": "lastTwoOctets", } _SEG_EXT_COMMUNITIES = { @@ -42,8 +42,9 @@ class BgpEvpn(Base): "administrator_as_4octet": "administratoras4octet", "opaque": "opaque", "evpn": "evpn", - "administrator_as_2octet_link_bandwidth": "administratoras2octetlinkbw" - } + "administrator_as_2octet_link_bandwidth": + "administratoras2octetlinkbw", + }, }, "subtype": { "ixn_attr": "subType", @@ -54,8 +55,8 @@ class BgpEvpn(Base): "extended_bandwidth": "extendedbandwidth", "color": "color", "encapsulation": "encapsulation", - "mac_address": "macaddress" - } + "mac_address": "macaddress", + }, } } @@ -82,37 +83,37 @@ class BgpEvpn(Base): "ixn_attr": "multicastTunnelType", "enum_map": { "ingress_replication": "tunneltypeingressreplication" - } - } + }, + }, } _COMMON_ROUTE_TYPE = { "as_2octet": "as", "as_4octet": "as4", - "ipv4_address": "ip" + "ipv4_address": "ip", } _BROADCAST_DOMAINS = { "ethernet_tag_id": "ethernetTagId", - "vlan_aware_service": "enableVlanAwareService" + "vlan_aware_service": "enableVlanAwareService", } _MAC_ADDRESS = { "address": "mac", "prefix": "prefixLength", - "count": "numberOfAddressesAsy" + "count": "numberOfAddressesAsy", } _IP_ADDRESS = { "address": "networkAddress", "prefix": "prefixLength", - "count": "numberOfAddressesAsy" + "count": "numberOfAddressesAsy", } _CMAC_PROPERTIES = { "l2vni": "firstLabelStart", "l3vni": "secondLabelStart", - "include_default_gateway": "includeDefaultGatewayExtendedCommunity" + "include_default_gateway": "includeDefaultGatewayExtendedCommunity", } def __init__(self, ngpf): @@ -127,8 +128,7 @@ def config(self, bgp_peer, ixn_bgp): eth_segment_info = self.get_symmetric_nodes( [bgp_peer], "evpn_ethernet_segments" ) - if eth_segment_info.is_all_null or \ - eth_segment_info.max_len == 0: + if eth_segment_info.is_all_null or eth_segment_info.max_len == 0: return self._peer_class = bgp_peer.__class__.__name__ if self._peer_class == "BgpV4Peer": @@ -150,9 +150,7 @@ def _config_advance(self, parent_info, ixn_parent): if advanced.is_all_null: return None - ixn_parent["origin"] = advanced.get_multivalues( - "origin" - ) + ixn_parent["origin"] = advanced.get_multivalues("origin") med_values = [] for node in advanced.symmetric_nodes: med_values.append( @@ -160,9 +158,7 @@ def _config_advance(self, parent_info, ixn_parent): ) if med_values.count(None) != len(med_values): ixn_parent["enableMultiExitDiscriminator"] = self.multivalue(True) - ixn_parent["multiExitDiscriminator"] = self.multivalue( - med_values - ) + ixn_parent["multiExitDiscriminator"] = self.multivalue(med_values) def _config_communities(self, parent_info, ixn_parent): active_list, communities_info_list = parent_info.get_group_nodes( @@ -174,7 +170,9 @@ def _config_communities(self, parent_info, ixn_parent): ixn_parent["noOfCommunities"] = len(communities_info_list) ixn_parent["enableCommunity"] = self.multivalue(active_list) for communities_info in communities_info_list: - ixn_communities = self.create_node_elemet(ixn_parent, "bgpCommunitiesList") + ixn_communities = self.create_node_elemet( + ixn_parent, "bgpCommunitiesList" + ) communities_info.config_values( ixn_communities, BgpEvpn._SEG_COMMUNITIES ) @@ -186,9 +184,7 @@ def _config_ext_communities(self, parent_info, ixn_parent): if len(ext_communitiesinfo_list) == 0: return None - ixn_parent["noOfExtendedCommunity"] = len( - ext_communitiesinfo_list - ) + ixn_parent["noOfExtendedCommunity"] = len(ext_communitiesinfo_list) ixn_parent["enableExtendedCommunity"] = self.multivalue(active_list) for ext_communitiesinfo in ext_communitiesinfo_list: ixn_ext_communities = self.create_node_elemet( @@ -199,7 +195,9 @@ def _config_ext_communities(self, parent_info, ixn_parent): ) types = ixn_ext_communities.get("type").value sub_types = ixn_ext_communities.get("subType").value - values = ext_communitiesinfo.get_values("value", default="0000000000c8") + values = ext_communitiesinfo.get_values( + "value", default="0000000000c8" + ) idx = 0 opaqueData = list() ip = list() @@ -255,12 +253,22 @@ def _config_ext_communities(self, parent_info, ixn_parent): ixn_ext_communities["opaqueData"] = self.multivalue(opaqueData) ixn_ext_communities["ip"] = self.multivalue(ip) - ixn_ext_communities["assignedNumber2Bytes"] = self.multivalue(assignedNumber2Bytes) - ixn_ext_communities["asNumber2Bytes"] = self.multivalue(asNumber2Bytes) - ixn_ext_communities["asNumber4Bytes"] = self.multivalue(asNumber4Bytes) - ixn_ext_communities["assignedNumber4Bytes"] = self.multivalue(assignedNumber4Bytes) + ixn_ext_communities["assignedNumber2Bytes"] = self.multivalue( + assignedNumber2Bytes + ) + ixn_ext_communities["asNumber2Bytes"] = self.multivalue( + asNumber2Bytes + ) + ixn_ext_communities["asNumber4Bytes"] = self.multivalue( + asNumber4Bytes + ) + ixn_ext_communities["assignedNumber4Bytes"] = self.multivalue( + assignedNumber4Bytes + ) ixn_ext_communities["colorCOBits"] = self.multivalue(colorCOBits) - ixn_ext_communities["colorReservedBits"] = self.multivalue(colorReservedBits) + ixn_ext_communities["colorReservedBits"] = self.multivalue( + colorReservedBits + ) ixn_ext_communities["colorValue"] = self.multivalue(colorValue) def _config_as_path_segments(self, parent_info, ixn_parent): @@ -285,7 +293,9 @@ def _config_as_path_segments(self, parent_info, ixn_parent): ixn_segments["segmentType"] = segments_info.get_multivalues( "type", BgpEvpn._SEGMENT_TYPE ) - active_list, numbers_info_list = segments_info.get_group_nodes("as_numbers") + active_list, numbers_info_list = segments_info.get_group_nodes( + "as_numbers" + ) if len(numbers_info_list) > 0: ixn_segments["numberOfAsNumberInSegment"] = len( numbers_info_list @@ -307,9 +317,9 @@ def _config_eth_segment(self, eth_segment_info, ixn_eth_segments): ) df_election_info = eth_segment_info.get_tab("df_election") if not df_election_info.is_all_null: - ixn_eth_segments["dfElectionTimer"] = df_election_info.get_multivalues( - "election_timer" - ) + ixn_eth_segments[ + "dfElectionTimer" + ] = df_election_info.get_multivalues("election_timer") self._config_advance(eth_segment_info, ixn_eth_segments) self._config_communities(eth_segment_info, ixn_eth_segments) @@ -333,9 +343,7 @@ def _config_evis(self, eth_segment_info, ixn_bgp, ixn_eth_segments): ixn_bgp, "bgpIPv4EvpnVXLAN" ) else: - ixn_xvlan = self.create_node_elemet( - ixn_bgp, "bgpIPv6EvpnVXLAN" - ) + ixn_xvlan = self.create_node_elemet(ixn_bgp, "bgpIPv6EvpnVXLAN") vxlan_info.config_values(ixn_xvlan, BgpEvpn._VXLAN) # Configure route_distinguisher @@ -352,18 +360,24 @@ def _config_evis(self, eth_segment_info, ixn_bgp, ixn_eth_segments): ixn_xvlan["rdASNumber"] = self.multivalue(convert_values.common_num) ixn_xvlan["rdEvi"] = self.multivalue(convert_values.assign_num) ixn_xvlan["rdIpAddress"] = self.multivalue(convert_values.ip_addr) - ixn_xvlan["autoConfigureRdIpAddress"] = distinguisher_info.get_multivalues( + ixn_xvlan[ + "autoConfigureRdIpAddress" + ] = distinguisher_info.get_multivalues( "auto_config_rd_ip_addr", default=True ) # Configure route_target_export - exports_info_list = vxlan_info.get_active_group_nodes("route_target_export") + exports_info_list = vxlan_info.get_active_group_nodes( + "route_target_export" + ) if len(exports_info_list) > 0: ixn_xvlan["numRtInExportRouteTargetList"] = len( exports_info_list ) for exports_info in exports_info_list: - ixn_export = self.create_node_elemet(ixn_xvlan, "bgpExportRouteTargetList") + ixn_export = self.create_node_elemet( + ixn_xvlan, "bgpExportRouteTargetList" + ) rt_types = exports_info.get_values( "rt_type", enum_map=BgpEvpn._COMMON_ROUTE_TYPE ) @@ -375,14 +389,18 @@ def _config_evis(self, eth_segment_info, ixn_bgp, ixn_eth_segments): self._set_target(ixn_export, rt_types, convert_rt_values) # Configure route_target_import - import_info_list = vxlan_info.get_active_group_nodes("route_target_import") + import_info_list = vxlan_info.get_active_group_nodes( + "route_target_import" + ) if len(import_info_list) > 0: ixn_xvlan["importRtListSameAsExportRtList"] = False ixn_xvlan["numRtInImportRouteTargetList"] = len( import_info_list ) for import_info in import_info_list: - ixn_import = self.create_node_elemet(ixn_xvlan, "bgpImportRouteTargetList") + ixn_import = self.create_node_elemet( + ixn_xvlan, "bgpImportRouteTargetList" + ) rt_types = import_info.get_values( "rt_type", enum_map=BgpEvpn._COMMON_ROUTE_TYPE ) @@ -394,13 +412,17 @@ def _config_evis(self, eth_segment_info, ixn_bgp, ixn_eth_segments): self._set_target(ixn_import, rt_types, convert_rt_values) # Configure l3_route_target_export - l3exports_info_list = vxlan_info.get_active_group_nodes("l3_route_target_export") + l3exports_info_list = vxlan_info.get_active_group_nodes( + "l3_route_target_export" + ) if len(l3exports_info_list) > 0: ixn_xvlan["numRtInL3vniExportRouteTargetList"] = len( l3exports_info_list ) for l3exports_info in l3exports_info_list: - ixn_l3export = self.create_node_elemet(ixn_xvlan, "bgpL3VNIExportRouteTargetList") + ixn_l3export = self.create_node_elemet( + ixn_xvlan, "bgpL3VNIExportRouteTargetList" + ) rt_types = l3exports_info.get_values( "rt_type", enum_map=BgpEvpn._COMMON_ROUTE_TYPE ) @@ -412,14 +434,18 @@ def _config_evis(self, eth_segment_info, ixn_bgp, ixn_eth_segments): self._set_target(ixn_l3export, rt_types, convert_rt_values) # Configure l3_route_target_import - l3import_info_list = vxlan_info.get_active_group_nodes("l3_route_target_import") + l3import_info_list = vxlan_info.get_active_group_nodes( + "l3_route_target_import" + ) if len(l3import_info_list) > 0: ixn_xvlan["l3vniImportRtListSameAsL3vniExportRtList"] = False ixn_xvlan["numRtInL3vniImportRouteTargetList"] = len( l3import_info_list ) for l3import_info in l3import_info_list: - ixn_l3import = self.create_node_elemet(ixn_xvlan, "bgpL3VNIImportRouteTargetList") + ixn_l3import = self.create_node_elemet( + ixn_xvlan, "bgpL3VNIImportRouteTargetList" + ) rt_types = l3import_info.get_values( "rt_type", enum_map=BgpEvpn._COMMON_ROUTE_TYPE ) @@ -440,12 +466,16 @@ def _config_evis(self, eth_segment_info, ixn_bgp, ixn_eth_segments): ) if not broadcast_domains_info.is_all_null: if self._peer_class == "BgpV4Peer": - ixn_xvlan["numBroadcastDomainV4"] = broadcast_domains_info.max_len + ixn_xvlan[ + "numBroadcastDomainV4" + ] = broadcast_domains_info.max_len ixn_broadcast_domains = self.create_property( ixn_xvlan, "broadcastDomainV4" ) else: - ixn_xvlan["numBroadcastDomainV6"] = broadcast_domains_info.max_len + ixn_xvlan[ + "numBroadcastDomainV6" + ] = broadcast_domains_info.max_len ixn_broadcast_domains = self.create_property( ixn_xvlan, "broadcastDomainV6" ) @@ -478,24 +508,32 @@ def _get_symetic_address(self, cmac_ip_range_info, address_type): for idx in range(len(active_list)): if active_list[idx] is False: symmetric_nodes[idx] = dummy_value - active_list[idx] = cmac_ip_range_info.active_list[idx] \ - and active_list[idx] + active_list[idx] = cmac_ip_range_info.active_list[idx] and \ + active_list[idx] return NodesInfo( cmac_ip_range_info.max_len, active_list, symmetric_nodes ) - def _config_cmac_ip_range(self, broadcast_domains_info, ixn_broadcast_domains, ixn_xvlan): + def _config_cmac_ip_range( + self, broadcast_domains_info, ixn_broadcast_domains, ixn_xvlan + ): cmac_ip_range_info = broadcast_domains_info.get_symmetric_nodes( "cmac_ip_range" ) if cmac_ip_range_info.is_all_null: return - mac_info = self._get_symetic_address(cmac_ip_range_info, "mac_addresses") - ipv4_info = self._get_symetic_address(cmac_ip_range_info, "ipv4_addresses") - ipv6_info = self._get_symetic_address(cmac_ip_range_info, "ipv6_addresses") + mac_info = self._get_symetic_address( + cmac_ip_range_info, "mac_addresses" + ) + ipv4_info = self._get_symetic_address( + cmac_ip_range_info, "ipv4_addresses" + ) + ipv6_info = self._get_symetic_address( + cmac_ip_range_info, "ipv6_addresses" + ) ixn_broadcast_domains["noOfMacPools"] = cmac_ip_range_info.max_len if mac_info.is_all_null: raise Exception("mac_addresses should configured in cmac_ip_range") @@ -549,5 +587,3 @@ def _config_cmac_ip_range(self, broadcast_domains_info, ixn_broadcast_domains, i ipv6_info.active_list ) ipv6_info.config_values(ixn_ipv6, BgpEvpn._IP_ADDRESS) - - diff --git a/snappi_ixnetwork/device/compactor.py b/snappi_ixnetwork/device/compactor.py index c23cd0ba2..df661ed11 100644 --- a/snappi_ixnetwork/device/compactor.py +++ b/snappi_ixnetwork/device/compactor.py @@ -6,9 +6,7 @@ class Compactor(object): def __init__(self, ixnetworkapi): self._api = ixnetworkapi self._unsupported_nodes = [] - self._ignore_keys = [ - "xpath", "name" - ] + self._ignore_keys = ["xpath", "name"] self.logger = get_ixnet_logger(__name__) def compact(self, roots): @@ -29,13 +27,11 @@ def compact(self, roots): for similar_objs in similar_objs_list: if len(similar_objs.objects) > 0: similar_objs.compact(roots) - self.set_scalable( - similar_objs.primary_obj - ) + self.set_scalable(similar_objs.primary_obj) def _comparator(self, src, dst): if type(src) != type(dst): - raise Exception("comparision issue") + raise Exception("comparison issue") src_node_keys = [ k for k, v in src.items() if not isinstance(v, MultiValue) ] @@ -56,7 +52,7 @@ def _comparator(self, src, dst): if isinstance(src_value, dict): if self._comparator(src_value, dst_value) is False: return False - # todo: we need to restructure if same element in different position + # todo: Need to restructure if same element in different position elif isinstance(src_value, list): if len(src_value) != len(dst_value): return False @@ -117,9 +113,7 @@ def append(self, object): def compact(self, roots): multiplier = len(self._objects) + 1 for object in self._objects: - self._value_compactor( - self._primary_obj, object - ) + self._value_compactor(self._primary_obj, object) roots.remove(object) self._primary_obj["multiplier"] = multiplier @@ -130,10 +124,16 @@ def _value_compactor(self, src, dst): src_value = src.get(key) dst_value = dst.get(key) if key == "name": - src_value = src_value if isinstance(src_value, MultiValue) \ + src_value = ( + src_value + if isinstance(src_value, MultiValue) else self.multivalue(src_value) - dst_value = dst_value if isinstance(dst_value, MultiValue) \ + ) + dst_value = ( + dst_value + if isinstance(dst_value, MultiValue) else self.multivalue(dst_value) + ) # todo: fill with product default value for # if dst_value is None: # dst_value = obj.get(key, with_default=True) @@ -141,9 +141,7 @@ def _value_compactor(self, src, dst): for index, dst_dict in enumerate(dst_value): if not isinstance(dst_dict, dict): continue - self._value_compactor( - src_value[index], dst_dict - ) + self._value_compactor(src_value[index], dst_dict) elif isinstance(dst_value, dict): self._value_compactor(src_value, dst_value) elif isinstance(src_value, MultiValue): diff --git a/snappi_ixnetwork/device/createixnconfig.py b/snappi_ixnetwork/device/createixnconfig.py index c54742e0e..8029f169a 100644 --- a/snappi_ixnetwork/device/createixnconfig.py +++ b/snappi_ixnetwork/device/createixnconfig.py @@ -15,9 +15,7 @@ def create(self, node, node_name, parent_xpath=""): if not isinstance(element, dict): raise TypeError("Expecting dict") xpath = """{parent_xpath}/{node_name}[{index}]""".format( - parent_xpath=parent_xpath, - node_name=node_name, - index=idx + parent_xpath=parent_xpath, node_name=node_name, index=idx ) element["xpath"] = xpath self._process_element(element, xpath) @@ -29,8 +27,7 @@ def post_calculate(self): def _process_element(self, element, parent_xpath, child_name=None): if child_name is not None and "xpath" in element: child_xpath = """{parent_xpath}/{child_name}""".format( - parent_xpath=parent_xpath, - child_name=child_name + parent_xpath=parent_xpath, child_name=child_name ) element["xpath"] = child_xpath key_to_remove = [] @@ -44,13 +41,14 @@ def _process_element(self, element, parent_xpath, child_name=None): else: element[key] = value elif isinstance(value, PostCalculated): - self._post_calculated_info.append( - [element, key, value] - ) + self._post_calculated_info.append([element, key, value]) elif isinstance(value, dict): self._process_element(value, parent_xpath, key) - elif isinstance(value, list) and len(value) > 0 and \ - isinstance(value[0], dict): + elif ( + isinstance(value, list) + and len(value) > 0 + and isinstance(value[0], dict) + ): if child_name is not None: self.create(value, key, element["xpath"]) else: @@ -63,9 +61,9 @@ def _get_ixn_multivalue(self, value, att_name, xpath): value = value.value ixn_value = { "xpath": "/multivalue[@source = '{xpath} {att_name}']".format( - xpath=xpath, - att_name=att_name - )} + xpath=xpath, att_name=att_name + ) + } if not isinstance(value, list): value = [value] if len(set(value)) == 1: @@ -74,19 +72,17 @@ def _get_ixn_multivalue(self, value, att_name, xpath): else: ixn_value["singleValue"] = { "xpath": "/multivalue[@source = '{xpath} {att_name}']/singleValue".format( - xpath=xpath, - att_name=att_name + xpath=xpath, att_name=att_name ), - "value": value[0] + "value": value[0], } return ixn_value else: ixn_value["valueList"] = { "xpath": "/multivalue[@source = '{xpath} {att_name}']/valueList".format( - xpath=xpath, - att_name=att_name + xpath=xpath, att_name=att_name ), - "values": value + "values": value, } return ixn_value diff --git a/snappi_ixnetwork/device/interface.py b/snappi_ixnetwork/device/interface.py index 3cd689b32..8aa341e79 100644 --- a/snappi_ixnetwork/device/interface.py +++ b/snappi_ixnetwork/device/interface.py @@ -1,6 +1,7 @@ from snappi_ixnetwork.device.base import Base from snappi_ixnetwork.logger import get_ixnet_logger + class Ethernet(Base): _ETHERNET = { "mac": "mac", @@ -16,17 +17,14 @@ class Ethernet(Base): "x9100": "ethertype9100", "x9200": "ethertype9200", "x9300": "ethertype9300", - } + }, }, "priority": "priority", - "id": "vlanId" + "id": "vlanId", } _IP = { - "address": "address", - "gateway": "gatewayIp", - "prefix": "prefix" - } + "address": "address", "gateway": "gatewayIp", "prefix": "prefix"} _GATEWAY_MAC = { "gateway_mac": "manualGatewayMac", @@ -55,7 +53,8 @@ def _configure_vlan(self, ixn_eth, vlans): self.logger.debug("Configuring VLAN") for vlan in vlans: ixn_vlan = self.create_node_elemet( - ixn_eth, "vlan", vlan.get("name")) + ixn_eth, "vlan", vlan.get("name") + ) self.configure_multivalues(vlan, ixn_vlan, Ethernet._VLAN) def _configure_ipv4(self, ixn_eth, ethernet): @@ -78,8 +77,9 @@ def _configure_ipv4(self, ixn_eth, ethernet): self._ngpf.set_device_info(ipv4_address, ixn_ip) self.configure_multivalues(ipv4_address, ixn_ip, Ethernet._IP) if ipv4_address.gateway_mac.choice == "value": - self.configure_multivalues_with_choice(ipv4_address, ixn_ip, - Ethernet._GATEWAY_MAC) + self.configure_multivalues_with_choice( + ipv4_address, ixn_ip, Ethernet._GATEWAY_MAC + ) def _configure_ipv6(self, ixn_eth, ethernet): self.logger.debug("Configuring IPv6 interface") @@ -101,6 +101,6 @@ def _configure_ipv6(self, ixn_eth, ethernet): self._ngpf.set_device_info(ipv6_address, ixn_ip) self.configure_multivalues(ipv6_address, ixn_ip, Ethernet._IP) if ipv6_address.gateway_mac.choice == "value": - self.configure_multivalues_with_choice(ipv6_address, ixn_ip, - Ethernet._GATEWAY_MAC) - + self.configure_multivalues_with_choice( + ipv6_address, ixn_ip, Ethernet._GATEWAY_MAC + ) diff --git a/snappi_ixnetwork/device/loopbackint.py b/snappi_ixnetwork/device/loopbackint.py index f939d5b91..5edd1b5bc 100644 --- a/snappi_ixnetwork/device/loopbackint.py +++ b/snappi_ixnetwork/device/loopbackint.py @@ -30,41 +30,44 @@ def _get_vxlan_source_ints(self, device): vxlan = device.get("vxlan") if vxlan is not None: v4_tunnels = vxlan.get("v4_tunnels") - if v4_tunnels is not None and len( - v4_tunnels) > 0: + if v4_tunnels is not None and len(v4_tunnels) > 0: for v4_tunnel in v4_tunnels: vxlan_source_int_list.append( v4_tunnel.get("source_interface") ) v6_tunnels = vxlan.get("v6_tunnels") - if v6_tunnels is not None and len( - v6_tunnels) > 0: + if v6_tunnels is not None and len(v6_tunnels) > 0: for v6_tunnel in v6_tunnels: vxlan_source_int_list.append( v6_tunnel.get("source_interface") ) - return vxlan_source_int_list + return vxlan_source_int_list def _create_dg(self, loop_back, device): self.logger.debug("Configuring DG for loopback interface") eth_name = loop_back.get("eth_name") if eth_name not in self._ngpf.api.ixn_objects.names: - raise Exception("Ethernet %s not present within configuration" - % eth_name) + raise Exception( + "Ethernet %s not present within configuration" % eth_name + ) ixn_parent_dg = self._ngpf.api.ixn_objects.get_working_dg( eth_name ) self._ixn_parent_dgs.append(ixn_parent_dg) ixn_dg = self.create_node_elemet( - ixn_parent_dg, "deviceGroup", "loopback_{}".format(device.get("name")) + ixn_parent_dg, + "deviceGroup", + "loopback_{}".format(device.get("name")) ) ixn_dg["multiplier"] = 1 self._ngpf.working_dg = ixn_dg self._ngpf.set_device_info(device, ixn_dg) return ixn_dg - def _config_ipv4_loopbacks(self, ipv4_loopbacks, device, vxlan_source_int_list): + def _config_ipv4_loopbacks( + self, ipv4_loopbacks, device, vxlan_source_int_list + ): self.logger.debug("Configuring IPv4 loopback interface") for ipv4_loopback in ipv4_loopbacks: ixn_dg = self._create_dg(ipv4_loopback, device) @@ -73,19 +76,23 @@ def _config_ipv4_loopbacks(self, ipv4_loopbacks, device, vxlan_source_int_list): ixn_eth = self.create_node_elemet( ixn_dg, "ethernet", "eth {}".format(name) ) - ixn_v4 = self.create_node_elemet( - ixn_eth, "ipv4", name - ) + ixn_v4 = self.create_node_elemet(ixn_eth, "ipv4", name) self._ngpf.set_device_info(ipv4_loopback, ixn_v4) - ixn_v4["address"] = self.as_multivalue(ipv4_loopback, "address") + ixn_v4["address"] = self.as_multivalue( + ipv4_loopback, "address" + ) else: ixn_v4lb = self.create_node_elemet( ixn_dg, "ipv4Loopback", name ) self._ngpf.set_device_info(ipv4_loopback, ixn_v4lb) - ixn_v4lb["address"] = self.as_multivalue(ipv4_loopback, "address") + ixn_v4lb["address"] = self.as_multivalue( + ipv4_loopback, "address" + ) - def _config_ipv6_loopbacks(self, ipv6_loopbacks, device, vxlan_source_int_list): + def _config_ipv6_loopbacks( + self, ipv6_loopbacks, device, vxlan_source_int_list + ): self.logger.debug("Configuring IPv6 loopback interface") for ipv6_loopback in ipv6_loopbacks: ixn_dg = self._create_dg(ipv6_loopback, device) @@ -94,16 +101,16 @@ def _config_ipv6_loopbacks(self, ipv6_loopbacks, device, vxlan_source_int_list): ixn_eth = self.create_node_elemet( ixn_dg, "ethernet", "eth {}".format(name) ) - ixn_v6 = self.create_node_elemet( - ixn_eth, "ipv6", name - ) + ixn_v6 = self.create_node_elemet(ixn_eth, "ipv6", name) self._ngpf.set_device_info(ipv6_loopback, ixn_v6) - ixn_v6["address"] = self.as_multivalue(ipv6_loopback, "address") + ixn_v6["address"] = self.as_multivalue( + ipv6_loopback, "address" + ) else: ixn_v4lb = self.create_node_elemet( ixn_dg, "ipv6Loopback", ipv6_loopback.get("name") ) self._ngpf.set_device_info(ipv6_loopback, ixn_v4lb) - ixn_v4lb["address"] = self.as_multivalue(ipv6_loopback, "address") - - + ixn_v4lb["address"] = self.as_multivalue( + ipv6_loopback, "address" + ) diff --git a/snappi_ixnetwork/device/ngpf.py b/snappi_ixnetwork/device/ngpf.py index 7894b6113..862570927 100644 --- a/snappi_ixnetwork/device/ngpf.py +++ b/snappi_ixnetwork/device/ngpf.py @@ -28,10 +28,7 @@ class Ngpf(Base): "BgpCMacIpRange": "ethernetVlan", } - _ROUTE_STATE = { - "advertise": True, - "withdraw": False - } + _ROUTE_STATE = {"advertise": True, "withdraw": False} def __init__(self, ixnetworkapi): super(Ngpf, self).__init__() @@ -72,15 +69,13 @@ def config(self): def set_device_info(self, snappi_obj, ixn_obj): name = snappi_obj.get("name") class_name = snappi_obj.__class__.__name__ - self.logger.debug("set_device_info name %s and class_name %s" % ( - name, class_name - )) + self.logger.debug( + "set_device_info name %s and class_name %s" % (name, class_name) + ) try: encap = Ngpf._DEVICE_ENCAP_MAP[class_name] except KeyError: - raise NameError( - "Mapping is missing for {0}".format(class_name) - ) + raise NameError("Mapping is missing for {0}".format(class_name)) self.api.set_device_encap(name, encap) self.api.set_device_encap( self.get_name(self.working_dg), encap @@ -114,42 +109,27 @@ def _configure_topology(self): for device in self.api.snappi_config.devices: self._bgp.config(device) - # Compaction will take place in this order # Step-1: Compact chain DGs for chain_parent_dg in self._chain_parent_dgs: - self.compactor.compact(chain_parent_dg.get( - "deviceGroup" - )) - self._set_dev_compacted(chain_parent_dg.get( - "deviceGroup" - )) + self.compactor.compact(chain_parent_dg.get("deviceGroup")) + self._set_dev_compacted(chain_parent_dg.get("deviceGroup")) # Step-2: Compact VXLAN source_interfaces = self._vxlan.source_interfaces for v4_int in source_interfaces.ipv4: - self.compactor.compact(v4_int.get( - "vxlan" - )) + self.compactor.compact(v4_int.get("vxlan")) for v6_int in source_interfaces.ipv6: - self.compactor.compact(v6_int.get( - "vxlanv6" - )) + self.compactor.compact(v6_int.get("vxlanv6")) # Step-3: First compact all loopback interfaces for ix_parent_dg in self.loopback_parent_dgs: - self.compactor.compact(ix_parent_dg.get( - "deviceGroup" - )) + self.compactor.compact(ix_parent_dg.get("deviceGroup")) # Step-4: Compact root Topology for ixn_topo in self._ixn_topo_objects.values(): - self.compactor.compact(ixn_topo.get( - "deviceGroup" - )) - self._set_dev_compacted(ixn_topo.get( - "deviceGroup" - )) + self.compactor.compact(ixn_topo.get("deviceGroup")) + self._set_dev_compacted(ixn_topo.get("deviceGroup")) def _configure_device_group(self, ixn_topos): """map ethernet with a ixn deviceGroup with multiplier = 1""" @@ -163,8 +143,11 @@ def _configure_device_group(self, ixn_topos): for ethernet in ethernets: if ethernet.get("connection") and ethernet.get("port_name"): raise Exception( - "port_name and connection for ethernet configuration cannot be passed together, use either connection or port_name property. \ - port_name is deprecated and will be removed in future releases.") + "port_name and connection for ethernet configuration \ + cannot be passed together, use either connection \ + or port_name property. port_name is deprecated and \ + will be removed in future releases." + ) if ethernet.get("connection"): connection_choice = ethernet.get("connection").choice if connection_choice == "port_name": @@ -181,13 +164,19 @@ def _configure_device_group(self, ixn_topos): else: port_name = ethernet.get("port_name") if port_name is None: - raise Exception("port_name is not passed for the device {}".format(device.get("name"))) + raise Exception( + "port_name is not passed for the device {}".format( + device.get("name") + ) + ) if port_name in self._ixn_topo_objects: ixn_topo = self._ixn_topo_objects[port_name] else: ixn_topo = self.add_element(ixn_topos) ixn_topo["name"] = self._get_topology_name(port_name) - ixn_topo["ports"] = [self.api.ixn_objects.get_xpath(port_name)] + ixn_topo["ports"] = [ + self.api.ixn_objects.get_xpath(port_name) + ] self._ixn_topo_objects[port_name] = ixn_topo ixn_dg = self.create_node_elemet( ixn_topo, "deviceGroup", device.get("name") @@ -214,7 +203,9 @@ def _configure_device_group(self, ixn_topos): if chin_dgs is None: continue for connected_to, ethernet_list in chin_dgs.items(): - ixn_working_dg = self.api.ixn_objects.get_working_dg(connected_to) + ixn_working_dg = self.api.ixn_objects.get_working_dg( + connected_to + ) self._chain_parent_dgs.append(ixn_working_dg) for ethernet in ethernet_list: ixn_dg = self.create_node_elemet( @@ -225,16 +216,13 @@ def _configure_device_group(self, ixn_topos): self.set_device_info(device, ixn_dg) self._ethernet.config(ethernet, ixn_dg) - def _pushixnconfig(self): self.logger.debug("pushing ixnet config") erros = self.api.get_errors() if len(erros) > 0: return ixn_cnf = json.dumps(self._ixn_config, indent=2) - errata = self._resource_manager.ImportConfig( - ixn_cnf, False - ) + errata = self._resource_manager.ImportConfig(ixn_cnf, False) for item in errata: self.api.warning(item) @@ -244,7 +232,7 @@ def stop_topology(self): self.logger.debug("Stopping topology") self.api._ixnetwork.StopAllProtocols("sync") - def set_protocol_state(self,request): + def set_protocol_state(self, request): if request.state is None: raise Exception("state is None within set_protocol_state") self.logger.debug("Setting protocol with %s" % request.state) @@ -273,27 +261,31 @@ def set_route_state(self, payload): ixn_obj = obj break if ixn_obj is None: - ixn_obj_idx_list[route_info] = list(range( - route_info.index, route_info.index + route_info.multiplier - )) + ixn_obj_idx_list[route_info] = list( + range( + route_info.index, + route_info.index + route_info.multiplier + ) + ) else: - ixn_obj_idx_list[route_info].extend(list(range( - route_info.index, route_info.index + route_info.multiplier - ))) + ixn_obj_idx_list[route_info].extend( + list( + range( + route_info.index, + route_info.index + route_info.multiplier + ) + ) + ) imports = [] for obj, index_list in ixn_obj_idx_list.items(): xpath = obj.xpath active = "active" index_list = list(set(index_list)) - object_info = self.select_properties( - xpath, properties=[active] - ) + object_info = self.select_properties(xpath, properties=[active]) values = object_info[active]["values"] for idx in index_list: values[idx] = Ngpf._ROUTE_STATE[payload.state] - imports.append(self.configure_value( - xpath, active, values - )) + imports.append(self.configure_value(xpath, active, values)) self.imports(imports) self.api._ixnetwork.Globals.Topology.ApplyOnTheFly() return names @@ -312,43 +304,65 @@ def set_device_state(self, payload): def _lacp_start_stop_pdu(self, state, index=1, lag_name=None): if lag_name is None: - lag_port_lacp = (self.api._ixnetwork.Lag.find() - .ProtocolStack.find().Ethernet.find() - .Lagportlacp.find()) - if state == 'up': + lag_port_lacp = ( + self.api._ixnetwork.Lag.find() + .ProtocolStack.find() + .Ethernet.find() + .Lagportlacp.find() + ) + if state == "up": lag_port_lacp.LacpStartPDU() - elif state == 'down': + elif state == "down": lag_port_lacp.LacpStopPDU() else: - lag_port_lacp = (self.api._ixnetwork.Lag.find(Name=lag_name) - .ProtocolStack.find().Ethernet.find() - .Lagportlacp.find()) - if state == 'up': + lag_port_lacp = ( + self.api._ixnetwork.Lag.find(Name=lag_name) + .ProtocolStack.find() + .Ethernet.find() + .Lagportlacp.find()) + if state == "up": lag_port_lacp.LacpStartPDU(SessionIndices=index) - elif state == 'down': + elif state == "down": lag_port_lacp.LacpStopPDU(SessionIndices=index) def get_states(self, request): self.logger.debug("get_states for %s" % request.choice) if request.choice == "ipv4_neighbors": - ip_objs = self.api._ixnetwork.Topology.find().DeviceGroup.find().Ethernet.find().Ipv4.find() + ip_objs = ( + self.api._ixnetwork.Topology.find() + .DeviceGroup.find() + .Ethernet.find() + .Ipv4.find() + ) resolved_mac_list = self._get_ether_resolved_mac( - ip_objs, self.ether_v4gateway_map, request.ipv4_neighbors, "ipv4" + ip_objs, + self.ether_v4gateway_map, + request.ipv4_neighbors, + "ipv4" ) elif request.choice == "ipv6_neighbors": - ip_objs = self.api._ixnetwork.Topology.find().DeviceGroup.find().Ethernet.find().Ipv6.find() + ip_objs = ( + self.api._ixnetwork.Topology.find() + .DeviceGroup.find() + .Ethernet.find() + .Ipv6.find() + ) resolved_mac_list = self._get_ether_resolved_mac( - ip_objs, self.ether_v6gateway_map, request.ipv6_neighbors, "ipv6" + ip_objs, + self.ether_v6gateway_map, + request.ipv6_neighbors, + "ipv6" ) else: - raise TypeError("get_states only accept ipv4_neighbors or ipv6_neighbors") + raise TypeError( + "get_states only accept ipv4_neighbors or ipv6_neighbors" + ) - return { - "choice": request.choice, - request.choice: resolved_mac_list - } + return {"choice": request.choice, request.choice: resolved_mac_list} - def _get_ether_resolved_mac(self, ip_objs, ether_gateway_map, ip_neighbors, choice): + def _get_ether_resolved_mac( + self, ip_objs, ether_gateway_map, ip_neighbors, choice + ): arp_entries = {} for ip_obj in ip_objs: resolved_mac_list = ip_obj.ResolvedGatewayMac @@ -366,27 +380,33 @@ def _get_ether_resolved_mac(self, ip_objs, ether_gateway_map, ip_neighbors, choi gateway_ips = ether_gateway_map[ethernet_name] for gateway_ip in gateway_ips: if gateway_ip not in arp_entries: - raise Exception("{} not found within current configured gateway ips".format( - gateway_ip - )) + raise Exception( + "{} not found within current configured gateway ips" + .format(gateway_ip) + ) if choice == "ipv4": - resolved_mac_list.append({ - "ethernet_name": ethernet_name, - "ipv4_address": gateway_ip, - "link_layer_address": arp_entries[gateway_ip] - }) + resolved_mac_list.append( + { + "ethernet_name": ethernet_name, + "ipv4_address": gateway_ip, + "link_layer_address": arp_entries[gateway_ip] + } + ) elif choice == "ipv6": - resolved_mac_list.append({ - "ethernet_name": ethernet_name, - "ipv6_address": gateway_ip, - "link_layer_address": arp_entries[gateway_ip] - }) - self.logger.debug("These are resolved_mac_list: %s" % resolved_mac_list) + resolved_mac_list.append( + { + "ethernet_name": ethernet_name, + "ipv6_address": gateway_ip, + "link_layer_address": arp_entries[gateway_ip] + } + ) + self.logger.debug( + "These are resolved_mac_list: %s" % resolved_mac_list + ) return resolved_mac_list def _get_href(self, xpath): - return xpath.replace('[', '/').\ - replace(']', '') + return xpath.replace('[', '/').replace(']', '') def select_properties(self, xpath, properties=[]): href = self._get_href(xpath) @@ -399,7 +419,7 @@ def select_properties(self, xpath, properties=[]): "inlines": [ { "child": "multivalue", - "properties": ["format", "pattern", "values"] + "properties": ["format", "pattern", "values"], } ] } @@ -445,5 +465,3 @@ def configure_value(self, source, attribute, value, enum_map=None): "value": value, } return ixn_value - - diff --git a/snappi_ixnetwork/device/utils.py b/snappi_ixnetwork/device/utils.py index 90aaa2be2..b7751a634 100644 --- a/snappi_ixnetwork/device/utils.py +++ b/snappi_ixnetwork/device/utils.py @@ -1,5 +1,7 @@ import re -from collections import namedtuple, Mapping +from collections import namedtuple +from collections.abc import Mapping + def namedtuple_with_defaults(typename, field_names, default_values=()): T = namedtuple(typename, field_names) @@ -14,8 +16,8 @@ def namedtuple_with_defaults(typename, field_names, default_values=()): def asdot2plain(asdot): """This returns an ASPLAIN formated ASN given an ASDOT+ format""" - if re.findall(r'\.|\:', asdot): - left, right = re.split(r'\.|\:', asdot) + if re.findall(r"\.|\:", asdot): + left, right = re.split(r"\.|\:", asdot) ret = int(left) * 65536 + int(right) return ret else: @@ -23,9 +25,11 @@ def asdot2plain(asdot): def convert_as_values(as_types, as_values): - ConvertedAsValues = namedtuple_with_defaults("ConvertedAsValues", - ("as_num", "as4_num", "ip_addr", "assign_num", "common_num"), - ([], [], [], [], [])) + ConvertedAsValues = namedtuple_with_defaults( + "ConvertedAsValues", + ("as_num", "as4_num", "ip_addr", "assign_num", "common_num"), + ([], [], [], [], []), + ) convert_values = ConvertedAsValues() for idx, as_type in enumerate(as_types): @@ -45,7 +49,8 @@ def convert_as_values(as_types, as_values): convert_values.ip_addr[idx] = num return convert_values + def hex_to_ipv4(hex_value): bytes = ["".join(x) for x in zip(*[iter(hex_value)] * 2)] bytes = [int(x, 16) for x in bytes] - return ".".join(str(x) for x in reversed(bytes)) \ No newline at end of file + return ".".join(str(x) for x in reversed(bytes)) diff --git a/snappi_ixnetwork/device/vxlan.py b/snappi_ixnetwork/device/vxlan.py index c1ad90311..10e26703d 100644 --- a/snappi_ixnetwork/device/vxlan.py +++ b/snappi_ixnetwork/device/vxlan.py @@ -2,10 +2,11 @@ from snappi_ixnetwork.logger import get_ixnet_logger from snappi_ixnetwork.device.utils import namedtuple_with_defaults + class VXLAN(Base): - SourceInterface = namedtuple_with_defaults("SourceInterface", - ("ipv4", "ipv6"), - ([], [])) + SourceInterface = namedtuple_with_defaults( + "SourceInterface", ("ipv4", "ipv6"), ([], []) + ) def __init__(self, ngpf): super(VXLAN, self).__init__() @@ -19,13 +20,11 @@ def source_interfaces(self): def config(self, vxlan): v4_tunnels = vxlan.get("v4_tunnels") - if v4_tunnels is not None and len( - v4_tunnels) > 0: + if v4_tunnels is not None and len(v4_tunnels) > 0: self._config_v4_tunnels(v4_tunnels) v6_tunnels = vxlan.get("v6_tunnels") - if v6_tunnels is not None and len( - v6_tunnels) > 0: + if v6_tunnels is not None and len(v6_tunnels) > 0: self._config_v6_tunnels(v6_tunnels) def _store_source_interface(self, ixn_inter, ip_type): @@ -44,9 +43,11 @@ def _config_v4_tunnels(self, v4_tunnels): self._ngpf.working_dg = ixnet_info.working_dg ip_type = self._ngpf.api.get_device_encap(source_interface) if ip_type != "ipv4": - raise TypeError("source_interface {} should support IPv4".format( - source_interface - )) + raise TypeError( + "source_interface {} should support IPv4".format( + source_interface + ) + ) self._store_source_interface(ixn_inter, ip_type) ixn_vxlan = self.create_node_elemet( ixn_inter, "vxlan", v4_tunnel.get("name") @@ -57,9 +58,7 @@ def _config_v4_tunnels(self, v4_tunnels): destination_ip_mode = v4_tunnel.destination_ip_mode if destination_ip_mode.choice == "unicast": ixn_vxlan["enableStaticInfo"] = True - self._config_v4_unicast( - destination_ip_mode.unicast, ixn_vxlan - ) + self._config_v4_unicast(destination_ip_mode.unicast, ixn_vxlan) else: ixn_vxlan["enableStaticInfo"] = False ixn_vxlan["ipv4_multicast"] = self.as_multivalue( @@ -74,9 +73,11 @@ def _config_v6_tunnels(self, v6_tunnels): self._ngpf.working_dg = ixnet_info.working_dg ip_type = self._ngpf.api.get_device_encap(source_interface) if ip_type != "ipv6": - raise TypeError("source_interface {} should support IPv6".format( - source_interface - )) + raise TypeError( + "source_interface {} should support IPv6".format( + source_interface + ) + ) self._store_source_interface(ixn_inter, ip_type) ixn_vxlan6 = self.create_node_elemet( ixn_inter, "vxlanv6", v6_tunnel.get("name") @@ -98,12 +99,16 @@ def _config_v6_tunnels(self, v6_tunnels): def _get_all_info(self, unicast): ixn_info_count = 0 - AllInfo = namedtuple_with_defaults("AllInfo", - ("remote_vtep_address", - "suppress_arp", - "remote_vm_mac", - "remote_vm_ipv4"), - ([], [], [], [])) + AllInfo = namedtuple_with_defaults( + "AllInfo", + ( + "remote_vtep_address", + "suppress_arp", + "remote_vm_mac", + "remote_vm_ipv4" + ), + ([], [], [], []), + ) all_info = AllInfo() vteps = unicast.vteps for vtep in vteps: @@ -129,11 +134,20 @@ def _config_v4_unicast(self, unicast, ixn_vxlan, v4_tunnel=True): ixn_vxlan["staticInfoCount"] = ixn_info_count if v4_tunnel is True: ixn_unicast = self.create_node_elemet(ixn_vxlan, "vxlanStaticInfo") - ixn_unicast["remoteVtepIpv4"] = self.multivalue(all_info.remote_vtep_address) + ixn_unicast["remoteVtepIpv4"] = self.multivalue( + all_info.remote_vtep_address + ) else: - ixn_unicast = self.create_node_elemet(ixn_vxlan, "vxlanIPv6StaticInfo") - ixn_unicast["remoteVtepUnicastIpv6"] = self.multivalue(all_info.remote_vtep_address) + ixn_unicast = self.create_node_elemet( + ixn_vxlan, "vxlanIPv6StaticInfo" + ) + ixn_unicast["remoteVtepUnicastIpv6"] = self.multivalue( + all_info.remote_vtep_address + ) ixn_unicast["suppressArp"] = self.multivalue(all_info.suppress_arp) - ixn_unicast["remoteVmStaticMac"] = self.multivalue(all_info.remote_vm_mac) - ixn_unicast["remoteVmStaticIpv4"] = self.multivalue(all_info.remote_vm_ipv4) - + ixn_unicast["remoteVmStaticMac"] = self.multivalue( + all_info.remote_vm_mac + ) + ixn_unicast["remoteVmStaticIpv4"] = self.multivalue( + all_info.remote_vm_ipv4 + ) diff --git a/snappi_ixnetwork/lag.py b/snappi_ixnetwork/lag.py index cf5123a22..44323b64a 100644 --- a/snappi_ixnetwork/lag.py +++ b/snappi_ixnetwork/lag.py @@ -1,6 +1,5 @@ import json from snappi_ixnetwork.timer import Timer -from snappi_ixnetwork.exceptions import SnappiIxnException class Lag(object): @@ -12,12 +11,15 @@ class Lag(object): """ """ - These are supported keys to defined field (_ETHERNET/ _VLAN/ _LACP/ new one...) + These are supported keys to defined field + (_ETHERNET/ _VLAN/ _LACP/ new one...) = { : { "ixn_attr" : name in IxNetwork SDM - "default" : default IxNetwork value. Consider snappi default already taken care. - "translate" : translate snappi value to ixnetwork value help of self method + "default" : default IxNetwork value. + Consider snappi default already taken care. + "translate" : translate snappi value to ixnetwork + value help of self method "enum_map" : enum map with snappi with ixnetwork } } @@ -51,7 +53,7 @@ class Lag(object): "actor_system_id": { "ixn_attr": "actorSystemId", "default": "00 00 00 00 00 01", - "translate": "_translate_actor_system_id" + "translate": "_translate_actor_system_id", }, "actor_system_priority": { "ixn_attr": "actorSystemPriority", @@ -82,8 +84,10 @@ def __init__(self, ixnetworkapi): def config(self): """Transform config.ports into Ixnetwork.Vport 1) delete any vport that is not part of the config - 2) create a vport for every config.ports[] that is not present in IxNetwork - 3) set config.ports[].location to /vport -location using resourcemanager + 2) create a vport for every config.ports[] that is + not present in IxNetwork + 3) set config.ports[].location to /vport -location + using resourcemanager 4) set /vport/l1Config/... properties using the corrected /vport -type 5) connectPorts to use new l1Config settings and clearownership """ @@ -112,7 +116,9 @@ def _import(self, imports): return True def _delete_lags(self): - """Delete any Lags from the api server that do not exist in the new config""" + """ + Delete any Lags from the api server that do not exist in the new config + """ self._api._remove(self._ixn_lag, self._lags_config) def _select_lags(self): @@ -272,9 +278,7 @@ def _lacp_ports_config(self, name, ports): ixn_lags[name]["xpath"] ) ) - imports.append( - self._set_multivalue(lacp_xpath, "active", True) - ) + imports.append(self._set_multivalue(lacp_xpath, "active", True)) for lacp_attr in Lag._LACP_PORT_PROTOCOL: attr_values = self._configure_attribute( lacp_attr, Lag._LACP_PORT_PROTOCOL, lacp_port_protocols @@ -333,7 +337,8 @@ def _protocol_config(self): ) ) lacp_port_imports = self._lacp_ports_config( - snappi_lag.name, snappi_lag.ports) + snappi_lag.name, snappi_lag.ports + ) imports += lacp_port_imports else: if len(ixn_lacp) > 0: diff --git a/snappi_ixnetwork/logger.py b/snappi_ixnetwork/logger.py index 12d5b2587..b422f6237 100644 --- a/snappi_ixnetwork/logger.py +++ b/snappi_ixnetwork/logger.py @@ -1,14 +1,16 @@ import sys import logging -APP_LOGGER_NAME = 'snappi_ixnetwork' +APP_LOGGER_NAME = "snappi_ixnetwork" def setup_ixnet_logger(log_level, file_name=None, module_name=None): logger = logging.getLogger(APP_LOGGER_NAME) logger.setLevel(log_level) - formatter = logging.Formatter(fmt="%(asctime)s [%(name)s] [%(levelname)s] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S") + formatter = logging.Formatter( + fmt="%(asctime)s [%(name)s] [%(levelname)s] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) sh = logging.StreamHandler(sys.stdout) sh.setFormatter(formatter) if len(logger.handlers) > 0: @@ -25,4 +27,4 @@ def setup_ixnet_logger(log_level, file_name=None, module_name=None): def get_ixnet_logger(module_name): module_name = ".".join(str(module_name).split(".")[1:]) - return logging.getLogger(APP_LOGGER_NAME).getChild(module_name) \ No newline at end of file + return logging.getLogger(APP_LOGGER_NAME).getChild(module_name) diff --git a/tests/utils/common.py b/tests/utils/common.py index 7429ce364..ef0648492 100644 --- a/tests/utils/common.py +++ b/tests/utils/common.py @@ -138,7 +138,6 @@ def start_traffic(api, cfg, start_capture=True): cs.protocol.all.state = cs.protocol.all.START api.set_control_state(cs) - print("Starting transmit on all flows ...") cs = api.control_state() cs.traffic.flow_transmit.state = cs.traffic.flow_transmit.START