Skip to content

Commit

Permalink
port_mapping: drop the hacks for CMIS
Browse files Browse the repository at this point in the history
Signed-off-by: Dante Su <dante.su@broadcom.com>
  • Loading branch information
ds952811 committed Nov 18, 2021
1 parent 501134b commit 9be5135
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 58 deletions.
69 changes: 31 additions & 38 deletions sonic-xcvrd/xcvrd/xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,82 +847,75 @@ def is_fast_reboot_enabled():

class CmisManagerTask:

def __init__(self):
def __init__(self, port_mapping):
# Max. number of the worker process/thread
self.worker_max = 5
self.task_workers = []
self.task_stopping_event = multiprocessing.Event()
self.task_queue = multiprocessing.Manager().Queue()
self.port_mapping = port_mapping.get_port_mapping()
self.port_mapping = copy.deepcopy(port_mapping)

def log_notice(self, message):
helper_logger.log_notice("CMIS (pid {0}): {1}".format(os.getpid(), message))

def on_port_config_change(self, port_change_event):
if port_change_event.event_type != port_change_event.PORT_ADD:
if port_change_event.event_type != port_change_event.PORT_SET:
return
logical_port = port_change_event.port_dict.get('name')
logical_port = port_change_event.port_name
physical_port = port_change_event.port_dict.get('index')
speed = port_change_event.port_dict.get('speed')
lanes = port_change_event.port_dict.get('lanes')
if (physical_port is None) or (speed is None) or (lanes is None):
return

try:
msg = {
'logical_port': logical_port,
'physical_port': int(physical_port),
'speed': int(speed),
'lanes': len(lanes.split(','))
'lanes': lanes.split(',')
}
except Exception as ex:
helper_logger.log_notice("CMIS (pid {0}): {1}".format(os.getpid(), ex))
except ValueError as ex:
self.log_notice("{}".format(ex))
return

self.task_queue.put(msg)
time.sleep(0.001)

def task_worker(self, dispatcher):
helper_logger.log_notice("Start CMIS task (pid {}): {}".format(os.getpid(), dispatcher))
def task_worker(self, commander):
self.log_notice("Starting...{}".format(commander))

if dispatcher:
if commander:
sel, asic_context = port_mapping.subscribe_port_config_change()
while not self.task_stopping_event.is_set():
if dispatcher:
while not self.task_stopping_event.is_set():
# Handle port change event from main thread
port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change, True)
else:

try:
msg = self.task_queue.get(block=True)
except:
msg = None
port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change)
else:
while not self.task_stopping_event.is_set():
msg = self.task_queue.get()
if msg is None:
time.sleep(0.5)
time.sleep(1)
continue

logical_port = msg['logical_port']
physical_port = msg['physical_port']
lport = msg['logical_port']
pport = msg['physical_port']
lanes = msg['lanes']
speed = msg['speed']
try:
sfp = platform_chassis.get_sfp(physical_port)
except Exception as ex:
helper_logger.log_notice("CMIS (pid {0}): {1}".format(os.getpid(), ex))
continue

sfp = platform_chassis.get_sfp(pport)
if not sfp.get_presence():
continue

helper_logger.log_notice("CMIS (pid {0}): {1}: i={2},speed={3},lanes={4}".format(os.getpid(), logical_port, physical_port, speed, lanes))

try:
ret = sfp.set_cmis_application(speed, lanes)
helper_logger.log_notice("CMIS (pid {0}): {1}: {2}".format(os.getpid(), logical_port, ("succeeded" if ret else "failed")))
except Exception as ex:
helper_logger.log_notice("CMIS (pid {0}): {1}: {2}".format(os.getpid(), logical_port, ex))
ret = sfp.set_cmis_application(speed, len(lanes))
except (NotImplementedError, AttributeError):
ret = False
self.log_notice("{0}: {1}".format(lport, "succeeded" if ret else "failed"))

helper_logger.log_notice("Stop CMIS task (pid {0})".format(os.getpid()))
self.log_notice("Stopped")

def task_run(self, y_cable_presence):
if platform_chassis is None:
helper_logger.log_notice("CMIS: Platform chassis is not available, stopping...")
self.log_notice("Platform chassis is not available, stopping...")
return

