From 7c0be2422512483b403527fcfef70a3cb363a6d2 Mon Sep 17 00:00:00 2001 From: Prince George <45705344+prgeor@users.noreply.github.com> Date: Tue, 28 Jun 2022 09:31:22 -0700 Subject: [PATCH] [CMIS]Improved 400G link bring up sequence (#254) * Improved 400G link bring up sequence * Event based handling * Remove unused functions * Force DP to remain in DpInitialized State state on admin shutdown * Added test case * Skip CMIS task manager based upon flag * Addressed review comments * Fix xcvrd crash * Fix test failure * Listen only to 'APPL_DB's admin_status * Fix typo --- sonic-xcvrd/tests/test_xcvrd.py | 61 ++-- sonic-xcvrd/xcvrd/xcvrd.py | 276 ++++++++++++------ .../xcvrd/xcvrd_utilities/port_mapping.py | 29 +- 3 files changed, 230 insertions(+), 136 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index d65a3bb89907..25c0b9468e37 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -1,3 +1,4 @@ +#from unittest.mock import DEFAULT from xcvrd.xcvrd_utilities.port_mapping import * from xcvrd.xcvrd_utilities.sfp_status_helper import * from xcvrd.xcvrd import * @@ -28,6 +29,7 @@ modules_path = os.path.dirname(test_path) scripts_path = os.path.join(modules_path, "xcvrd") sys.path.insert(0, modules_path) +DEFAULT_NAMESPACE = [''] os.environ["XCVRD_UNIT_TESTING"] = "1" @@ -242,7 +244,7 @@ def test_post_port_sfp_dom_info_to_db(self): port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) port_mapping.handle_port_change_event(port_change_event) stop_event = threading.Event() - xcvr_table_helper = XcvrTableHelper() + xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) post_port_sfp_dom_info_to_db(True, port_mapping, xcvr_table_helper, stop_event) @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @@ -255,7 +257,7 @@ def test_init_port_sfp_status_tbl(self): port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) port_mapping.handle_port_change_event(port_change_event) stop_event = threading.Event() - xcvr_table_helper = XcvrTableHelper() + xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) init_port_sfp_status_tbl(port_mapping, xcvr_table_helper, stop_event) def test_get_media_settings_key(self): @@ -339,7 +341,7 @@ def test_handle_port_config_change(self, mock_select, mock_sub_table): mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) mock_sub_table.return_value = mock_selectable - sel, asic_context = subscribe_port_config_change() + sel, asic_context = subscribe_port_config_change(DEFAULT_NAMESPACE) port_mapping = PortMapping() stop_event = threading.Event() stop_event.is_set = MagicMock(return_value=False) @@ -367,7 +369,7 @@ def test_get_port_mapping(self, mock_swsscommon_table): mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4', 'Ethernet-IB0']) mock_table.get = MagicMock(side_effect=[(True, (('index', 1), )), (True, (('index', 2), )), (True, (('index', 3), ))]) mock_swsscommon_table.return_value = mock_table - port_mapping = get_port_mapping() + port_mapping = get_port_mapping(DEFAULT_NAMESPACE) assert port_mapping.logical_port_list.count('Ethernet0') assert port_mapping.get_asic_id_for_logical_port('Ethernet0') == 0 assert port_mapping.get_physical_to_logical(1) == ['Ethernet0'] @@ -382,7 +384,7 @@ def test_get_port_mapping(self, mock_swsscommon_table): assert port_mapping.get_asic_id_for_logical_port('Ethernet-IB0') == None assert port_mapping.get_physical_to_logical(3) == None assert port_mapping.get_logical_to_physical('Ethernet-IB0') == None - + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) @patch('swsscommon.swsscommon.SubscriberStateTable') @patch('swsscommon.swsscommon.Select.select') @@ -418,7 +420,7 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1, @patch('xcvrd.xcvrd._wrapper_get_sfp_type', MagicMock(return_value='QSFP_DD')) def test_CmisManagerTask_handle_port_change_event(self): port_mapping = PortMapping() - task = CmisManagerTask(port_mapping) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) assert not task.isPortConfigDone port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) @@ -450,7 +452,7 @@ def test_CmisManagerTask_task_run_stop(self, mock_chassis): mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object]) port_mapping = PortMapping() - task = CmisManagerTask(port_mapping) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) task.task_run() task.task_stop() assert task.task_process is None @@ -536,7 +538,7 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) port_mapping = PortMapping() - task = CmisManagerTask(port_mapping) + task = CmisManagerTask(DEFAULT_NAMESPACE, port_mapping) port_change_event = PortChangeEvent('PortConfigDone', -1, 0, PortChangeEvent.PORT_SET) task.on_port_update_event(port_change_event) @@ -547,33 +549,44 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): task.on_port_update_event(port_change_event) assert len(task.port_dict) == 1 + task.get_host_tx_status = MagicMock(return_value='true') + task.get_port_admin_status = MagicMock(return_value='up') + # Case 1: Module Inserted --> DP_DEINIT + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) + task.task_worker() + assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_DEINIT' + task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() assert mock_xcvr_api.set_datapath_deinit.call_count == 1 assert mock_xcvr_api.tx_disable_channel.call_count == 1 - assert mock_xcvr_api.set_lpmode.call_count == 2 + assert mock_xcvr_api.set_lpmode.call_count == 1 + assert task.port_dict['Ethernet0']['cmis_state'] == 'AP_CONFIGURED' # Case 2: DP_DEINIT --> AP Configured task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() assert mock_xcvr_api.set_application.call_count == 1 + assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_INIT' # Case 3: AP Configured --> DP_INIT task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() assert mock_xcvr_api.set_datapath_init.call_count == 1 + assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_TXON' # Case 4: DP_INIT --> DP_TXON task.task_stopping_event.is_set = MagicMock(side_effect=[False, False, True]) task.task_worker() assert mock_xcvr_api.tx_disable_channel.call_count == 2 + assert task.port_dict['Ethernet0']['cmis_state'] == 'DP_ACTIVATION' @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) def test_DomInfoUpdateTask_handle_port_change_event(self): port_mapping = PortMapping() - task = DomInfoUpdateTask(port_mapping) - task.xcvr_table_helper = XcvrTableHelper() + task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping) + task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) task.on_port_config_change(port_change_event) assert task.port_mapping.logical_port_list.count('Ethernet0') @@ -592,7 +605,7 @@ def test_DomInfoUpdateTask_handle_port_change_event(self): @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) def test_DomInfoUpdateTask_task_run_stop(self): port_mapping = PortMapping() - task = DomInfoUpdateTask(port_mapping) + task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping) task.task_run() task.task_stop() assert not task.task_thread.is_alive() @@ -612,8 +625,8 @@ def test_DomInfoUpdateTask_task_worker(self, mock_select, mock_sub_table, mock_p mock_sub_table.return_value = mock_selectable port_mapping = PortMapping() - task = DomInfoUpdateTask(port_mapping) - task.xcvr_table_helper = XcvrTableHelper() + task = DomInfoUpdateTask(DEFAULT_NAMESPACE, port_mapping) + task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) mock_detect_error.return_value = True task.task_worker() @@ -640,8 +653,8 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): stopping_event = multiprocessing.Event() port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) - task.xcvr_table_helper = XcvrTableHelper() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) wait_time = 5 while wait_time > 0: @@ -671,7 +684,7 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): def test_SfpStateUpdateTask_task_run_stop(self): port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) sfp_error_event = multiprocessing.Event() task.task_run(sfp_error_event) assert wait_until(5, 1, task.task_process.is_alive) @@ -686,8 +699,8 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) - task.xcvr_table_helper = XcvrTableHelper() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=mock_table) task.xcvr_table_helper.get_dom_tbl = MagicMock(return_value=mock_table) task.xcvr_table_helper.get_app_port_tbl = MagicMock(return_value=mock_table) @@ -712,7 +725,7 @@ def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): def test_SfpStateUpdateTask_mapping_event_from_change_event(self): port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) port_dict = {} assert task._mapping_event_from_change_event(False, port_dict) == SYSTEM_FAIL assert port_dict[EVENT_ON_ALL_SFP] == SYSTEM_FAIL @@ -744,8 +757,8 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_info, mock_post_dom_info, mock_post_dom_th, mock_update_media_setting, mock_del_dom, mock_change_event, mock_mapping_event, mock_os_kill): port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) - task.xcvr_table_helper = XcvrTableHelper() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) stop_event = multiprocessing.Event() sfp_error_event = multiprocessing.Event() mock_change_event.return_value = (True, {0: 0}, {}) @@ -854,8 +867,8 @@ class MockTable: port_mapping = PortMapping() retry_eeprom_set = set() - task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) - task.xcvr_table_helper = XcvrTableHelper() + task = SfpStateUpdateTask(DEFAULT_NAMESPACE, port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper(DEFAULT_NAMESPACE) task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl task.xcvr_table_helper.get_dom_tbl = mock_table_helper.get_dom_tbl diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index e5bc59abf38f..e5feae753b01 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 """ xcvrd @@ -18,7 +18,8 @@ import time import datetime import subprocess - + import argparse + from sonic_py_common import daemon_base, device_info, logger from sonic_py_common import multi_asic from swsscommon import swsscommon @@ -592,7 +593,7 @@ def post_port_sfp_dom_info_to_db(is_warm_start, port_mapping, xcvr_table_helper, transceiver_dict.clear() else: retry_eeprom_set.add(logical_port_name) - + return retry_eeprom_set # Delete port dom/sfp info from db @@ -911,6 +912,7 @@ def is_fast_reboot_enabled(): fastboot_enabled = True return fastboot_enabled + # # Helper classes =============================================================== # @@ -928,19 +930,23 @@ class CmisManagerTask: CMIS_STATE_INSERTED = 'INSERTED' CMIS_STATE_DP_DEINIT = 'DP_DEINIT' CMIS_STATE_AP_CONF = 'AP_CONFIGURED' + CMIS_STATE_DP_ACTIVATE = 'DP_ACTIVATION' CMIS_STATE_DP_INIT = 'DP_INIT' CMIS_STATE_DP_TXON = 'DP_TXON' CMIS_STATE_READY = 'READY' CMIS_STATE_REMOVED = 'REMOVED' CMIS_STATE_FAILED = 'FAILED' - def __init__(self, port_mapping): + def __init__(self, namespaces, port_mapping, skip_cmis_mgr=False): self.task_stopping_event = multiprocessing.Event() self.task_process = None self.port_dict = {} self.port_mapping = copy.deepcopy(port_mapping) + self.xcvr_table_helper = XcvrTableHelper(namespaces) self.isPortInitDone = False self.isPortConfigDone = False + self.skip_cmis_mgr = skip_cmis_mgr + self.namespaces = namespaces def log_notice(self, message): helper_logger.log_notice("CMIS: {}".format(message)) @@ -972,22 +978,28 @@ def on_port_update_event(self, port_change_event): return # Skip if the port/cage type is not a CMIS - ptype = _wrapper_get_sfp_type(pport) - if ptype not in self.CMIS_MODULE_TYPES: - return - + # 'index' can be -1 if STATE_DB|PORT_TABLE if lport not in self.port_dict: self.port_dict[lport] = {} + if port_change_event.port_dict is None: + return + if port_change_event.event_type == port_change_event.PORT_SET: if pport >= 0: self.port_dict[lport]['index'] = pport - if port_change_event.port_dict is not None and 'speed' in port_change_event.port_dict: + if 'speed' in port_change_event.port_dict and port_change_event.port_dict['speed'] != 'N/A': self.port_dict[lport]['speed'] = port_change_event.port_dict['speed'] - if port_change_event.port_dict is not None and 'lanes' in port_change_event.port_dict: + if 'lanes' in port_change_event.port_dict: self.port_dict[lport]['lanes'] = port_change_event.port_dict['lanes'] - self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED - self.reset_cmis_init(lport, 0) + if 'host_tx_ready' in port_change_event.port_dict: + self.port_dict[lport]['host_tx_ready'] = port_change_event.port_dict['host_tx_ready'] + if 'admin_status' in port_change_event.port_dict and 'oper_status' in port_change_event.port_dict: + # At times 'admin_status' is NOT the same in the PORT_TABLE of APPL_DB and STATE_DB + # We dont have better way to check if 'admin_status' is from APPL_DB or STATE_DB so this + # check is put temporarily to listen only to APPL_DB's admin_status and ignore that of STATE_DB + self.port_dict[lport]['admin_status'] = port_change_event.port_dict['admin_status'] + self.force_cmis_reinit(lport, 0) else: self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_REMOVED @@ -1081,7 +1093,6 @@ def is_cmis_application_update_required(self, api, channel, speed): app_new = self.get_cmis_application_desired(api, channel, speed) if app_new != 1: self.log_notice("Non-default application is not supported") - return False app_old = 0 for lane in range(self.CMIS_NUM_CHANNELS): @@ -1113,12 +1124,15 @@ def is_cmis_application_update_required(self, api, channel, speed): return True - def reset_cmis_init(self, lport, retries=0): + def force_cmis_reinit(self, lport, retries=0): + """ + Try to force the restart of CMIS state machine + """ self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_INSERTED self.port_dict[lport]['cmis_retries'] = retries self.port_dict[lport]['cmis_expired'] = None # No expiration - def test_module_state(self, api, states): + def check_module_state(self, api, states): """ Check if the CMIS module is in the specified state @@ -1133,7 +1147,7 @@ def test_module_state(self, api, states): """ return api.get_module_state() in states - def test_config_error(self, api, channel, states): + def check_config_error(self, api, channel, states): """ Check if the CMIS configuration states are in the specified state @@ -1161,7 +1175,7 @@ def test_config_error(self, api, channel, states): return done - def test_datapath_state(self, api, channel, states): + def check_datapath_state(self, api, channel, states): """ Check if the CMIS datapath states are in the specified state @@ -1189,13 +1203,37 @@ def test_datapath_state(self, api, channel, states): return done + def get_host_tx_status(self, lport): + host_tx_ready = 'false' + + asic_index = self.port_mapping.get_asic_id_for_logical_port(lport) + state_port_tbl = self.xcvr_table_helper.get_state_port_tbl(asic_index) + + found, port_info = state_port_tbl.get(lport) + if found and 'host_tx_ready' in dict(port_info): + host_tx_ready = dict(port_info)['host_tx_ready'] + return host_tx_ready + + def get_port_admin_status(self, lport): + admin_status = 'down' + + asic_index = self.port_mapping.get_asic_id_for_logical_port(lport) + cfg_port_tbl = self.xcvr_table_helper.get_cfg_port_tbl(asic_index) + + found, port_info = cfg_port_tbl.get(lport) + if found: + # Check admin_status too ...just in case + admin_status = dict(port_info)['admin_status'] + return admin_status + def task_worker(self): - self.xcvr_table_helper = XcvrTableHelper() + self.xcvr_table_helper = XcvrTableHelper(self.namespaces) self.log_notice("Starting...") + print("Starting") # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal - sel, asic_context = port_mapping.subscribe_port_update_event(['APPL_DB', 'STATE_DB']) + sel, asic_context = port_mapping.subscribe_port_update_event(self.namespaces) while not self.task_stopping_event.is_set(): # Handle port change event from main thread port_mapping.handle_port_update_event(sel, @@ -1221,6 +1259,14 @@ def task_worker(self): self.CMIS_STATE_REMOVED]: continue + # Handle the case when Xcvrd was NOT running when 'host_tx_ready' or 'admin_status' + # was updated or this is the first run so reconcile the above two attributes + if 'host_tx_ready' not in self.port_dict[lport]: + self.port_dict[lport]['host_tx_ready'] = self.get_host_tx_status(lport) + + if 'admin_status' not in self.port_dict[lport]: + self.port_dict[lport]['admin_status'] = self.get_port_admin_status(lport) + pport = int(info.get('index', "-1")) speed = int(info.get('speed', "0")) lanes = info.get('lanes', "").strip() @@ -1248,11 +1294,13 @@ def task_worker(self): # Skip if XcvrApi is not supported api = sfp.get_xcvr_api() if api is None: + self.log_error("{}: skipping CMIS state machine since no xcvr api!!!".format(lport)) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue # Skip if it's not a paged memory device if api.is_flat_memory(): + self.log_notice("{}: skipping CMIS state machine for flat memory xcvr".format(lport)) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue @@ -1284,6 +1332,13 @@ def task_worker(self): try: # CMIS state transitions if state == self.CMIS_STATE_INSERTED: + if self.port_dict[lport]['host_tx_ready'] != 'true' or \ + self.port_dict[lport]['admin_status'] != 'up': + self.log_notice("{} Forcing Tx laser OFF".format(lport)) + # Force DataPath re-init + api.tx_disable_channel(host_lanes, True) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY + continue appl = self.get_cmis_application_desired(api, host_lanes, host_speed) if appl < 1: @@ -1294,38 +1349,37 @@ def task_worker(self): has_update = self.is_cmis_application_update_required(api, host_lanes, host_speed) if not has_update: # No application updates - self.log_notice("{}: READY".format(lport)) + self.log_notice("{}: no CMIS application update required...READY".format(lport)) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY continue + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT + elif state == self.CMIS_STATE_DP_DEINIT: # D.2.2 Software Deinitialization api.set_datapath_deinit(host_lanes) - api.set_lpmode(True) - if not self.test_module_state(api, ['ModuleReady', 'ModuleLowPwr']): - self.log_notice("{}: unable to enter low-power mode".format(lport)) - self.port_dict[lport]['cmis_retries'] = retries + 1 - continue # D.1.3 Software Configuration and Initialization if not api.tx_disable_channel(host_lanes, True): self.log_notice("{}: unable to turn off tx power".format(lport)) self.port_dict[lport]['cmis_retries'] = retries + 1 continue - api.set_lpmode(False) - # TODO: Use fine grained time when the CMIS memory map is available + # TODO: Make sure this doesn't impact other datapaths + api.set_lpmode(False) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_AP_CONF self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) - self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_DEINIT - elif state == self.CMIS_STATE_DP_DEINIT: - if not self.test_module_state(api, ['ModuleReady']): + elif state == self.CMIS_STATE_AP_CONF: + # TODO: Use fine grained time when the CMIS memory map is available + if not self.check_module_state(api, ['ModuleReady']): if (expired is not None) and (expired <= now): self.log_notice("{}: timeout for 'ModuleReady'".format(lport)) - self.reset_cmis_init(lport, retries + 1) + self.force_cmis_reinit(lport, retries + 1) continue - if not self.test_datapath_state(api, host_lanes, ['DataPathDeinit', 'DataPathDeactivated']): + + if not self.check_datapath_state(api, host_lanes, ['DataPathDeactivated']): if (expired is not None) and (expired <= now): - self.log_notice("{}: timeout for 'DataPathDeinit'".format(lport)) - self.reset_cmis_init(lport, retries + 1) + self.log_notice("{}: timeout for 'DataPathDeactivated state'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) continue # D.1.3 Software Configuration and Initialization @@ -1337,47 +1391,51 @@ def task_worker(self): if not api.set_application(host_lanes, appl): self.log_notice("{}: unable to set application".format(lport)) - self.reset_cmis_init(lport, retries + 1) - continue - - # TODO: Use fine grained time when the CMIS memory map is available - self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) - self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_AP_CONF - elif state == self.CMIS_STATE_AP_CONF: - if not self.test_config_error(api, host_lanes, ['ConfigSuccess']): - if (expired is not None) and (expired <= now): - self.log_notice("{}: timeout for 'ConfigSuccess'".format(lport)) - self.reset_cmis_init(lport, retries + 1) + self.force_cmis_reinit(lport, retries + 1) continue - # D.1.3 Software Configuration and Initialization - api.set_datapath_init(host_lanes) - # TODO: Use fine grained time when the CMIS memory map is available self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_INIT elif state == self.CMIS_STATE_DP_INIT: - if not self.test_datapath_state(api, host_lanes, ['DataPathInitialized']): + if not self.check_config_error(api, host_lanes, ['ConfigSuccess']): if (expired is not None) and (expired <= now): - self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) - self.reset_cmis_init(lport, retries + 1) + self.log_notice("{}: timeout for 'ConfigSuccess'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) continue - # D.1.3 Software Configuration and Initialization - if not api.tx_disable_channel(host_lanes, False): - self.log_notice("{}: unable to turn on tx power".format(lport)) - self.reset_cmis_init(lport, retries + 1) + # Ensure the Datapath is NOT Activated unless the host Tx siganl is good. + # NOTE: Some CMIS compliant modules may have 'auto-squelch' feature where + # the module won't take datapaths to Activated state if host tries to enable + # the datapaths while there is no good Tx signal from the host-side. + if self.port_dict[lport]['admin_status'] != 'up' or \ + self.port_dict[lport]['host_tx_ready'] != 'true': + self.log_notice("{} waiting for host tx ready...".format(lport)) continue + # D.1.3 Software Configuration and Initialization + api.set_datapath_init(host_lanes) # TODO: Use fine grained timeout when the CMIS memory map is available self.port_dict[lport]['cmis_expired'] = now + datetime.timedelta(seconds=self.CMIS_DEF_EXPIRED) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_TXON elif state == self.CMIS_STATE_DP_TXON: - if not self.test_datapath_state(api, host_lanes, ['DataPathActivated']): + if not self.check_datapath_state(api, host_lanes, ['DataPathInitialized']): + if (expired is not None) and (expired <= now): + self.log_notice("{}: timeout for 'DataPathInitialized'".format(lport)) + self.force_cmis_reinit(lport, retries + 1) + continue + + # Turn ON the laser + api.tx_disable_channel(host_lanes, False) + self.log_notice("{}: Turning ON tx power".format(lport)) + self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_DP_ACTIVATE + elif state == self.CMIS_STATE_DP_ACTIVATE: + if not self.check_datapath_state(api, host_lanes, ['DataPathActivated']): if (expired is not None) and (expired <= now): self.log_notice("{}: timeout for 'DataPathActivated'".format(lport)) - self.reset_cmis_init(lport, retries + 1) + self.force_cmis_reinit(lport, retries + 1) continue + self.log_notice("{}: READY".format(lport)) self.port_dict[lport]['cmis_state'] = self.CMIS_STATE_READY @@ -1392,6 +1450,10 @@ def task_run(self): self.log_notice("Platform chassis is not available, stopping...") return + if self.skip_cmis_mgr: + self.log_notice("Skipping CMIS Task Manager") + return + self.task_process = multiprocessing.Process(target=self.task_worker) if self.task_process is not None: self.task_process.start() @@ -1406,17 +1468,18 @@ def task_stop(self): class DomInfoUpdateTask(object): - def __init__(self, port_mapping): + def __init__(self, namespaces, port_mapping): self.task_thread = None self.task_stopping_event = threading.Event() self.port_mapping = copy.deepcopy(port_mapping) + self.namespaces = namespaces def task_worker(self): - self.xcvr_table_helper = XcvrTableHelper() + self.xcvr_table_helper = XcvrTableHelper(self.namespaces) helper_logger.log_info("Start DOM monitoring loop") dom_info_cache = {} dom_th_info_cache = {} - sel, asic_context = port_mapping.subscribe_port_config_change() + sel, asic_context = port_mapping.subscribe_port_config_change(self.namespaces) # Start loop to update dom info in DB periodically while not self.task_stopping_event.wait(DOM_INFO_UPDATE_PERIOD_SECS): @@ -1465,18 +1528,18 @@ def on_remove_logical_port(self, port_change_event): # To avoid race condition, remove the entry TRANSCEIVER_DOM_INFO table. # This thread only update TRANSCEIVER_DOM_INFO table, so we don't have to remove entries from # TRANSCEIVER_INFO and TRANSCEIVER_STATUS_INFO - del_port_sfp_dom_info_from_db(port_change_event.port_name, - self.port_mapping, + del_port_sfp_dom_info_from_db(port_change_event.port_name, + self.port_mapping, None, self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) - + # Process wrapper class to update sfp state info periodically class SfpStateUpdateTask(object): RETRY_EEPROM_READING_INTERVAL = 60 - def __init__(self, port_mapping, retry_eeprom_set): + def __init__(self, namespaces, port_mapping, retry_eeprom_set): self.task_process = None self.task_stopping_event = multiprocessing.Event() self.port_mapping = copy.deepcopy(port_mapping) @@ -1488,6 +1551,7 @@ def __init__(self, port_mapping, retry_eeprom_set): # because _wrapper_get_presence returns the SFP presence status self.sfp_error_dict = {} self.sfp_insert_events = {} + self.namespaces = namespaces def _mapping_event_from_change_event(self, status, port_dict): """ @@ -1514,7 +1578,7 @@ def _mapping_event_from_change_event(self, status, port_dict): return event def task_worker(self, stopping_event, sfp_error_event): - self.xcvr_table_helper = XcvrTableHelper() + self.xcvr_table_helper = XcvrTableHelper(self.namespaces) helper_logger.log_info("Start SFP monitoring loop") @@ -1588,7 +1652,7 @@ def task_worker(self, stopping_event, sfp_error_event): retry = 0 timeout = RETRY_PERIOD_FOR_SYSTEM_READY_MSECS state = STATE_INIT - sel, asic_context = port_mapping.subscribe_port_config_change() + sel, asic_context = port_mapping.subscribe_port_config_change(self.namespaces) port_change_event_handler = functools.partial(self.on_port_config_change, stopping_event) while not stopping_event.is_set(): port_mapping.handle_port_config_change(sel, asic_context, stopping_event, self.port_mapping, helper_logger, port_change_event_handler) @@ -1652,8 +1716,8 @@ def task_worker(self, stopping_event, sfp_error_event): # this is for the vendors who don't implement "system_not_ready/system_becom_ready" logic logical_port_dict = {} for key, value in port_dict.items(): - # SFP error event should be cached because: when a logical port is created, there is no way to - # detect the SFP error by platform API. + # SFP error event should be cached because: when a logical port is created, there is no way to + # detect the SFP error by platform API. if value != sfp_status_helper.SFP_STATUS_INSERTED and value != sfp_status_helper.SFP_STATUS_REMOVED: self.sfp_error_dict[key] = (value, error_dict) else: @@ -1764,6 +1828,7 @@ def task_run(self, sfp_error_event): if self.task_stopping_event.is_set(): return + self.task_process = multiprocessing.Process(target=self.task_worker, args=( self.task_stopping_event, sfp_error_event)) self.task_process.start() @@ -1788,9 +1853,9 @@ def on_remove_logical_port(self, port_change_event): # To avoid race condition, remove the entry TRANSCEIVER_DOM_INFO, TRANSCEIVER_STATUS_INFO and TRANSCEIVER_INFO table. # The operation to remove entry from TRANSCEIVER_DOM_INFO is duplicate with DomInfoUpdateTask.on_remove_logical_port, # but it is necessary because TRANSCEIVER_DOM_INFO is also updated in this sub process when a new SFP is inserted. - del_port_sfp_dom_info_from_db(port_change_event.port_name, - self.port_mapping, - self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id), + del_port_sfp_dom_info_from_db(port_change_event.port_name, + self.port_mapping, + self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id), self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) delete_port_from_status_table(port_change_event.port_name, self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) @@ -1809,12 +1874,12 @@ def on_add_logical_port(self, port_change_event): """ # A logical port is created. There could be 3 cases: # 1. SFP information is already in DB, which means that a logical port with the same physical index is in DB before. - # Need copy the data from existing logical port and insert it into TRANSCEIVER_DOM_INFO, TRANSCEIVER_STATUS_INFO + # Need copy the data from existing logical port and insert it into TRANSCEIVER_DOM_INFO, TRANSCEIVER_STATUS_INFO # and TRANSCEIVER_INFO table. - # 2. SFP information is not in DB and SFP is present with no SFP error. Need query the SFP status by platform API and + # 2. SFP information is not in DB and SFP is present with no SFP error. Need query the SFP status by platform API and # insert the data to DB. - # 3. SFP information is not in DB and SFP is present with SFP error. If the SFP error does not block EEPROM reading, - # just query transceiver information and DOM sensor information via platform API and update the data to DB; otherwise, + # 3. SFP information is not in DB and SFP is present with SFP error. If the SFP error does not block EEPROM reading, + # just query transceiver information and DOM sensor information via platform API and update the data to DB; otherwise, # just update TRANSCEIVER_STATUS table with the error. # 4. SFP information is not in DB and SFP is not present. Only update TRANSCEIVER_STATUS_INFO table. logical_port_event_dict = {} @@ -1824,7 +1889,7 @@ def on_add_logical_port(self, port_change_event): int_tbl = self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id) dom_tbl = self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id) physical_port_list = self.port_mapping.logical_port_name_to_physical_port_list(port_change_event.port_name) - + # Try to find a logical port with same physical index in DB for physical_port in physical_port_list: logical_port_list = self.port_mapping.get_physical_to_logical(physical_port) @@ -1836,10 +1901,10 @@ def on_add_logical_port(self, port_change_event): if found: sibling_port = logical_port break - + if sfp_status: break - + if sfp_status: # SFP information is in DB status_tbl.set(port_change_event.port_name, sfp_status) @@ -1899,7 +1964,7 @@ def retry_eeprom_reading(self): if not self.retry_eeprom_set: return - # Retry eeprom with an interval RETRY_EEPROM_READING_INTERVAL. No need to put sleep here + # Retry eeprom with an interval RETRY_EEPROM_READING_INTERVAL. No need to put sleep here # because _wrapper_get_transceiver_change_event has a timeout argument. now = time.time() if now - self.last_retry_eeprom_time < self.RETRY_EEPROM_READING_INTERVAL: @@ -1928,10 +1993,12 @@ def retry_eeprom_reading(self): class DaemonXcvrd(daemon_base.DaemonBase): - def __init__(self, log_identifier): + def __init__(self, log_identifier, skip_cmis_mgr=False): super(DaemonXcvrd, self).__init__(log_identifier) self.stop_event = threading.Event() self.sfp_error_event = multiprocessing.Event() + self.skip_cmis_mgr = skip_cmis_mgr + self.namespaces = [''] # Signal handler def signal_handler(self, sig, frame): @@ -1979,7 +2046,7 @@ def load_media_settings(self): with open(media_settings_file_path, "r") as media_file: g_dict = json.load(media_file) - + # Initialize daemon def init(self): global platform_sfputil @@ -2013,9 +2080,12 @@ def init(self): if multi_asic.is_multi_asic(): # Load the namespace details first from the database_global.json file. swsscommon.SonicDBConfig.initializeGlobalConfig() + # To prevent race condition in get_all_namespaces() we cache the namespaces before + # creating any worker threads + self.namespaces = multi_asic.get_front_end_namespaces() # Initialize xcvr table helper - self.xcvr_table_helper = XcvrTableHelper() + self.xcvr_table_helper = XcvrTableHelper(self.namespaces) if is_fast_reboot_enabled(): self.log_info("Skip loading media_settings.json in case of fast-reboot") @@ -2029,11 +2099,10 @@ def init(self): # Make sure this daemon started after all port configured self.log_info("Wait for port config is done") - for namespace in self.xcvr_table_helper.namespaces: + for namespace in self.namespaces: self.wait_for_port_config_done(namespace) - - port_mapping_data = port_mapping.get_port_mapping() + port_mapping_data = port_mapping.get_port_mapping(self.namespaces) # Post all the current interface dom/sfp info to STATE_DB self.log_info("Post all port DOM/SFP info to DB") @@ -2050,7 +2119,7 @@ def deinit(self): self.log_info("Start daemon deinit...") # Delete all the information from DB and then exit - port_mapping_data = port_mapping.get_port_mapping() + port_mapping_data = port_mapping.get_port_mapping(self.namespaces) logical_port_list = port_mapping_data.logical_port_list for logical_port_name in logical_port_list: # Get the asic to which this port belongs @@ -2074,15 +2143,15 @@ def run(self): port_mapping_data, retry_eeprom_set = self.init() # Start the CMIS manager - cmis_manager = CmisManagerTask(port_mapping_data) + cmis_manager = CmisManagerTask(self.namespaces, port_mapping_data, self.skip_cmis_mgr) cmis_manager.task_run() # Start the dom sensor info update thread - dom_info_update = DomInfoUpdateTask(port_mapping_data) + dom_info_update = DomInfoUpdateTask(self.namespaces, port_mapping_data) dom_info_update.task_run() # Start the sfp state info update process - sfp_state_update = SfpStateUpdateTask(port_mapping_data, retry_eeprom_set) + sfp_state_update = SfpStateUpdateTask(self.namespaces, port_mapping_data, retry_eeprom_set) sfp_state_update.task_run(self.sfp_error_event) # Start the Y-cable state info update process if Y cable presence established @@ -2094,6 +2163,10 @@ def run(self): self.log_info("Stop daemon main loop") + # Stop the CMIS manager + if cmis_manager is not None: + cmis_manager.task_stop() + # Stop the dom sensor info update thread dom_info_update.task_stop() @@ -2111,19 +2184,23 @@ def run(self): class XcvrTableHelper: - def __init__(self): - self.int_tbl, self.dom_tbl, self.status_tbl, self.app_port_tbl = {}, {}, {}, {} + def __init__(self, namespaces): + self.int_tbl, self.dom_tbl, self.status_tbl, self.app_port_tbl, \ + self.cfg_port_tbl, self.state_port_tbl = {}, {}, {}, {}, {}, {} self.state_db = {} - self.namespaces = multi_asic.get_front_end_namespaces() - for namespace in self.namespaces: + self.cfg_db = {} + for namespace in namespaces: asic_id = multi_asic.get_asic_index_from_namespace(namespace) self.state_db[asic_id] = daemon_base.db_connect("STATE_DB", namespace) self.int_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_INFO_TABLE) self.dom_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_DOM_SENSOR_TABLE) self.status_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], TRANSCEIVER_STATUS_TABLE) + self.state_port_tbl[asic_id] = swsscommon.Table(self.state_db[asic_id], swsscommon.STATE_PORT_TABLE_NAME) appl_db = daemon_base.db_connect("APPL_DB", namespace) self.app_port_tbl[asic_id] = swsscommon.ProducerStateTable(appl_db, swsscommon.APP_PORT_TABLE_NAME) - + self.cfg_db[asic_id] = daemon_base.db_connect("CONFIG_DB", namespace) + self.cfg_port_tbl[asic_id] = swsscommon.Table(self.cfg_db[asic_id], swsscommon.CFG_PORT_TABLE_NAME) + def get_intf_tbl(self, asic_id): return self.int_tbl[asic_id] @@ -2139,6 +2216,11 @@ def get_app_port_tbl(self, asic_id): def get_state_db(self, asic_id): return self.state_db[asic_id] + def get_cfg_port_tbl(self, asic_id): + return self.cfg_port_tbl[asic_id] + + def get_state_port_tbl(self, asic_id): + return self.state_port_tbl[asic_id] # # Main ========================================================================= @@ -2148,7 +2230,11 @@ def get_state_db(self, asic_id): def main(): - xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER) + parser = argparse.ArgumentParser() + parser.add_argument('--skip_cmis_mgr', action='store_true') + + args = parser.parse_args() + xcvrd = DaemonXcvrd(SYSLOG_IDENTIFIER, args.skip_cmis_mgr) xcvrd.run() diff --git a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py index 201bc33f6be6..3c24415a4b3a 100644 --- a/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py +++ b/sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py @@ -94,10 +94,9 @@ def validate_port(port): return False return True -def subscribe_port_config_change(): +def subscribe_port_config_change(namespaces): sel = swsscommon.Select() asic_context = {} - namespaces = multi_asic.get_front_end_namespaces() for namespace in namespaces: config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace) asic_id = multi_asic.get_asic_index_from_namespace(namespace) @@ -106,23 +105,20 @@ def subscribe_port_config_change(): sel.addSelectable(port_tbl) return sel, asic_context -def subscribe_port_update_event(db_list=['APPL_DB', 'STATE_DB']): - port_tbl_map = { - 'APPL_DB': swsscommon.APP_PORT_TABLE_NAME, - 'CONFIG_DB': swsscommon.CFG_PORT_TABLE_NAME, - 'STATE_DB': 'TRANSCEIVER_INFO' - } +def subscribe_port_update_event(namespaces): + port_tbl_map = [ + {'APPL_DB': swsscommon.APP_PORT_TABLE_NAME}, + {'STATE_DB': 'TRANSCEIVER_INFO'}, + {'STATE_DB': 'PORT_TABLE'}, + ] sel = swsscommon.Select() asic_context = {} - namespaces = multi_asic.get_front_end_namespaces() - for db_name in db_list: - if db_name not in port_tbl_map: - continue + for d in port_tbl_map: for namespace in namespaces: - db = daemon_base.db_connect(db_name, namespace=namespace) + db = daemon_base.db_connect(list(d.keys())[0], namespace=namespace) asic_id = multi_asic.get_asic_index_from_namespace(namespace) - port_tbl = swsscommon.SubscriberStateTable(db, port_tbl_map[db_name]) + port_tbl = swsscommon.SubscriberStateTable(db, list(d.values())[0]) asic_context[port_tbl] = asic_id sel.addSelectable(port_tbl) return sel, asic_context @@ -186,7 +182,7 @@ def read_port_config_change(asic_context, port_mapping, logger, port_change_even if not key: break if not validate_port(key): - continue + continue if op == swsscommon.SET_COMMAND: fvp = dict(fvp) if 'index' not in fvp: @@ -218,11 +214,10 @@ def read_port_config_change(asic_context, port_mapping, logger, port_change_even else: logger.log_warning('Invalid DB operation: {}'.format(op)) -def get_port_mapping(): +def get_port_mapping(namespaces): """Get port mapping from CONFIG_DB """ port_mapping = PortMapping() - namespaces = multi_asic.get_front_end_namespaces() for namespace in namespaces: asic_id = multi_asic.get_asic_index_from_namespace(namespace) config_db = daemon_base.db_connect("CONFIG_DB", namespace=namespace)