From c31314ab83b27df82bd1ee48572546a3724264c2 Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Tue, 6 Apr 2021 17:28:15 +0200 Subject: [PATCH] iox-#615 unblock RouDi shutdown with blocked publisher by full subscriber queue Signed-off-by: Mathias Kraus --- .../internal/roudi/port_manager.hpp | 9 ++++++-- iceoryx_posh/source/roudi/port_manager.cpp | 16 ++++++++++++++ iceoryx_posh/source/roudi/process_manager.cpp | 3 +++ iceoryx_posh/source/roudi/roudi.cpp | 22 ++++++++++--------- 4 files changed, 38 insertions(+), 12 deletions(-) diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp index 2000f332725..08da84eff23 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp @@ -89,12 +89,17 @@ class PortManager cxx::expected acquireConditionVariableData(const RuntimeName_t& runtimeName) noexcept; + /// @brief Used to unblock potential locks in the shutdown phase + void unblockShutdown() noexcept; + void deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept; const std::atomic* serviceRegistryChangeCounter() noexcept; runtime::IpcMessage findService(const capro::ServiceDescription& service) noexcept; protected: + void makeAllPublisherPortsToStopOffer() noexcept; + void destroyPublisherPort(PublisherPortRouDiType::MemberType_t* const publisherPortData) noexcept; void destroySubscriberPort(SubscriberPortType::MemberType_t* const subscriberPortData) noexcept; @@ -127,8 +132,8 @@ class PortManager void removeEntryFromServiceRegistry(const capro::IdString_t& service, const capro::IdString_t& instance) noexcept; template ::value>* = nullptr> - cxx::optional doesViolateCommunicationPolicy(const capro::ServiceDescription& service) const - noexcept; + cxx::optional + doesViolateCommunicationPolicy(const capro::ServiceDescription& service) const noexcept; template ::value>* = nullptr> cxx::optional doesViolateCommunicationPolicy(const capro::ServiceDescription& service diff --git a/iceoryx_posh/source/roudi/port_manager.cpp b/iceoryx_posh/source/roudi/port_manager.cpp index b4f133b54bd..676e8ec1c5b 100644 --- a/iceoryx_posh/source/roudi/port_manager.cpp +++ b/iceoryx_posh/source/roudi/port_manager.cpp @@ -446,6 +446,22 @@ void PortManager::sendToAllMatchingInterfacePorts(const capro::CaproMessage& mes } } +void PortManager::unblockShutdown() noexcept +{ + makeAllPublisherPortsToStopOffer(); +} + +void PortManager::makeAllPublisherPortsToStopOffer() noexcept +{ + for (auto port : m_portPool->getPublisherPortDataList()) + { + port->m_offeringRequested.store(false, std::memory_order_relaxed); + + PublisherPortRouDiType publisherPort(port); + doDiscoveryForPublisherPort(publisherPort); + } +} + void PortManager::deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept { for (auto port : m_portPool->getPublisherPortDataList()) diff --git a/iceoryx_posh/source/roudi/process_manager.cpp b/iceoryx_posh/source/roudi/process_manager.cpp index c31822e5a52..271f49b7770 100644 --- a/iceoryx_posh/source/roudi/process_manager.cpp +++ b/iceoryx_posh/source/roudi/process_manager.cpp @@ -80,6 +80,9 @@ void ProcessManager::requestShutdownOfAllProcesses() noexcept { requestShutdownOfProcess(process, ShutdownPolicy::SIG_TERM); } + + // this unblocks the RouDi shutdown if a publisher port is blocked by a full subscriber queue + m_portManager.unblockShutdown(); } bool ProcessManager::isAnyRegisteredProcessStillRunning() noexcept diff --git a/iceoryx_posh/source/roudi/roudi.cpp b/iceoryx_posh/source/roudi/roudi.cpp index e734d88d045..80ac7033dfd 100644 --- a/iceoryx_posh/source/roudi/roudi.cpp +++ b/iceoryx_posh/source/roudi/roudi.cpp @@ -86,7 +86,16 @@ void RouDi::shutdown() { m_processIntrospection.stop(); m_portManager->stopPortIntrospection(); + + // stop the process management thread in order to prevent application to register while shutting down m_runDiscoveryThread = false; + if (m_processManagementThread.joinable()) + { + LogDebug() << "Joining 'ProcessMgmt' thread..."; + m_processManagementThread.join(); + LogDebug() << "...'ProcessMgmt' thread joined."; + } + if (m_killProcessesInDestructor) { cxx::DeadlineTimer finalKillTimer(m_processKillDelay); @@ -111,16 +120,10 @@ void RouDi::shutdown() m_prcMgr->printWarningForRegisteredProcessesAndClearProcessList(); } } + // Postpone the IpcChannelThread in order to receive TERMINATION m_runIpcChannelThread = false; - - if (m_processManagementThread.joinable()) - { - LogDebug() << "Joining 'ProcessMgmt' thread..."; - m_processManagementThread.join(); - LogDebug() << "...'ProcessMgmt' thread joined."; - } if (m_processRuntimeMessagesThread.joinable()) { LogDebug() << "Joining 'IPC-msg-process' thread..."; @@ -251,9 +254,8 @@ void RouDi::processMessage(const runtime::IpcMessage& message, options.queueCapacity = std::stoull(message.getElementAtIndex(4)); options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(5)); options.subscribeOnCreate = (0U == std::stoull(message.getElementAtIndex(6))) ? false : true; - options.queueFullPolicy = - static_cast(std::stoul(message.getElementAtIndex(7))); - + options.queueFullPolicy = static_cast(std::stoul(message.getElementAtIndex(7))); + m_prcMgr->addSubscriberForProcess( runtimeName, service, options, iox::runtime::PortConfigInfo(portConfigInfoSerialization)); }