Skip to content

Commit

Permalink
iox-eclipse-iceoryx#615 integrated trigger queue
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Eltzschig <me@elchris.org>
  • Loading branch information
elfenpiff committed Mar 25, 2021
1 parent 29e6cce commit 1adac39
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 284 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <functional>
#include <thread>

#include "iceoryx_utils/internal/concurrent/fifo.hpp"
#include "iceoryx_utils/internal/concurrent/trigger_queue.hpp"

namespace iox
Expand All @@ -32,17 +33,15 @@ class ActiveObject
virtual ~ActiveObject();
void addTask(const std::function<void()> f);
void mainLoop();
bool isInitialized() const;
void stopRunning();

friend class cxx::optional<ActiveObject>;

private:
static constexpr uint32_t taskQueueSize = 128;
using taskQueue_t = concurrent::TriggerQueue<std::function<void()>, taskQueueSize>;
using taskQueue_t = concurrent::TriggerQueue<std::function<void()>, taskQueueSize, concurrent::FiFo>;

cxx::optional<taskQueue_t> m_tasks = taskQueue_t::CreateTriggerQueue();
bool m_isInitialized = m_tasks.has_value();
taskQueue_t m_tasks;

bool m_keepRunning{true};
std::thread m_mainLoopThread;
Expand Down
11 changes: 7 additions & 4 deletions iceoryx_utils/include/iceoryx_utils/internal/concurrent/fifo.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ class FiFo
/// @brief pushes a value into the fifo
/// @return if the values was pushed successfully into the fifo it returns
/// true, otherwise false
bool push(const ValueType& f_value);
bool push(const ValueType& f_value) noexcept;

/// @brief returns the oldest value from the fifo and removes it
/// @return if the fifo was not empty the optional contains the value,
/// otherwise it contains a nullopt
cxx::optional<ValueType> pop();
cxx::optional<ValueType> pop() noexcept;

/// @brief returns true when the fifo is empty, otherwise false
bool empty() const;
bool empty() const noexcept;

/// @brief returns the size of the fifo
uint64_t size() const noexcept;

private:
bool is_full() const;
bool is_full() const noexcept;

