Skip to content

Commit

Permalink
[fpmsyncd] Implement pending route suppression feature (#2551)
Browse files Browse the repository at this point in the history
DEPENDS: #2512

What I did

I implemented support to enable pending routes suppression feature. When this feature is enabled, fpmsyncd will wait for reply from orchagent before sending offload status message to zebra.

Why I did it

This is done to not announce routes which aren't yet offloaded in HW.

How I verified it

UT and manual tests.
  • Loading branch information
stepanblyschak authored Mar 19, 2023
1 parent a2bd92f commit 840fe1d
Show file tree
Hide file tree
Showing 13 changed files with 864 additions and 10 deletions.
27 changes: 27 additions & 0 deletions fpmsyncd/fpminterface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <swss/selectable.h>
#include <libnl3/netlink/netlink.h>

#include "fpm/fpm.h"

namespace swss
{

/**
* @brief FPM zebra communication interface
*/
class FpmInterface : public Selectable
{
public:
virtual ~FpmInterface() = default;

/**
* @brief Send netlink message through FPM socket
* @param msg Netlink message
* @return True on success, otherwise false is returned
*/
virtual bool send(nlmsghdr* nl_hdr) = 0;
};

}
39 changes: 39 additions & 0 deletions fpmsyncd/fpmlink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,17 @@ FpmLink::FpmLink(RouteSync *rsync, unsigned short port) :

m_server_up = true;
m_messageBuffer = new char[m_bufSize];
m_sendBuffer = new char[m_bufSize];

m_routesync->onFpmConnected(*this);
}

FpmLink::~FpmLink()
{
m_routesync->onFpmDisconnected();

delete[] m_messageBuffer;
delete[] m_sendBuffer;
if (m_connected)
close(m_connection_socket);
if (m_server_up)
Expand Down Expand Up @@ -277,3 +283,36 @@ void FpmLink::processFpmMessage(fpm_msg_hdr_t* hdr)
nlmsg_free(msg);
}
}

bool FpmLink::send(nlmsghdr* nl_hdr)
{
fpm_msg_hdr_t hdr{};

size_t len = fpm_msg_align(sizeof(hdr) + nl_hdr->nlmsg_len);

if (len > m_bufSize)
{
SWSS_LOG_THROW("Message length %zu is greater than the send buffer size %d", len, m_bufSize);
}

hdr.version = FPM_PROTO_VERSION;
hdr.msg_type = FPM_MSG_TYPE_NETLINK;
hdr.msg_len = htons(static_cast<uint16_t>(len));

memcpy(m_sendBuffer, &hdr, sizeof(hdr));
memcpy(m_sendBuffer + sizeof(hdr), nl_hdr, nl_hdr->nlmsg_len);

size_t sent = 0;
while (sent != len)
{
auto rc = ::send(m_connection_socket, m_sendBuffer + sent, len - sent, 0);
if (rc == -1)
{
SWSS_LOG_ERROR("Failed to send FPM message: %s", strerror(errno));
return false;
}
sent += rc;
}

return true;
}
7 changes: 5 additions & 2 deletions fpmsyncd/fpmlink.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
#include <unistd.h>
#include <exception>

#include "selectable.h"
#include "fpm/fpm.h"
#include "fpmsyncd/fpminterface.h"
#include "fpmsyncd/routesync.h"

