diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index e10b930e4..af2001f34 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -530,6 +530,160 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, assert mock_deinit.call_count == 1 assert mock_init.call_count == 1 + @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) + def test_CmisManagerTask_handle_port_change_event(self): + port_mapping = PortMapping() + task = CmisManagerTask(port_mapping) + + assert not task.isPortConfigDone + port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) + task.on_port_update_event(port_change_event) + assert task.isPortConfigDone + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 0 + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_DEL) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 1 + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 1 + + @patch('xcvrd.xcvrd.platform_chassis') + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) + def test_CmisManagerTask_task_run_stop(self, mock_chassis): + mock_object = MagicMock() + mock_object.get_presence = MagicMock(return_value=True) + mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object]) + + port_mapping = PortMapping() + task = CmisManagerTask(port_mapping) + task.task_run() + task.task_stop() + assert task.task_process is None + + @patch('xcvrd.xcvrd.platform_chassis') + @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_update_event', MagicMock(return_value=(None, None))) + @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_update_event', MagicMock()) + @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) + def test_CmisManagerTask_task_worker(self, mock_chassis): + mock_xcvr_api = MagicMock() + mock_xcvr_api.set_datapath_deinit = MagicMock(return_value=True) + mock_xcvr_api.set_datapath_init = MagicMock(return_value=True) + mock_xcvr_api.tx_disable_channel = MagicMock(return_value=True) + mock_xcvr_api.set_lpmode = MagicMock(return_value=True) + mock_xcvr_api.set_application = MagicMock(return_value=True) + mock_xcvr_api.is_flat_memory = MagicMock(return_value=False) + mock_xcvr_api.get_module_type_abbreviation = MagicMock(return_value='QSFP-DD') + mock_xcvr_api.get_application_advertisement = MagicMock(return_value={ + 1: { + 'host_electrical_interface_id': '400GAUI-8 C2M (Annex 120E)', + 'module_media_interface_id': '400GBASE-DR4 (Cl 124)', + 'media_lane_count': 4, + 'host_lane_count': 8, + 'host_lane_assignment_options': 1 + }, + 2: { + 'host_electrical_interface_id': '100GAUI-2 C2M (Annex 135G)', + 'module_media_interface_id': '100G-FR/100GBASE-FR1 (Cl 140)', + 'media_lane_count': 1, + 'host_lane_count': 2, + 'host_lane_assignment_options': 85 + } + }) + mock_xcvr_api.get_module_state = MagicMock(return_value='ModuleReady') + mock_xcvr_api.get_config_datapath_hostlane_status = MagicMock(return_value={ + 'ConfigStatusLane1': 'ConfigSuccess', + 'ConfigStatusLane2': 'ConfigSuccess', + 'ConfigStatusLane3': 'ConfigSuccess', + 'ConfigStatusLane4': 'ConfigSuccess', + 'ConfigStatusLane5': 'ConfigSuccess', + 'ConfigStatusLane6': 'ConfigSuccess', + 'ConfigStatusLane7': 'ConfigSuccess', + 'ConfigStatusLane8': 'ConfigSuccess' + }) + mock_xcvr_api.get_datapath_state = MagicMock(side_effect=[ + { + 'DP1State': 'DataPathDeactivated', + 'DP2State': 'DataPathDeactivated', + 'DP3State': 'DataPathDeactivated', + 'DP4State': 'DataPathDeactivated', + 'DP5State': 'DataPathDeactivated', + 'DP6State': 'DataPathDeactivated', + 'DP7State': 'DataPathDeactivated', + 'DP8State': 'DataPathDeactivated' + }, + { + 'DP1State': 'DataPathInitialized', + 'DP2State': 'DataPathInitialized', + 'DP3State': 'DataPathInitialized', + 'DP4State': 'DataPathInitialized', + 'DP5State': 'DataPathInitialized', + 'DP6State': 'DataPathInitialized', + 'DP7State': 'DataPathInitialized', + 'DP8State': 'DataPathInitialized' + }, + { + 'DP1State': 'DataPathActivated', + 'DP2State': 'DataPathActivated', + 'DP3State': 'DataPathActivated', + 'DP4State': 'DataPathActivated', + 'DP5State': 'DataPathActivated', + 'DP6State': 'DataPathActivated', + 'DP7State': 'DataPathActivated', + 'DP8State': 'DataPathActivated' + } + ]) + + mock_sfp = MagicMock() + mock_sfp.get_presence = MagicMock(return_value=True) + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_xcvr_api) + + mock_chassis.get_all_sfps = MagicMock(return_value=[mock_sfp]) + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + + port_mapping = PortMapping() + task = CmisManagerTask(port_mapping) + + port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) + task.on_port_update_event(port_change_event) + assert task.isPortConfigDone + + port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET, + {'speed':'400000', 'lanes':'1,2,3,4,5,6,7,8'}) + task.on_port_update_event(port_change_event) + assert len(task.port_dict) == 1 + + # Case 1: Module Inserted --> DP_DEINIT + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.set_datapath_deinit.call_count == 1 + assert mock_xcvr_api.tx_disable_channel.call_count == 1 + assert mock_xcvr_api.set_lpmode.call_count == 2 + + # Case 2: DP_DEINIT --> AP Configured + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.set_application.call_count == 1 + + # Case 3: AP Configured --> DP_INIT + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.set_datapath_init.call_count == 1 + + # Case 4: DP_INIT --> DP_TXON + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert mock_xcvr_api.tx_disable_channel.call_count == 2 + @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) def test_DomInfoUpdateTask_handle_port_change_event(self): port_mapping = PortMapping() diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index 0992dca18..3aa56d36c 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -16,6 +16,7 @@ import sys import threading import time + import datetime import subprocess from sonic_py_common import daemon_base, device_info, logger @@ -196,7 +197,11 @@ def _wrapper_get_transceiver_change_event(timeout): def _wrapper_get_sfp_type(physical_port): if platform_chassis: try: - return platform_chassis.get_sfp(physical_port).sfp_type + sfp = platform_chassis.get_sfp(physical_port) + except (NotImplementedError, AttributeError): + return None + try: + return sfp.sfp_type except (NotImplementedError, AttributeError): pass return None @@ -227,7 +232,7 @@ def beautify_dom_info_dict(dom_info_dict, physical_port): dom_info_dict['tx2power'] = strip_unit_and_beautify(dom_info_dict['tx2power'], POWER_UNIT) dom_info_dict['tx3power'] = strip_unit_and_beautify(dom_info_dict['tx3power'], POWER_UNIT) dom_info_dict['tx4power'] = strip_unit_and_beautify(dom_info_dict['tx4power'], POWER_UNIT) - if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD': + if 'rx5power' in dom_info_dict: dom_info_dict['rx5power'] = strip_unit_and_beautify(dom_info_dict['rx5power'], POWER_UNIT) dom_info_dict['rx6power'] = strip_unit_and_beautify(dom_info_dict['rx6power'], POWER_UNIT) dom_info_dict['rx7power'] = strip_unit_and_beautify(dom_info_dict['rx7power'], POWER_UNIT) @@ -433,7 +438,7 @@ def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event= dom_info_cache[physical_port] = dom_info_dict if dom_info_dict is not None: beautify_dom_info_dict(dom_info_dict, physical_port) - if _wrapper_get_sfp_type(physical_port) == 'QSFP_DD': + if 'rx5power' in dom_info_dict: fvs = swsscommon.FieldValuePairs( [('temperature', dom_info_dict['temperature']), ('voltage', dom_info_dict['voltage']), @@ -679,7 +684,6 @@ def get_media_settings_key(physical_port, transceiver_dict): return [vendor_key, media_key] - def get_media_val_str_from_dict(media_dict): LANE_STR = 'lane' LANE_SEPARATOR = ',' @@ -843,6 +847,491 @@ def is_fast_reboot_enabled(): # Helper classes =============================================================== # +# Thread wrapper class for CMIS transceiver management + +class CmisManagerTask: + + CMIS_MAX_RETRIES = 3 + CMIS_DEF_EXPIRED = 60 # seconds, default expiration time + CMIS_MODULE_TYPES = ['QSFP-DD', 'QSFP_DD', 'OSFP'] + CMIS_NUM_CHANNELS = 8 + + CMIS_STATE_UNKNOWN = 'UNKNOWN' + CMIS_STATE_INSERTED = 'INSERTED' + CMIS_STATE_DP_DEINIT = 'DP_DEINIT' + CMIS_STATE_AP_CONF = 'AP_CONFIGURED' + CMIS_STATE_DP_INIT = 'DP_INIT' + CMIS_STATE_DP_TXON = 'DP_TXON' + CMIS_STATE_READY = 'READY' + CMIS_STATE_REMOVED = 'REMOVED' + CMIS_STATE_FAILED = 'FAILED' + + def __init__(self, port_mapping): + self.task_stopping_event = multiprocessing.Event() + self.task_process = None + self.port_dict = {} + self.port_mapping = copy.deepcopy(port_mapping) + self.isPortInitDone = False + self.isPortConfigDone = False + + def log_notice(self, message): + helper_logger.log_notice("CMIS: {}".format(message)) + + def log_error(self, message): + helper_logger.log_error("CMIS: {}".format(message)) + + def on_port_update_event(self, port_change_event): + if port_change_event.event_type not in [port_change_event.PORT_SET, port_change_event.PORT_DEL]: + return + + lport = port_change_event.port_name + pport = port_change_event.port_index + + if lport in ['PortInitDone']: + self.isPortInitDone = True + return + + if lport in ['PortConfigDone']: + self.isPortConfigDone = True + return + + # Skip if it's not a physical port + if not lport.startswith('Ethernet'): + return + + # Skip if the physical index is not available + if pport is None: + return + + # Skip if the port/cage type is not a CMIS + ptype = _wrapper_get_sfp_type(pport) + if ptype not in self.CMIS_MODULE_TYPES: + return + + if lport not in self.port_dict: + self.port_dict[lport] = {} + + if port_change_event.event_type == port_change_event.PORT_SET: + if pport >= 0: + self.port_dict[lport]['index'] = pport + if port_change_event.port_dict is not None and 'speed' in port_change_event.port_dict: + self.port_dict[lport]['speed'] = port_change_event.port_dict['speed'] + if port_change_event.port_dict is not None and 'lanes' in port_change_event.port_dict: + self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes'] + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED + self.reset_cmis_init(lport, 0) + else: + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED + + def get_interface_speed(self, ifname): + """ + Get the port speed from the host interface name + + Args: + ifname: String, interface name + + Returns: + Integer, the port speed if success otherwise 0 + """ + # see HOST_ELECTRICAL_INTERFACE of sff8024.py + speed = 0 + if '400G' in ifname: + speed = 400000 + elif '200G' in ifname: + speed = 200000 + elif '100G' in ifname or 'CAUI-4' in ifname: + speed = 100000 + elif '50G' in ifname or 'LAUI-2' in ifname: + speed = 50000 + elif '40G' in ifname or 'XLAUI' in ifname or 'XLPPI' in ifname: + speed = 40000 + elif '25G' in ifname: + speed = 25000 + elif '10G' in ifname or 'SFI' in ifname or 'XFI' in ifname: + speed = 10000 + elif '1000BASE' in ifname: + speed = 1000 + return speed + + def get_cmis_application_desired(self, api, channel, speed): + """ + Get the CMIS application code that matches the specified host side configurations + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + speed: + Integer, the port speed of the host interface + + Returns: + Integer, the transceiver-specific application code + """ + if speed == 0 or channel == 0: + return 0 + + host_lane_count = 0 + for lane in range(self.CMIS_NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + host_lane_count += 1 + + appl_code = 0 + appl_dict = api.get_application_advertisement() + for c in appl_dict.keys(): + d = appl_dict[c] + if d.get('host_lane_count') != host_lane_count: + continue + if self.get_interface_speed(d.get('host_electrical_interface_id')) != speed: + continue + appl_code = c + break + + return (appl_code & 0xf) + + def is_cmis_application_update_required(self, api, channel, speed): + """ + Check if the CMIS application update is required + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + speed: + Integer, the port speed of the host interface + + Returns: + Boolean, true if application update is required otherwise false + """ + if speed == 0 or channel == 0 or api.is_flat_memory(): + return False + + app_new = self.get_cmis_application_desired(api, channel, speed) + if app_new != 1: + self.log_notice("Non-default application is not supported") + return False + + app_old = 0 + for lane in range(self.CMIS_NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + if app_old == 0: + app_old = api.get_application(lane) + elif app_old != api.get_application(lane): + self.log_notice("Not all the lanes are in the same application mode") + self.log_notice("Forcing application update...") + return True + + if app_old == app_new: + skip = True + dp_state = api.get_datapath_state() + conf_state = api.get_config_datapath_hostlane_status() + for lane in range(self.CMIS_NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + name = "DP{}State".format(lane + 1) + if dp_state[name] != 'DataPathActivated': + skip = False + break + name = "ConfigStatusLane{}".format(lane + 1) + if conf_state[name] != 'ConfigSuccess': + skip = False + break + return (not skip) + + return True + + def reset_cmis_init(self, lport, retries=0): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED + self.port_dict[lport]['cmis_retries'] = retries + self.port_dict[lport]['cmis_expired'] = None # No expiration + + def test_module_state(self, api, states): + """ + Check if the CMIS module is in the specified state + + Args: + api: + XcvrApi object + states: + List, a string list of states + + Returns: + Boolean, true if it's in the specified state, otherwise false + """ + return api.get_module_state() in states + + def test_config_error(self, api, channel, states): + """ + Check if the CMIS configuration states are in the specified state + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + states: + List, a string list of states + + Returns: + Boolean, true if all lanes are in the specified state, otherwise false + """ + done = True + cerr = api.get_config_datapath_hostlane_status() + for lane in range(self.CMIS_NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + key = "ConfigStatusLane{}".format(lane + 1) + if cerr[key] not in states: + done = False + break + + return done + + def test_datapath_state(self, api, channel, states): + """ + Check if the CMIS datapath states are in the specified state + + Args: + api: + XcvrApi object + channel: + Integer, a bitmask of the lanes on the host side + e.g. 0x5 for lane 0 and lane 2. + states: + List, a string list of states + + Returns: + Boolean, true if all lanes are in the specified state, otherwise false + """ + done = True + dpstate = api.get_datapath_state() + for lane in range(self.CMIS_NUM_CHANNELS): + if ((1 << lane) & channel) == 0: + continue + key = "DP{}State".format(lane + 1) + if dpstate[key] not in states: + done = False + break + + return done + + def task_worker(self): + self.log_notice("Starting...") + + # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal + sel, asic_context = port_mapping.subscribe_port_update_event(['APPL_DB', 'STATE_DB']) + while not self.task_stopping_event.is_set(): + # Handle port change event from main thread + port_mapping.handle_port_update_event(sel, + asic_context, + self.task_stopping_event, + helper_logger, + self.on_port_update_event) + + if not self.isPortConfigDone: + continue + + for lport, info in self.port_dict.items(): + if self.task_stopping_event.is_set(): + break + + if lport not in self.port_dict: + continue + + state = self.port_dict[lport].get('cmis_state', self.CMIS_STATE_UNKNOWN) + if state in [self.CMIS_STATE_UNKNOWN, + self.CMIS_STATE_FAILED, + self.CMIS_STATE_READY, + self.CMIS_STATE_REMOVED]: + continue + + pport = int(info.get('index', "-1")) + speed = int(info.get('speed', "0")) + lanes = info.get('lanes', "").strip() + if pport < 0 or speed == 0 or len(lanes) < 1: + continue + + # Desired port speed on the host side + host_speed = speed + + # Convert the physical lane list into a logical lanemask + # + # TODO: Add dynamic port breakout support by checking the physical lane offset + host_lanes = 0 + phys_lanes = lanes.split(',') + for i in range(len(phys_lanes)): + host_lanes |= (1 << i) + + # double-check the HW presence before moving forward + sfp = platform_chassis.get_sfp(pport) + if not sfp.get_presence(): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED + continue + + try: + # Skip if XcvrApi is not supported + api = sfp.get_xcvr_api() + if api is None: + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + + # Skip if it's not a paged memory device + if api.is_flat_memory(): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + + # Skip if it's not a CMIS module + type = api.get_module_type_abbreviation() + if (type is None) or (type not in self.CMIS_MODULE_TYPES): + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + except AttributeError: + # Skip if these essential routines are not available + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + + # CMIS expiration and retries + # + # A retry should always start over at INSETRTED state, while the + # expiration will reset the state to INSETRTED and advance the + # retry counter + now = datetime.datetime.now() + expired = self.port_dict[lport].get('cmis_expired') + retries = self.port_dict[lport].get('cmis_retries', 0) + self.log_notice("{}: {}G, lanemask=0x{:x}, state={}, retries={}".format( + lport, int(speed/1000), host_lanes, state, retries)) + if retries > self.CMIS_MAX_RETRIES: + self.log_error("{}: FAILED".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + continue + + try: + # CMIS state transitions + if state == self.CMIS_STATE_INSERTED: + + appl = self.get_cmis_application_desired(api, host_lanes, host_speed) + if appl < 1: + self.log_error("{}: no suitable app for the port".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + continue + + has_update = self.is_cmis_application_update_required(api, host_lanes, host_speed) + if not has_update: + # No application updates + self.log_notice("{}: READY".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue + + # D.2.2 Software Deinitialization + api.set_datapath_deinit(host_lanes) + api.set_lpmode(True) + if not self.test_module_state(api, ['ModuleReady', 'ModuleLowPwr']): + self.log_notice("{}: unable to enter low-power mode".format(lport)) + self.port_dict[lport]['cmis_retries'] = retries + 1 + continue + + # D.1.3 Software Configuration and Initialization + if not api.tx_disable_channel(host_lanes, True): + self.log_notice("{}: unable to turn off tx power".format(lport)) + self.port_dict[lport]['cmis_retries'] = retries + 1 + continue + api.set_lpmode(False) + + # TODO: Use fine grained time when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT + elif state == self.CMIS_STATE_DP_DEINIT: + if not self.test_module_state(api, ['ModuleReady']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'ModuleReady'".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + if not self.test_datapath_state(api, host_lanes, ['DataPathDeinit', 'DataPathDeactivated']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'DataPathDeinit'".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + + # D.1.3 Software Configuration and Initialization + appl = self.get_cmis_application_desired(api, host_lanes, host_speed) + if appl < 1: + self.log_error("{}: no suitable app for the port".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + continue + + if not api.set_application(host_lanes, appl): + self.log_notice("{}: unable to set application".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + + # TODO: Use fine grained time when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_AP_CONF + elif state == self.CMIS_STATE_AP_CONF: + if not self.test_config_error(api, host_lanes, ['ConfigSuccess']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'ConfigSuccess'".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + + # D.1.3 Software Configuration and Initialization + api.set_datapath_init(host_lanes) + + # TODO: Use fine grained time when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_INIT + elif state == self.CMIS_STATE_DP_INIT: + if not self.test_datapath_state(api, host_lanes, ['DataPathInitialized']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + + # D.1.3 Software Configuration and Initialization + if not api.tx_disable_channel(host_lanes, False): + self.log_notice("{}: unable to turn on tx power".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + + # TODO: Use fine grained timeout when the CMIS memory map is available + self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_TXON + elif state == self.CMIS_STATE_DP_TXON: + if not self.test_datapath_state(api, host_lanes, ['DataPathActivated']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'DataPathActivated'".format(lport)) + self.reset_cmis_init(lport, retries + 1) + continue + self.log_notice("{}: READY".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + + except (NotImplementedError, AttributeError): + self.log_error("{}: internal errors".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_FAILED + + self.log_notice("Stopped") + + def task_run(self): + if platform_chassis is None: + self.log_notice("Platform chassis is not available, stopping...") + return + + self.task_process = multiprocessing.Process(target=self.task_worker) + if self.task_process is not None: + self.task_process.start() + + def task_stop(self): + self.task_stopping_event.set() + if self.task_process is not None: + self.task_process.join() + self.task_process = None + # Thread wrapper class to update dom info periodically @@ -1538,6 +2027,10 @@ def run(self): # Start daemon initialization sequence port_mapping_data, retry_eeprom_set = self.init() + # Start the CMIS manager + cmis_manager = CmisManagerTask(port_mapping_data) + cmis_manager.task_run() + # Start the dom sensor info update thread dom_info_update = DomInfoUpdateTask(port_mapping_data) dom_info_update.task_run(self.y_cable_presence) diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index bf44fb70b..39e55656e 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -8,7 +8,10 @@ class PortChangeEvent: PORT_ADD = 0 PORT_REMOVE = 1 - def __init__(self, port_name, port_index, asic_id, event_type): + PORT_SET = 2 + PORT_DEL = 3 + + def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None): # Logical port name, e.g. Ethernet0 self.port_name = port_name # Physical port index, equals to "index" field of PORT table in CONFIG_DB @@ -17,6 +20,8 @@ def __init__(self, port_name, port_index, asic_id, event_type): self.asic_id = asic_id # Port change event type self.event_type = event_type + # Port config dict + self.port_dict = port_dict def __str__(self): return '{} - name={} index={} asic_id={}'.format('Add' if self.event_type == self.PORT_ADD else 'Remove', @@ -83,7 +88,6 @@ def logical_port_name_to_physical_port_list(self, port_name): else: return None - def subscribe_port_config_change(): sel = swsscommon.Select() asic_context = {} @@ -96,6 +100,63 @@ def subscribe_port_config_change(): sel.addSelectable(port_tbl) return sel, asic_context +def subscribe_port_update_event(db_list=['APPL_DB', 'STATE_DB']): + port_tbl_map = { + 'APPL_DB': swsscommon.APP_PORT_TABLE_NAME, + 'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME, + 'STATE_DB': 'TRANSCEIVER_INFO' + } + + sel = swsscommon.Select() + asic_context = {} + namespaces = multi_asic.get_front_end_namespaces() + for db_name in db_list: + if db_name not in port_tbl_map: + continue + for namespace in namespaces: + db = daemon_base.db_connect(db_name, namespace=namespace) + asic_id = multi_asic.get_asic_index_from_namespace(namespace) + port_tbl = swsscommon.SubscriberStateTable(db, port_tbl_map[db_name]) + asic_context[port_tbl] = asic_id + sel.addSelectable(port_tbl) + return sel, asic_context + +def handle_port_update_event(sel, asic_context, stop_event, logger, port_change_event_handler): + """ + Select PORT update events, notify the observers upon a port update in APPL_DB/CONFIG_DB + or a XCVR insertion/removal in STATE_DB + """ + if not stop_event.is_set(): + (state, _) = sel.select(SELECT_TIMEOUT_MSECS) + if state == swsscommon.Select.TIMEOUT: + return + if state != swsscommon.Select.OBJECT: + logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') + return + for port_tbl in asic_context.keys(): + while True: + (key, op, fvp) = port_tbl.pop() + if not key: + break + fvp = dict(fvp) if fvp is not None else {} + if 'index' not in fvp: + fvp['index'] = '-1' + port_index = int(fvp['index']) + port_change_event = None + if op == swsscommon.SET_COMMAND: + port_change_event = PortChangeEvent(key, + port_index, + asic_context[port_tbl], + PortChangeEvent.PORT_SET, + fvp) + elif op == swsscommon.DEL_COMMAND: + port_change_event = PortChangeEvent(key, + port_index, + asic_context[port_tbl], + PortChangeEvent.PORT_DEL, + fvp) + if port_change_event is not None: + port_change_event_handler(port_change_event) def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler): """Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers @@ -107,9 +168,8 @@ def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logge if state != swsscommon.Select.OBJECT: logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT') return - + read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler) - def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler): for port_tbl in asic_context.keys(): @@ -130,9 +190,9 @@ def read_port_config_change(asic_context, port_mapping, logger, port_change_even else: current_physical_index = port_mapping.get_logical_to_physical(key)[0] if current_physical_index != new_physical_index: - port_change_event = PortChangeEvent(key, - current_physical_index, - asic_context[port_tbl], + port_change_event = PortChangeEvent(key, + current_physical_index, + asic_context[port_tbl], PortChangeEvent.PORT_REMOVE) port_change_event_handler(port_change_event) @@ -140,15 +200,14 @@ def read_port_config_change(asic_context, port_mapping, logger, port_change_even port_change_event_handler(port_change_event) elif op == swsscommon.DEL_COMMAND: if port_mapping.is_logical_port(key): - port_change_event = PortChangeEvent(key, - port_mapping.get_logical_to_physical(key)[0], - asic_context[port_tbl], + port_change_event = PortChangeEvent(key, + port_mapping.get_logical_to_physical(key)[0], + asic_context[port_tbl], PortChangeEvent.PORT_REMOVE) port_change_event_handler(port_change_event) else: logger.log_warning('Invalid DB operation: {}'.format(op)) - def get_port_mapping(): """Get port mapping from CONFIG_DB """ @@ -163,4 +222,4 @@ def get_port_mapping(): port_config_dict = dict(port_config) port_change_event = PortChangeEvent(key, port_config_dict['index'], asic_id, PortChangeEvent.PORT_ADD) port_mapping.handle_port_change_event(port_change_event) - return port_mapping \ No newline at end of file + return port_mapping