Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iox-#615 unblock RouDi shutdown with blocked publisher #681

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,17 @@ class PortManager
cxx::expected<popo::ConditionVariableData*, PortPoolError>
acquireConditionVariableData(const RuntimeName_t& runtimeName) noexcept;

/// @brief Used to unblock potential locks in the shutdown phase
void unblockShutdown() noexcept;

void deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept;

const std::atomic<uint64_t>* serviceRegistryChangeCounter() noexcept;
runtime::IpcMessage findService(const capro::ServiceDescription& service) noexcept;

protected:
void makeAllPublisherPortsToStopOffer() noexcept;

void destroyPublisherPort(PublisherPortRouDiType::MemberType_t* const publisherPortData) noexcept;

void destroySubscriberPort(SubscriberPortType::MemberType_t* const subscriberPortData) noexcept;
Expand Down Expand Up @@ -127,8 +132,8 @@ class PortManager
void removeEntryFromServiceRegistry(const capro::IdString_t& service, const capro::IdString_t& instance) noexcept;

template <typename T, std::enable_if_t<std::is_same<T, iox::build::OneToManyPolicy>::value>* = nullptr>
cxx::optional<RuntimeName_t> doesViolateCommunicationPolicy(const capro::ServiceDescription& service) const
noexcept;
cxx::optional<RuntimeName_t>
doesViolateCommunicationPolicy(const capro::ServiceDescription& service) const noexcept;

template <typename T, std::enable_if_t<std::is_same<T, iox::build::ManyToManyPolicy>::value>* = nullptr>
cxx::optional<RuntimeName_t> doesViolateCommunicationPolicy(const capro::ServiceDescription& service
Expand Down
16 changes: 16 additions & 0 deletions iceoryx_posh/source/roudi/port_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,22 @@ void PortManager::sendToAllMatchingInterfacePorts(const capro::CaproMessage& mes
}
}

void PortManager::unblockShutdown() noexcept
{
makeAllPublisherPortsToStopOffer();
}

void PortManager::makeAllPublisherPortsToStopOffer() noexcept
{
for (auto port : m_portPool->getPublisherPortDataList())
{
port->m_offeringRequested.store(false, std::memory_order_relaxed);

PublisherPortRouDiType publisherPort(port);
doDiscoveryForPublisherPort(publisherPort);
}
}

void PortManager::deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept
{
for (auto port : m_portPool->getPublisherPortDataList())
Expand Down
3 changes: 3 additions & 0 deletions iceoryx_posh/source/roudi/process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ void ProcessManager::requestShutdownOfAllProcesses() noexcept
{
requestShutdownOfProcess(process, ShutdownPolicy::SIG_TERM);
}

// this unblocks the RouDi shutdown if a publisher port is blocked by a full subscriber queue
m_portManager.unblockShutdown();
}

bool ProcessManager::isAnyRegisteredProcessStillRunning() noexcept
Expand Down
22 changes: 12 additions & 10 deletions iceoryx_posh/source/roudi/roudi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,16 @@ void RouDi::shutdown()
{
m_processIntrospection.stop();
m_portManager->stopPortIntrospection();

// stop the process management thread in order to prevent application to register while shutting down
m_runDiscoveryThread = false;
if (m_processManagementThread.joinable())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please rename this thread to something saner at some point? I just mixed up m_processMan.. and m_processRun.. :P

Maybe m_monitorAndDiscoveryThread and m_handleRuntimeMessageThread?

{
LogDebug() << "Joining 'ProcessMgmt' thread...";
m_processManagementThread.join();
LogDebug() << "...'ProcessMgmt' thread joined.";
}

if (m_killProcessesInDestructor)
{
cxx::DeadlineTimer finalKillTimer(m_processKillDelay);
Expand All @@ -111,16 +120,10 @@ void RouDi::shutdown()
m_prcMgr->printWarningForRegisteredProcessesAndClearProcessList();
}
}

// Postpone the IpcChannelThread in order to receive TERMINATION
m_runIpcChannelThread = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be renamed:

Suggested change
m_runIpcChannelThread = false;
m_runRuntimeMessagesThread = false;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, will do it in the follow up



if (m_processManagementThread.joinable())
{
LogDebug() << "Joining 'ProcessMgmt' thread...";
m_processManagementThread.join();
LogDebug() << "...'ProcessMgmt' thread joined.";
}
if (m_processRuntimeMessagesThread.joinable())
{
LogDebug() << "Joining 'IPC-msg-process' thread...";
Expand Down Expand Up @@ -251,9 +254,8 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
options.queueCapacity = std::stoull(message.getElementAtIndex(4));
options.nodeName = NodeName_t(cxx::TruncateToCapacity, message.getElementAtIndex(5));
options.subscribeOnCreate = (0U == std::stoull(message.getElementAtIndex(6))) ? false : true;
options.queueFullPolicy =
static_cast<popo::QueueFullPolicy>(std::stoul(message.getElementAtIndex(7)));

options.queueFullPolicy = static_cast<popo::QueueFullPolicy>(std::stoul(message.getElementAtIndex(7)));

m_prcMgr->addSubscriberForProcess(
runtimeName, service, options, iox::runtime::PortConfigInfo(portConfigInfoSerialization));
}
Expand Down