Skip to content

Commit

Permalink
iox-eclipse-iceoryx#1030 Add manual trigger for the RouDi discovery loop
Browse files Browse the repository at this point in the history
  • Loading branch information
elBoberido committed Sep 5, 2023
1 parent 7299cd1 commit a5dffbd
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 8 deletions.
9 changes: 9 additions & 0 deletions iceoryx_posh/include/iceoryx_posh/internal/roudi/roudi.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "iceoryx_posh/internal/roudi/introspection/mempool_introspection.hpp"
#include "iceoryx_posh/internal/roudi/process_manager.hpp"
#include "iceoryx_posh/internal/runtime/ipc_interface_creator.hpp"
#include "iceoryx_posh/popo/user_trigger.hpp"
#include "iceoryx_posh/roudi/memory/roudi_memory_interface.hpp"
#include "iceoryx_posh/roudi/memory/roudi_memory_manager.hpp"
#include "iceoryx_posh/roudi/roudi_app.hpp"
Expand Down Expand Up @@ -82,6 +83,11 @@ class RouDi

virtual ~RouDi() noexcept;

/// @brief Triggers the discovery loop to run immediately instead of waiting for the next tick interval
/// @param[in] timeout is the time to wait to unblock the function call in case the discovery loop never signals to
/// have finished the run
void triggerDiscoveryLoopAndWaitToFinish(units::Duration timeout) noexcept;

protected:
/// @brief Starts the thread processing messages from the runtimes
/// Once this is done, applications can register and Roudi is fully operational.
Expand Down Expand Up @@ -131,6 +137,9 @@ class RouDi
std::atomic_bool m_runMonitoringAndDiscoveryThread;
std::atomic_bool m_runHandleRuntimeMessageThread;

popo::UserTrigger m_discoveryLoopTrigger;
optional<iox::posix::UnnamedSemaphore> m_discoveryFinishedSemaphore;

const units::Duration m_runtimeMessagesThreadTimeout{100_ms};