has_cmis = False
Expand All @@ -932,7 +925,7 @@ def task_run(self, y_cable_presence):
break

if not has_cmis:
helper_logger.log_notice("CMIS: None of QSFP-DD cage is detected, stopping...")
self.log_notice("None of QSFP-DD cage is detected, stopping...")
return

for n in range(0, self.worker_max):
Expand Down Expand Up @@ -1642,7 +1635,7 @@ def run(self):
port_mapping_data, retry_eeprom_set = self.init()

# Start the CMIS manager
cmis_manager = CmisManagerTask()
cmis_manager = CmisManagerTask(port_mapping_data)
cmis_manager.task_run(self.y_cable_presence)

# Start the dom sensor info update thread
Expand Down
58 changes: 38 additions & 20 deletions sonic-xcvrd/xcvrd/xcvrd_utilities/port_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
class PortChangeEvent:
PORT_ADD = 0
PORT_REMOVE = 1
PORT_SET = 2
PORT_DEL = 3

def __init__(self, port_name, port_index, asic_id, event_type, port_dict=None):
# Logical port name, e.g. Ethernet0
self.port_name = port_name
Expand Down Expand Up @@ -99,7 +102,7 @@ def subscribe_port_config_change():
return sel, asic_context


def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler, is_cmis=False):
def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logger, port_change_event_handler):
"""Select CONFIG_DB PORT table changes, once there is a port configuration add/remove, notify observers
"""
if not stop_event.is_set():
Expand All @@ -110,50 +113,65 @@ def handle_port_config_change(sel, asic_context, stop_event, port_mapping, logge
logger.log_warning('sel.select() did not return swsscommon.Select.OBJECT')
return

read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler, is_cmis)
read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler)

def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler, is_cmis=False):
def read_port_config_change(asic_context, port_mapping, logger, port_change_event_handler):
for port_tbl in asic_context.keys():
while True:
(key, op, fvp) = port_tbl.pop()
if not key:
break
fvp = dict(fvp) if fvp is not None else {}
if op == swsscommon.SET_COMMAND:
fvp = dict(fvp)
if 'index' not in fvp:
continue

new_physical_index = int(fvp['index'])
if is_cmis:
fvp['name'] = key
port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD, fvp)
port_change_event_handler(port_change_event)
elif not port_mapping.is_logical_port(key):

port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_SET, fvp)
port_change_event_handler(port_change_event)

if not port_mapping.is_logical_port(key):
# New logical port created
port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD)
port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD, fvp)
port_change_event_handler(port_change_event)
else:
current_physical_index = port_mapping.get_logical_to_physical(key)[0]
if current_physical_index != new_physical_index:
port_change_event = PortChangeEvent(key,
current_physical_index,
asic_context[port_tbl],
PortChangeEvent.PORT_REMOVE)
port_change_event = PortChangeEvent(key,
current_physical_index,
asic_context[port_tbl],
PortChangeEvent.PORT_REMOVE,
fvp)
port_change_event_handler(port_change_event)

port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD)
port_change_event = PortChangeEvent(key, new_physical_index, asic_context[port_tbl], PortChangeEvent.PORT_ADD, fvp)
port_change_event_handler(port_change_event)
elif op == swsscommon.DEL_COMMAND:
if 'index' in fvp:
physical_index = int(fvp['index'])
elif port_mapping.is_logical_port(key):
physical_index = port_mapping.get_logical_to_physical(key)[0]
else:
physical_index = 0

port_change_event = PortChangeEvent(key,
physical_index,
asic_context[port_tbl],
PortChangeEvent.PORT_DEL,
fvp)
port_change_event_handler(port_change_event)

if port_mapping.is_logical_port(key):
port_change_event = PortChangeEvent(key,
port_mapping.get_logical_to_physical(key)[0],
asic_context[port_tbl],
PortChangeEvent.PORT_REMOVE)
port_change_event = PortChangeEvent(key,
physical_index,
asic_context[port_tbl],
PortChangeEvent.PORT_REMOVE,
fvp)
port_change_event_handler(port_change_event)
else:
logger.log_warning('Invalid DB operation: {}'.format(op))


def get_port_mapping():
"""Get port mapping from CONFIG_DB
"""
Expand Down

0 comments on commit 9be5135

Please sign in to comment.