Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ycabled][active-active] no initialize Async Client, when no active-active cable type; fix names for all ycabled threads #373

Merged
merged 15 commits into from
Jul 7, 2023
84 changes: 81 additions & 3 deletions sonic-ycabled/tests/test_y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1730,6 +1730,7 @@ def test_change_ports_status_for_y_cable_change_event(self, mock_swsscommon_tabl
def mock_get_asic_id(mock_logical_port_name):
return 0

state_db = {}
y_cable_presence = [True]
logical_port_dict = {'Ethernet0': '1'}

Expand All @@ -1745,7 +1746,7 @@ def mock_get_asic_id(mock_logical_port_name):
patched_util.get_asic_id_for_logical_port.return_value = 0

rc = change_ports_status_for_y_cable_change_event(
logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stop_event=threading.Event())
logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event())

assert(rc == None)

Expand All @@ -1764,6 +1765,7 @@ def mock_get_asic_id(mock_logical_port_name):

y_cable_presence = [True]
logical_port_dict = {'Ethernet0': '1'}
state_db = {}

mock_table = MagicMock()
mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4'])
Expand All @@ -1777,7 +1779,7 @@ def mock_get_asic_id(mock_logical_port_name):

patched_util.get_asic_id_for_logical_port.return_value = 0
rc = change_ports_status_for_y_cable_change_event(
logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl,stop_event=threading.Event())
logical_port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event())

assert(rc == None)

Expand All @@ -1794,6 +1796,7 @@ def mock_get_asic_id(mock_logical_port_name):

y_cable_presence = [True]
logical_port_dict = {'Ethernet0': '2'}
state_db = {}

mock_table = MagicMock()
mock_table.getKeys = MagicMock(return_value=['Ethernet0', 'Ethernet4'])
Expand All @@ -1806,7 +1809,7 @@ def mock_get_asic_id(mock_logical_port_name):

patched_util.get_asic_id_for_logical_port.return_value = 0
rc = change_ports_status_for_y_cable_change_event(
logical_port_dict, y_cable_presence,port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stop_event=threading.Event())
logical_port_dict, y_cable_presence,port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event())

assert(rc == None)

Expand Down Expand Up @@ -7141,3 +7144,78 @@ def test_ycable_graceful_client(self, channel, stub):
read_side = 1
Y_cable_restart_client = GracefulRestartClient("Ethernet48", None, read_side)


class TestYcableScriptExecution(object):

@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None))
@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock())
#@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_ycable_helper_cli_worker(self, mock_select, mock_sub_table):

mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(
side_effect=[(False, False, False), (False, False, False), ('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable

stop_event = threading.Event()

asic_index = 0
Y_cable_cli_task = YCableCliUpdateTask()
Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True])
Y_cable_cli_task.cli_table_helper.xcvrd_show_hwmode_dir_cmd_tbl[asic_index].return_value = mock_selectable

#Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=False)

expected_exception_start = None
expected_exception_join = None
trace = None
try:
#Y_cable_cli_task.start()
Y_cable_cli_task.task_cli_worker()
time.sleep(5)
Y_cable_cli_task.task_stopping_event.clear()
except Exception as e1:
expected_exception_start = e1
trace = traceback.format_exc()


@patch('swsscommon.swsscommon.Select.addSelectable', MagicMock())
@patch('swsscommon.swsscommon.Select.TIMEOUT', MagicMock(return_value=None))
@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj', MagicMock())
#@patch('swsscommon.swsscommon.CastSelectableToRedisSelectObj.getDbConnector', MagicMock())
@patch('swsscommon.swsscommon.SubscriberStateTable')
@patch('swsscommon.swsscommon.Select.select')
def test_ycable_helper_cli_worker_execution(self, mock_select, mock_sub_table):

mock_selectable = MagicMock()
mock_selectable.pop = MagicMock(
side_effect=[(False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False), (False, False, False) ,('Ethernet0', swsscommon.SET_COMMAND, (('index', '1'), )), (None, None, None), (None, None, None)])
mock_select.return_value = (swsscommon.Select.OBJECT, mock_selectable)
mock_sub_table.return_value = mock_selectable

stop_event = threading.Event()

asic_index = 0
Y_cable_cli_task = YCableCliUpdateTask()
Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=[False, True])
Y_cable_cli_task.cli_table_helper.xcvrd_show_hwmode_dir_cmd_tbl[asic_index].return_value = mock_selectable

#Y_cable_cli_task.task_stopping_event.is_set = MagicMock(side_effect=False)

expected_exception_start = None
expected_exception_join = None
trace = None
try:
#Y_cable_cli_task.start()
Y_cable_cli_task.task_cli_worker()
time.sleep(5)
Y_cable_cli_task.task_stopping_event.clear()
except Exception as e1:
expected_exception_start = e1
trace = traceback.format_exc()


19 changes: 9 additions & 10 deletions sonic-ycabled/tests/test_ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ def test_ycable_info_helper_class_run(self, mocked_sleep):
except Exception as e:
pass

