Skip to content

Commit

Permalink
iox-eclipse-iceoryx#615 unblock RouDi shutdown with blocked publisher…
Browse files Browse the repository at this point in the history
… by full subscriber queue

Signed-off-by: Mathias Kraus <mathias.kraus@apex.ai>
  • Loading branch information
elBoberido authored and marthtz committed May 12, 2021
1 parent 5644b4c commit c31314a
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ class PortManager
cxx::expected<popo::ConditionVariableData*, PortPoolError>
acquireConditionVariableData(const RuntimeName_t& runtimeName) noexcept;

/// @brief Used to unblock potential locks in the shutdown phase
void unblockShutdown() noexcept;

void deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept;

const std::atomic<uint64_t>* serviceRegistryChangeCounter() noexcept;
runtime::IpcMessage findService(const capro::ServiceDescription& service) noexcept;

protected:
void makeAllPublisherPortsToStopOffer() noexcept;

void destroyPublisherPort(PublisherPortRouDiType::MemberType_t* const publisherPortData) noexcept;

void destroySubscriberPort(SubscriberPortType::MemberType_t* const subscriberPortData) noexcept;
Expand Down Expand Up @@ -127,8 +132,8 @@ class PortManager
void removeEntryFromServiceRegistry(const capro::IdString_t& service, const capro::IdString_t& instance) noexcept;

template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>* = nullptr>
cxx::optional<RuntimeName_t> doesViolateCommunicationPolicy(const capro::ServiceDescription& service) const
noexcept;
cxx::optional<RuntimeName_t>
doesViolateCommunicationPolicy(const capro::ServiceDescription& service) const noexcept;

template <typename T, std::enable_if_t<std::is_same<T, iox::build::ManyToManyPolicy>::value>* = nullptr>
cxx::optional<RuntimeName_t> doesViolateCommunicationPolicy(const capro::ServiceDescription& service
Expand Down
16 changes: 16 additions & 0 deletions iceoryx_posh/source/roudi/port_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,22 @@ void PortManager::sendToAllMatchingInterfacePorts(const capro::CaproMessage& mes
}
}

void PortManager::unblockShutdown() noexcept
{
makeAllPublisherPortsToStopOffer();
}

void PortManager::makeAllPublisherPortsToStopOffer() noexcept
{
for (auto port : m_portPool->getPublisherPortDataList())
{
port->m_offeringRequested.store(false, std::memory_order_relaxed);

PublisherPortRouDiType publisherPort(port);
doDiscoveryForPublisherPort(publisherPort);
}
}

void PortManager::deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept
{
for (auto port : m_portPool->getPublisherPortDataList())
Expand Down
3 changes: 3 additions & 0 deletions iceoryx_posh/source/roudi/process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ void ProcessManager::requestShutdownOfAllProcesses() noexcept
{
requestShutdownOfProcess(process, ShutdownPolicy::SIG_TERM);
}

// this unblocks the RouDi shutdown if a publisher port is blocked by a full subscriber queue
m_portManager.unblockShutdown();
}

bool ProcessManager::isAnyRegisteredProcessStillRunning() noexcept
Expand Down
22 changes: 12 additions & 10 deletions iceoryx_posh/source/roudi/roudi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,16 @@ void RouDi::shutdown()
{
m_processIntrospection.stop();
m_portManager->stopPortIntrospection();

// stop the process management thread in order to prevent application to register while shutting down
m_runDiscoveryThread = false;
if (m_processManagementThread.joinable())
{
LogDebug() << "Joining 'ProcessMgmt' thread...";
m_processManagementThread.join();
LogDebug() << "...'ProcessMgmt' thread joined.";
}

if (m_killProcessesInDestructor)
{
cxx::DeadlineTimer finalKillTimer(m_processKillDelay);
Expand All @@ -111,16 +120,10 @@ void RouDi::shutdown()
m_prcMgr->printWarningForRegisteredProcessesAndClearProcessList();
}
}

// Postpone the IpcChannelThread in order to receive TERMINATION
m_runIpcChannelThread = false;


if (m_processManagementThread.joinable())
{
LogDebug() << "Joining 'ProcessMgmt' thread...";
m_processManagementThread.join();
LogDebug() << "...'ProcessMgmt' thread joined.";
}
if (m_processRuntimeMessagesThread.joinable())
{
LogDebug() << "Joining 'IPC-msg-process' thread...";
Expand Down Expand Up @@ -251,9 +254,8 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
options.queueCapacity = std::stoull(message.getElementAtIndex(4));
options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(5));
options.subscribeOnCreate = (0U == std::stoull(message.getElementAtIndex(6))) ? false : true;
options.queueFullPolicy =
static_cast<popo::QueueFullPolicy>(std::stoul(message.getElementAtIndex(7)));

options.queueFullPolicy = static_cast<popo::QueueFullPolicy>(std::stoul(message.getElementAtIndex(7)));

m_prcMgr->addSubscriberForProcess(
runtimeName, service, options, iox::runtime::PortConfigInfo(portConfigInfoSerialization));
}
Expand Down

0 comments on commit c31314a

Please sign in to comment.