Skip to content

Commit

Permalink
Added Protocol metrics TCs
Browse files Browse the repository at this point in the history
  • Loading branch information
indraniBh committed Sep 10, 2024
1 parent ec947af commit 3b563ed
Show file tree
Hide file tree
Showing 8 changed files with 872 additions and 2 deletions.
4 changes: 2 additions & 2 deletions snappi_ixnetwork/protocolmetrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class ProtocolMetrics(object):
# TODO session_flap_count can't be added now
# it needs to be added by creating new view.
# currently facing an issue in protocol view creation.
("session_flap_count", "Session Flap Count", int, _SKIP),
("session_flap_count", "Session Flap Count", int),
("routes_advertised", "Routes Advertised", int),
("routes_received", "Routes Rx", int),
("route_withdraws_sent", "Routes Withdrawn", int),
Expand All @@ -43,7 +43,7 @@ class ProtocolMetrics(object):
("name", "Device Group", str),
("session_state", "Status", str),
# TODO session_flap_count can't be added now
("session_flap_count", "Session Flap Count", int, _SKIP),
("session_flap_count", "Session Flap Count", int),
("routes_advertised", "Routes Advertised", int),
("routes_received", "Routes Rx", int),
("route_withdraws_sent", "Routes Withdrawn", int),
Expand Down
128 changes: 128 additions & 0 deletions tests/bgp/test_bgpv4_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import pytest


# @pytest.mark.skip(reason="Revisit CI/CD fail")
def test_bgpv4_stats(api, b2b_raw_config, utils):
"""
Test for the bgpv4 metrics
"""
api.set_config(api.config())
b2b_raw_config.flows.clear()

p1, p2 = b2b_raw_config.ports
d1, d2 = b2b_raw_config.devices.device(name="tx_bgp").device(name="rx_bgp")

eth1, eth2 = d1.ethernets.add(), d2.ethernets.add()
eth1.connection.port_name, eth2.connection.port_name = p1.name, p2.name
eth1.mac, eth2.mac = "00:00:00:00:00:11", "00:00:00:00:00:22"
ip1, ip2 = eth1.ipv4_addresses.add(), eth2.ipv4_addresses.add()
bgp1, bgp2 = d1.bgp, d2.bgp

eth1.name, eth2.name = "eth1", "eth2"
ip1.name, ip2.name = "ip1", "ip2"
bgp1.router_id, bgp2.router_id = "192.0.0.1", "192.0.0.2"
bgp1_int, bgp2_int = bgp1.ipv4_interfaces.add(), bgp2.ipv4_interfaces.add()
bgp1_int.ipv4_name, bgp2_int.ipv4_name = ip1.name, ip2.name
bgp1_peer, bgp2_peer = bgp1_int.peers.add(), bgp2_int.peers.add()
bgp1_peer.name, bgp2_peer.name = "bgp1", "bpg2"
ip1.address = "10.1.1.1"
ip1.gateway = "10.1.1.2"
ip1.prefix = 24

ip2.address = "10.1.1.2"
ip2.gateway = "10.1.1.1"
ip2.prefix = 24

bgp1_peer.peer_address = "10.1.1.2"
bgp1_peer.as_type = "ibgp"
bgp1_peer.as_number = 10

bgp2_peer.peer_address = "10.1.1.1"
bgp2_peer.as_type = "ibgp"
bgp2_peer.as_number = 10

