diff --git a/sfputil/main.py b/sfputil/main.py index 96653bb622..71c0e7fda8 100644 --- a/sfputil/main.py +++ b/sfputil/main.py @@ -1112,6 +1112,59 @@ def run_firmware(port_name, mode): return status +def is_fw_switch_done(port_name): + """ + Make sure the run_firmware cmd is done + @port_name: + Returns 1 on success, and exit_code = -1 on failure + """ + status = 0 + physical_port = logical_port_to_physical_port_index(port_name) + sfp = platform_chassis.get_sfp(physical_port) + + try: + api = sfp.get_xcvr_api() + except NotImplementedError: + click.echo("This functionality is currently not implemented for this platform") + sys.exit(ERROR_NOT_IMPLEMENTED) + + try: + MAX_WAIT = 60 # 60s timeout. + is_busy = 1 # Initial to 1 for entering while loop at least one time. + timeout_time = time.time() + MAX_WAIT + while is_busy and (time.time() < timeout_time): + fw_info = api.get_module_fw_info() + is_busy = 1 if (fw_info['status'] == False) and (fw_info['result'] is not None) else 0 + time.sleep(2) + + if fw_info['status'] == True: + (ImageA, ImageARunning, ImageACommitted, ImageAInvalid, + ImageB, ImageBRunning, ImageBCommitted, ImageBInvalid) = fw_info['result'] + + if (ImageARunning == 1) and (ImageAInvalid == 1): # ImageA is running, but also invalid. + click.echo("FW info error : ImageA shows running, but also shows invalid!") + status = -1 # Abnormal status. + elif (ImageBRunning == 1) and (ImageBInvalid == 1): # ImageB is running, but also invalid. + click.echo("FW info error : ImageB shows running, but also shows invalid!") + status = -1 # Abnormal status. + elif (ImageARunning == 1) and (ImageACommitted == 0): # ImageA is running, but not committed. + click.echo("FW images switch successful : ImageA is running") + status = 1 # run_firmware is done. + elif (ImageBRunning == 1) and (ImageBCommitted == 0): # ImageB is running, but not committed. + click.echo("FW images switch successful : ImageB is running") + status = 1 # run_firmware is done. + else: # No image is running, or running and committed image is same. + click.echo("FW info error : Failed to switch into uncommitted image!") + status = -1 # Failure for Switching images. + else: + click.echo("FW switch : Timeout!") + status = -1 # Timeout or check code error or CDB not supported. + + except NotImplementedError: + click.echo("This functionality is not applicable for this transceiver") + + return status + def commit_firmware(port_name): status = 0 physical_port = logical_port_to_physical_port_index(port_name) @@ -1280,6 +1333,10 @@ def upgrade(port_name, filepath): click.echo("Firmware run in mode 1 successful") + if is_fw_switch_done(port_name) != 1: + click.echo('Failed to switch firmware images!') + sys.exit(EXIT_FAIL) + status = commit_firmware(port_name) if status != 1: click.echo('Failed to commit firmware! CDB status: {}'.format(status)) diff --git a/tests/sfputil_test.py b/tests/sfputil_test.py index 1231ba67d7..2d87efa2e8 100644 --- a/tests/sfputil_test.py +++ b/tests/sfputil_test.py @@ -475,6 +475,30 @@ def test_run_firmwre(self, mock_chassis): status = sfputil.run_firmware("Ethernet0", 1) assert status == 1 + @patch('sfputil.main.platform_chassis') + @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) + @pytest.mark.parametrize("mock_response, expected", [ + ({'status': False, 'result': None} , -1), + ({'status': True, 'result': ("1.0.1", 1, 1, 0, "1.0.2", 0, 0, 0)} , -1), + ({'status': True, 'result': ("1.0.1", 0, 0, 0, "1.0.2", 1, 1, 0)} , -1), + ({'status': True, 'result': ("1.0.1", 1, 0, 0, "1.0.2", 0, 1, 0)} , 1), + ({'status': True, 'result': ("1.0.1", 0, 1, 0, "1.0.2", 1, 0, 0)} , 1), + ({'status': True, 'result': ("1.0.1", 1, 0, 1, "1.0.2", 0, 1, 0)} , -1), + ({'status': True, 'result': ("1.0.1", 0, 1, 0, "1.0.2", 1, 0, 1)} , -1), + + # "is_fw_switch_done" function will waiting until timeout under below condition, so that this test will spend around 1min. + ({'status': False, 'result': 0} , -1), + ]) + def test_is_fw_switch_done(self, mock_chassis, mock_response, expected): + mock_sfp = MagicMock() + mock_api = MagicMock() + mock_sfp.get_xcvr_api = MagicMock(return_value=mock_api) + mock_sfp.get_presence.return_value = True + mock_chassis.get_sfp = MagicMock(return_value=mock_sfp) + mock_api.get_module_fw_info.return_value = mock_response + status = sfputil.is_fw_switch_done("Ethernet0") + assert status == expected + @patch('sfputil.main.platform_chassis') @patch('sfputil.main.logical_port_to_physical_port_index', MagicMock(return_value=1)) def test_commit_firmwre(self, mock_chassis):