"""
@patch("swsscommon.swsscommon.Select", MagicMock())
@patch("swsscommon.swsscommon.Select.addSelectable", MagicMock())
@patch("swsscommon.swsscommon.Select.select", MagicMock())
Expand All @@ -82,6 +83,8 @@ def test_ycable_helper_class_run_loop(self):
Y_cable_cli_task.task_cli_worker()
Y_cable_cli_task.start()
Y_cable_cli_task.join()
"""


@patch("swsscommon.swsscommon.Select", MagicMock())
@patch("swsscommon.swsscommon.Select.addSelectable", MagicMock())
Expand Down Expand Up @@ -307,10 +310,11 @@ def test_handle_state_update_task(self):

port = "Ethernet0"
fvp_dict = {}
state_db = {}
y_cable_presence = False
stopping_event = None
port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl = {}, {}, {}, {}, {}, {}, {}, {}, {}, {}, {}
rc = handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stopping_event)
rc = handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event)
assert(rc == None)


Expand All @@ -327,7 +331,7 @@ def wait_until(total_wait_time, interval, call_back, *args, **kwargs):
return False


class TestYcableScriptException(object):
"""class TestYcableScriptException(object):

@patch("swsscommon.swsscommon.Select", MagicMock(side_effect=NotImplementedError))
@patch("swsscommon.swsscommon.Select.addSelectable", MagicMock(side_effect=NotImplementedError))
Expand All @@ -353,19 +357,13 @@ def test_ycable_helper_class_run_loop_with_exception(self):
except Exception as e2:
expected_exception_join = e2

"""
#Handy debug Helpers or else use import logging
#f = open("newfile", "w")
#f.write(format(e2))
#f.write(format(m1))
#f.write(trace)
"""

assert(type(expected_exception_start) == type(expected_exception_join))
assert(expected_exception_start.args == expected_exception_join.args)
assert("NotImplementedError" in str(trace) and "effect" in str(trace))
assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace))
assert("swsscommon.Select" in str(trace))
"""


class TestYcableAsyncScript(object):

Expand Down Expand Up @@ -400,3 +398,4 @@ def test_ycable_helper_async_client_run_loop_with_exception(self, sfputil):
assert("sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py" in str(trace))
assert("setup_grpc_channel_for_port" in str(trace))


38 changes: 32 additions & 6 deletions sonic-ycabled/ycable/ycable.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,26 @@
# Helper functions =============================================================
#


def check_presence_for_active_active_cable_type(port_tbl):


logical_port_list = platform_sfputil.logical
for logical_port_name in logical_port_list:
# Get the asic to which this port belongs
asic_index = platform_sfputil.get_asic_id_for_logical_port(logical_port_name)
if asic_index is None:
continue

(status, cable_type) = y_cable_helper.check_mux_cable_port_type(logical_port_name, port_tbl, asic_index)

if status and cable_type == "active-active":
return True

return False



def detect_port_in_error_status(logical_port_name, status_tbl):
rec, fvp = status_tbl.get(logical_port_name)
if rec:
Expand All @@ -81,13 +101,13 @@ def detect_port_in_error_status(logical_port_name, status_tbl):
else:
return False

def handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stopping_event):
def handle_state_update_task(port, fvp_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event):

port_dict = {}
port_dict[port] = fvp_dict.get('status', None)

y_cable_helper.change_ports_status_for_y_cable_change_event(
port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stopping_event)
port_dict, y_cable_presence, port_tbl, port_tbl_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stopping_event)

#
# Helper classes ===============================================================
Expand All @@ -105,6 +125,7 @@ def __init__(self, y_cable_presence):
self.task_stopping_event = threading.Event()
self.y_cable_presence = y_cable_presence
self.table_helper = y_cable_table_helper.YcableInfoUpdateTableHelper()
self.name = "YcableInfoUpdateTask"


def task_worker(self, y_cable_presence):
Expand Down Expand Up @@ -159,6 +180,7 @@ def __init__(self, sfp_error_event, y_cable_presence):
self.sfp_error_event = sfp_error_event
self.y_cable_presence = y_cable_presence
self.table_helper = y_cable_table_helper.YcableStateUpdateTableHelper()
self.name = "YcableStateUpdateTask"


def task_worker(self, stopping_event, sfp_error_event, y_cable_presence):
Expand Down Expand Up @@ -208,7 +230,7 @@ def task_worker(self, stopping_event, sfp_error_event, y_cable_presence):
continue

