Skip to content

Commit

Permalink
AddPolish code
Browse files Browse the repository at this point in the history
Signed-off-by: Ze Gan <ganze718@gmail.com>
  • Loading branch information
Pterosaur committed Jun 2, 2021
1 parent e721812 commit f9bb8f8
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 29 deletions.
21 changes: 12 additions & 9 deletions tests/macsec/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import logging
import re
from multiprocessing.pool import ThreadPool
from functools import partial

from tests.common import config_reload

Expand Down Expand Up @@ -31,7 +30,7 @@ def global_cmd(duthost, nbrhosts, cmd):
pool.join()


def recover_environment(duthost, nbrhosts):
def recover_configuration(duthost, nbrhosts):
pool = ThreadPool(1 + len(nbrhosts))
pool.apply_async(config_reload, args=(duthost, "minigraph"))
for nbr in nbrhosts.values():
Expand All @@ -41,14 +40,16 @@ def recover_environment(duthost, nbrhosts):


@pytest.fixture(scope="module")
def cleanup_configuration(duthost, nbrhosts):
def macsec_environment(duthost, nbrhosts):
recover_configuration(duthost, nbrhosts)
logger.info("Prepare MACsec environment")
yield
recover_environment(duthost, nbrhosts)
logger.info("Cleanup configuration")
recover_configuration(duthost, nbrhosts)
logger.info("Cleanup MACsec configuration")


@pytest.fixture(scope="module")
def enable_macsec_feature(duthost, nbrhosts, cleanup_configuration):
def enable_macsec_feature(duthost, nbrhosts, macsec_environment):
global_cmd(duthost, nbrhosts, "sudo config feature state macsec enabled")
logger.info("Enable MACsec feature")

Expand Down Expand Up @@ -81,17 +82,19 @@ def get_portchannel_list(host):
return portchannel_list


# TODO: Temporary solution, because MACsec cannot be enabled on a portchannel member in the current version
def delete_all_portchannel(host):
portchannel_list = get_portchannel_list(host)
for name, members in portchannel_list.items():
if len(members) > 0:
for member in members:
host.command("sudo config portchannel member del {} {}".format(name, member))
host.command("sudo config portchannel del {}".format(name))



# TODO: Re-added member port to port channel
@pytest.fixture(scope="module")
def cleanup_portchannel(duthost, nbrhosts, cleanup_configuration):
def cleanup_portchannel(duthost, nbrhosts, macsec_environment):
pool = ThreadPool(1 + len(nbrhosts))
pool.apply_async(delete_all_portchannel, args=(duthost, ))
for nbr in nbrhosts.values():
Expand Down Expand Up @@ -141,7 +144,7 @@ def policy(request):


