From 2765e8020f718973a9f7c7bc2f2fc79a8e0710c5 Mon Sep 17 00:00:00 2001 From: Lawrence Lee Date: Wed, 15 Nov 2023 11:02:10 -0800 Subject: [PATCH] [tph]: Detect LAG flaps from APPL_DB (#16879) (#17156) Why I did it A race condition exists while the TPH is processing a netlink message - if a second netlink message arrives during processing it will be missed since TPH is not listening for other messages. Another bug was found where TPH was unnecessarily restarting since it was checking admin status instead of operational status of portchannels. How I did it Subscribe to APPL_DB for updates on LAG operational state Track currently sniffed interfaces How to verify it Send tunnel packets with destination IP of an unresolved neighbor, verify that ping commands are run Shut down a portchannel interface, verify that sniffer does not restart Send tunnel packets, verify ping commands are still run Bring up portchannel interface, verify that sniffer restarts Signed-off-by: Lawrence Lee --- .../docker-orchagent/tunnel_packet_handler.py | 150 ++++++++++-------- 1 file changed, 87 insertions(+), 63 deletions(-) diff --git a/dockers/docker-orchagent/tunnel_packet_handler.py b/dockers/docker-orchagent/tunnel_packet_handler.py index 44dbb19fda65..8d1b775ec9ce 100755 --- a/dockers/docker-orchagent/tunnel_packet_handler.py +++ b/dockers/docker-orchagent/tunnel_packet_handler.py @@ -8,11 +8,14 @@ destination IP to trigger the process of obtaining neighbor information """ import subprocess +import sys import time from datetime import datetime from ipaddress import ip_interface +from queue import Queue -from swsssdk import ConfigDBConnector, SonicV2Connector +from swsscommon.swsscommon import ConfigDBConnector, SonicV2Connector, \ + DBConnector, Select, SubscriberStateTable from sonic_py_common import logger as log from pyroute2 import IPRoute @@ -25,18 +28,35 @@ logger = log.Logger() STATE_DB = 'STATE_DB' +APPL_DB = 'APPL_DB' PORTCHANNEL_INTERFACE_TABLE = 'PORTCHANNEL_INTERFACE' TUNNEL_TABLE = 'TUNNEL' PEER_SWITCH_TABLE = 'PEER_SWITCH' INTF_TABLE_TEMPLATE = 'INTERFACE_TABLE|{}|{}' +LAG_TABLE = 'LAG_TABLE' STATE_KEY = 'state' TUNNEL_TYPE_KEY = 'tunnel_type' DST_IP_KEY = 'dst_ip' ADDRESS_IPV4_KEY = 'address_ipv4' +OPER_STATUS_KEY = 'oper_status' IPINIP_TUNNEL = 'ipinip' - RTM_NEWLINK = 'RTM_NEWLINK' +SELECT_TIMEOUT = 1000 + +nl_msgs = Queue() +portchannel_intfs = None + +def add_msg_to_queue(target, msg): + """ + Adds a netlink message to a queue + Args: + target: unused, needed by NDB API + msg: a netlink message + """ + + if msg.get_attr('IFLA_IFNAME') in portchannel_intfs: + nl_msgs.put(msg) class TunnelPacketHandler(object): """ @@ -55,7 +75,10 @@ def __init__(self): self.sniffer = None self.self_ip = '' self.packet_filter = '' - self.sniff_intfs = [] + self.sniff_intfs = set() + + global portchannel_intfs + portchannel_intfs = [name for name, _ in self.portchannel_intfs] @property def portchannel_intfs(self): @@ -95,17 +118,6 @@ def get_intf_name(self, msg): return '' - def netlink_msg_is_for_portchannel(self, msg): - """ - Determines if a netlink message is about a PortChannel interface - - Returns: - (list) integers representing kernel indices - """ - ifname = self.get_intf_name(msg) - - return ifname in [name for name, _ in self.portchannel_intfs] - def get_up_portchannels(self): """ Returns the portchannels which are operationally up @@ -125,11 +137,11 @@ def get_up_portchannels(self): logger.log_notice("Skipping non-existent interface {}".format(intf)) continue link_statuses.append(status[0]) - up_portchannels = [] + up_portchannels = set() for status in link_statuses: - if status['state'] == 'up': - up_portchannels.append(self.get_intf_name(status)) + if status.get_attr('IFLA_OPERSTATE').lower() == 'up': + up_portchannels.add(status.get_attr('IFLA_IFNAME')) return up_portchannels @@ -242,52 +254,47 @@ def get_inner_pkt_type(self, packet): return IPv6 return False - def wait_for_netlink_msgs(self): - """ - Gathers any RTM_NEWLINK messages - - Returns: - (list) containing any received messages - """ - msgs = [] - with IPRoute() as ipr: - ipr.bind() - for msg in ipr.get(): - if msg['event'] == RTM_NEWLINK: - msgs.append(msg) - - return msgs - - def sniffer_restart_required(self, messages): + def sniffer_restart_required(self, lag, fvs): """ Determines if the packet sniffer needs to be restarted - A restart is required if all of the following conditions are met: - 1. A netlink message of type RTM_NEWLINK is received - (this is checked by `wait_for_netlink_msgs`) - 2. The interface index of the message corresponds to a portchannel - interface - 3. The state of the interface in the message is 'up' - Here, we do not care about an interface going down since - the sniffer is able to continue sniffing on the other - interfaces. However, if an interface has gone down and - come back up, we need to restart the sniffer to be able - to sniff traffic on the interface that has come back up. + The sniffer needs to be restarted when a portchannel interface transitions + from down to up. When a portchannel interface goes down, the sniffer is + able to continue sniffing on other portchannels. """ - for msg in messages: - if self.netlink_msg_is_for_portchannel(msg): - if msg['state'] == 'up': - logger.log_info('{} came back up, sniffer restart required' - .format(self.get_intf_name(msg))) - return True - return False + oper_status = dict(fvs).get(OPER_STATUS_KEY) + if lag not in self.sniff_intfs and oper_status == 'up': + logger.log_info('{} came back up, sniffer restart required' + .format(lag)) + # Don't need to modify self.sniff_intfs here since it is repopulated + # by self.get_up_portchannels() + return True + elif lag in self.sniff_intfs and oper_status == 'down': + # A portchannel interface went down, remove it from the list of + # sniffed interfaces so we can detect when it comes back up + self.sniff_intfs.remove(lag) + return False + else: + return False def start_sniffer(self): """ Starts an AsyncSniffer and waits for it to inititalize fully """ + start = datetime.now() + + self.sniff_intfs = self.get_up_portchannels() + + while not self.sniff_intfs: + logger.log_info('No portchannels are up yet...') + if (datetime.now() - start).seconds > 180: + logger.log_error('All portchannels failed to come up within 3 minutes, exiting...') + sys.exit(1) + self.sniff_intfs = self.get_up_portchannels() + time.sleep(10) + self.sniffer = AsyncSniffer( - iface=self.sniff_intfs, + iface=list(self.sniff_intfs), filter=self.packet_filter, prn=self.ping_inner_dst, store=0 @@ -332,18 +339,35 @@ def listen_for_tunnel_pkts(self): logger.log_notice('Starting tunnel packet handler for {}' .format(self.packet_filter)) - self.sniff_intfs = self.get_up_portchannels() - logger.log_info("Listening on interfaces {}".format(self.sniff_intfs)) + + app_db = DBConnector(APPL_DB, 0) + lag_table = SubscriberStateTable(app_db, LAG_TABLE) + sel = Select() + sel.addSelectable(lag_table) self.start_sniffer() + logger.log_info("Listening on interfaces {}".format(self.sniff_intfs)) while True: - msgs = self.wait_for_netlink_msgs() - if self.sniffer_restart_required(msgs): - self.sniffer.stop() - sniff_intfs = self.get_up_portchannels() - logger.log_notice('Restarting tunnel packet handler on ' - 'interfaces {}'.format(sniff_intfs)) - self.start_sniffer() + rc, _ = sel.select(SELECT_TIMEOUT) + + if rc == Select.TIMEOUT: + continue + elif rc == Select.ERROR: + raise Exception("Select() error") + else: + lag, op, fvs = lag_table.pop() + if self.sniffer_restart_required(lag, fvs): + self.sniffer.stop() + start = datetime.now() + # wait up to 3 seconds for the kernel interface to be synced with APPL_DB status + while (datetime.now() - start).seconds < 3: + self.sniff_intfs = self.get_up_portchannels() + if lag in self.sniff_intfs: + break + time.sleep(0.1) + logger.log_notice('Restarting tunnel packet handler on ' + 'interfaces {}'.format(self.sniff_intfs)) + self.start_sniffer() def run(self): """ @@ -360,4 +384,4 @@ def main(): if __name__ == "__main__": - main() + main() \ No newline at end of file