diff --git a/tests/common/devices/fanout.py b/tests/common/devices/fanout.py index 51c72fcd8f..ca03e8e9fd 100644 --- a/tests/common/devices/fanout.py +++ b/tests/common/devices/fanout.py @@ -107,3 +107,65 @@ def add_port_map(self, host_port, fanout_port): def exec_template(self, ansible_root, ansible_playbook, inventory, **kwargs): return self.host.exec_template(ansible_root, ansible_playbook, inventory, **kwargs) + + def get_supported_speeds(self, interface_name): + """Get supported speeds for a given interface + + Args: + interface_name (str): Interface name + + Returns: + list: A list of supported speed strings or None + """ + return self.host.get_supported_speeds(interface_name) + + def set_auto_negotiation_mode(self, interface_name, mode): + """Set auto negotiation mode for a given interface + + Args: + interface_name (str): Interface name + mode (boolean): True to enable auto negotiation else disable + + Returns: + boolean: False if the operation is not supported else True + """ + return self.host.set_auto_negotiation_mode(interface_name, mode) + + def get_auto_negotiation_mode(self, interface_name): + """Get auto negotiation mode for a given interface + + Args: + interface_name (str): Interface name + + Returns: + boolean: True if auto negotiation mode is enabled else False. Return None if + the auto negotiation mode is unknown or unsupported. + """ + return self.host.get_auto_negotiation_mode(interface_name) + + def set_speed(self, interface_name, speed): + """Set interface speed according to the auto negotiation mode. When auto negotiation mode + is enabled, set the advertised speeds; otherwise, set the force speed. + + Args: + interface_name (str): Interface name + speed (str): SONiC style interface speed. E.g, 1G=1000, 10G=10000, 100G=100000. If the speed + is None and auto negotiation mode is enabled, it sets the advertised speeds to all supported + speeds. + + Returns: + boolean: True if success. Usually, the method return False only if the operation + is not supported or failed. + """ + return self.host.set_speed(interface_name, speed) + + def get_speed(self, interface_name): + """Get interface speed + + Args: + interface_name (str): Interface name + + Returns: + str: SONiC style interface speed value. E.g, 1G=1000, 10G=10000, 100G=100000. + """ + return self.host.get_speed(interface_name) diff --git a/tests/common/devices/onyx.py b/tests/common/devices/onyx.py index 4534b8af18..c3f646479e 100644 --- a/tests/common/devices/onyx.py +++ b/tests/common/devices/onyx.py @@ -53,7 +53,7 @@ def command(self, cmd): def set_interface_lacp_rate_mode(self, interface_name, mode): out = self.host.onyx_config( lines=['lacp rate %s' % mode], - parents='interface ethernet %s' % interface_name) + parents='interface %s' % interface_name) logging.info("Set interface [%s] lacp rate to [%s]" % (interface_name, mode)) return out @@ -68,3 +68,141 @@ def exec_template(self, ansible_root, ansible_playbook, inventory, **kwargs): if res["localhost"]["rc"] != 0: raise Exception("Unable to execute template\n{}".format(res["localhost"]["stdout"])) + + def get_supported_speeds(self, interface_name): + """Get supported speeds for a given interface + + Args: + interface_name (str): Interface name + + Returns: + list: A list of supported speed strings or None + """ + show_int_result = self.host.onyx_command( + commands=['show interfaces {} | include "Supported speeds"'.format(interface_name)])[self.hostname] + + if 'failed' in show_int_result and show_int_result['failed']: + logger.error('Failed to get supported speed for {} - {}'.format(interface_name, show_int_result['msg'])) + return None + + out = show_int_result['stdout'][0].strip() + logger.debug('Get supported speeds for port {} from onyx: {}'.format(interface_name, out)) + if not out: + return None + + # The output should be something like: "Supported speeds:1G 10G 25G 50G" + speeds = out.split(':')[-1].split() + return [x[:-1] + '000' for x in speeds] + + def set_auto_negotiation_mode(self, interface_name, mode): + """Set auto negotiation mode for a given interface + + Args: + interface_name (str): Interface name + mode (boolean): True to enable auto negotiation else disable + + Returns: + boolean: False if the operation is not supported else True + """ + if mode: + return self.set_speed(interface_name, None) + else: + speed = self.get_speed(interface_name) + out = self.host.onyx_config( + lines=['shutdown', 'speed {}G no-autoneg'.format(speed[:-3]), 'no shutdown'], + parents='interface %s' % interface_name)[self.hostname] + + if 'failed' in out and out['failed']: + logger.error('Failed to set auto neg to False for port {} - {}'.format(interface_name, out['msg'])) + return False + logger.debug('Set auto neg to False for port {} from onyx: {}'.format(interface_name, out)) + return True + + def get_auto_negotiation_mode(self, interface_name): + """Get auto negotiation mode for a given interface + + Args: + interface_name (str): Interface name + + Returns: + boolean: True if auto negotiation mode is enabled else False. Return None if + the auto negotiation mode is unknown or unsupported. + """ + show_int_result = self.host.onyx_command( + commands=['show interfaces {} | include "Auto-negotiation"'.format(interface_name)])[self.hostname] + + if 'failed' in show_int_result and show_int_result['failed']: + logger.error('Failed to get auto neg mode for port {} - {}'.format(interface_name, show_int_result['msg'])) + return None + + out = show_int_result['stdout'][0].strip() + logger.debug('Get auto negotiation mode for port {} from onyx: {}'.format(interface_name, out)) + if not out: + return None + + # The output should be something like: "Auto-negotiation:Enabled" + return 'Enabled' in out + + def set_speed(self, interface_name, speed): + """Set interface speed according to the auto negotiation mode. When auto negotiation mode + is enabled, set the advertised speeds; otherwise, set the force speed. + + Args: + interface_name (str): Interface name + speed (str): SONiC style interface speed. E.g, 1G=1000, 10G=10000, 100G=100000. If the speed + is None and auto negotiation mode is enabled, it sets the advertised speeds to all supported + speeds. + + Returns: + boolean: True if success. Usually, the method return False only if the operation + is not supported or failed. + """ + autoneg_mode = self.get_auto_negotiation_mode(interface_name) + if not speed: + speed = 'auto' + else: + speed = speed[:-3] + 'G' + if autoneg_mode or speed == 'auto': + out = self.host.onyx_config( + lines=['shutdown', 'speed {}'.format(speed), 'no shutdown'], + parents='interface %s' % interface_name)[self.hostname] + if 'failed' in out and out['failed']: + logger.error('Failed to set speed for port {} - {}'.format(interface_name, out['msg'])) + return False + logger.debug('Set auto speed for port {} from onyx: {}'.format(interface_name, out)) + return True + else: + out = self.host.onyx_config( + lines=['shutdown', 'speed {} no-autoneg'.format(speed), 'no shutdown'], + parents='interface %s' % interface_name)[self.hostname] + if 'failed' in out and out['failed']: + logger.error('Failed to set speed for port {} - {}'.format(interface_name, out['msg'])) + return False + logger.debug('Set force speed for port {} from onyx: {}'.format(interface_name, out)) + return True + + def get_speed(self, interface_name): + """Get interface speed + + Args: + interface_name (str): Interface name + + Returns: + str: SONiC style interface speed value. E.g, 1G=1000, 10G=10000, 100G=100000. + """ + show_int_result = self.host.onyx_command( + commands=['show interfaces {} | include "Actual speed"'.format(interface_name)])[self.hostname] + + if 'failed' in show_int_result and show_int_result['failed']: + logger.error('Failed to get speed for port {} - {}'.format(interface_name, show_int_result['msg'])) + return False + + out = show_int_result['stdout'][0].strip() + logger.debug('Get speed for port {} from onyx: {}'.format(interface_name, out)) + if not out: + return None + + # The output should be something like: "Actual speed:50G" + speed = out.split(':')[-1].strip() + pos = speed.find('G') + return speed[:pos] + '000' diff --git a/tests/common/devices/sonic.py b/tests/common/devices/sonic.py index cc722260a0..284140dfb2 100644 --- a/tests/common/devices/sonic.py +++ b/tests/common/devices/sonic.py @@ -1383,6 +1383,84 @@ def get_up_ip_ports(self): pass return up_ip_ports + def get_supported_speeds(self, interface_name): + """Get supported speeds for a given interface + + Args: + interface_name (str): Interface name + + Returns: + list: A list of supported speed strings or None + """ + cmd = 'sonic-db-cli STATE_DB HGET \"PORT_TABLE|{}\" \"{}\"'.format(interface_name, 'supported_speeds') + supported_speeds = self.shell(cmd)['stdout'].strip() + return None if not supported_speeds else supported_speeds.split(',') + + def set_auto_negotiation_mode(self, interface_name, mode): + """Set auto negotiation mode for a given interface + + Args: + interface_name (str): Interface name + mode (boolean): True to enable auto negotiation else disable + + Returns: + boolean: False if the operation is not supported else True + """ + cmd = 'config interface autoneg {} {}'.format(interface_name, 'enabled' if mode else 'disabled') + self.shell(cmd) + return True + + def get_auto_negotiation_mode(self, interface_name): + """Get auto negotiation mode for a given interface + + Args: + interface_name (str): Interface name + + Returns: + boolean: True if auto negotiation mode is enabled else False. Return None if + the auto negotiation mode is unknown or unsupported. + """ + cmd = 'sonic-db-cli APPL_DB HGET \"PORT_TABLE:{}\" \"{}\"'.format(interface_name, 'autoneg') + mode = self.shell(cmd)['stdout'].strip() + if not mode: + return None + return True if mode == 'on' else False + + def set_speed(self, interface_name, speed): + """Set interface speed according to the auto negotiation mode. When auto negotiation mode + is enabled, set the advertised speeds; otherwise, set the force speed. + + Args: + interface_name (str): Interface name + speed (str): SONiC style interface speed. E.g, 1G=1000, 10G=10000, 100G=100000. If the speed + is None and auto negotiation mode is enabled, it sets the advertised speeds to all supported + speeds. + + Returns: + boolean: True if success. Usually, the method return False only if the operation + is not supported or failed. + """ + auto_neg_mode = self.get_auto_negotiation_mode(interface_name) + if not auto_neg_mode: + cmd = 'config interface speed {} {}'.format(interface_name, speed) + else: + cmd = 'config interface advertised-speeds {} {}'.format(interface_name, speed) + self.shell(cmd) + return True + + def get_speed(self, interface_name): + """Get interface speed + + Args: + interface_name (str): Interface name + + Returns: + str: SONiC style interface speed value. E.g, 1G=1000, 10G=10000, 100G=100000. + """ + cmd = 'sonic-db-cli APPL_DB HGET \"PORT_TABLE:{}\" \"{}\"'.format(interface_name, 'speed') + speed = self.shell(cmd)['stdout'].strip() + return speed + def get_rsyslog_ipv4(self): if not self.is_multi_asic: return "127.0.0.1" diff --git a/tests/conftest.py b/tests/conftest.py index 0b910d3dc2..982a2ba564 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -964,6 +964,8 @@ def pytest_generate_tests(metafunc): if "enum_dut_portname" in metafunc.fixturenames: metafunc.parametrize("enum_dut_portname", generate_port_lists(metafunc, "all_ports")) + if "enum_dut_portname_module_fixture" in metafunc.fixturenames: + metafunc.parametrize("enum_dut_portname_module_fixture", generate_port_lists(metafunc, "all_ports"), scope="module") if "enum_dut_portname_oper_up" in metafunc.fixturenames: metafunc.parametrize("enum_dut_portname_oper_up", generate_port_lists(metafunc, "oper_up_ports")) if "enum_dut_portname_admin_up" in metafunc.fixturenames: diff --git a/tests/platform_tests/test_auto_negotiation.py b/tests/platform_tests/test_auto_negotiation.py new file mode 100644 index 0000000000..3bb88b4130 --- /dev/null +++ b/tests/platform_tests/test_auto_negotiation.py @@ -0,0 +1,365 @@ +""" +Test port auto negotiation. + +To save test time, the script randomly chooses 3 ports to do following test: +1. Advertise all supported speeds and expect the negotiated speed is the highest speed +2. Advertise each supported speed and expect the negotiated speed is the one configured +3. Force each supported speed and expect the operational speed is the one configured +""" +import logging +import pytest +import random + +from natsort import natsorted +from tests.common.config_reload import config_reload +from tests.common.helpers.assertions import pytest_assert, pytest_require +from tests.common.helpers.dut_ports import decode_dut_port_name +from tests.common.utilities import wait_until +from tests.platform_tests.link_flap.link_flap_utils import build_test_candidates + +logger = logging.getLogger(__name__) + +STATE_DB = 'STATE_DB' +STATE_PORT_TABLE_TEMPLATE = 'PORT_TABLE|{}' +STATE_PORT_FIELD_SUPPORTED_SPEEDS = 'supported_speeds' +APPL_DB = 'APPL_DB' +APPL_PORT_TABLE_TEMPLATE = 'PORT_TABLE:{}' +ALL_PORT_WAIT_TIME = 60 +SINGLE_PORT_WAIT_TIME = 40 +PORT_STATUS_CHECK_INTERVAL = 10 + +# To avoid getting candidate test ports again and again, use a global variable +# to save all candidate test ports. +# Key: dut host object, value: a list of candidate ports tuple +cadidate_test_ports = {} + + +@pytest.fixture(scope='module', autouse=True) +def recover_ports(duthosts, enum_dut_portname_module_fixture, fanouthosts): + """Module level fixture that automatically do following job: + 1. Build global candidate test ports + 2. Save fanout port state before the test + 3. Restor fanout and DUT after test + + Args: + duthosts: DUT object + enum_dut_portname_module_fixture (str): DUT port name + fanouthosts: Fanout objects + """ + global cadidate_test_ports + fanout_original_port_states = {} + dutname, portname = decode_dut_port_name(enum_dut_portname_module_fixture) + logger.info('Collecting existing port configuration for DUT and fanout...') + for duthost in duthosts: + if dutname == 'unknown' or dutname == duthost.hostname: + all_ports = build_test_candidates(duthost, fanouthosts, portname) + # Test all ports takes too much time (sometimes more than an hour), + # so we choose 3 ports randomly as the cadidates ports + cadidate_test_ports[duthost] = random.sample(all_ports, 3) + for _, fanout, fanout_port in cadidate_test_ports[duthost]: + auto_neg_mode = fanout.get_auto_negotiation_mode(fanout_port) + speed = fanout.get_speed(fanout_port) + if not fanout in fanout_original_port_states: + fanout_original_port_states[fanout] = {} + fanout_original_port_states[fanout][fanout_port] = (auto_neg_mode, speed) + + yield + + logger.info('Recovering port configuration for fanout...') + for fanout, port_data in fanout_original_port_states.items(): + for port, state in port_data.items(): + fanout.set_auto_negotiation_mode(port, state[0]) + fanout.set_speed(port, state[1]) + + logger.info('Recovering port configuration for DUT...') + for duthost in duthosts: + config_reload(duthost) + + +def get_supported_speeds_for_port(duthost, dut_port_name, fanout, fanout_port_name): + """Get supported speeds list for a given port. The supported speeds list is + a intersection of DUT port supported speeds, fanout port supported speeds, + and cable supported speeds. + + Args: + duthost: DUT object + dut_port_name (str): DUT interface name + fanout: Fanout object + fanout_port_name (str): The name of fanout port which connected to the DUT port + + Returns: + list: A sorted list of supported speed strings + """ + dut_supported_speeds = duthost.get_supported_speeds(dut_port_name) + if not dut_supported_speeds: + return [duthost.get_speed(dut_port_name)] + + fanout_supported_speeds = fanout.get_supported_speeds(fanout_port_name) + if not fanout_supported_speeds: + return [duthost.get_speed(dut_port_name)] + + # get supported speeds for the cable + cable_supported_speeds = get_cable_supported_speeds(duthost, dut_port_name) + if not cable_supported_speeds: + return [duthost.get_speed(dut_port_name)] + + logger.info('dut_supported_speeds = {}, fanout_supported_speeds = {}, cable_supported_speeds = {}'.format( + dut_supported_speeds, + fanout_supported_speeds, + cable_supported_speeds + )) + supported_speeds = set(dut_supported_speeds) & set(fanout_supported_speeds) & set(cable_supported_speeds) + if not supported_speeds: + # Since the port link is up before the test, we should not hit this branch + # However, in case we hit here, we use current actual speed as supported speed + return [duthost.get_speed(dut_port_name)] + + return natsorted(supported_speeds) + + +def get_cable_supported_speeds(duthost, dut_port_name): + """Get cable supported speeds. As there is no SONiC CLI to get supported speeds for + a given cable, this function depends on vendor implementation. + A sample: MlnxCableSupportedSpeedsHelper. + + Args: + duthost: DUT object + dut_port_name (str): DUT interface name + + Returns: + list: A list of supported speed strings + """ + helper = get_cable_supported_speeds_helper(duthost) + return helper.get_cable_supported_speeds(duthost, dut_port_name) if helper else None + + +def check_ports_up(duthost, dut_ports, expect_speed=None): + """Check if given ports are operational up or not + + Args: + duthost: DUT object + dut_ports (str): DUT interface name + + Returns: + boolean: True if all given ports are up + """ + ports_down = duthost.interface_facts(up_ports=dut_ports)["ansible_facts"]["ansible_interface_link_down_ports"] + show_interface_output = duthost.show_interface(command="status", up_ports=dut_ports)["ansible_facts"] + db_ports_down = show_interface_output["ansible_interface_link_down_ports"] + down_ports = set(ports_down) | set(db_ports_down) + logger.info('Down ports are: {}'.format(down_ports)) + if len(down_ports) == 0: + if expect_speed: + int_status = show_interface_output['int_status'] + for dut_port in dut_ports: + actual_speed = int_status[dut_port]['speed'][:-1] + '000' + if actual_speed != expect_speed: + return False + return True + else: + return False + + +def test_auto_negotiation_advertised_speeds_all(): + """Test all candidate ports to advertised all supported speeds and verify: + 1. All ports are up after auto negotiation + 2. All ports are negotiated to its highest supported speeds + """ + for duthost, candidates in cadidate_test_ports.items(): + if not candidates: + continue + logger.info('Test candidate ports are {}'.format(candidates)) + for dut_port, fanout, fanout_port in candidates: + logger.info('Start test for DUT port {} and fanout port {}'.format(dut_port, fanout_port)) + # Enable auto negotiation on fanout port + success = fanout.set_auto_negotiation_mode(fanout_port, True) + if not success: + # Fanout does not support set auto negotiation mode for this port + logger.info('Ignore port {} due to fanout port {} does not support setting auto-neg mode'.format(dut_port, fanout_port)) + continue + + # Advertise all supported speeds in fanout port + success = fanout.set_speed(fanout_port, None) + if not success: + # Fanout does not support set advertise speeds for this port + logger.info('Ignore port {} due to fanout port {} does not support setting advertised speeds'.format(dut_port, fanout_port)) + continue + + duthost.shell('config interface autoneg {} enabled'.format(dut_port)) + duthost.shell('config interface advertised-speeds {} all'.format(dut_port)) + + logger.info('Wait until all ports are up') + wait_result = wait_until(ALL_PORT_WAIT_TIME, + PORT_STATUS_CHECK_INTERVAL, + check_ports_up, + duthost, + [item[0] for item in candidates]) + pytest_assert(wait_result, 'Some ports are still down') + + # Make sure all ports are negotiated to the highest speed + logger.info('Checking the actual speed is equal to highest speed') + int_status = duthost.show_interface(command="status")["ansible_facts"]['int_status'] + for dut_port, fanout, fanout_port in candidates: + supported_speeds = get_supported_speeds_for_port(duthost, dut_port, fanout, fanout_port) + logger.info('DUT port = {}, fanout port = {}, supported speeds = {}, actual speed = {}'.format( + dut_port, + fanout_port, + supported_speeds, + int_status[dut_port]['speed'] + )) + highest_speed = supported_speeds[-1] + actual_speed = int_status[dut_port]['speed'][:-1] + '000' + pytest_assert(actual_speed == highest_speed, 'Actual speed is not the highest speed') + + +def test_auto_negotiation_advertised_each_speed(): + """Test all candidate ports to advertised all supported speeds one by one and verify + that the port operational status is up after auto negotiation + """ + for duthost, candidates in cadidate_test_ports.items(): + if not candidates: + continue + logger.info('Test candidate ports are {}'.format(candidates)) + for dut_port, fanout, fanout_port in candidates: + logger.info('Start test for DUT port {} and fanout port {}'.format(dut_port, fanout_port)) + # Enable auto negotiation on fanout port + success = fanout.set_auto_negotiation_mode(fanout_port, True) + if not success: + # Fanout does not support set auto negotiation mode for this port + logger.info('Ignore port {} due to fanout port {} does not support setting auto-neg mode'.format(dut_port, fanout_port)) + continue + + # Advertise all supported speeds in fanout port + success = fanout.set_speed(fanout_port, None) + if not success: + # Fanout does not support set advertise speeds for this port + logger.info('Ignore port {} due to fanout port {} does not support setting advertised speeds'.format(dut_port, fanout_port)) + continue + + logger.info('Trying to get a common supported speed set among dut port, fanout port and cable') + supported_speeds = get_supported_speeds_for_port(duthost, dut_port, fanout, fanout_port) + if not supported_speeds: + logger.warn('Ignore test for port {} due to cannot get supported speed for it'.format(dut_port)) + continue + + logger.info('Run test based on supported speeds: {}'.format(supported_speeds)) + duthost.shell('config interface autoneg {} enabled'.format(dut_port)) + for speed in supported_speeds: + duthost.shell('config interface advertised-speeds {} {}'.format(dut_port, speed)) + logger.info('Wait until the port status is up, expected speed: {}'.format(speed)) + wait_result = wait_until(SINGLE_PORT_WAIT_TIME, + PORT_STATUS_CHECK_INTERVAL, + check_ports_up, + duthost, + [dut_port], + speed) + pytest_assert(wait_result, '{} are still down'.format(dut_port)) + fanout_actual_speed = fanout.get_speed(fanout_port) + pytest_assert(fanout_actual_speed == speed, 'expect fanout speed: {}, but got {}'.format(speed, fanout_actual_speed)) + + +def test_force_speed(): + """Test all candidate ports to force to all supported speeds one by one and verify + that the port operational status is up after auto negotiation + """ + for duthost, candidates in cadidate_test_ports.items(): + if not candidates: + continue + logger.info('Test candidate ports are {}'.format(candidates)) + for dut_port, fanout, fanout_port in candidates: + logger.info('Start test for DUT port {} and fanout port {}'.format(dut_port, fanout_port)) + # Disable auto negotiation on fanout port + success = fanout.set_auto_negotiation_mode(fanout_port, False) + if not success: + # Fanout does not support set auto negotiation mode for this port + logger.info('Ignore port {} due to fanout port {} does not support setting auto-neg mode'.format(dut_port, fanout_port)) + continue + + logger.info('Trying to get a common supported speeds set among dut port, fanout port and cable') + supported_speeds = get_supported_speeds_for_port(duthost, dut_port, fanout, fanout_port) + if not supported_speeds: + logger.warn('Ignore test for port {} due to cannot get supported speed for it'.format(dut_port)) + continue + + logger.info('Run test based on supported speeds: {}'.format(supported_speeds)) + duthost.shell('config interface autoneg {} disabled'.format(dut_port)) + for speed in supported_speeds: + success = fanout.set_speed(fanout_port, speed) + if not success: + logger.info('Skip speed {} because fanout does not support it'.format(speed)) + continue + duthost.shell('config interface speed {} {}'.format(dut_port, speed)) + logger.info('Wait until the port status is up, expected speed: {}'.format(speed)) + wait_result = wait_until(SINGLE_PORT_WAIT_TIME, + PORT_STATUS_CHECK_INTERVAL, + check_ports_up, + duthost, + [dut_port], + speed) + pytest_assert(wait_result, '{} are still down'.format(dut_port)) + fanout_actual_speed = fanout.get_speed(fanout_port) + pytest_assert(fanout_actual_speed == speed, 'expect fanout speed: {}, but got {}'.format(speed, fanout_actual_speed)) + + +def get_cable_supported_speeds_helper(duthost): + """Get a cable supported speeds helper + + Args: + duthost: DUT object + + Returns: + object: A helper class or instance + """ + asic_type = duthost.facts["asic_type"] + + if asic_type == "mellanox": + return MlnxCableSupportedSpeedsHelper + else: + return None + +class MlnxCableSupportedSpeedsHelper(object): + # To avoid getting ports list again and again, use a class level variable to save + # all sorted ports. + # Key: dut host object, value: a sorted list of interface name + sorted_ports = {} + + # Key: tuple of dut host object and interface name, value: supported speed list + supported_speeds = {} + + device_path = None + + @classmethod + def get_cable_supported_speeds(cls, duthost, dut_port_name): + """Helper function to get supported speeds for a cable + + Args: + duthost: DUT object + dut_port_name (str): DUT interface name + + Returns: + list: A list of supported speed strings + """ + if (duthost, dut_port_name) in cls.supported_speeds: + return cls.supported_speeds[duthost, dut_port_name] + + if duthost not in cls.sorted_ports: + int_status = duthost.show_interface(command="status")["ansible_facts"]['int_status'] + ports = natsorted([port_name for port_name in int_status.keys()]) + cls.sorted_ports[duthost] = ports + + if not cls.device_path: + cls.device_path = duthost.shell('ls /dev/mst/*_pci_cr0')['stdout'].strip() + port_index = cls.sorted_ports[duthost].index(dut_port_name) + 1 + cmd = 'mlxlink -d {} -p {} | grep "Supported Cable Speed"'.format(cls.device_path, port_index) + output = duthost.shell(cmd)['stdout'].strip() + # Valid output should be something like "Supported Cable Speed:0x68b1f141 (100G,56G,50G,40G,25G,10G,1G)" + logger.info('Get supported speeds for {} {}: {}'.format(duthost, dut_port_name, output)) + if not output: + return None + pos = output.rfind('(') + if pos == -1: + return None + speeds_str = output[pos+1:-1] + speeds = [speed[:-1] + '000' for speed in speeds_str.split(',')] + cls.supported_speeds[(duthost, dut_port_name)] = speeds + return speeds