@pytest.fixture(scope="module")
def ctrl_nbrhosts(nbrhosts):
def ctrl_links(nbrhosts):
return [
{
"host": nbrhosts["ARISTA01T1"]["host"],
Expand Down
125 changes: 105 additions & 20 deletions tests/macsec/test_macsec.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
import struct
import re

# import ptf.testutils as testutils
# import ptf.mask as mask
# import ptf.packet as packet

from tests.common.helpers.assertions import pytest_assert

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -46,26 +50,39 @@ def disable_macsec_port(host, port):
host.command(cmd)


@pytest.fixture(scope="module", autouse=True)
def setup(duthost, ctrl_nbrhosts, profile_name, default_priority, cipher_suite,
primary_cak, primary_ckn, policy, enable_macsec_feature, cleanup_portchannel):
def cleanup_macsec_configuration(duthost, ctrl_links, profile_name):
for nbr in ctrl_links:
disable_macsec_port(duthost, nbr["dut_ctrl_port"])
disable_macsec_port(nbr["host"], nbr["host_ctrl_port"])
delete_macsec_profile(nbr["host"], profile_name)
delete_macsec_profile(duthost, profile_name)


def setup_macsec_configuration(duthost, ctrl_links, profile_name, default_priority,
cipher_suite, primary_cak, primary_ckn, policy):
set_macsec_profile(duthost, profile_name, default_priority,
cipher_suite, primary_cak, primary_ckn, policy)
for nbr in ctrl_nbrhosts:
for nbr in ctrl_links:
enable_macsec_port(duthost, nbr["dut_ctrl_port"], profile_name)
set_macsec_profile(nbr["host"], profile_name, default_priority,
cipher_suite, primary_cak, primary_ckn, policy)
enable_macsec_port(nbr["host"], nbr["host_ctrl_port"], profile_name)


@pytest.fixture(scope="module", autouse=True)
def setup(duthost, ctrl_links, profile_name, default_priority, cipher_suite,
primary_cak, primary_ckn, policy, enable_macsec_feature, cleanup_portchannel, request):
cleanup_macsec_configuration(duthost, ctrl_links, profile_name)
time.sleep(30)
setup_macsec_configuration(duthost, ctrl_links, profile_name,
default_priority, cipher_suite, primary_cak, primary_ckn, policy)
logger.info("Setup MACsec configuration with arguments:\n{}".format(locals()))
time.sleep(60)
time.sleep(120)
yield
for nbr in ctrl_nbrhosts:
disable_macsec_port(duthost, nbr["dut_ctrl_port"])
disable_macsec_port(nbr["host"], nbr["host_ctrl_port"])
delete_macsec_profile(nbr["host"], profile_name)
delete_macsec_profile(duthost, profile_name)
logger.info("Cleanup MACsec configuration")
time.sleep(60)
if request.session.testsfailed > 0:
return
cleanup_macsec_configuration(duthost, ctrl_links, profile_name)
time.sleep(30)


def check_wpa_supplicant_process(host, ctrl_port_name):
Expand Down Expand Up @@ -278,7 +295,6 @@ def check_mka_sc(egress_sc, ingress_sc):
assert egress_sc["sas"][active_an]["enabled"]
assert ingress_sc["sas"][active_an]["enabled"]
assert egress_sc["sas"][active_an]["key"] == ingress_sc["sas"][active_an]["key"]
assert egress_sc["sas"][active_an]["pn"] >= ingress_sc["sas"][active_an]["pn"]


def check_mka_session(dut_mka_session, dut_sci, nbr_mka_session, nbr_sci, policy, cipher_suite):
Expand All @@ -303,20 +319,20 @@ def check_mka_session(dut_mka_session, dut_sci, nbr_mka_session, nbr_sci, policy


class TestControlPlane():
def test_wpa_supplicant_processes(self, duthost, ctrl_nbrhosts):
for nbr in ctrl_nbrhosts:
def test_wpa_supplicant_processes(self, duthost, ctrl_links):
for nbr in ctrl_links:
check_wpa_supplicant_process(duthost, nbr["dut_ctrl_port"])
check_wpa_supplicant_process(nbr["host"], nbr["host_ctrl_port"])

def test_appl_db(self, duthost, ctrl_nbrhosts, policy, cipher_suite):
for nbr in ctrl_nbrhosts:
def test_appl_db(self, duthost, ctrl_links, policy, cipher_suite):
for nbr in ctrl_links:
check_appl_db(duthost, nbr["dut_ctrl_port"], nbr["host"],
nbr["host_ctrl_port"], policy, cipher_suite)

def test_mka_session(self, duthost, ctrl_nbrhosts, policy, cipher_suite):
def test_mka_session(self, duthost, ctrl_links, policy, cipher_suite):
dut_mka_session = get_mka_session(duthost)
assert len(dut_mka_session) == len(ctrl_nbrhosts)
for nbr in ctrl_nbrhosts:
assert len(dut_mka_session) == len(ctrl_links)
for nbr in ctrl_links:
nbr_mka_session = get_mka_session(nbr["host"])
dut_macsec_port = get_macsec_infname(duthost, nbr["dut_ctrl_port"])
nbr_macsec_port = get_macsec_infname(nbr["host"], nbr["host_ctrl_port"])
Expand All @@ -327,3 +343,72 @@ def test_mka_session(self, duthost, ctrl_nbrhosts, policy, cipher_suite):
check_mka_session(dut_mka_session[dut_macsec_port], dut_sci,
nbr_mka_session[nbr_macsec_port], nbr_sci,
policy, cipher_suite)


# class TestDataPlane():
# def test_pkt(self, duthost, tbinfo, ptfadapter):
# downstream_ports = defaultdict(list)
# downstream_port_ids = []
# mg_facts = duthost.get_extended_minigraph_facts(tbinfo)
# topo = tbinfo["topo"]["type"]
# logging.info("mg_facts")
# for interface, neighbor in mg_facts["minigraph_neighbors"].items():
# port_id = mg_facts["minigraph_ptf_indices"][interface]
# # if (topo == "t1" and "T0" in neighbor["name"]) or (topo == "t0" and "Server" in neighbor["name"]):
# if neighbor["name"] == "ARISTA01T1":
# downstream_ports[neighbor['namespace']].append(interface)
# downstream_port_ids.append(port_id)

# # pkt = testutils.simple_tcp_packet(
# # eth_dst="52:54:00:41:30:3f",
# # eth_src="52:54:00:b5:be:69",
# # ip_dst="10.0.0.57",
# # ip_src="10.0.0.56",
# # tcp_sport=4321,
# # tcp_dport=1234,
# # ip_ttl=64
# # )
# pkt = testutils.simple_icmp_packet(
# eth_dst="52:54:00:41:30:3f",
# eth_src="52:54:00:b5:be:69",
# ip_dst="10.0.1.56",
# ip_src="10.0.1.57",
# icmp_type=8,
# icmp_code=0,
# ip_ttl = 64,
# )
# exp_pkt = pkt.copy()
# exp_pkt = mask.Mask(exp_pkt, ignore_extra_bytes=True)
# exp_pkt.set_do_not_care_scapy(packet.Ether, "dst")
# exp_pkt.set_do_not_care_scapy(packet.Ether, "src")
# exp_pkt.set_do_not_care(14*8, (3*16 - 2)*8)
# exp_pkt.mask = [0x00] * exp_pkt.size
# exp_pkt.mask[12] = 0xff
# exp_pkt.mask[13] = 0xff
# logging.info(ptfadapter.dataplane)
# ptfadapter.dataplane.flush()
# import threading
# import time
# def run_send():
# time.sleep(1)
# testutils.send(ptfadapter, downstream_port_ids[0], pkt, 10)
# t = threading.Thread(target=run_send)
# t.start()
# testutils.send(ptfadapter, downstream_port_ids[0], pkt)
# result = testutils.verify_packet_any_port(ptfadapter)
# testutils.send(ptfadapter, downstream_port_ids[0], pkt)
# testutils.verify_packet(ptfadapter, exp_pkt, port_id=downstream_port_ids[0])
# t.join()
# i = 10
# while True:
# (rcv_device, rcv_port, rcv_pkt, pkt_time) = testutils.dp_poll(
# ptfadapter, port_number=downstream_port_ids[0], exp_pkt=exp_pkt)
# # if not rcv_device or not rcv_port or not rcv_pkt or not pkt_time or i == 0:
# # break
# if i == 0:
# break
# i -= 1
# logging.info((rcv_device, rcv_port, rcv_pkt, pkt_time))



0 comments on commit f9bb8f8

Please sign in to comment.