utils.start_traffic(api, b2b_raw_config)
utils.wait_for(
lambda: results_ok(api), "stats to be as expected", timeout_seconds=20
)
enums = [
"session_state",
"routes_advertised",
"routes_received",
"route_withdraws_sent",
"route_withdraws_received",
"updates_sent",
"updates_received",
"opens_sent",
"opens_received",
"keepalives_sent",
"keepalives_received",
"notifications_sent",
"notifications_received",
]
expected_results = {
"tx_bgp": ["up", 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"rx_bgp": ["up", 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
}
req = api.metrics_request()
req.bgpv4.peer_names = []
req.bgpv4.column_names = enums[:3]
results = api.get_metrics(req)
assert len(results.bgpv4_metrics) == 2
for bgp_res in results.bgpv4_metrics:
for i, enum in enumerate(enums[:3]):
val = expected_results[bgp_res.name][i]
if "session_state" in enum:
assert getattr(bgp_res, enum) == val
else:
assert getattr(bgp_res, enum) >= val

req = api.metrics_request()
req.bgpv4.peer_names = []
req.bgpv4.column_names = []
results = api.get_metrics(req)

assert len(results.bgpv4_metrics) == 2
for bgp_res in results.bgpv4_metrics:
for i, enum in enumerate(enums):
val = expected_results[bgp_res.name][i]
if "session_state" in enum:
assert getattr(bgp_res, enum) == val
else:
assert getattr(bgp_res, enum) >= val

# req = api.metrics_request()
# req.bgpv4.peer_names = ["rx_bgp"]
# results = api.get_metrics(req)

# assert len(results.bgpv4_metrics) == 1
# assert results.bgpv4_metrics[0].name == "rx_bgp"
# for bgp_res in results.bgpv4_metrics:
# for i, enum in enumerate(enums):
# val = expected_results[bgp_res.name][i]
# if "session_state" in enum:
# assert getattr(bgp_res, enum) == val
# else:
# assert getattr(bgp_res, enum) >= val

req = api.metrics_request()
req.bgpv4.column_names = ["session_state"]
results = api.get_metrics(req)
assert len(results.bgpv4_metrics) == 2
assert results.bgpv4_metrics[0].session_state == "up"
assert results.bgpv4_metrics[1].session_state == "up"
utils.stop_traffic(api, b2b_raw_config)


def results_ok(api):
req = api.metrics_request()
req.bgpv4.column_names = ["session_state"]
results = api.get_metrics(req)
ok = []
for r in results.bgpv4_metrics:
ok.append(r.session_state == "up")
return all(ok)


if __name__ == "__main__":
pytest.main(["-vv", "-s", __file__])
128 changes: 128 additions & 0 deletions tests/traffic/test_device_bgp_ep.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import pytest


# @pytest.mark.skip(reason="will be updating the test with new snappi version")
def test_bgpv6_routes(api, b2b_raw_config, utils):
"""
Test for the bgpv6 routes
"""
size = 1500
packets = 1000
api.set_config(api.config())
b2b_raw_config.flows.clear()

p1, p2 = b2b_raw_config.ports
d1, d2 = b2b_raw_config.devices.device(name="tx_bgp").device(name="rx_bgp")
eth1, eth2 = d1.ethernets.add(), d2.ethernets.add()
eth1.connection.port_name, eth2.connection.port_name = p1.name, p2.name
eth1.mac, eth2.mac = "00:00:00:00:00:11", "00:00:00:00:00:22"
ip1, ip2 = eth1.ipv6_addresses.add(), eth2.ipv6_addresses.add()
bgp1, bgp2 = d1.bgp, d2.bgp

eth1.name, eth2.name = "eth1", "eth2"
ip1.name, ip2.name = "ip1", "ip2"

ip1.address = "2000::1"
ip1.gateway = "3000::1"
ip1.prefix = 64

ip2.address = "3000::1"
ip2.gateway = "2000::1"
ip2.prefix = 64

bgp1.router_id, bgp2.router_id = "192.0.0.1", "192.0.0.2"
bgp1_int, bgp2_int = bgp1.ipv6_interfaces.add(), bgp2.ipv6_interfaces.add()
bgp1_int.ipv6_name, bgp2_int.ipv6_name = ip1.name, ip2.name
bgp1_peer, bgp2_peer = bgp1_int.peers.add(), bgp2_int.peers.add()
bgp1_peer.name, bgp2_peer.name = "bgp1", "bpg2"

bgp1_peer.peer_address = "3000::1"
bgp1_peer.as_type = "ibgp"
bgp1_peer.as_number = 10

bgp2_peer.peer_address = "2000::1"
bgp2_peer.as_type = "ibgp"
bgp2_peer.as_number = 10

bgp1_rr1 = bgp1_peer.v6_routes.add(name="bgp1_rr1")
bgp1_rr2 = bgp1_peer.v6_routes.add(name="bgp1_rr2")
bgp2_rr1 = bgp2_peer.v6_routes.add(name="bgp2_rr1")
bgp2_rr2 = bgp2_peer.v6_routes.add(name="bgp2_rr2")

bgp1_rr1.addresses.add(address="4000::1", prefix=64)
bgp1_rr2.addresses.add(address="5000::1", prefix=64)

bgp2_rr1.addresses.add(address="4000::1", prefix=64)
bgp2_rr2.addresses.add(address="5000::1", prefix=64)

flow_bgp = b2b_raw_config.flows.flow(name="flow_bgp")[-1]

flow_bgp.rate.percentage = 1
flow_bgp.duration.fixed_packets.packets = packets
flow_bgp.size.fixed = size
flow_bgp.tx_rx.device.tx_names = [
bgp1_rr1.name,
bgp1_rr2.name,
]
flow_bgp.tx_rx.device.rx_names = [
bgp2_rr1.name,
bgp2_rr2.name,
]
flow_bgp.metrics.enable = True
flow_bgp.metrics.loss = True
utils.start_traffic(api, b2b_raw_config, start_capture=False)

req = api.metrics_request()
req.bgpv6.peer_names = []
results = api.get_metrics(req)
enums = [
"session_state",
"routes_advertised",
"route_withdraws_sent",
]
expected_results = {
"tx_bgp": ["up", 0, 0],
"rx_bgp": ["up", 0, 0],
}

assert len(results.bgpv6_metrics) == 2
for bgp_res in results.bgpv6_metrics:
for i, enum in enumerate(enums):
val = expected_results[bgp_res.name][i]
assert getattr(bgp_res, enum) == val

req = api.metrics_request()
req.bgpv6.peer_names = ["rx_bgp"]
results = api.get_metrics(req)

# assert len(results.bgpv6_metrics) == 1
# assert results.bgpv6_metrics[0].name == "rx_bgp"
# for bgp_res in results.bgpv6_metrics:
# for i, enum in enumerate(enums):
# val = expected_results[bgp_res.name][i]
# assert getattr(bgp_res, enum) == val

req = api.metrics_request()
req.bgpv6.column_names = ["session_state"]
results = api.get_metrics(req)
assert len(results.bgpv6_metrics) == 2
assert results.bgpv6_metrics[0].session_state == "up"
assert results.bgpv6_metrics[1].session_state == "up"

utils.wait_for(
lambda: results_ok(api, ["flow_bgp"], packets),
"stats to be as expected",
timeout_seconds=10,
)
utils.stop_traffic(api, b2b_raw_config)


def results_ok(api, flow_names, expected):
"""
Returns True if there is no traffic loss else False
"""
request = api.metrics_request()
request.flow.flow_names = flow_names
flow_results = api.get_metrics(request).flow_metrics
flow_rx = sum([f.frames_rx for f in flow_results])
return flow_rx == expected
Loading

0 comments on commit 3b563ed

Please sign in to comment.