diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/roudi.hpp b/iceoryx_posh/include/iceoryx_posh/internal/roudi/roudi.hpp index 820bcbd2d57..deb5f4905ac 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/roudi.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/roudi.hpp @@ -24,6 +24,7 @@ #include "iceoryx_posh/internal/roudi/introspection/mempool_introspection.hpp" #include "iceoryx_posh/internal/roudi/process_manager.hpp" #include "iceoryx_posh/internal/runtime/ipc_interface_creator.hpp" +#include "iceoryx_posh/popo/user_trigger.hpp" #include "iceoryx_posh/roudi/memory/roudi_memory_interface.hpp" #include "iceoryx_posh/roudi/memory/roudi_memory_manager.hpp" #include "iceoryx_posh/roudi/roudi_app.hpp" @@ -82,6 +83,11 @@ class RouDi virtual ~RouDi() noexcept; + /// @brief Triggers the discovery loop to run immediately instead of waiting for the next tick interval + /// @param[in] timeout is the time to wait to unblock the function call in case the discovery loop never signals to + /// have finished the run + void triggerDiscoveryLoopAndWaitToFinish(units::Duration timeout) noexcept; + protected: /// @brief Starts the thread processing messages from the runtimes /// Once this is done, applications can register and Roudi is fully operational. @@ -131,6 +137,9 @@ class RouDi std::atomic_bool m_runMonitoringAndDiscoveryThread; std::atomic_bool m_runHandleRuntimeMessageThread; + popo::UserTrigger m_discoveryLoopTrigger; + optional m_discoveryFinishedSemaphore; + const units::Duration m_runtimeMessagesThreadTimeout{100_ms}; protected: diff --git a/iceoryx_posh/source/roudi/roudi.cpp b/iceoryx_posh/source/roudi/roudi.cpp index 8abc9ff9ffa..551fed06bb0 100644 --- a/iceoryx_posh/source/roudi/roudi.cpp +++ b/iceoryx_posh/source/roudi/roudi.cpp @@ -1,5 +1,6 @@ // Copyright (c) 2019, 2021 by Robert Bosch GmbH. All rights reserved. // Copyright (c) 2021 - 2022 by Apex.AI Inc. All rights reserved. +// Copyright (c) 2023 by Mathias Kraus . All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +24,7 @@ #include "iceoryx_hoofs/posix_wrapper/thread.hpp" #include "iceoryx_posh/internal/runtime/node_property.hpp" #include "iceoryx_posh/popo/subscriber_options.hpp" +#include "iceoryx_posh/popo/wait_set.hpp" #include "iceoryx_posh/roudi/introspection_types.hpp" #include "iceoryx_posh/runtime/port_config_info.hpp" #include "iox/logging.hpp" @@ -63,6 +65,13 @@ RouDi::RouDi(RouDiMemoryInterface& roudiMemoryInterface, // since RouDi offers the introspection services, also add it to the list of processes m_processIntrospection.addProcess(getpid(), IPC_CHANNEL_ROUDI_NAME); + // initialize semaphore for discovery loop finish indicator + iox::posix::UnnamedSemaphoreBuilder() + .initialValue(0U) + .isInterProcessCapable(false) + .create(m_discoveryFinishedSemaphore) + .expect("Valid Semaphore"); + // run the threads m_monitoringAndDiscoveryThread = std::thread(&RouDi::monitorAndDiscoveryUpdate, this); posix::setThreadName(m_monitoringAndDiscoveryThread.native_handle(), "Mon+Discover"); @@ -86,11 +95,17 @@ void RouDi::startProcessRuntimeMessagesThread() noexcept void RouDi::shutdown() noexcept { + // trigger the shutdown of the monitoring and discovery thread in order to prevent application to register while + // shutting down + m_runMonitoringAndDiscoveryThread = false; + m_discoveryLoopTrigger.trigger(); + + // stopp the introspection m_processIntrospection.stop(); + m_mempoolIntrospection.stop(); m_portManager->stopPortIntrospection(); - // stop the process management thread in order to prevent application to register while shutting down - m_runMonitoringAndDiscoveryThread = false; + // wait for the monitoring and discovery thread to stop if (m_monitoringAndDiscoveryThread.joinable()) { IOX_LOG(DEBUG) << "Joining 'Mon+Discover' thread..."; @@ -98,6 +113,7 @@ void RouDi::shutdown() noexcept IOX_LOG(DEBUG) << "...'Mon+Discover' thread joined."; } + if (m_killProcessesInDestructor) { deadline_timer finalKillTimer(m_processKillDelay); @@ -147,15 +163,63 @@ void RouDi::cyclicUpdateHook() noexcept // default implementation; do nothing } +void RouDi::triggerDiscoveryLoopAndWaitToFinish(units::Duration timeout) noexcept +{ + bool decrementSemaphoreCount{true}; + while (decrementSemaphoreCount) + { + m_discoveryFinishedSemaphore->tryWait() + .and_then([&decrementSemaphoreCount](const auto& countNonZero) { decrementSemaphoreCount = countNonZero; }) + .or_else([&decrementSemaphoreCount](const auto& error) { + decrementSemaphoreCount = false; + IOX_LOG(ERROR) << "Could not decrement count of the semaphore which signals a finished run of the " + "discoery loop! Error: " + << static_cast(error); + }); + } + m_discoveryLoopTrigger.trigger(); + m_discoveryFinishedSemaphore->timedWait(timeout); +} + void RouDi::monitorAndDiscoveryUpdate() noexcept { + class DiscoveryWaitSet : public popo::WaitSet<1> + { + public: + DiscoveryWaitSet(popo::ConditionVariableData& condVarData) noexcept + : WaitSet(condVarData) + { + } + }; + + popo::ConditionVariableData conditionVariableData; + DiscoveryWaitSet discoveryLoopWaitset{conditionVariableData}; + discoveryLoopWaitset.attachEvent(m_discoveryLoopTrigger); + bool manuallyTriggered{false}; + while (m_runMonitoringAndDiscoveryThread) { m_prcMgr->run(); cyclicUpdateHook(); - std::this_thread::sleep_for(std::chrono::milliseconds(DISCOVERY_INTERVAL.toMilliseconds())); + if (manuallyTriggered) + { + m_discoveryFinishedSemaphore->post().or_else([](const auto& error) { + IOX_LOG(ERROR) << "Could not trigger semaphore to signal a finished run of the discoery loop! Error: " + << static_cast(error); + }); + } + + manuallyTriggered = false; + for (const auto& notification : discoveryLoopWaitset.timedWait(DISCOVERY_INTERVAL)) + { + if (notification->doesOriginateFrom(&m_discoveryLoopTrigger)) + { + manuallyTriggered = true; + break; + } + } } } @@ -170,7 +234,7 @@ void RouDi::processRuntimeMessages() noexcept { // read RouDi's IPC channel runtime::IpcMessage message; - if (roudiIpcInterface.timedReceive(m_runtimeMessagesThreadTimeout, message)) + if (roudiIpcInterface.timedReceive(10_ms /*m_runtimeMessagesThreadTimeout*/, message)) { auto cmd = runtime::stringToIpcMessageType(message.getElementAtIndex(0).c_str()); RuntimeName_t runtimeName{into>(message.getElementAtIndex(1))}; diff --git a/iceoryx_posh/testing/include/iceoryx_posh/testing/roudi_environment/roudi_environment.hpp b/iceoryx_posh/testing/include/iceoryx_posh/testing/roudi_environment/roudi_environment.hpp index 739cdfe2428..39f53901227 100644 --- a/iceoryx_posh/testing/include/iceoryx_posh/testing/roudi_environment/roudi_environment.hpp +++ b/iceoryx_posh/testing/include/iceoryx_posh/testing/roudi_environment/roudi_environment.hpp @@ -23,6 +23,7 @@ #include "iceoryx_posh/roudi/iceoryx_roudi_components.hpp" #include "iceoryx_posh/roudi/memory/iceoryx_roudi_memory_manager.hpp" #include "iceoryx_posh/testing/roudi_environment/runtime_test_interface.hpp" +#include "iox/duration.hpp" #include #include @@ -67,9 +68,9 @@ class RouDiEnvironment private: RuntimeTestInterface m_runtimes; #if defined(__APPLE__) - std::chrono::milliseconds m_interOpWaitingTime = std::chrono::milliseconds(1000); + iox::units::Duration m_interOpWaitingTimeout{iox::units::Duration::fromMilliseconds(1000)}; #else - std::chrono::milliseconds m_interOpWaitingTime = std::chrono::milliseconds(200); + iox::units::Duration m_interOpWaitingTimeout{iox::units::Duration::fromMilliseconds(200)}; #endif std::unique_ptr m_roudiComponents; std::unique_ptr m_roudiApp; diff --git a/iceoryx_posh/testing/roudi_environment/roudi_environment.cpp b/iceoryx_posh/testing/roudi_environment/roudi_environment.cpp index 8683b365a95..5ad459cbdd4 100644 --- a/iceoryx_posh/testing/roudi_environment/roudi_environment.cpp +++ b/iceoryx_posh/testing/roudi_environment/roudi_environment.cpp @@ -57,12 +57,12 @@ RouDiEnvironment::~RouDiEnvironment() void RouDiEnvironment::SetInterOpWaitingTime(const std::chrono::milliseconds& v) { - m_interOpWaitingTime = v; + m_interOpWaitingTimeout = units::Duration::fromMilliseconds(v.count()); } void RouDiEnvironment::InterOpWait() { - std::this_thread::sleep_for(m_interOpWaitingTime); + m_roudiApp->triggerDiscoveryLoopAndWaitToFinish(m_interOpWaitingTimeout); } void RouDiEnvironment::CleanupAppResources(const RuntimeName_t& name)