From c3485985f110151742fbc4284ec505934bdc0cde Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Thu, 8 Apr 2021 21:41:35 +0200 Subject: [PATCH 1/7] iox-#615 make ProcessManager and PortManager handle a process shutdown preparation request Signed-off-by: Mathias Kraus --- .../internal/roudi/port_manager.hpp | 8 +- .../internal/roudi/process_manager.hpp | 4 + iceoryx_posh/source/roudi/port_manager.cpp | 15 +++- iceoryx_posh/source/roudi/process_manager.cpp | 14 +++- .../moduletests/test_roudi_portmanager.cpp | 76 +++++++++++++------ .../test_roudi_process_manager.cpp | 27 +++++++ 6 files changed, 118 insertions(+), 26 deletions(-) diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp index 08da84eff2..d159b88db1 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/port_manager.hpp @@ -89,8 +89,12 @@ class PortManager cxx::expected acquireConditionVariableData(const RuntimeName_t& runtimeName) noexcept; - /// @brief Used to unblock potential locks in the shutdown phase - void unblockShutdown() noexcept; + /// @brief Used to unblock potential locks in the shutdown phase of a process + /// @param [in] name of the process runtime which is about to shut down + void unblockProcessShutdown(const RuntimeName_t& runtimeName) noexcept; + + /// @brief Used to unblock potential locks in the shutdown phase of RouDi + void unblockRouDiShutdown() noexcept; void deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept; diff --git a/iceoryx_posh/include/iceoryx_posh/internal/roudi/process_manager.hpp b/iceoryx_posh/include/iceoryx_posh/internal/roudi/process_manager.hpp index 007e4aff69..d1c7b1da2e 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/roudi/process_manager.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/roudi/process_manager.hpp @@ -98,6 +98,10 @@ class ProcessManager : public ProcessManagerInterface /// @return true if one or more of the registered processes is running, false otherwise bool isAnyRegisteredProcessStillRunning() noexcept; + /// @brief A process is about to shut down and needs to be unblock by a potentially block publisher + /// @param [in] name of the process runtime which is about to shut down + void handleProcessShutdownPreparationRequest(const RuntimeName_t& name) noexcept; + /// @brief Tries to gracefully terminate all registered processes void requestShutdownOfAllProcesses() noexcept; diff --git a/iceoryx_posh/source/roudi/port_manager.cpp b/iceoryx_posh/source/roudi/port_manager.cpp index 676e8ec1c5..f613c21ceb 100644 --- a/iceoryx_posh/source/roudi/port_manager.cpp +++ b/iceoryx_posh/source/roudi/port_manager.cpp @@ -446,7 +446,20 @@ void PortManager::sendToAllMatchingInterfacePorts(const capro::CaproMessage& mes } } -void PortManager::unblockShutdown() noexcept +void PortManager::unblockProcessShutdown(const RuntimeName_t& runtimeName) noexcept +{ + for (auto port : m_portPool->getPublisherPortDataList()) + { + PublisherPortRouDiType publisherPort(port); + if (runtimeName == publisherPort.getRuntimeName()) + { + port->m_offeringRequested.store(false, std::memory_order_relaxed); + doDiscoveryForPublisherPort(publisherPort); + } + } +} + +void PortManager::unblockRouDiShutdown() noexcept { makeAllPublisherPortsToStopOffer(); } diff --git a/iceoryx_posh/source/roudi/process_manager.cpp b/iceoryx_posh/source/roudi/process_manager.cpp index f81a3d4fa8..d7a31ebae4 100644 --- a/iceoryx_posh/source/roudi/process_manager.cpp +++ b/iceoryx_posh/source/roudi/process_manager.cpp @@ -73,6 +73,18 @@ ProcessManager::ProcessManager(RouDiMemoryInterface& roudiMemoryInterface, auto m_segmentInfo = m_segmentManager->getSegmentInformationForUser(currentUser); m_memoryManagerOfCurrentProcess = m_segmentInfo.m_memoryManager; } + +void ProcessManager::handleProcessShutdownPreparationRequest(const RuntimeName_t& name) noexcept +{ + searchForProcessAndThen( + name, + [&](Process&) { + m_portManager.unblockProcessShutdown(name); + // TODO send response + }, + [&]() { LogWarn() << "Unknown application " << name << " requested shutdown preparation."; }); +} + void ProcessManager::requestShutdownOfAllProcesses() noexcept { // send SIG_TERM to all running applications and wait for processes to answer with TERMINATION @@ -82,7 +94,7 @@ void ProcessManager::requestShutdownOfAllProcesses() noexcept } // this unblocks the RouDi shutdown if a publisher port is blocked by a full subscriber queue - m_portManager.unblockShutdown(); + m_portManager.unblockRouDiShutdown(); } bool ProcessManager::isAnyRegisteredProcessStillRunning() noexcept diff --git a/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp b/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp index be8218f89d..8f28e88f3a 100644 --- a/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp +++ b/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp @@ -198,6 +198,9 @@ class PortManager_test : public Test } } } + + void setupAndTestBlockingPublisher(const iox::RuntimeName_t& publisherRuntimeName, + std::function testHook) noexcept; }; template @@ -770,16 +773,12 @@ TEST_F(PortManager_test, AcquireNodeDataAfterDestroyingPreviouslyAcquiredOnesIsS acquireMaxNumberOfNodes(nodeName, runtimeName); } -TEST_F(PortManager_test, UnblockShutdownMakesAllPublisherStopOffer) +TEST_F(PortManager_test, UnblockRouDiShutdownMakesAllPublisherStopOffer) { PublisherOptions publisherOptions{1U, iox::NodeName_t("node"), true}; - SubscriberOptions subscriberOptions{1U, 1U, iox::NodeName_t("node"), true}; + iox::cxx::vector publisher; - constexpr uint64_t MAX_PUB_SUB = iox::algorithm::min(iox::MAX_PUBLISHERS, iox::MAX_SUBSCRIBERS); - iox::cxx::vector publisher; - iox::cxx::vector subscriber; - - for (unsigned int i = 0; i < MAX_PUB_SUB; i++) + for (unsigned int i = 0; i < iox::MAX_PUBLISHERS; i++) { auto servideDescription = getUniqueSD(); auto publisherRuntimeName = iox::RuntimeName_t(iox::cxx::TruncateToCapacity, "pub_" + std::to_string(i)); @@ -791,17 +790,10 @@ TEST_F(PortManager_test, UnblockShutdownMakesAllPublisherStopOffer) ASSERT_FALSE(publisherPortDataResult.has_error()); publisher.emplace_back(publisherPortDataResult.value()); - auto subscriberRuntimeName = iox::RuntimeName_t(iox::cxx::TruncateToCapacity, "sub_" + std::to_string(i)); - auto subscriberPortDataResult = m_portManager->acquireSubscriberPortData( - servideDescription, subscriberOptions, subscriberRuntimeName, PortConfigInfo()); - ASSERT_FALSE(subscriberPortDataResult.has_error()); - subscriber.emplace_back(subscriberPortDataResult.value()); - EXPECT_TRUE(publisher.back().isOffered()); - EXPECT_EQ(subscriber.back().getSubscriptionState(), iox::SubscribeState::SUBSCRIBED); } - m_portManager->unblockShutdown(); + m_portManager->unblockRouDiShutdown(); for (const auto& pub : publisher) { @@ -809,17 +801,43 @@ TEST_F(PortManager_test, UnblockShutdownMakesAllPublisherStopOffer) } } -TEST_F(PortManager_test, UnblockShutdownUnblocksBlockedPublisher) +TEST_F(PortManager_test, UnblockProcessShutdownMakesPublisherStopOffer) +{ + const iox::RuntimeName_t publisherRuntimeName{"guiseppe"}; + + // get publisher and subscriber + PublisherOptions publisherOptions{ + 0U, iox::NodeName_t("node"), true, iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER}; + PublisherPortUser publisher(m_portManager + ->acquirePublisherPortData({1U, 1U, 1U}, + publisherOptions, + publisherRuntimeName, + m_payloadDataSegmentMemoryManager, + PortConfigInfo()) + .value()); + + EXPECT_TRUE(publisher.isOffered()); + + m_portManager->unblockProcessShutdown(publisherRuntimeName); + + EXPECT_FALSE(publisher.isOffered()); +} + +void PortManager_test::setupAndTestBlockingPublisher(const iox::RuntimeName_t& publisherRuntimeName, + std::function testHook) noexcept { + // get publisher and subscriber PublisherOptions publisherOptions{ 0U, iox::NodeName_t("node"), true, iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER}; SubscriberOptions subscriberOptions{ 1U, 0U, iox::NodeName_t("node"), true, iox::popo::QueueFullPolicy::BLOCK_PUBLISHER}; - PublisherPortUser publisher( - m_portManager - ->acquirePublisherPortData( - {1U, 1U, 1U}, publisherOptions, "guiseppe", m_payloadDataSegmentMemoryManager, PortConfigInfo()) - .value()); + PublisherPortUser publisher(m_portManager + ->acquirePublisherPortData({1U, 1U, 1U}, + publisherOptions, + publisherRuntimeName, + m_payloadDataSegmentMemoryManager, + PortConfigInfo()) + .value()); SubscriberPortUser subscriber( m_portManager->acquireSubscriberPortData({1U, 1U, 1U}, subscriberOptions, "schlomo", PortConfigInfo()).value()); @@ -854,12 +872,26 @@ TEST_F(PortManager_test, UnblockShutdownUnblocksBlockedPublisher) std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_IN_MS)); EXPECT_THAT(wasChunkSent.load(), Eq(false)); - m_portManager->unblockShutdown(); + ASSERT_TRUE(testHook); + testHook(); blockingPublisher.join(); // ensure the wasChunkSent store happens before the read EXPECT_THAT(wasChunkSent.load(), Eq(true)); } +TEST_F(PortManager_test, UnblockRouDiShutdownUnblocksBlockedPublisher) +{ + const iox::RuntimeName_t publisherRuntimeName{"guiseppe"}; + setupAndTestBlockingPublisher(publisherRuntimeName, [&] { m_portManager->unblockRouDiShutdown(); }); +} + +TEST_F(PortManager_test, UnblockProcessShutdownUnblocksBlockedPublisher) +{ + const iox::RuntimeName_t publisherRuntimeName{"guiseppe"}; + setupAndTestBlockingPublisher(publisherRuntimeName, + [&] { m_portManager->unblockProcessShutdown(publisherRuntimeName); }); +} + TEST_F(PortManager_test, PortsDestroyInProcess2ChangeStatesOfPortsInProcess1) { iox::RuntimeName_t runtimeName1 = "myApp1"; diff --git a/iceoryx_posh/test/moduletests/test_roudi_process_manager.cpp b/iceoryx_posh/test/moduletests/test_roudi_process_manager.cpp index 5fee8d4413..3fda800162 100644 --- a/iceoryx_posh/test/moduletests/test_roudi_process_manager.cpp +++ b/iceoryx_posh/test/moduletests/test_roudi_process_manager.cpp @@ -23,6 +23,7 @@ #include "iceoryx_posh/version/compatibility_check_level.hpp" #include "iceoryx_utils/cxx/string.hpp" #include "iceoryx_utils/platform/types.hpp" +#include "iceoryx_utils/testing/watch_dog.hpp" #include "test.hpp" using namespace ::testing; @@ -114,3 +115,29 @@ TEST_F(ProcessManager_test, RegisterAndUnregisterWorks) EXPECT_TRUE(unregisterResult); } + +TEST_F(ProcessManager_test, HandleProcessShutdownPreparationRequestWorks) +{ + m_sut->registerProcess(m_processname, m_pid, m_user, m_isMonitored, 1U, 1U, m_versionInfo); + + auto user = iox::posix::PosixUser::getUserOfCurrentProcess().getName(); + auto payloadDataSegmentMemoryManager = + m_roudiMemoryManager->segmentManager().value()->getSegmentInformationForUser(user).m_memoryManager; + + // get publisher and subscriber + PublisherOptions publisherOptions{ + 0U, iox::NodeName_t("node"), true, iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER}; + PublisherPortUser publisher( + m_portManager + ->acquirePublisherPortData( + {1U, 1U, 1U}, publisherOptions, m_processname, payloadDataSegmentMemoryManager, PortConfigInfo()) + .value()); + + ASSERT_TRUE(publisher.isOffered()); + + m_sut->handleProcessShutdownPreparationRequest(m_processname); + + // we just check if handleProcessShutdownPreparationRequest calls PortManager::unblockProcessShutdown + // ideally this should be checked by a mock, but since there isn't on for PortManager we just check the side effect + ASSERT_FALSE(publisher.isOffered()); +} From a1e4c4fafe73ab46631339ff34c9c17831101cce Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Thu, 8 Apr 2021 21:50:33 +0200 Subject: [PATCH 2/7] iox-#615 handle PREPARE_APP_TERMINATION in RouDi Signed-off-by: Mathias Kraus --- .../internal/runtime/ipc_interface_base.hpp | 2 ++ iceoryx_posh/source/roudi/process_manager.cpp | 7 +++++-- iceoryx_posh/source/roudi/roudi.cpp | 14 ++++++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/iceoryx_posh/include/iceoryx_posh/internal/runtime/ipc_interface_base.hpp b/iceoryx_posh/include/iceoryx_posh/internal/runtime/ipc_interface_base.hpp index 39da6d4e07..818b6166e6 100644 --- a/iceoryx_posh/include/iceoryx_posh/internal/runtime/ipc_interface_base.hpp +++ b/iceoryx_posh/include/iceoryx_posh/internal/runtime/ipc_interface_base.hpp @@ -67,6 +67,8 @@ enum class IpcMessageType : int32_t KEEPALIVE, TERMINATION, TERMINATION_ACK, + PREPARE_APP_TERMINATION, + PREPARE_APP_TERMINATION_ACK, ERROR, APP_WAIT, WAKEUP_TRIGGER, diff --git a/iceoryx_posh/source/roudi/process_manager.cpp b/iceoryx_posh/source/roudi/process_manager.cpp index d7a31ebae4..14f9bba0cd 100644 --- a/iceoryx_posh/source/roudi/process_manager.cpp +++ b/iceoryx_posh/source/roudi/process_manager.cpp @@ -78,9 +78,12 @@ void ProcessManager::handleProcessShutdownPreparationRequest(const RuntimeName_t { searchForProcessAndThen( name, - [&](Process&) { + [&](Process& process) { m_portManager.unblockProcessShutdown(name); - // TODO send response + // Reply with PREPARE_APP_TERMINATION_ACK and let process shutdown + runtime::IpcMessage sendBuffer; + sendBuffer << runtime::IpcMessageTypeToString(runtime::IpcMessageType::PREPARE_APP_TERMINATION_ACK); + process.sendViaIpcChannel(sendBuffer); }, [&]() { LogWarn() << "Unknown application " << name << " requested shutdown preparation."; }); } diff --git a/iceoryx_posh/source/roudi/roudi.cpp b/iceoryx_posh/source/roudi/roudi.cpp index bafcb87e8e..b3b69011bd 100644 --- a/iceoryx_posh/source/roudi/roudi.cpp +++ b/iceoryx_posh/source/roudi/roudi.cpp @@ -337,6 +337,20 @@ void RouDi::processMessage(const runtime::IpcMessage& message, m_prcMgr->updateLivelinessOfProcess(runtimeName); break; } + case runtime::IpcMessageType::PREPARE_APP_TERMINATION: + { + if (message.getNumberOfElements() != 2) + { + LogError() << "Wrong number of parameters for \"IpcMessageType::PREPARE_APP_TERMINATION\" from \"" + << runtimeName << "\"received!"; + } + else + { + // this is used to unblock a potentially block application by blocking publisher + m_prcMgr->handleProcessShutdownPreparationRequest(runtimeName); + } + break; + } case runtime::IpcMessageType::TERMINATION: { if (message.getNumberOfElements() != 2) From 0b2b481ffbda3bfdeb655f3517fdf8a27fd81ffa Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Thu, 8 Apr 2021 22:24:35 +0200 Subject: [PATCH 3/7] iox-#615 add PoshRuntime::shutdown to unblock app shutdown there are blocking publisher Signed-off-by: Mathias Kraus --- .../iceoptions/iox_publisher_with_options.cpp | 1 + .../iceoryx_posh/runtime/posh_runtime.hpp | 18 ++++--- iceoryx_posh/source/runtime/posh_runtime.cpp | 47 ++++++++++++++++++- 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp b/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp index 991faeb507..5030af98af 100644 --- a/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp +++ b/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp @@ -29,6 +29,7 @@ static void sigHandler(int f_sig [[gnu::unused]]) { // caught SIGINT or SIGTERM, now exit gracefully killswitch = true; + iox::runtime::PoshRuntime::getInstance().shutdown(); } int main() diff --git a/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp b/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp index ce50bf4d0b..5bcd1fd1b7 100644 --- a/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp +++ b/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp @@ -78,6 +78,10 @@ class PoshRuntime /// @return name of the registered application RuntimeName_t getInstanceName() const noexcept; + /// @brief initiates the shutdown of the runtime to unblock all potentially blocking publisher with the + /// with the SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option set + void shutdown() noexcept; + /// @brief find all services that match the provided service description /// @param[in] serviceDescription service to search for /// @return cxx::expected @@ -207,15 +211,17 @@ class PoshRuntime SharedMemoryUser m_ShmInterface; popo::ApplicationPort m_applicationPort; - void sendKeepAlive() noexcept; + std::atomic m_shutdownRequested{false}; + void sendKeepAliveAndHandleShutdownPreparation() noexcept; static_assert(PROCESS_KEEP_ALIVE_INTERVAL > roudi::DISCOVERY_INTERVAL, "Keep alive interval too small"); /// @note the m_keepAliveTask should always be the last member, so that it will be the first member to be destroyed - concurrent::PeriodicTask> m_keepAliveTask{concurrent::PeriodicTaskAutoStart, - PROCESS_KEEP_ALIVE_INTERVAL, - "KeepAlive", - *this, - &PoshRuntime::sendKeepAlive}; + concurrent::PeriodicTask> m_keepAliveTask{ + concurrent::PeriodicTaskAutoStart, + PROCESS_KEEP_ALIVE_INTERVAL, + "KeepAlive", + *this, + &PoshRuntime::sendKeepAliveAndHandleShutdownPreparation}; }; } // namespace runtime diff --git a/iceoryx_posh/source/runtime/posh_runtime.cpp b/iceoryx_posh/source/runtime/posh_runtime.cpp index 97fb7eaf30..a6f02dddfe 100644 --- a/iceoryx_posh/source/runtime/posh_runtime.cpp +++ b/iceoryx_posh/source/runtime/posh_runtime.cpp @@ -109,9 +109,14 @@ PoshRuntime::~PoshRuntime() noexcept } else { - LogError() << "Got wrong response from IPC channel :'" << receiveBuffer.getMessage() << "'"; + LogError() << "Got wrong response from IPC channel for IpcMessageType::TERMINATION:'" + << receiveBuffer.getMessage() << "'"; } } + else + { + LogError() << "Sending IpcMessageType::TERMINATION to RouDi failed:'" << receiveBuffer.getMessage() << "'"; + } } @@ -141,6 +146,11 @@ RuntimeName_t PoshRuntime::getInstanceName() const noexcept return m_appName; } +void PoshRuntime::shutdown() noexcept +{ + m_shutdownRequested.store(true, std::memory_order_relaxed); +} + const std::atomic* PoshRuntime::getServiceRegistryChangeCounter() noexcept { IpcMessage sendBuffer; @@ -589,12 +599,45 @@ bool PoshRuntime::sendRequestToRouDi(const IpcMessage& msg, IpcMessage& answer) } // this is the callback for the m_keepAliveTimer -void PoshRuntime::sendKeepAlive() noexcept +void PoshRuntime::sendKeepAliveAndHandleShutdownPreparation() noexcept { if (!m_ipcChannelInterface.sendKeepalive()) { LogWarn() << "Error in sending keep alive"; } + + // this is not the nicest solution, but we cannot send this in the signal handler where m_shutdownRequested is + // usually set; luckily the runtime already has a thread running and therefore this thread is used to unblock the + // application shutdown from a potentially blocking publisher with the with the + // SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option set + if (m_shutdownRequested.exchange(false, std::memory_order_relaxed)) + { + // Inform RouDi to prepare for app shutdown + IpcMessage sendBuffer; + sendBuffer << IpcMessageTypeToString(IpcMessageType::PREPARE_APP_TERMINATION) << m_appName; + IpcMessage receiveBuffer; + + if (m_ipcChannelInterface.sendRequestToRouDi(sendBuffer, receiveBuffer) + && (1U == receiveBuffer.getNumberOfElements())) + { + std::string IpcMessage = receiveBuffer.getElementAtIndex(0U); + + if (stringToIpcMessageType(IpcMessage.c_str()) == IpcMessageType::PREPARE_APP_TERMINATION_ACK) + { + LogVerbose() << "RouDi unblocked shutdown of " << m_appName << "."; + } + else + { + LogError() << "Got wrong response from IPC channel for PREPARE_APP_TERMINATION:'" + << receiveBuffer.getMessage() << "'"; + } + } + else + { + LogError() << "Sending IpcMessageType::PREPARE_APP_TERMINATION to RouDi failed:'" + << receiveBuffer.getMessage() << "'"; + } + } } } // namespace runtime From db4be23fa94f046bf5bd40382da782ca438a78c6 Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Thu, 8 Apr 2021 22:39:07 +0200 Subject: [PATCH 4/7] iox-#615 add test for PoshRuntime to check if shutdown unblocks a blocked publisher Signed-off-by: Mathias Kraus --- .../test/moduletests/test_posh_runtime.cpp | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp index f4c8041264..d67ab57fe3 100644 --- a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp +++ b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp @@ -16,9 +16,12 @@ // SPDX-License-Identifier: Apache-2.0 #include "iceoryx_posh/iceoryx_posh_types.hpp" +#include "iceoryx_posh/popo/publisher.hpp" +#include "iceoryx_posh/popo/subscriber.hpp" #include "iceoryx_posh/runtime/posh_runtime.hpp" #include "iceoryx_posh/testing/roudi_environment/roudi_environment.hpp" #include "iceoryx_utils/testing/timing_test.hpp" +#include "iceoryx_utils/testing/watch_dog.hpp" #include "test.hpp" #include @@ -656,6 +659,51 @@ TEST_F(PoshRuntime_test, FindServiceReturnsNoInstanceForDefaultDescription) EXPECT_THAT(0u, instanceContainer.value().size()); } +TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingPublisher) +{ + // get publisher and subscriber + iox::capro::ServiceDescription serviceDescription{"don't", "stop", "me"}; + + iox::popo::PublisherOptions publisherOptions{ + 0U, iox::NodeName_t("node"), true, iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER}; + iox::popo::SubscriberOptions subscriberOptions{ + 1U, 0U, iox::NodeName_t("node"), true, iox::popo::QueueFullPolicy::BLOCK_PUBLISHER}; + + iox::popo::Publisher publisher{serviceDescription, publisherOptions}; + iox::popo::Subscriber subscriber{serviceDescription, subscriberOptions}; + + ASSERT_TRUE(publisher.hasSubscribers()); + ASSERT_THAT(subscriber.getSubscriptionState(), Eq(iox::SubscribeState::SUBSCRIBED)); + + // send samples to fill subscriber queue + ASSERT_FALSE(publisher.publishCopyOf(42U).has_error()); + + auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U); + std::atomic_bool wasSampleSent{false}; + + constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s}; + Watchdog deadlockWatchdog{DEADLOCK_TIMEOUT}; + deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); }); + + // block in a separate thread + std::thread blockingPublisher([&] { + ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + ASSERT_FALSE(publisher.publishCopyOf(42U).has_error()); + wasSampleSent = true; + }); + + // wait some time to check if the publisher is blocked + constexpr int64_t SLEEP_IN_MS = 100; + ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_IN_MS)); + EXPECT_THAT(wasSampleSent.load(), Eq(false)); + + m_runtime->shutdown(); + + blockingPublisher.join(); // ensure the wasChunkSent store happens before the read + EXPECT_THAT(wasSampleSent.load(), Eq(true)); +} + // disabled because we cannot use the RouDiEnvironment but need a RouDi for this test // will be re-enabled with the PoshRuntime Mock from #449 TEST(PoshRuntimeFactory_test, DISABLED_SetValidRuntimeFactorySucceeds) From 9a347c988ead7681d9b084f7b3af70ab0a2f9077 Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Fri, 9 Apr 2021 11:23:14 +0200 Subject: [PATCH 5/7] iox-#615 fix documentation Signed-off-by: Mathias Kraus --- iceoryx_examples/iceoptions/iox_publisher_with_options.cpp | 2 ++ iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp | 2 +- iceoryx_posh/source/runtime/posh_runtime.cpp | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp b/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp index 5030af98af..e09a3564b4 100644 --- a/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp +++ b/iceoryx_examples/iceoptions/iox_publisher_with_options.cpp @@ -29,6 +29,8 @@ static void sigHandler(int f_sig [[gnu::unused]]) { // caught SIGINT or SIGTERM, now exit gracefully killswitch = true; + // this is optional, but since the iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option is used, + // a slow subscriber might block the shutdown and this call unblocks the publisher iox::runtime::PoshRuntime::getInstance().shutdown(); } diff --git a/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp b/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp index 5bcd1fd1b7..f9978c7ad2 100644 --- a/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp +++ b/iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp @@ -78,7 +78,7 @@ class PoshRuntime /// @return name of the registered application RuntimeName_t getInstanceName() const noexcept; - /// @brief initiates the shutdown of the runtime to unblock all potentially blocking publisher with the + /// @brief initiates the shutdown of the runtime to unblock all potentially blocking publisher /// with the SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option set void shutdown() noexcept; diff --git a/iceoryx_posh/source/runtime/posh_runtime.cpp b/iceoryx_posh/source/runtime/posh_runtime.cpp index a6f02dddfe..58bae8e182 100644 --- a/iceoryx_posh/source/runtime/posh_runtime.cpp +++ b/iceoryx_posh/source/runtime/posh_runtime.cpp @@ -608,7 +608,7 @@ void PoshRuntime::sendKeepAliveAndHandleShutdownPreparation() noexcept // this is not the nicest solution, but we cannot send this in the signal handler where m_shutdownRequested is // usually set; luckily the runtime already has a thread running and therefore this thread is used to unblock the - // application shutdown from a potentially blocking publisher with the with the + // application shutdown from a potentially blocking publisher with the // SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option set if (m_shutdownRequested.exchange(false, std::memory_order_relaxed)) { From eb9c97ba1fc27d5f1b23db1dbf394c14e2c0e956 Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Fri, 9 Apr 2021 11:52:04 +0200 Subject: [PATCH 6/7] iox-#615 update example readme Signed-off-by: Mathias Kraus --- iceoryx_examples/iceoptions/README.md | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/iceoryx_examples/iceoptions/README.md b/iceoryx_examples/iceoptions/README.md index 6164a96809..ad723fab1e 100644 --- a/iceoryx_examples/iceoptions/README.md +++ b/iceoryx_examples/iceoptions/README.md @@ -47,6 +47,13 @@ Both publisher and subscriber have to request compatible policies (`SubscriberTo publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER; ``` +With this option set, it is possible that a slow subscriber blocks a publisher indefinitely due to the busy waiting loop. +In order to be able to gracefully shutdown the application with `Ctrl+C`, the publisher needs to be unblocked. +This is done by placing the following code in the signal handler. +``` +iox::runtime::PoshRuntime::getInstance().shutdown(); +``` + ### Subscriber To configure a subscriber, we have to supply a struct of the type `iox::popo::SubscriberOptions` as a second parameter. From 4bb9cbe64d962f3ba5804599a0b93185514c88c8 Mon Sep 17 00:00:00 2001 From: Mathias Kraus Date: Fri, 9 Apr 2021 16:34:27 +0200 Subject: [PATCH 7/7] iox-#615 extend C API with the runtime shutdown method Signed-off-by: Mathias Kraus --- iceoryx_binding_c/include/iceoryx_binding_c/runtime.h | 5 +++++ iceoryx_binding_c/source/c_runtime.cpp | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/iceoryx_binding_c/include/iceoryx_binding_c/runtime.h b/iceoryx_binding_c/include/iceoryx_binding_c/runtime.h index 09080fb4c1..a1450de46e 100644 --- a/iceoryx_binding_c/include/iceoryx_binding_c/runtime.h +++ b/iceoryx_binding_c/include/iceoryx_binding_c/runtime.h @@ -1,4 +1,5 @@ // Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved. +// Copyright (c) 2021 by Apex.AI Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,4 +33,8 @@ void iox_runtime_init(const char* const name); /// If name is a nullptr, 0 will be returned. uint64_t iox_runtime_get_instance_name(char* const name, const uint64_t nameLength); +/// @brief initiates the shutdown of the runtime to unblock all potentially blocking publisher +/// with the iox_SubscriberTooSlowPolicy::SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER option set +void iox_runtime_shutdown(); + #endif diff --git a/iceoryx_binding_c/source/c_runtime.cpp b/iceoryx_binding_c/source/c_runtime.cpp index ea30f7cfd0..5120c14c0b 100644 --- a/iceoryx_binding_c/source/c_runtime.cpp +++ b/iceoryx_binding_c/source/c_runtime.cpp @@ -53,3 +53,8 @@ uint64_t iox_runtime_get_instance_name(char* const name, const uint64_t nameLeng return instanceName.size(); } + +void iox_runtime_shutdown() +{ + PoshRuntime::getInstance().shutdown(); +}