-
Notifications
You must be signed in to change notification settings - Fork 714
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[platform fixtures] copy a few shared files/fixtures from tests/platf…
…orm to tests/common/platform (#1492) * [platform fixtures] copy a few shared files/fixtures from tests/platform to tests/common/platform This is the first PR of a serial of refactoring and clean up actions. The intention / end goal is that for shared fixtured/functions, we want to organize them into a nice folder structure for tests to use. This move didn't deprecate/change any existing reference/usage of these fixtures/functions in their original locations. Moved files has been tested with a mock test that didn't go in with this change. Next steps would be further refactoring the existing code to move more fixtures/functions/references over here. And eventually deprecate the existing duplicate of these files. Signed-off-by: Ying Xie <ying.xie@microsoft.com> * Rename files
- Loading branch information
Showing
5 changed files
with
216 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
import pytest | ||
import os | ||
|
||
@pytest.fixture(scope="module") | ||
def conn_graph_facts(testbed_devices): | ||
dut = testbed_devices["dut"] | ||
localhost = testbed_devices["localhost"] | ||
|
||
base_path = os.path.dirname(os.path.realpath(__file__)) | ||
conn_graph_file = os.path.join(base_path, "../../../ansible/files/lab_connection_graph.xml") | ||
conn_graph_facts = localhost.conn_graph_facts(host=dut.hostname, filename=conn_graph_file)['ansible_facts'] | ||
return conn_graph_facts |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
""" | ||
Helper script for checking status of interfaces | ||
This script contains re-usable functions for checking status of interfaces on SONiC. | ||
""" | ||
import logging | ||
from transceiver_utils import all_transceivers_detected | ||
|
||
|
||
def parse_intf_status(lines): | ||
""" | ||
@summary: Parse the output of command "intfutil description". | ||
@param lines: The output lines of command "intfutil description". | ||
@return: Return a dictionary like: | ||
{ | ||
"Ethernet0": { | ||
"oper": "up", | ||
"admin": "up", | ||
"alias": "etp1", | ||
"desc": "ARISTA01T2:Ethernet1" | ||
}, | ||
... | ||
} | ||
""" | ||
result = {} | ||
for line in lines: | ||
fields = line.split() | ||
if len(fields) >= 5: | ||
intf = fields[0] | ||
oper, admin, alias, desc = fields[1], fields[2], fields[3], ' '.join(fields[4:]) | ||
result[intf] = {"oper": oper, "admin": admin, "alias": alias, "desc": desc} | ||
return result | ||
|
||
|
||
def check_interface_status(dut, interfaces): | ||
""" | ||
@summary: Check the admin and oper status of the specified interfaces on DUT. | ||
@param dut: The AnsibleHost object of DUT. For interacting with DUT. | ||
@param interfaces: List of interfaces that need to be checked. | ||
""" | ||
logging.info("Check interface status using cmd 'intfutil'") | ||
mg_ports = dut.minigraph_facts(host=dut.hostname)["ansible_facts"]["minigraph_ports"] | ||
output = dut.command("intfutil description") | ||
intf_status = parse_intf_status(output["stdout_lines"][2:]) | ||
check_intf_presence_command = 'show interface transceiver presence {}' | ||
for intf in interfaces: | ||
expected_oper = "up" if intf in mg_ports else "down" | ||
expected_admin = "up" if intf in mg_ports else "down" | ||
if intf not in intf_status: | ||
logging.info("Missing status for interface %s" % intf) | ||
return False | ||
if intf_status[intf]["oper"] != expected_oper: | ||
logging.info("Oper status of interface %s is %s, expected '%s'" % (intf, intf_status[intf]["oper"], | ||
expected_oper)) | ||
return False | ||
if intf_status[intf]["admin"] != expected_admin: | ||
logging.info("Admin status of interface %s is %s, expected '%s'" % (intf, intf_status[intf]["admin"], | ||
expected_admin)) | ||
return False | ||
|
||
# Cross check the interface SFP presence status | ||
check_presence_output = dut.command(check_intf_presence_command.format(intf)) | ||
presence_list = check_presence_output["stdout_lines"][2].split() | ||
assert intf in presence_list, "Wrong interface name in the output: %s" % str(presence_list) | ||
assert 'Present' in presence_list, "Status is not expected, presence status: %s" % str(presence_list) | ||
|
||
logging.info("Check interface status using the interface_facts module") | ||
intf_facts = dut.interface_facts(up_ports=mg_ports)["ansible_facts"] | ||
down_ports = intf_facts["ansible_interface_link_down_ports"] | ||
if len(down_ports) != 0: | ||
logging.info("Some interfaces are down: %s" % str(down_ports)) | ||
return False | ||
|
||
return True | ||
|
||
|
||
def check_interface_information(dut, interfaces): | ||
if not all_transceivers_detected(dut, interfaces): | ||
logging.info("Not all transceivers are detected") | ||
return False | ||
if not check_interface_status(dut, interfaces): | ||
logging.info("Not all interfaces are up") | ||
return False | ||
|
||
return True |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
""" | ||
Helper script for checking status of transceivers | ||
This script contains re-usable functions for checking status of transceivers. | ||
""" | ||
import logging | ||
import re | ||
import json | ||
|
||
|
||
def parse_transceiver_info(output_lines): | ||
""" | ||
@summary: Parse the list of transceiver from DB table TRANSCEIVER_INFO content | ||
@param output_lines: DB table TRANSCEIVER_INFO content output by 'redis' command | ||
@return: Return parsed transceivers in a list | ||
""" | ||
result = [] | ||
p = re.compile(r"TRANSCEIVER_INFO\|(Ethernet\d+)") | ||
for line in output_lines: | ||
m = p.match(line) | ||
assert m, "Unexpected line %s" % line | ||
result.append(m.group(1)) | ||
return result | ||
|
||
|
||
def parse_transceiver_dom_sensor(output_lines): | ||
""" | ||
@summary: Parse the list of transceiver from DB table TRANSCEIVER_DOM_SENSOR content | ||
@param output_lines: DB table TRANSCEIVER_DOM_SENSOR content output by 'redis' command | ||
@return: Return parsed transceivers in a list | ||
""" | ||
result = [] | ||
p = re.compile(r"TRANSCEIVER_DOM_SENSOR\|(Ethernet\d+)") | ||
for line in output_lines: | ||
m = p.match(line) | ||
assert m, "Unexpected line %s" % line | ||
result.append(m.group(1)) | ||
return result | ||
|
||
|
||
def all_transceivers_detected(dut, interfaces): | ||
""" | ||
Check if transceiver information of all the specified interfaces have been detected. | ||
""" | ||
db_output = dut.command("redis-cli --raw -n 6 keys TRANSCEIVER_INFO\*")["stdout_lines"] | ||
not_detected_interfaces = [intf for intf in interfaces if "TRANSCEIVER_INFO|%s" % intf not in db_output] | ||
if len(not_detected_interfaces) > 0: | ||
logging.info("Interfaces not detected: %s" % str(not_detected_interfaces)) | ||
return False | ||
return True | ||
|
||
|
||
def check_transceiver_basic(dut, interfaces): | ||
""" | ||
@summary: Check whether all the specified interface are in TRANSCEIVER_INFO redis DB. | ||
@param dut: The AnsibleHost object of DUT. For interacting with DUT. | ||
@param interfaces: List of interfaces that need to be checked. | ||
""" | ||
logging.info("Check whether transceiver information of all ports are in redis") | ||
xcvr_info = dut.command("redis-cli -n 6 keys TRANSCEIVER_INFO*") | ||
parsed_xcvr_info = parse_transceiver_info(xcvr_info["stdout_lines"]) | ||
for intf in interfaces: | ||
assert intf in parsed_xcvr_info, "TRANSCEIVER INFO of %s is not found in DB" % intf | ||
|
||
|
||
def check_transceiver_details(dut, interfaces): | ||
""" | ||
@summary: Check the detailed TRANSCEIVER_INFO content of all the specified interfaces. | ||
@param dut: The AnsibleHost object of DUT. For interacting with DUT. | ||
@param interfaces: List of interfaces that need to be checked. | ||
""" | ||
logging.info("Check detailed transceiver information of each connected port") | ||
expected_fields = ["type", "hardwarerev", "serialnum", "manufacturename", "modelname"] | ||
for intf in interfaces: | ||
port_xcvr_info = dut.command('redis-cli -n 6 hgetall "TRANSCEIVER_INFO|%s"' % intf) | ||
for field in expected_fields: | ||
assert port_xcvr_info["stdout"].find(field) >= 0, \ | ||
"Expected field %s is not found in %s while checking %s" % (field, port_xcvr_info["stdout"], intf) | ||
|
||
|
||
def check_transceiver_dom_sensor_basic(dut, interfaces): | ||
""" | ||
@summary: Check whether all the specified interface are in TRANSCEIVER_DOM_SENSOR redis DB. | ||
@param dut: The AnsibleHost object of DUT. For interacting with DUT. | ||
@param interfaces: List of interfaces that need to be checked. | ||
""" | ||
logging.info("Check whether TRANSCEIVER_DOM_SENSOR of all ports in redis") | ||
xcvr_dom_sensor = dut.command("redis-cli -n 6 keys TRANSCEIVER_DOM_SENSOR*") | ||
parsed_xcvr_dom_sensor = parse_transceiver_dom_sensor(xcvr_dom_sensor["stdout_lines"]) | ||
for intf in interfaces: | ||
assert intf in parsed_xcvr_dom_sensor, "TRANSCEIVER_DOM_SENSOR of %s is not found in DB" % intf | ||
|
||
|
||
def check_transceiver_dom_sensor_details(dut, interfaces): | ||
""" | ||
@summary: Check the detailed TRANSCEIVER_DOM_SENSOR content of all the specified interfaces. | ||
@param dut: The AnsibleHost object of DUT. For interacting with DUT. | ||
@param interfaces: List of interfaces that need to be checked. | ||
""" | ||
logging.info("Check detailed TRANSCEIVER_DOM_SENSOR information of each connected ports") | ||
expected_fields = ["temperature", "voltage", "rx1power", "rx2power", "rx3power", "rx4power", "tx1bias", | ||
"tx2bias", "tx3bias", "tx4bias", "tx1power", "tx2power", "tx3power", "tx4power"] | ||
for intf in interfaces: | ||
port_xcvr_dom_sensor = dut.command('redis-cli -n 6 hgetall "TRANSCEIVER_DOM_SENSOR|%s"' % intf) | ||
for field in expected_fields: | ||
assert port_xcvr_dom_sensor["stdout"].find(field) >= 0, \ | ||
"Expected field %s is not found in %s while checking %s" % (field, port_xcvr_dom_sensor["stdout"], intf) | ||
|
||
|
||
def check_transceiver_status(dut, interfaces): | ||
""" | ||
@summary: Check transceiver information of all the specified interfaces in redis DB. | ||
@param dut: The AnsibleHost object of DUT. For interacting with DUT. | ||
@param interfaces: List of interfaces that need to be checked. | ||
""" | ||
check_transceiver_basic(dut, interfaces) | ||
check_transceiver_details(dut, interfaces) | ||
check_transceiver_dom_sensor_basic(dut, interfaces) | ||
check_transceiver_dom_sensor_details(dut, interfaces) |