Skip to content

Commit

Permalink
add missing unittest for CMIS
Browse files Browse the repository at this point in the history
Signed-off-by: Dante Su <dante.su@broadcom.com>
  • Loading branch information
ds952811 committed Nov 18, 2021
1 parent ceea223 commit 6dd3608
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 11 deletions.
82 changes: 82 additions & 0 deletions sonic-xcvrd/tests/test_xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,88 @@ def test_DaemonXcvrd_run(self, mock_task_stop1, mock_task_stop2, mock_task_run1,
assert mock_deinit.call_count == 1
assert mock_init.call_count == 1

@patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock())
def test_CmisManagerTask_handle_port_change_event(self):
port_mapping = PortMapping()
task = CmisManagerTask(port_mapping)

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD)
task.on_port_config_change(port_change_event)
assert task.task_queue.empty()

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_REMOVE)
task.on_port_config_change(port_change_event)
assert task.task_queue.empty()

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_DEL)
task.on_port_config_change(port_change_event)
assert task.task_queue.empty()

port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET)
task.on_port_config_change(port_change_event)
assert not task.task_queue.empty()

@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
def test_CmisManagerTask_task_run_stop(self, mock_chassis):
mock_object = MagicMock()
mock_object.get_presence = MagicMock(return_value=True)
mock_object.get_port_type = MagicMock(return_value="QSFP_DD")
mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object, mock_object])

port_mapping = PortMapping()
task = CmisManagerTask(port_mapping)
task.task_run([False])
task.task_stop()
for worker in task.task_workers:
assert not worker.is_alive()

@patch('xcvrd.xcvrd.platform_chassis')
@patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None)))
@patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock())
def test_CmisManagerTask_task_worker(self, mock_chassis):
mock_object = MagicMock()
mock_object.get_presence = MagicMock(return_value=True)
mock_object.get_port_type = MagicMock(return_value="QSFP_DD")
mock_object.set_cmis_application = MagicMock()
mock_chassis.get_all_sfps = MagicMock(return_value=[mock_object])
mock_chassis.get_sfp = MagicMock(return_value=mock_object)

port_mapping = PortMapping()
port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_SET)

# Case 1: Both port speed and lanes are unset
task = CmisManagerTask(port_mapping)
task.task_stopping_event.is_set = MagicMock(side_effect=[False, True])
task.on_port_config_change(port_change_event)
task.task_worker(False)
assert mock_object.set_cmis_application.call_count == 0

# Case 2: Invalid port speed while lanes is valid
port_change_event.port_dict = {'speed': 0, 'lanes': "1,2,3,4,5,6,7,8"}
task = CmisManagerTask(port_mapping)
task.task_stopping_event.is_set = MagicMock(side_effect=[False, True])
task.on_port_config_change(port_change_event)
task.task_worker(False)
assert mock_object.set_cmis_application.call_count == 0

# Case 3: Valid port speed with invalid lanes
port_change_event.port_dict = {'speed': 400000, 'lanes': None}
task = CmisManagerTask(port_mapping)
task.task_stopping_event.is_set = MagicMock(side_effect=[False, True])
task.on_port_config_change(port_change_event)
task.task_worker(False)
assert mock_object.set_cmis_application.call_count == 0

# Case 4: valid port speed and lanes
port_change_event.port_dict = {'speed': 400000, 'lanes': "1,2,3,4,5,6,7,8"}
task = CmisManagerTask(port_mapping)
task.task_stopping_event.is_set = MagicMock(side_effect=[False, True])
task.on_port_config_change(port_change_event)
task.task_worker(False)
assert mock_object.set_cmis_application.call_count == 1

@patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock())
def test_DomInfoUpdateTask_handle_port_change_event(self):
port_mapping = PortMapping()
Expand Down
38 changes: 27 additions & 11 deletions sonic-xcvrd/xcvrd/xcvrd.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,13 @@ def _wrapper_get_transceiver_change_event(timeout):

def _wrapper_get_sfp_type(physical_port):
if platform_chassis:
sfp = platform_chassis.get_sfp(physical_port)
try:
return platform_chassis.get_sfp(physical_port).sfp_type
return sfp.sfp_type
except (NotImplementedError, AttributeError):
pass
try:
return sfp.get_port_type()
except (NotImplementedError, AttributeError):
pass
return None
Expand Down Expand Up @@ -861,17 +866,23 @@ def log_notice(self, message):
def on_port_config_change(self, port_change_event):
if port_change_event.event_type != port_change_event.PORT_SET:
return
logical_port = port_change_event.port_name
physical_port = port_change_event.port_dict.get('index')
speed = port_change_event.port_dict.get('speed')
lanes = port_change_event.port_dict.get('lanes')
if (physical_port is None) or (speed is None) or (lanes is None):

lport = port_change_event.port_name
pport = port_change_event.port_index
if port_change_event.port_dict is None:
speed = "0"
lanes = "0"
else:
speed = port_change_event.port_dict.get('speed')
lanes = port_change_event.port_dict.get('lanes')

if (pport is None) or (speed is None) or (lanes is None):
return

try:
msg = {
'logical_port': logical_port,
'physical_port': int(physical_port),
'lport': lport,
'pport': pport,
'speed': int(speed),
'lanes': lanes.split(',')
}
Expand All @@ -892,15 +903,20 @@ def task_worker(self, commander):
port_mapping.handle_port_config_change(sel, asic_context, self.task_stopping_event, self.port_mapping, helper_logger, self.on_port_config_change)
else:
while not self.task_stopping_event.is_set():
msg = self.task_queue.get()
try:
msg = self.task_queue.get(False, 0.1)
except:
msg = None
if msg is None:
time.sleep(1)
continue

lport = msg['logical_port']
pport = msg['physical_port']
lport = msg['lport']
pport = msg['pport']
lanes = msg['lanes']
speed = msg['speed']
if speed == 0 or len(lanes) < 1:
continue
sfp = platform_chassis.get_sfp(pport)
if not sfp.get_presence():
continue
Expand Down

0 comments on commit 6dd3608

Please sign in to comment.