namespace swss {

class FpmLink : public Selectable {
class FpmLink : public FpmInterface {
public:
const int MSG_BATCH_SIZE;
FpmLink(RouteSync *rsync, unsigned short port = FPM_DEFAULT_PORT);
Expand All @@ -41,10 +41,13 @@ class FpmLink : public Selectable {

void processFpmMessage(fpm_msg_hdr_t* hdr);

bool send(nlmsghdr* nl_hdr) override;

private:
RouteSync *m_routesync;
unsigned int m_bufSize;
char *m_messageBuffer;
char *m_sendBuffer;
unsigned int m_pos;

bool m_connected;
Expand Down
112 changes: 107 additions & 5 deletions fpmsyncd/fpmsyncd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@
#include "select.h"
#include "selectabletimer.h"
#include "netdispatcher.h"
#include "netlink.h"
#include "notificationconsumer.h"
#include "subscriberstatetable.h"
#include "warmRestartHelper.h"
#include "fpmsyncd/fpmlink.h"
#include "fpmsyncd/routesync.h"

#include <netlink/route/route.h>

using namespace std;
using namespace swss;
Expand Down Expand Up @@ -47,21 +51,47 @@ static bool eoiuFlagsSet(Table &bgpStateTable)
int main(int argc, char **argv)
{
swss::Logger::linkToDbNative("fpmsyncd");

const auto routeResponseChannelName = std::string("APPL_DB_") + APP_ROUTE_TABLE_NAME + "_RESPONSE_CHANNEL";

DBConnector db("APPL_DB", 0);
DBConnector cfgDb("CONFIG_DB", 0);
SubscriberStateTable deviceMetadataTableSubscriber(&cfgDb, CFG_DEVICE_METADATA_TABLE_NAME);
Table deviceMetadataTable(&cfgDb, CFG_DEVICE_METADATA_TABLE_NAME);
DBConnector applStateDb("APPL_STATE_DB", 0);
std::unique_ptr<NotificationConsumer> routeResponseChannel;

RedisPipeline pipeline(&db);
RouteSync sync(&pipeline);

DBConnector stateDb("STATE_DB", 0);
Table bgpStateTable(&stateDb, STATE_BGP_TABLE_NAME);

NetLink netlink;

netlink.registerGroup(RTNLGRP_LINK);

NetDispatcher::getInstance().registerMessageHandler(RTM_NEWROUTE, &sync);
NetDispatcher::getInstance().registerMessageHandler(RTM_DELROUTE, &sync);
NetDispatcher::getInstance().registerMessageHandler(RTM_NEWLINK, &sync);
NetDispatcher::getInstance().registerMessageHandler(RTM_DELLINK, &sync);

rtnl_route_read_protocol_names(DefaultRtProtoPath);

std::string suppressionEnabledStr;
deviceMetadataTable.hget("localhost", "suppress-fib-pending", suppressionEnabledStr);
if (suppressionEnabledStr == "enabled")
{
routeResponseChannel = std::make_unique<NotificationConsumer>(&applStateDb, routeResponseChannelName);
sync.setSuppressionEnabled(true);
}

while (true)
{
try
{
FpmLink fpm(&sync);

Select s;
SelectableTimer warmStartTimer(timespec{0, 0});
// Before eoiu flags detected, check them periodically. It also stop upon detection of reconciliation done.
Expand All @@ -80,6 +110,13 @@ int main(int argc, char **argv)
cout << "Connected!" << endl;

s.addSelectable(&fpm);
s.addSelectable(&netlink);
s.addSelectable(&deviceMetadataTableSubscriber);

if (sync.isSuppressionEnabled())
{
s.addSelectable(routeResponseChannel.get());
}

/* If warm-restart feature is enabled, execute 'restoration' logic */
bool warmStartEnabled = sync.m_warmStartHelper.checkAndStart();
Expand Down Expand Up @@ -139,11 +176,8 @@ int main(int argc, char **argv)
SWSS_LOG_NOTICE("Warm-Restart EOIU hold timer expired.");
}

if (sync.m_warmStartHelper.inProgress())
{
sync.m_warmStartHelper.reconcile();
SWSS_LOG_NOTICE("Warm-Restart reconciliation processed.");
}
sync.onWarmStartEnd(applStateDb);

// remove the one-shot timer.
s.removeSelectable(temps);
pipeline.flush();
Expand Down Expand Up @@ -182,6 +216,74 @@ int main(int argc, char **argv)
s.removeSelectable(&eoiuCheckTimer);
}
}
else if (temps == &deviceMetadataTableSubscriber)
{
std::deque<KeyOpFieldsValuesTuple> keyOpFvsQueue;
deviceMetadataTableSubscriber.pops(keyOpFvsQueue);

for (const auto& keyOpFvs: keyOpFvsQueue)
{
const auto& key = kfvKey(keyOpFvs);
const auto& op = kfvOp(keyOpFvs);
const auto& fvs = kfvFieldsValues(keyOpFvs);

if (op != SET_COMMAND)
{
continue;
}

if (key != "localhost")
{
continue;
}

for (const auto& fv: fvs)
{
const auto& field = fvField(fv);
const auto& value = fvValue(fv);

if (field != "suppress-fib-pending")
{
continue;
}

bool shouldEnable = (value == "enabled");

if (shouldEnable && !sync.isSuppressionEnabled())
{
routeResponseChannel = std::make_unique<NotificationConsumer>(&applStateDb, routeResponseChannelName);
sync.setSuppressionEnabled(true);
s.addSelectable(routeResponseChannel.get());
}
else if (!shouldEnable && sync.isSuppressionEnabled())
{
/* When disabling suppression we mark all existing routes offloaded in zebra
* as there could be some transient routes which are pending response from
* orchagent, thus such updates might be missing. Since we are disabling suppression
* we no longer care about real HW offload status and can mark all routes as offloaded
* to avoid routes stuck in suppressed state after transition. */
sync.markRoutesOffloaded(db);

sync.setSuppressionEnabled(false);
s.removeSelectable(routeResponseChannel.get());
routeResponseChannel.reset();
}
} // end for fvs
} // end for keyOpFvsQueue
}
else if (routeResponseChannel && (temps == routeResponseChannel.get()))
{
std::deque<KeyOpFieldsValuesTuple> notifications;
routeResponseChannel->pops(notifications);

for (const auto& notification: notifications)
{
const auto& key = kfvKey(notification);
const auto& fieldValues = kfvFieldsValues(notification);

sync.onRouteResponse(key, fieldValues);
}
}
else if (!warmStartEnabled || sync.m_warmStartHelper.isReconciled())
{
pipeline.flush();
Expand Down
Loading

0 comments on commit 840fe1d

Please sign in to comment.