# Check if all tables are created in table_helper
handle_state_update_task(port, fvp_dict, y_cable_presence, self.table_helper.get_port_tbl(), self.table_helper.port_table_keys, self.table_helper.get_loopback_tbl(), self.table_helper.loopback_keys, self.table_helper.get_hw_mux_cable_tbl(), self.table_helper.get_hw_mux_cable_tbl_peer(), self.table_helper.get_y_cable_tbl(), self.table_helper.get_static_tbl(), self.table_helper.get_mux_tbl(), self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), stopping_event)
handle_state_update_task(port, fvp_dict, y_cable_presence, self.table_helper.get_port_tbl(), self.table_helper.port_table_keys, self.table_helper.get_loopback_tbl(), self.table_helper.loopback_keys, self.table_helper.get_hw_mux_cable_tbl(), self.table_helper.get_hw_mux_cable_tbl_peer(), self.table_helper.get_y_cable_tbl(), self.table_helper.get_static_tbl(), self.table_helper.get_mux_tbl(), self.table_helper.get_grpc_config_tbl(), self.table_helper.get_fwd_state_response_tbl(), self.table_helper.state_db, stopping_event)

def run(self):
if self.task_stopping_event.is_set():
Expand Down Expand Up @@ -242,6 +264,7 @@ def __init__(self, log_identifier):
self.y_cable_presence = [False]
self.table_helper = y_cable_table_helper.DaemonYcableTableHelper()
self.threads = []
self.name = "DaemonYcable"

# Signal handler
def signal_handler(self, sig, frame):
Expand Down Expand Up @@ -389,9 +412,12 @@ def run(self):
y_cable_cli_worker_update = y_cable_helper.YCableCliUpdateTask()
y_cable_cli_worker_update.start()
self.threads.append(y_cable_cli_worker_update)
y_cable_async_noti_worker = y_cable_helper.YCableAsyncNotificationTask()
y_cable_async_noti_worker.start()
self.threads.append(y_cable_async_noti_worker)
# enable async client only if there are active-active cables
active_active_cable_presence = check_presence_for_active_active_cable_type(self.table_helper.get_port_tbl())
if active_active_cable_presence is True:
y_cable_async_noti_worker = y_cable_helper.YCableAsyncNotificationTask()
y_cable_async_noti_worker.start()
self.threads.append(y_cable_async_noti_worker)

# Start main loop
self.log_info("Start daemon main loop")
Expand Down
13 changes: 8 additions & 5 deletions sonic-ycabled/ycable/ycable_utilities/y_cable_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,7 @@ def init_ports_status_for_y_cable(platform_sfp, platform_chassis, y_cable_presen
"Could not retreive port inside config_db PORT table {} for Y-Cable initiation".format(logical_port_name))


def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, stop_event=threading.Event()):
def change_ports_status_for_y_cable_change_event(port_dict, y_cable_presence, port_tbl, port_table_keys, loopback_tbl, loopback_keys, hw_mux_cable_tbl, hw_mux_cable_tbl_peer, y_cable_tbl, static_tbl, mux_tbl, grpc_client, fwd_state_response_tbl, state_db, stop_event=threading.Event()):

global read_side
delete_change_event = [False]
Expand Down Expand Up @@ -3788,6 +3788,7 @@ def __init__(self):
self.task_download_firmware_thread = {}
self.task_stopping_event = threading.Event()
self.cli_table_helper = y_cable_table_helper.YcableCliUpdateTableHelper()
self.name = "YCableCliUpdateTask"


def task_cli_worker(self):
Expand Down Expand Up @@ -3818,14 +3819,12 @@ def task_cli_worker(self):
sel.addSelectable(self.cli_table_helper.xcvrd_show_ber_cmd_tbl[asic_id])

# Listen indefinitely for changes to the XCVRD_CMD_TABLE in the Application DB's
while True:
while not self.task_stopping_event.is_set():
# Use timeout to prevent ignoring the signals we want to handle
# in signal_handler() (e.g. SIGTERM for graceful shutdown)

if self.task_stopping_event.is_set():
break

(state, selectableObj) = sel.select(SELECT_TIMEOUT)
self.exc = None

if state == swsscommon.Select.TIMEOUT:
# Do not flood log when select times out
Expand All @@ -3842,6 +3841,7 @@ def task_cli_worker(self):
namespace = redisSelectObj.getDbConnector().getNamespace()
asic_index = multi_asic.get_asic_index_from_namespace(namespace)


while True:
(key, op_m, fvp_m) = self.cli_table_helper.xcvrd_log_tbl[asic_index].pop()

Expand Down Expand Up @@ -3991,6 +3991,8 @@ def task_cli_worker(self):
handle_show_ber_cmd_arg_tbl_notification(fvp, self.cli_table_helper.xcvrd_show_ber_cmd_arg_tbl, self.cli_table_helper.xcvrd_show_ber_rsp_tbl, self.cli_table_helper.xcvrd_show_ber_cmd_sts_tbl, self.cli_table_helper.xcvrd_show_ber_res_tbl, asic_index, port)

break
"""
"""

def run(self):
if self.task_stopping_event.is_set():
Expand Down Expand Up @@ -4070,6 +4072,7 @@ def __init__(self):
self.task_stopping_event = threading.Event()
self.table_helper = y_cable_table_helper.YcableAsyncNotificationTableHelper()
self.read_side = process_loopback_interface_and_get_read_side(self.table_helper.loopback_keys)
self.name = "YCableAsyncNotificationTask"

async def task_worker(self):

Expand Down