diff --git a/syncd/Makefile.am b/syncd/Makefile.am index d34131156..4982ea16c 100644 --- a/syncd/Makefile.am +++ b/syncd/Makefile.am @@ -34,6 +34,7 @@ libSyncd_a_SOURCES = \ NotificationQueue.cpp \ PortMap.cpp \ PortMapParser.cpp \ + PortStateChangeHandler.cpp \ RedisClient.cpp \ RedisNotificationProducer.cpp \ RequestShutdownCommandLineOptions.cpp \ diff --git a/syncd/PortStateChangeHandler.cpp b/syncd/PortStateChangeHandler.cpp new file mode 100644 index 000000000..909649390 --- /dev/null +++ b/syncd/PortStateChangeHandler.cpp @@ -0,0 +1,51 @@ +#include "PortStateChangeHandler.h" + +#include + +#include "swss/logger.h" + +using namespace syncd; + +PortStateChangeHandler::PortStateChangeHandler( + _In_ std::shared_ptr portStateChangeEvent) +{ + SWSS_LOG_ENTER(); + + if (portStateChangeEvent == nullptr) + { + SWSS_LOG_THROW("Unexpected error: port state change event is null."); + } + + m_portStateChangeEvent = portStateChangeEvent; + + m_portStateChangeQueue = std::make_shared( + PORT_STATE_CHANGE_QUEUE_SIZE); +} + +void PortStateChangeHandler::handlePortStateChangeNotification( + _In_ uint32_t count, + _In_ const sai_port_oper_status_notification_t *data) +{ + SWSS_LOG_ENTER(); + + for (uint32_t idx = 0; idx < count; ++idx) + { + if (m_portStateChangeQueue->enqueue(data[idx]) == false) + { + SWSS_LOG_ERROR( + "Unexpected error: failed to enqueue the port state change " + "notification."); + + return; + } + } + + m_portStateChangeEvent->notify(); +} + +std::shared_ptr PortStateChangeHandler::getQueue() const +{ + SWSS_LOG_ENTER(); + + return m_portStateChangeQueue; +} diff --git a/syncd/PortStateChangeHandler.h b/syncd/PortStateChangeHandler.h new file mode 100644 index 000000000..60b0c33a3 --- /dev/null +++ b/syncd/PortStateChangeHandler.h @@ -0,0 +1,52 @@ +#pragma once + +extern "C" { +#include "saimetadata.h" +} + +#include + +#include "ConcurrentQueue.h" +#include "swss/selectableevent.h" + +namespace syncd +{ + using PortOperStatusNotificationQueue = + ConcurrentQueue; + + // Class to handle the port state change callback from SAI. This consists a + // selectable event that will be used to send notification from producer thread + // to consumer thread, and a mutex protected concurrent queue to share the port + // state change notification data between producer and consumer threads. + class PortStateChangeHandler + { + public: + + PortStateChangeHandler( + _In_ std::shared_ptr portStateChangeEvent); + + virtual ~PortStateChangeHandler() = default; + + // Adds the port operational status notification data to a queue and generates a + // notification event. + void handlePortStateChangeNotification( + _In_ uint32_t count, + _In_ const sai_port_oper_status_notification_t *data); + + // Returns the shared pointer of the queue. + std::shared_ptr getQueue() const; + + private: + + // Choosing 4k max event queue size based on if we had 256 ports, it can + // accommodate on average 16 port events per ports in worst case. + static constexpr size_t PORT_STATE_CHANGE_QUEUE_SIZE = 4096; + + // SelectableEvent for producer to generate the event and for consumer to + // listen on. + std::shared_ptr m_portStateChangeEvent; + + // Mutex protected queue to share the data between producer and consumer. + std::shared_ptr m_portStateChangeQueue; + }; +} // namespace syncd diff --git a/unittest/syncd/Makefile.am b/unittest/syncd/Makefile.am index 4e02f5172..27c301e25 100644 --- a/unittest/syncd/Makefile.am +++ b/unittest/syncd/Makefile.am @@ -16,6 +16,7 @@ tests_SOURCES = main.cpp \ TestNotificationProcessor.cpp \ TestNotificationHandler.cpp \ TestMdioIpcServer.cpp \ + TestPortStateChangeHandler.cpp \ TestVendorSai.cpp tests_CXXFLAGS = $(DBGFLAGS) $(AM_CXXFLAGS) $(CXXFLAGS_COMMON) diff --git a/unittest/syncd/TestPortStateChangeHandler.cpp b/unittest/syncd/TestPortStateChangeHandler.cpp new file mode 100644 index 000000000..9d945209f --- /dev/null +++ b/unittest/syncd/TestPortStateChangeHandler.cpp @@ -0,0 +1,57 @@ +#include "PortStateChangeHandler.h" + +#include + +using namespace syncd; + +constexpr size_t portStateChangeQueueSize = 4096; + +class PortStateChangeHandlerTest : public ::testing::Test +{ + protected: + PortStateChangeHandlerTest() + : m_portStateChangeHandler(std::make_shared()) + { + SWSS_LOG_ENTER(); + } + + ~PortStateChangeHandlerTest() override = default; + + PortStateChangeHandler m_portStateChangeHandler; +}; + +TEST_F(PortStateChangeHandlerTest, VerifyGetQueue) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); +} + +TEST_F(PortStateChangeHandlerTest, + HandlePortStateChangeNotificationFailsOnEnqueuingData) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); + + // Insert enough data in the queue so it reaches its capacity. + sai_port_oper_status_notification_t operStatus[portStateChangeQueueSize]; + m_portStateChangeHandler.handlePortStateChangeNotification( + portStateChangeQueueSize, &operStatus[0]); + EXPECT_EQ(queue->size(), portStateChangeQueueSize); + + // Since queue is at its maximum capacity, adding a new element should cause + // insert failure and new element should not get added. + m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1, + &operStatus[0]); + EXPECT_EQ(queue->size(), portStateChangeQueueSize); +} + +TEST_F(PortStateChangeHandlerTest, HandlePortStateChangeNotificationSucceeds) +{ + auto queue = m_portStateChangeHandler.getQueue(); + EXPECT_EQ(queue->size(), 0); + + sai_port_oper_status_notification_t operStatus; + m_portStateChangeHandler.handlePortStateChangeNotification(/*count=*/1, + &operStatus); + EXPECT_EQ(queue->size(), 1); +}