protected:
Expand Down
72 changes: 68 additions & 4 deletions iceoryx_posh/source/roudi/roudi.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright (c) 2019, 2021 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 - 2022 by Apex.AI Inc. All rights reserved.
// Copyright (c) 2023 by Mathias Kraus <elboberido@m-hias.de>. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -23,6 +24,7 @@
#include "iceoryx_hoofs/posix_wrapper/thread.hpp"
#include "iceoryx_posh/internal/runtime/node_property.hpp"
#include "iceoryx_posh/popo/subscriber_options.hpp"
#include "iceoryx_posh/popo/wait_set.hpp"
#include "iceoryx_posh/roudi/introspection_types.hpp"
#include "iceoryx_posh/runtime/port_config_info.hpp"
#include "iox/logging.hpp"
Expand Down Expand Up @@ -63,6 +65,13 @@ RouDi::RouDi(RouDiMemoryInterface& roudiMemoryInterface,
// since RouDi offers the introspection services, also add it to the list of processes
m_processIntrospection.addProcess(getpid(), IPC_CHANNEL_ROUDI_NAME);

// initialize semaphore for discovery loop finish indicator
iox::posix::UnnamedSemaphoreBuilder()
.initialValue(0U)
.isInterProcessCapable(false)
.create(m_discoveryFinishedSemaphore)
.expect("Valid Semaphore");

// run the threads
m_monitoringAndDiscoveryThread = std::thread(&RouDi::monitorAndDiscoveryUpdate, this);
posix::setThreadName(m_monitoringAndDiscoveryThread.native_handle(), "Mon+Discover");
Expand All @@ -86,18 +95,25 @@ void RouDi::startProcessRuntimeMessagesThread() noexcept

void RouDi::shutdown() noexcept
{
// trigger the shutdown of the monitoring and discovery thread in order to prevent application to register while
// shutting down
m_runMonitoringAndDiscoveryThread = false;
m_discoveryLoopTrigger.trigger();

// stopp the introspection
m_processIntrospection.stop();
m_mempoolIntrospection.stop();
m_portManager->stopPortIntrospection();

// stop the process management thread in order to prevent application to register while shutting down
m_runMonitoringAndDiscoveryThread = false;
// wait for the monitoring and discovery thread to stop
if (m_monitoringAndDiscoveryThread.joinable())
{
IOX_LOG(DEBUG) << "Joining 'Mon+Discover' thread...";
m_monitoringAndDiscoveryThread.join();
IOX_LOG(DEBUG) << "...'Mon+Discover' thread joined.";
}


if (m_killProcessesInDestructor)
{
deadline_timer finalKillTimer(m_processKillDelay);
Expand Down Expand Up @@ -147,15 +163,63 @@ void RouDi::cyclicUpdateHook() noexcept
// default implementation; do nothing
}

void RouDi::triggerDiscoveryLoopAndWaitToFinish(units::Duration timeout) noexcept
{
bool decrementSemaphoreCount{true};
while (decrementSemaphoreCount)
{
m_discoveryFinishedSemaphore->tryWait()
.and_then([&decrementSemaphoreCount](const auto& countNonZero) { decrementSemaphoreCount = countNonZero; })
.or_else([&decrementSemaphoreCount](const auto& error) {
decrementSemaphoreCount = false;
IOX_LOG(ERROR) << "Could not decrement count of the semaphore which signals a finished run of the "
"discoery loop! Error: "
<< static_cast<uint32_t>(error);
});
}
m_discoveryLoopTrigger.trigger();
m_discoveryFinishedSemaphore->timedWait(timeout);
}

void RouDi::monitorAndDiscoveryUpdate() noexcept
{
class DiscoveryWaitSet : public popo::WaitSet<1>
{
public:
DiscoveryWaitSet(popo::ConditionVariableData& condVarData) noexcept
: WaitSet(condVarData)
{
}
};

popo::ConditionVariableData conditionVariableData;
DiscoveryWaitSet discoveryLoopWaitset{conditionVariableData};
discoveryLoopWaitset.attachEvent(m_discoveryLoopTrigger);
bool manuallyTriggered{false};

while (m_runMonitoringAndDiscoveryThread)
{
m_prcMgr->run();

cyclicUpdateHook();

std::this_thread::sleep_for(std::chrono::milliseconds(DISCOVERY_INTERVAL.toMilliseconds()));
if (manuallyTriggered)
{
m_discoveryFinishedSemaphore->post().or_else([](const auto& error) {
IOX_LOG(ERROR) << "Could not trigger semaphore to signal a finished run of the discoery loop! Error: "
<< static_cast<uint32_t>(error);
});
}

manuallyTriggered = false;
for (const auto& notification : discoveryLoopWaitset.timedWait(DISCOVERY_INTERVAL))
{
if (notification->doesOriginateFrom(&m_discoveryLoopTrigger))
{
manuallyTriggered = true;
break;
}
}
}
}

Expand All @@ -170,7 +234,7 @@ void RouDi::processRuntimeMessages() noexcept
{
// read RouDi's IPC channel
runtime::IpcMessage message;
if (roudiIpcInterface.timedReceive(m_runtimeMessagesThreadTimeout, message))
if (roudiIpcInterface.timedReceive(10_ms /*m_runtimeMessagesThreadTimeout*/, message))
{
auto cmd = runtime::stringToIpcMessageType(message.getElementAtIndex(0).c_str());
RuntimeName_t runtimeName{into<lossy<RuntimeName_t>>(message.getElementAtIndex(1))};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "iceoryx_posh/roudi/iceoryx_roudi_components.hpp"
#include "iceoryx_posh/roudi/memory/iceoryx_roudi_memory_manager.hpp"
#include "iceoryx_posh/testing/roudi_environment/runtime_test_interface.hpp"
#include "iox/duration.hpp"

#include <atomic>
#include <map>
Expand Down Expand Up @@ -67,9 +68,9 @@ class RouDiEnvironment
private:
RuntimeTestInterface m_runtimes;
#if defined(__APPLE__)
std::chrono::milliseconds m_interOpWaitingTime = std::chrono::milliseconds(1000);
iox::units::Duration m_interOpWaitingTimeout{iox::units::Duration::fromMilliseconds(1000)};
#else
std::chrono::milliseconds m_interOpWaitingTime = std::chrono::milliseconds(200);
iox::units::Duration m_interOpWaitingTimeout{iox::units::Duration::fromMilliseconds(200)};
#endif
std::unique_ptr<IceOryxRouDiComponents> m_roudiComponents;
std::unique_ptr<RouDi> m_roudiApp;
Expand Down
4 changes: 2 additions & 2 deletions iceoryx_posh/testing/roudi_environment/roudi_environment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ RouDiEnvironment::~RouDiEnvironment()

void RouDiEnvironment::SetInterOpWaitingTime(const std::chrono::milliseconds& v)
{
m_interOpWaitingTime = v;
m_interOpWaitingTimeout = units::Duration::fromMilliseconds(v.count());
}

void RouDiEnvironment::InterOpWait()
{
std::this_thread::sleep_for(m_interOpWaitingTime);
m_roudiApp->triggerDiscoveryLoopAndWaitToFinish(m_interOpWaitingTimeout);
}

void RouDiEnvironment::CleanupAppResources(const RuntimeName_t& name)
Expand Down

0 comments on commit a5dffbd

Please sign in to comment.