Skip to content

Commit

Permalink
[Link event damping] Add generic concurrent queue for link event damp…
Browse files Browse the repository at this point in the history
…ing.

- This queue will be used to enqueue the port state change events by
  NotificationHandler and dequeued by syncd main thread and processed in
  link event damping logic.

HLD: sonic-net/SONiC#1071
  • Loading branch information
Ashish Singh committed Oct 23, 2023
1 parent eaa2bda commit 5a9fe8c
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 0 deletions.
107 changes: 107 additions & 0 deletions syncd/ConcurrentQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#pragma once

#include <mutex>
#include <queue>

#include "swss/logger.h"
#include "swss/sal.h"

namespace syncd
{
template <class T>
class ConcurrentQueue
{
public:

explicit ConcurrentQueue(
_In_ size_t queueSizeLimit = UNLIMITED);

virtual ~ConcurrentQueue() = default;

bool enqueue(
_In_ const T& val);

bool dequeue(
_Out_ T* valOut);

size_t size();

bool empty();

private:

// Queue size = 0 means there is no limit on queue size.
static constexpr size_t UNLIMITED = 0;

std::mutex m_mutex;
std::queue<T> m_queue;
size_t m_queueSizeLimit;

ConcurrentQueue<T>(const ConcurrentQueue<T>&) = delete;
ConcurrentQueue<T>& operator=(const ConcurrentQueue<T>&) = delete;
};

template <class T>
ConcurrentQueue<T>::ConcurrentQueue(
_In_ size_t queueSizeLimit)
: m_queueSizeLimit(queueSizeLimit)
{
SWSS_LOG_ENTER();
}

template <class T>
bool ConcurrentQueue<T>::enqueue(
_In_ const T& val)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

// If the queue exceeds the limit, return false.
if ((m_queueSizeLimit == UNLIMITED) || (m_queue.size() < m_queueSizeLimit))
{
m_queue.push(val);
return true;
}

return false;
}

template <class T>
bool ConcurrentQueue<T>::dequeue(
_Out_ T* valOut)
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);
if (m_queue.empty())
{
return false;
}

*valOut = m_queue.front();
m_queue.pop();

return true;
}

template <class T>
size_t ConcurrentQueue<T>::size()
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

return m_queue.size();
}

template <class T>
bool ConcurrentQueue<T>::empty()
{
SWSS_LOG_ENTER();

std::lock_guard<std::mutex> mutex_lock(m_mutex);

return m_queue.empty();
}
} // namespace syncd
1 change: 1 addition & 0 deletions tests/aspell.en.pws
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ IPGs
IPv
Inseg
KEYs
LLC
LOGLEVEL
LOOPBACK
MACsec
Expand Down
1 change: 1 addition & 0 deletions unittest/syncd/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ tests_SOURCES = main.cpp \
MockableSaiInterface.cpp \
MockHelper.cpp \
TestCommandLineOptions.cpp \
TestConcurrentQueue.cpp \
TestFlexCounter.cpp \
TestVirtualOidTranslator.cpp \
TestNotificationQueue.cpp \
Expand Down
77 changes: 77 additions & 0 deletions unittest/syncd/TestConcurrentQueue.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include <gtest/gtest.h>

#include "ConcurrentQueue.h"

using namespace syncd;

class ConcurrentQueueTest : public ::testing::Test
{};

TEST_F(ConcurrentQueueTest, QueueIsEmpty)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());
EXPECT_EQ(testQueue.size(), 0);
}

TEST_F(ConcurrentQueueTest, EnqueueSucceeds)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

EXPECT_TRUE(testQueue.enqueue(1));
EXPECT_FALSE(testQueue.empty());
EXPECT_EQ(testQueue.size(), 1);
}

TEST_F(ConcurrentQueueTest, EnqueueFailsIfQueueSizeLimitIsReached)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

int data = 1;
for (size_t idx = 0; idx < queueSize; ++idx)
{
SCOPED_TRACE(::testing::Message() << "Inserting data at index: " << idx);
EXPECT_TRUE(testQueue.enqueue(data++));
EXPECT_EQ(testQueue.size(), idx + 1);
}

EXPECT_EQ(testQueue.size(), queueSize);

// Once queue is at maximum capacity, en-queuing next element will fail.
EXPECT_FALSE(testQueue.enqueue(data));
EXPECT_EQ(testQueue.size(), queueSize);
}

TEST_F(ConcurrentQueueTest, DequeueFailsIfQueueIsEmpty)
{
constexpr size_t queueSize = 5;
ConcurrentQueue<int> testQueue(queueSize);

EXPECT_TRUE(testQueue.empty());

int val;
EXPECT_FALSE(testQueue.dequeue(&val));
}

TEST_F(ConcurrentQueueTest, DequeueSucceeds)
{
ConcurrentQueue<int> testQueue;
EXPECT_TRUE(testQueue.empty());

constexpr int testValue = 56;
EXPECT_TRUE(testQueue.enqueue(testValue));
EXPECT_EQ(testQueue.size(), 1);

int val;
EXPECT_TRUE(testQueue.dequeue(&val));
EXPECT_EQ(val, testValue);
EXPECT_TRUE(testQueue.empty());
}

0 comments on commit 5a9fe8c

Please sign in to comment.