private:
ValueType m_data[Capacity];
Expand Down
14 changes: 10 additions & 4 deletions iceoryx_utils/include/iceoryx_utils/internal/concurrent/fifo.inl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace iox
namespace concurrent
{
template <class ValueType, uint64_t Capacity>
inline bool FiFo<ValueType, Capacity>::push(const ValueType& f_param_r)
inline bool FiFo<ValueType, Capacity>::push(const ValueType& f_param_r) noexcept
{
if (is_full())
{
Expand All @@ -44,19 +44,25 @@ inline bool FiFo<ValueType, Capacity>::push(const ValueType& f_param_r)
}

template <class ValueType, uint64_t Capacity>
inline bool FiFo<ValueType, Capacity>::is_full() const
inline bool FiFo<ValueType, Capacity>::is_full() const noexcept
{
return m_write_pos.load(std::memory_order_relaxed) == m_read_pos.load(std::memory_order_relaxed) + Capacity;
}

template <class ValueType, uint64_t Capacity>
inline bool FiFo<ValueType, Capacity>::empty() const
inline uint64_t FiFo<ValueType, Capacity>::size() const noexcept
{
return m_write_pos.load(std::memory_order_relaxed) - m_read_pos.load(std::memory_order_relaxed);
}

template <class ValueType, uint64_t Capacity>
inline bool FiFo<ValueType, Capacity>::empty() const noexcept
{
return m_read_pos.load(std::memory_order_relaxed) == m_write_pos.load(std::memory_order_relaxed);
}

template <class ValueType, uint64_t Capacity>
inline cxx::optional<ValueType> FiFo<ValueType, Capacity>::pop()
inline cxx::optional<ValueType> FiFo<ValueType, Capacity>::pop() noexcept
{
auto currentReadPos = m_read_pos.load(std::memory_order_relaxed);
bool isEmpty = (currentReadPos ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,152 +17,67 @@
#define IOX_UTILS_CONCURRENT_TRIGGER_QUEUE_HPP

#include "iceoryx_utils/cxx/optional.hpp"
#include "iceoryx_utils/internal/concurrent/smart_lock.hpp"
#include "iceoryx_utils/posix_wrapper/semaphore.hpp"

#include <atomic>
#include <cstdint>
#include <queue>
#include <thread>

namespace iox
{
namespace concurrent
{
template <typename ElementType, uint64_t Capacity>
class LockFreeQueue;

template <typename T, uint64_t Capacity, template <typename, uint64_t> class Queue>
struct QueueAdapter
{
static bool push(Queue<T, Capacity>& queue, const T& in) noexcept;
};

template <typename T, uint64_t Capacity>
struct QueueAdapter<T, Capacity, LockFreeQueue>
{
static bool push(LockFreeQueue<T, Capacity>& queue, const T& in) noexcept;
};


/// @brief TriggerQueue is behaves exactly like a normal queue (fifo) except that
/// this queue is threadsafe and offers a blocking pop which blocks the
/// the caller until the queue contains at least one element which can
/// be pop'ed.
///
/// @code
/// #include "iceoryx_utils/internal/concurrent/trigger_queue.hpp"
/// #include <atomic>
/// #include <iostream>
/// #include <thread>
/// #include <vector>
///
/// concurrent::TriggerQueue<int, 10> trigger;
///
/// std::atomic_bool keepRunning{true};
/// std::vector<int> outputVector;
///
/// void OutputToConsoleThread() {
/// while(keepRunning) {
/// int value;
/// // if this returns false then it is caused by the wakeup trigger
/// // otherwise an element would be in the queue
/// if ( trigger.blocking_pop(value) )
/// std::cout << value << std::endl;
/// }
/// }
///
/// void OutputToVectorThread() {
/// while(keepRunning) {
/// int value;
/// // if this returns false then it is caused by the wakeup trigger
/// // otherwise an element would be in the queue
/// if ( trigger.blocking_pop(value) )
/// outputVector.push_back(value);
/// }
/// }
///
/// int main() {
/// std::thread outputConsole(OutputToConsoleThread);
/// std::thread outputVector(OutputToVectorThread);
///
/// if ( trigger.is_initialized() == false ) {
/// // semaphore of the trigger could not be initialized
/// }
///
/// trigger.push(1);
/// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
/// trigger.push(2);
/// std::this_thread::sleep_for(std::chrono::milliseconds(1000));
/// keepRunning = false;
///
/// // exit push to ensure that the output thread really terminates
/// // we need to send 2 wakeup_trigger since 2 threads are running and
/// // wakeup_trigger sends only a single trigger
/// trigger.send_wakeup_trigger();
/// trigger.send_wakeup_trigger();
///
/// outputConsole.join();
/// outputVector.join();
/// }
/// @endcode
template <typename T, uint64_t CAPACITY>
template <typename T, uint64_t Capacity, template <typename, uint64_t> class QueueType>
class TriggerQueue
{
/// To gain the advantages of the rule of zero (no explicit definition of
/// methods which can be autogenerated by the compiler, ctor, dtor, copy,
/// move) we check CAPACITY > 0 by declaring a member variable via a
/// lambda which itselfs contains the check.
static_assert(CAPACITY > 0, "The trigger queue must have at least one element!");

public:
/// @todo replace this with a multi push / multi pop lockfree fifo
using queue_t = concurrent::smart_lock<std::queue<T>>;

/// @brief Creates a TriggerQueue. If the TriggerQueue could not be
/// initialized, which only happens when the semaphore could not
/// be created, then the optional contains nothing.
/// Before using a TriggerQueue created by this factory you have to
/// verify the success of CreateTriggerQueue by calling
/// the optional method has_value().
static cxx::optional<TriggerQueue> CreateTriggerQueue();
using ValueType = T;
static constexpr uint64_t CAPACITY = Capacity;

/// @brief Pushs an element into the trigger queue and notifies one thread
/// which is waiting in blocking_pop().
/// If the queue is full it returns false and no element is inserted
/// and nothing is notified. If the push was successfull, it returns
/// true.
bool push(const T& in);

/// @brief This is a blocking pop. If the queue is empty it blocks until
/// an element is push'ed into the queue otherwise it returns the
/// last element instantly.
/// It returns false when the queue was empty. This can happen when
/// another thread called send_wakeup_trigger to notify an arbitrary
/// thread.
/// It returns true if an element could be pop'ed from the queue.
bool blocking_pop(T& out);
bool push(const T& in) noexcept;

/// @brief If the queue already contains an element it writes the contents
/// of that element in out and returns true, otherwise false.
bool try_pop(T& out);
cxx::optional<T> pop() noexcept;

/// @brief Returns true if the queue is empty, otherwise false.
bool empty();
bool empty() const noexcept;

/// @brief Returns the number of elements which are currently in the queue.
uint64_t size();
uint64_t size() const noexcept;

/// @brief Returns the capacity of the trigger queue.
uint64_t capacity();

/// @brief Sends a single wakeup trigger to an arbitrary blocking_pop. This
/// is needed when shutting down a thread which works in a while
/// loop with blocking_pop.
/// You have the responsibility to ensure that every thread which
/// is waiting in blocking_pop is notified when needed!
void send_wakeup_trigger();
static constexpr uint64_t capacity() noexcept;

friend class cxx::optional<TriggerQueue<T, CAPACITY>>;
void destroy() noexcept;

private:
/// The default constructor needs to be private since we should use the
/// factory methode CreateTriggerQueue to create a new trigger queue. A
/// trigger queue constructor can fail if the semaphore construction fails
/// and the factory method handles that case with an optional
TriggerQueue() = default;

cxx::expected<posix::Semaphore, posix::SemaphoreError> m_semaphore =
posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U);
bool m_isInitialized = !m_semaphore.has_error();

/// @todo remove with lockfree fifo implementation
/// this methods are helper to make the transition to a lockfree fifo easier
bool stl_queue_pop(T& out);
bool stl_queue_push(const T& in);
queue_t m_queue;
QueueType<T, Capacity> m_queue;
std::atomic_bool m_toBeDestroyed{false};
};
} // namespace concurrent
} // namespace iox
Expand Down
Loading

0 comments on commit 1adac39

Please sign in to comment.