From 63bd9d8b7cbbdfec039aa236079c3b5d0c01112c Mon Sep 17 00:00:00 2001 From: vdahiya12 <67608553+vdahiya12@users.noreply.github.com> Date: Fri, 7 Jul 2023 13:38:09 -0700 Subject: [PATCH] [ycabled][active-active] no initialize Async Client, when no active-active cable type; fix names for all ycabled threads (#373) This PR intends to not start sonic-ycabled's Asynchronous client, when there is no active-active cable type. The Problem that we encounter is ycabled expects all threads to be working/running state all the time. Since there are no active-active cable_type Async client thread exits gracefully, but by design this needs to be working if added to monitor loop thread. So this PR fixes this problem by runnig the Async Client only if configuration is active-active for atleast a cable. This PR also has all the infrastructure changes UT for all Classes having infinite loops in ycabled, thus helping in picking up cases if changes break the code. Signed-off-by: vaibhav-dahiya --- sonic-ycabled/tests/test_y_cable_helper.py | 323 +++++++++++++++++- sonic-ycabled/tests/test_ycable.py | 60 +++- sonic-ycabled/ycable/ycable.py | 38 ++- .../ycable/ycable_utilities/y_cable_helper.py | 41 ++- 4 files changed, 428 insertions(+), 34 deletions(-) diff --git a/sonic-ycabled/tests/test_y_cable_helper.py b/sonic-ycabled/tests/test_y_cable_helper.py index 04ac6f7b5..84630eb48 100644 --- a/sonic-ycabled/tests/test_y_cable_helper.py +++ b/sonic-ycabled/tests/test_y_cable_helper.py @@ -1730,6 +1730,7 @@ def test_change_ports_status_for_y_cable_change_event(self, mock_swsscommon_tabl def mock_get_asic_id(mock_logical_port_name): return 0 + state_db = {} y_cable_presence = [True] logical_port_dict = {'Ethernet0': '1'} @@ -1745,7 +1746,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stop_event=threading.Event()) + logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) assert(rc == None) @@ -1764,6 +1765,7 @@ def mock_get_asic_id(mock_logical_port_name): y_cable_presence = [True] logical_port_dict = {'Ethernet0': '1'} + state_db = {} mock_table = MagicMock() mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) @@ -1777,7 +1779,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl,stop_event=threading.Event()) + logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) assert(rc == None) @@ -1794,6 +1796,7 @@ def mock_get_asic_id(mock_logical_port_name): y_cable_presence = [True] logical_port_dict = {'Ethernet0': '2'} + state_db = {} mock_table = MagicMock() mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) @@ -1806,7 +1809,7 @@ def mock_get_asic_id(mock_logical_port_name): patched_util.get_asic_id_for_logical_port.return_value = 0 rc = change_ports_status_for_y_cable_change_event( - logical_port_dict, y_cable_presence,port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stop_event=threading.Event()) + logical_port_dict, y_cable_presence,port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()) assert(rc == None) @@ -7141,3 +7144,317 @@ def test_ycable_graceful_client(self, channel, stub): read_side = 1 Y_cable_restart_client = GracefulRestartClient("Ethernet48", None, read_side) + +class TestYcableScriptExecution(object): + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_ycable_helper_cli_worker(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + stop_event = threading.Event() + + asic_index = 0 + Y_cable_cli_task = YCableCliUpdateTask() + Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + + #Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=False) + + expected_exception_start = None + expected_exception_join = None + #Y_cable_cli_task.start() + Y_cable_cli_task.task_cli_worker() + Y_cable_cli_task.task_stopping_event.clear() + + assert swsscommon.Select.select.call_count == 1 + #y_cable_helper.handle_show_hwmode_state_cmd_arg_tbl_notification.assert_called() + Y_cable_cli_task_n = YCableCliUpdateTask() + Y_cable_cli_task_n.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + + mock_selectable.pop = MagicMock( + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + + Y_cable_cli_task_n.task_cli_worker() + assert swsscommon.Select.select.call_count == 2 + + + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + def test_ycable_helper_cli_worker_execution(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[(False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False) ,('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + stop_event = threading.Event() + + asic_index = 0 + Y_cable_cli_task = YCableCliUpdateTask() + Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + Y_cable_cli_task.cli_table_helper.xcvrd_show_hwmode_dir_cmd_tbl[asic_index].return_value = mock_selectable + + #Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=False) + + expected_exception_start = None + expected_exception_join = None + trace = None + """ + try: + #Y_cable_cli_task.start() + Y_cable_cli_task.task_cli_worker() + time.sleep(5) + Y_cable_cli_task.task_stopping_event.clear() + except Exception as e1: + expected_exception_start = e1 + trace = traceback.format_exc() + """ + + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + #@patch('swsscommon.swsscommon.Table') + def test_ycable_helper_table_worker(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('state', 'active'), )), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + + Y_cable_task = YCableTableUpdateTask() + Y_cable_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + mock_table = MagicMock() + """mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) + mock_table.get = MagicMock( + side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + """ + Y_cable_task.hw_mux_cable_tbl_keys = MagicMock(side_effect={0:["Ethernet0", "Ethernet4"]}) + Y_cable_task.task_worker() + assert swsscommon.Select.select.call_count == 1 + + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + def test_ycable_helper_table_worker_active_active(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + + Y_cable_task = YCableTableUpdateTask() + Y_cable_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + Y_cable_task.task_worker() + assert swsscommon.Select.select.call_count == 1 + + + + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_stubs', MagicMock(return_value={})) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_channels', MagicMock(return_value={})) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.FieldValuePairs', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + #@patch('swsscommon.swsscommon.Table') + def test_ycable_helper_table_worker_probe_active_active(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[(False, False, False), (False, False, False), (False, False, False), ('Ethernet0', swsscommon.SET_COMMAND, (('state', 'active'), )), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + + Y_cable_task = YCableTableUpdateTask() + Y_cable_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + """ + mock_table = MagicMock() + mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) + mock_table.get = MagicMock( + side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + """ + + Y_cable_task.hw_mux_cable_tbl_keys = MagicMock(side_effect={0:["Ethernet0", "Ethernet4"]}) + Y_cable_task.task_worker() + assert swsscommon.Select.select.call_count == 1 + + + + + + + + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_stubs', MagicMock(return_value={})) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_channels', MagicMock(return_value={})) + @patch('swsscommon.swsscommon.FieldValuePairs', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + #@patch('swsscommon.swsscommon.Table') + def test_ycable_helper_table_worker_probe_active(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[(False, False, False), (False, False, False), ('Ethernet0', swsscommon.SET_COMMAND, (('state', 'active'), ('command', 'probe'),)), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + + Y_cable_task = YCableTableUpdateTask() + Y_cable_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + """ + mock_table = MagicMock() + mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) + mock_table.get = MagicMock( + side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) + mock_swsscommon_table.return_value = mock_table + """ + + Y_cable_task.hw_mux_cable_tbl_keys = MagicMock(side_effect={0:["Ethernet0", "Ethernet4"]}) + Y_cable_task.task_worker() + assert swsscommon.Select.select.call_count == 1 + + + + + + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + #@patch('swsscommon.swsscommon.Table') + def test_ycable_helper_table_worker_probe(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[(False, False, False), ('Ethernet0', swsscommon.SET_COMMAND, (('state', 'active'), )), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + + Y_cable_task = YCableTableUpdateTask() + Y_cable_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + #mock_table = MagicMock() + #mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) + swsscommon.Table.return_value.get.return_value = ( + True, {"read_side": "1", "state":"active"}) + #mock_table.get = MagicMock( + # side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) + swsscommon.Table.return_value.getKeys.return_value = ( + ['Ethernet0', 'Ethernet4']) + #mock_swsscommon_table.return_value = mock_table + + Y_cable_task.task_worker() + assert swsscommon.Select.select.call_count == 1 + + + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + #@patch('swsscommon.swsscommon.Table') + def test_ycable_helper_table_worker_toggle(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('state', 'active'), )), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + + Y_cable_task = YCableTableUpdateTask() + Y_cable_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + #mock_table = MagicMock() + #mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) + swsscommon.Table.return_value.get.return_value = ( + True, {"read_side": "1", "state":"active"}) + #mock_table.get = MagicMock( + # side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) + swsscommon.Table.return_value.getKeys.return_value = ( + ['Ethernet0', 'Ethernet4']) + #mock_swsscommon_table.return_value = mock_table + + Y_cable_task.task_worker() + assert swsscommon.Select.select.call_count == 1 + + + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + @patch('swsscommon.swsscommon.Select.addSelectable', MagicMock()) + @patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None)) + @patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock()) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_stubs', MagicMock(return_value={})) + @patch('ycable.ycable_utilities.y_cable_helper.grpc_port_channels', MagicMock(return_value={})) + @patch('swsscommon.swsscommon.FieldValuePairs', MagicMock()) + #@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock()) + @patch('swsscommon.swsscommon.SubscriberStateTable') + @patch('swsscommon.swsscommon.Select.select') + #@patch('swsscommon.swsscommon.Table') + def test_ycable_helper_table_worker_toggle_active_active(self, mock_select, mock_sub_table): + + mock_selectable = MagicMock() + mock_selectable.pop = MagicMock( + side_effect=[('Ethernet0', swsscommon.SET_COMMAND, (('state', 'active'), ("command", "probe"),)), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False)]) + mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable) + mock_sub_table.return_value = mock_selectable + + + Y_cable_task = YCableTableUpdateTask() + Y_cable_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True]) + #mock_table = MagicMock() + #mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4']) + swsscommon.Table.return_value.get.return_value = ( + True, {"read_side": "1", "state":"active"}) + #mock_table.get = MagicMock( + # side_effect=[(True, (('index', 1), )), (True, (('index', 2), ))]) + swsscommon.Table.return_value.getKeys.return_value = ( + ['Ethernet0', 'Ethernet4']) + #mock_swsscommon_table.return_value = mock_table + + Y_cable_task.task_worker() + assert swsscommon.Select.select.call_count == 1 + + diff --git a/sonic-ycabled/tests/test_ycable.py b/sonic-ycabled/tests/test_ycable.py index b45fea22e..1847ffd8d 100644 --- a/sonic-ycabled/tests/test_ycable.py +++ b/sonic-ycabled/tests/test_ycable.py @@ -83,6 +83,7 @@ def test_ycable_helper_class_run_loop(self): Y_cable_cli_task.start() Y_cable_cli_task.join() + @patch("swsscommon.swsscommon.Select", MagicMock()) @patch("swsscommon.swsscommon.Select.addSelectable", MagicMock()) def test_ycable_helper_class_run(self): @@ -307,13 +308,15 @@ def test_handle_state_update_task(self): port = "Ethernet0" fvp_dict = {} + state_db = {} y_cable_presence = False stopping_event = None port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl = {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {} - rc = handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stopping_event) + rc = handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event) assert(rc == None) + def wait_until(total_wait_time, interval, call_back, *args, **kwargs): wait_time = 0 while wait_time <= total_wait_time: @@ -353,20 +356,13 @@ def test_ycable_helper_class_run_loop_with_exception(self): except Exception as e2: expected_exception_join = e2 - """ - #Handy debug Helpers or else use import logging - #f = open("newfile", "w") - #f.write(format(e2)) - #f.write(format(m1)) - #f.write(trace) - """ - assert(type(expected_exception_start) == type(expected_exception_join)) assert(expected_exception_start.args == expected_exception_join.args) assert("NotImplementedError" in str(trace) and "effect" in str(trace)) assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace)) assert("swsscommon.Select" in str(trace)) + class TestYcableAsyncScript(object): @patch("swsscommon.swsscommon.Select", MagicMock(side_effect=NotImplementedError)) @@ -400,3 +396,49 @@ def test_ycable_helper_async_client_run_loop_with_exception(self, sfputil): assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace)) assert("setup_grpc_channel_for_port" in str(trace)) + +class TestYcableActiveActiveHelper(object): + + @patch("ycable.ycable.platform_sfputil") + def test_check_presence_for_active_active_cable_type(self, sfputil): + + + y_cable_tbl = {} + test_db = "TEST_DB" + status = True + asic_index = 0 + fvs = [('state', "auto"), ('read_side', 1), ('soc_ipv4', '192.168.0.1/32')] + y_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "Y_CABLE_TABLE") + y_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "Y_CABLE_TABLE") + y_cable_tbl[asic_index].get.return_value = (status, fvs) + + sfputil.logical = ["Ethernet0", "Ethernet4"] + sfputil.get_asic_id_for_logical_port = MagicMock(return_value=0) + rc = check_presence_for_active_active_cable_type(y_cable_tbl) + + assert(rc == False) + + @patch('ycable.ycable_utilities.y_cable_helper.check_mux_cable_port_type', MagicMock(return_value=(True,"active-active"))) + @patch("ycable.ycable.platform_sfputil") + def test_check_presence_for_active_active_cable_type(self, sfputil): + + + y_cable_tbl = {} + test_db = "TEST_DB" + status = True + asic_index = 0 + fvs = [('state', "auto"), ('read_side', 1), ('soc_ipv4', '192.168.0.1/32')] + y_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "Y_CABLE_TABLE") + y_cable_tbl[asic_index] = swsscommon.Table( + test_db[asic_index], "Y_CABLE_TABLE") + y_cable_tbl[asic_index].get.return_value = (status, fvs) + + sfputil.logical = ["Ethernet0", "Ethernet4"] + sfputil.get_asic_id_for_logical_port = MagicMock(return_value=0) + rc = check_presence_for_active_active_cable_type(y_cable_tbl) + + assert(rc == True) + diff --git a/sonic-ycabled/ycable/ycable.py b/sonic-ycabled/ycable/ycable.py index 3618b56f2..bc85b04d9 100644 --- a/sonic-ycabled/ycable/ycable.py +++ b/sonic-ycabled/ycable/ycable.py @@ -70,6 +70,26 @@ # Helper functions ============================================================= # + +def check_presence_for_active_active_cable_type(port_tbl): + + + logical_port_list = platform_sfputil.logical + for logical_port_name in logical_port_list: + # Get the asic to which this port belongs + asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name) + if asic_index is None: + continue + + (status, cable_type) = y_cable_helper.check_mux_cable_port_type(logical_port_name, port_tbl, asic_index) + + if status and cable_type == "active-active": + return True + + return False + + + def detect_port_in_error_status(logical_port_name, status_tbl): rec, fvp = status_tbl.get(logical_port_name) if rec: @@ -81,13 +101,13 @@ def detect_port_in_error_status(logical_port_name, status_tbl): else: return False -def handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stopping_event): +def handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event): port_dict = {} port_dict[port] = fvp_dict.get('status', None) y_cable_helper.change_ports_status_for_y_cable_change_event( - port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stopping_event) + port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event) # # Helper classes =============================================================== @@ -105,6 +125,7 @@ def __init__(self, y_cable_presence): self.task_stopping_event = threading.Event() self.y_cable_presence = y_cable_presence self.table_helper = y_cable_table_helper.YcableInfoUpdateTableHelper() + self.name = "YcableInfoUpdateTask" def task_worker(self, y_cable_presence): @@ -159,6 +180,7 @@ def __init__(self, sfp_error_event, y_cable_presence): self.sfp_error_event = sfp_error_event self.y_cable_presence = y_cable_presence self.table_helper = y_cable_table_helper.YcableStateUpdateTableHelper() + self.name = "YcableStateUpdateTask" def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): @@ -208,7 +230,7 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence): continue # Check if all tables are created in table_helper - handle_state_update_task(port, fvp_dict, y_cable_presence, self.table_helper.get_port_tbl(), self.table_helper.port_table_keys, self.table_helper.get_loopback_tbl(), self.table_helper.loopback_keys, self.table_helper.get_hw_mux_cable_tbl(), self.table_helper.get_hw_mux_cable_tbl_peer(), self.table_helper.get_y_cable_tbl(), self.table_helper.get_static_tbl(), self.table_helper.get_mux_tbl(), self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), stopping_event) + handle_state_update_task(port, fvp_dict, y_cable_presence, self.table_helper.get_port_tbl(), self.table_helper.port_table_keys, self.table_helper.get_loopback_tbl(), self.table_helper.loopback_keys, self.table_helper.get_hw_mux_cable_tbl(), self.table_helper.get_hw_mux_cable_tbl_peer(), self.table_helper.get_y_cable_tbl(), self.table_helper.get_static_tbl(), self.table_helper.get_mux_tbl(), self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), self.table_helper.state_db, stopping_event) def run(self): if self.task_stopping_event.is_set(): @@ -242,6 +264,7 @@ def __init__(self, log_identifier): self.y_cable_presence = [False] self.table_helper = y_cable_table_helper.DaemonYcableTableHelper() self.threads = [] + self.name = "DaemonYcable" # Signal handler def signal_handler(self, sig, frame): @@ -389,9 +412,12 @@ def run(self): y_cable_cli_worker_update = y_cable_helper.YCableCliUpdateTask() y_cable_cli_worker_update.start() self.threads.append(y_cable_cli_worker_update) - y_cable_async_noti_worker = y_cable_helper.YCableAsyncNotificationTask() - y_cable_async_noti_worker.start() - self.threads.append(y_cable_async_noti_worker) + # enable async client only if there are active-active cables + active_active_cable_presence = check_presence_for_active_active_cable_type(self.table_helper.get_port_tbl()) + if active_active_cable_presence is True: + y_cable_async_noti_worker = y_cable_helper.YCableAsyncNotificationTask() + y_cable_async_noti_worker.start() + self.threads.append(y_cable_async_noti_worker) # Start main loop self.log_info("Start daemon main loop") diff --git a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py index cfd6cadc3..f7a916bab 100644 --- a/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py +++ b/sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py @@ -387,10 +387,10 @@ def apply_grpc_secrets_configuration(SECRETS_PATH, grpc_config): if grpc_client_config is not None: config = grpc_client_config.get("config", None) if config is not None: - type = config.get("type",None) + type_chan = config.get("type",None) auth_level = config.get("auth_level",None) log_level = config.get("log_level", None) - fvs_updated = swsscommon.FieldValuePairs([('type', type), + fvs_updated = swsscommon.FieldValuePairs([('type', type_chan), ('auth_level',auth_level ), ('log_level',log_level)]) grpc_config[asic_index].set('config', fvs_updated) @@ -407,7 +407,7 @@ def apply_grpc_secrets_configuration(SECRETS_PATH, grpc_config): grpc_config[asic_index].set('certs', fvs_updated) -def get_grpc_credentials(type, kvp): +def get_grpc_credentials(type_chan, kvp): root_file = kvp.get("ca_crt", None) if root_file is not None and os.path.isfile(root_file): @@ -416,7 +416,7 @@ def get_grpc_credentials(type, kvp): helper_logger.log_error("grpc credential channel setup no root file in config_db") return None - if type == "mutual": + if type_chan == "mutual": cert_file = kvp.get("client_crt", None) if cert_file is not None and os.path.isfile(cert_file): cert_chain = open(cert_file, 'rb').read() @@ -435,7 +435,7 @@ def get_grpc_credentials(type, kvp): root_certificates=root_cert, private_key=key, certificate_chain=cert_chain) - elif type == "server": + elif type_chan == "server": credential = grpc.ssl_channel_credentials( root_certificates=root_cert) else: @@ -458,7 +458,7 @@ def connect_channel(channel, stub, port): else: break -def create_channel(type, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async): +def create_channel(type_chan, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async): # Helper callback to get an channel connectivity state def wait_for_state_change(channel_connectivity): @@ -487,28 +487,33 @@ def wait_for_state_change(channel_connectivity): grpc_port_connectivity[port] = "SHUTDOWN" - if type == "secure": + if type_chan == "secure": credential = get_grpc_credentials(level, kvp) target_name = kvp.get("grpc_ssl_credential", None) if credential is None or target_name is None: return (None, None) - GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) if is_async: - channel = grpc.aio.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) + ASYNC_GRPC_CLIENT_OPTIONS = [] + ASYNC_GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) + channel = grpc.aio.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=ASYNC_GRPC_CLIENT_OPTIONS) + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) else: + GRPC_CLIENT_OPTIONS.append(('grpc.ssl_target_name_override', '{}'.format(target_name))) channel = grpc.secure_channel("{}:{}".format(soc_ip, GRPC_PORT), credential, options=GRPC_CLIENT_OPTIONS) + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) else: if is_async: - channel = grpc.aio.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + channel = grpc.aio.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT)) + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) else: channel = grpc.insecure_channel("{}:{}".format(soc_ip, GRPC_PORT), options=GRPC_CLIENT_OPTIONS) + stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) - stub = linkmgr_grpc_driver_pb2_grpc.DualToRActiveStub(channel) if not is_async and channel is not None: @@ -541,7 +546,7 @@ def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, fwd_state #if no config from config DB, treat channel to be as insecure - type = "insecure" + type_chan = "insecure" level = "server" (status, fvs) = grpc_config[asic_index].get("config") @@ -550,12 +555,12 @@ def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, fwd_state "Could not retreive fieldvalue pairs for {}, inside config_db table kvp config for {} for setting up channel type".format(port, grpc_config[asic_index].getTableName())) else: grpc_config_dict = dict(fvs) - type = grpc_config_dict.get("type", None) + type_chan = grpc_config_dict.get("type", None) level = grpc_config_dict.get("auth_level", None) kvp = {} - if type == "secure": + if type_chan == "secure": (status, fvs) = grpc_config[asic_index].get("certs") if status is False: helper_logger.log_warning( @@ -565,7 +570,7 @@ def setup_grpc_channel_for_port(port, soc_ip, asic_index, grpc_config, fwd_state kvp = dict(fvs) - channel, stub = create_channel(type, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async) + channel, stub = create_channel(type_chan, level, kvp, soc_ip, port, asic_index, fwd_state_response_tbl, is_async) if stub is None: helper_logger.log_warning("stub was not setup for gRPC soc ip {} port {}, no gRPC soc server running ?".format(soc_ip, port)) @@ -1384,7 +1389,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen "Could not retreive port inside config_db PORT table {} for Y-Cable initiation".format(logical_port_name)) -def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stop_event=threading.Event()): +def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()): global read_side delete_change_event = [False] @@ -3713,6 +3718,7 @@ def task_worker(self): handle_hw_mux_cable_table_grpc_notification( fvp, self.table_helper.get_hw_mux_cable_tbl(), asic_index, self.table_helper.get_mux_metrics_tbl(), False, port, self.table_helper.get_port_tbl(), self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl()) + while True: (port_m, op_m, fvp_m) = self.table_helper.get_mux_cable_command_tbl()[asic_index].pop() @@ -3788,6 +3794,7 @@ def __init__(self): self.task_download_firmware_thread = {} self.task_stopping_event = threading.Event() self.cli_table_helper = y_cable_table_helper.YcableCliUpdateTableHelper() + self.name = "YCableCliUpdateTask" def task_cli_worker(self): @@ -3842,6 +3849,7 @@ def task_cli_worker(self): namespace = redisSelectObj.getDbConnector().getNamespace() asic_index = multi_asic.get_asic_index_from_namespace(namespace) + while True: (key, op_m, fvp_m) = self.cli_table_helper.xcvrd_log_tbl[asic_index].pop() @@ -4070,6 +4078,7 @@ def __init__(self): self.task_stopping_event = threading.Event() self.table_helper = y_cable_table_helper.YcableAsyncNotificationTableHelper() self.read_side = process_loopback_interface_and_get_read_side(self.table_helper.loopback_keys) + self.name = "YCableAsyncNotificationTask" async def task_worker(self):