Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iox #615 unblock app shutdown with blocked publisher #708

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions iceoryx_binding_c/include/iceoryx_binding_c/runtime.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// Copyright (c) 2020 by Robert Bosch GmbH. All rights reserved.
// Copyright (c) 2021 by Apex.AI Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,4 +33,8 @@ void iox_runtime_init(const char* const name);
/// If name is a nullptr, 0 will be returned.
uint64_t iox_runtime_get_instance_name(char* const name, const uint64_t nameLength);

/// @brief initiates the shutdown of the runtime to unblock all potentially blocking publisher
/// with the iox_SubscriberTooSlowPolicy::SubscriberTooSlowPolicy_WAIT_FOR_SUBSCRIBER option set
void iox_runtime_shutdown();

#endif
5 changes: 5 additions & 0 deletions iceoryx_binding_c/source/c_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,8 @@ uint64_t iox_runtime_get_instance_name(char* const name, const uint64_t nameLeng

return instanceName.size();
}

void iox_runtime_shutdown()
{
PoshRuntime::getInstance().shutdown();
}
7 changes: 7 additions & 0 deletions iceoryx_examples/iceoptions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ Both publisher and subscriber have to request compatible policies (`SubscriberTo
publisherOptions.subscriberTooSlowPolicy = iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER;
```

With this option set, it is possible that a slow subscriber blocks a publisher indefinitely due to the busy waiting loop.
In order to be able to gracefully shutdown the application with `Ctrl+C`, the publisher needs to be unblocked.
This is done by placing the following code in the signal handler.
```
iox::runtime::PoshRuntime::getInstance().shutdown();
```

### Subscriber

To configure a subscriber, we have to supply a struct of the type `iox::popo::SubscriberOptions` as a second parameter.
Expand Down
3 changes: 3 additions & 0 deletions iceoryx_examples/iceoptions/iox_publisher_with_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ static void sigHandler(int f_sig [[gnu::unused]])
{
// caught SIGINT or SIGTERM, now exit gracefully
killswitch = true;
// this is optional, but since the iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option is used,
// a slow subscriber might block the shutdown and this call unblocks the publisher
iox::runtime::PoshRuntime::getInstance().shutdown();
}

int main()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ class PortManager
cxx::expected<popo::ConditionVariableData*, PortPoolError>
acquireConditionVariableData(const RuntimeName_t& runtimeName) noexcept;

/// @brief Used to unblock potential locks in the shutdown phase
void unblockShutdown() noexcept;
/// @brief Used to unblock potential locks in the shutdown phase of a process
/// @param [in] name of the process runtime which is about to shut down
void unblockProcessShutdown(const RuntimeName_t& runtimeName) noexcept;

/// @brief Used to unblock potential locks in the shutdown phase of RouDi
void unblockRouDiShutdown() noexcept;

void deletePortsOfProcess(const RuntimeName_t& runtimeName) noexcept;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ class ProcessManager : public ProcessManagerInterface
/// @return true if one or more of the registered processes is running, false otherwise
bool isAnyRegisteredProcessStillRunning() noexcept;

/// @brief A process is about to shut down and needs to be unblock by a potentially block publisher
/// @param [in] name of the process runtime which is about to shut down
void handleProcessShutdownPreparationRequest(const RuntimeName_t& name) noexcept;

/// @brief Tries to gracefully terminate all registered processes
void requestShutdownOfAllProcesses() noexcept;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ enum class IpcMessageType : int32_t
KEEPALIVE,
TERMINATION,
TERMINATION_ACK,
PREPARE_APP_TERMINATION,
PREPARE_APP_TERMINATION_ACK,
ERROR,
APP_WAIT,
WAKEUP_TRIGGER,
Expand Down
18 changes: 12 additions & 6 deletions iceoryx_posh/include/iceoryx_posh/runtime/posh_runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class PoshRuntime
/// @return name of the registered application
RuntimeName_t getInstanceName() const noexcept;

/// @brief initiates the shutdown of the runtime to unblock all potentially blocking publisher
/// with the SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option set
void shutdown() noexcept;

/// @brief find all services that match the provided service description
/// @param[in] serviceDescription service to search for
/// @return cxx::expected<InstanceContainer, FindServiceError>
Expand Down Expand Up @@ -207,15 +211,17 @@ class PoshRuntime
SharedMemoryUser m_ShmInterface;
popo::ApplicationPort m_applicationPort;

void sendKeepAlive() noexcept;
std::atomic<bool> m_shutdownRequested{false};
void sendKeepAliveAndHandleShutdownPreparation() noexcept;
static_assert(PROCESS_KEEP_ALIVE_INTERVAL > roudi::DISCOVERY_INTERVAL, "Keep alive interval too small");

/// @note the m_keepAliveTask should always be the last member, so that it will be the first member to be destroyed
concurrent::PeriodicTask<cxx::MethodCallback<void>> m_keepAliveTask{concurrent::PeriodicTaskAutoStart,
PROCESS_KEEP_ALIVE_INTERVAL,
"KeepAlive",
*this,
&PoshRuntime::sendKeepAlive};
concurrent::PeriodicTask<cxx::MethodCallback<void>> m_keepAliveTask{
concurrent::PeriodicTaskAutoStart,
PROCESS_KEEP_ALIVE_INTERVAL,
"KeepAlive",
*this,
&PoshRuntime::sendKeepAliveAndHandleShutdownPreparation};
};

} // namespace runtime
Expand Down
15 changes: 14 additions & 1 deletion iceoryx_posh/source/roudi/port_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,20 @@ void PortManager::sendToAllMatchingInterfacePorts(const capro::CaproMessage& mes
}
}

void PortManager::unblockShutdown() noexcept
void PortManager::unblockProcessShutdown(const RuntimeName_t& runtimeName) noexcept
{
for (auto port : m_portPool->getPublisherPortDataList())
{
PublisherPortRouDiType publisherPort(port);
if (runtimeName == publisherPort.getRuntimeName())
{
port->m_offeringRequested.store(false, std::memory_order_relaxed);
doDiscoveryForPublisherPort(publisherPort);
}
}
}

void PortManager::unblockRouDiShutdown() noexcept
{
makeAllPublisherPortsToStopOffer();
}
Expand Down
17 changes: 16 additions & 1 deletion iceoryx_posh/source/roudi/process_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,21 @@ ProcessManager::ProcessManager(RouDiMemoryInterface& roudiMemoryInterface,
auto m_segmentInfo = m_segmentManager->getSegmentInformationForUser(currentUser);
m_memoryManagerOfCurrentProcess = m_segmentInfo.m_memoryManager;
}

void ProcessManager::handleProcessShutdownPreparationRequest(const RuntimeName_t& name) noexcept
{
searchForProcessAndThen(
name,
[&](Process& process) {
m_portManager.unblockProcessShutdown(name);
// Reply with PREPARE_APP_TERMINATION_ACK and let process shutdown
runtime::IpcMessage sendBuffer;
sendBuffer << runtime::IpcMessageTypeToString(runtime::IpcMessageType::PREPARE_APP_TERMINATION_ACK);
process.sendViaIpcChannel(sendBuffer);
},
[&]() { LogWarn() << "Unknown application " << name << " requested shutdown preparation."; });
}

void ProcessManager::requestShutdownOfAllProcesses() noexcept
{
// send SIG_TERM to all running applications and wait for processes to answer with TERMINATION
Expand All @@ -82,7 +97,7 @@ void ProcessManager::requestShutdownOfAllProcesses() noexcept
}

// this unblocks the RouDi shutdown if a publisher port is blocked by a full subscriber queue
m_portManager.unblockShutdown();
m_portManager.unblockRouDiShutdown();
}

bool ProcessManager::isAnyRegisteredProcessStillRunning() noexcept
Expand Down
14 changes: 14 additions & 0 deletions iceoryx_posh/source/roudi/roudi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,20 @@ void RouDi::processMessage(const runtime::IpcMessage& message,
m_prcMgr->updateLivelinessOfProcess(runtimeName);
break;
}
case runtime::IpcMessageType::PREPARE_APP_TERMINATION:
{
if (message.getNumberOfElements() != 2)
{
LogError() << "Wrong number of parameters for \"IpcMessageType::PREPARE_APP_TERMINATION\" from \""
<< runtimeName << "\"received!";
}
else
{
// this is used to unblock a potentially block application by blocking publisher
m_prcMgr->handleProcessShutdownPreparationRequest(runtimeName);
}
break;
}
case runtime::IpcMessageType::TERMINATION:
{
if (message.getNumberOfElements() != 2)
Expand Down
47 changes: 45 additions & 2 deletions iceoryx_posh/source/runtime/posh_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,14 @@ PoshRuntime::~PoshRuntime() noexcept
}
else
{
LogError() << "Got wrong response from IPC channel :'" << receiveBuffer.getMessage() << "'";
LogError() << "Got wrong response from IPC channel for IpcMessageType::TERMINATION:'"
<< receiveBuffer.getMessage() << "'";
}
}
else
{
LogError() << "Sending IpcMessageType::TERMINATION to RouDi failed:'" << receiveBuffer.getMessage() << "'";
}
}


Expand Down Expand Up @@ -141,6 +146,11 @@ RuntimeName_t PoshRuntime::getInstanceName() const noexcept
return m_appName;
}

void PoshRuntime::shutdown() noexcept
{
m_shutdownRequested.store(true, std::memory_order_relaxed);
}

const std::atomic<uint64_t>* PoshRuntime::getServiceRegistryChangeCounter() noexcept
{
IpcMessage sendBuffer;
Expand Down Expand Up @@ -589,12 +599,45 @@ bool PoshRuntime::sendRequestToRouDi(const IpcMessage& msg, IpcMessage& answer)
}

// this is the callback for the m_keepAliveTimer
void PoshRuntime::sendKeepAlive() noexcept
void PoshRuntime::sendKeepAliveAndHandleShutdownPreparation() noexcept
{
if (!m_ipcChannelInterface.sendKeepalive())
{
LogWarn() << "Error in sending keep alive";
}

// this is not the nicest solution, but we cannot send this in the signal handler where m_shutdownRequested is
// usually set; luckily the runtime already has a thread running and therefore this thread is used to unblock the
// application shutdown from a potentially blocking publisher with the
// SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER option set
if (m_shutdownRequested.exchange(false, std::memory_order_relaxed))
{
// Inform RouDi to prepare for app shutdown
IpcMessage sendBuffer;
sendBuffer << IpcMessageTypeToString(IpcMessageType::PREPARE_APP_TERMINATION) << m_appName;
IpcMessage receiveBuffer;

if (m_ipcChannelInterface.sendRequestToRouDi(sendBuffer, receiveBuffer)
&& (1U == receiveBuffer.getNumberOfElements()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uuuhh Yoda notation you use.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c&p I used :)

{
std::string IpcMessage = receiveBuffer.getElementAtIndex(0U);

if (stringToIpcMessageType(IpcMessage.c_str()) == IpcMessageType::PREPARE_APP_TERMINATION_ACK)
{
LogVerbose() << "RouDi unblocked shutdown of " << m_appName << ".";
}
else
{
LogError() << "Got wrong response from IPC channel for PREPARE_APP_TERMINATION:'"
<< receiveBuffer.getMessage() << "'";
}
}
else
{
LogError() << "Sending IpcMessageType::PREPARE_APP_TERMINATION to RouDi failed:'"
<< receiveBuffer.getMessage() << "'";
}
}
}

} // namespace runtime
Expand Down
48 changes: 48 additions & 0 deletions iceoryx_posh/test/moduletests/test_posh_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_posh/iceoryx_posh_types.hpp"
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"
#include "iceoryx_posh/testing/roudi_environment/roudi_environment.hpp"
#include "iceoryx_utils/testing/timing_test.hpp"
#include "iceoryx_utils/testing/watch_dog.hpp"
#include "test.hpp"

#include <type_traits>
Expand Down Expand Up @@ -656,6 +659,51 @@ TEST_F(PoshRuntime_test, FindServiceReturnsNoInstanceForDefaultDescription)
EXPECT_THAT(0u, instanceContainer.value().size());
}

TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingPublisher)
{
// get publisher and subscriber
iox::capro::ServiceDescription serviceDescription{"don't", "stop", "me"};

iox::popo::PublisherOptions publisherOptions{
0U, iox::NodeName_t("node"), true, iox::popo::SubscriberTooSlowPolicy::WAIT_FOR_SUBSCRIBER};
iox::popo::SubscriberOptions subscriberOptions{
1U, 0U, iox::NodeName_t("node"), true, iox::popo::QueueFullPolicy::BLOCK_PUBLISHER};

iox::popo::Publisher<uint8_t> publisher{serviceDescription, publisherOptions};
iox::popo::Subscriber<uint8_t> subscriber{serviceDescription, subscriberOptions};

ASSERT_TRUE(publisher.hasSubscribers());
ASSERT_THAT(subscriber.getSubscriptionState(), Eq(iox::SubscribeState::SUBSCRIBED));

// send samples to fill subscriber queue
ASSERT_FALSE(publisher.publishCopyOf(42U).has_error());

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool wasSampleSent{false};

constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s};
Watchdog deadlockWatchdog{DEADLOCK_TIMEOUT};
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::thread blockingPublisher([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
ASSERT_FALSE(publisher.publishCopyOf(42U).has_error());
wasSampleSent = true;
});

// wait some time to check if the publisher is blocked
constexpr int64_t SLEEP_IN_MS = 100;
ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_IN_MS));
EXPECT_THAT(wasSampleSent.load(), Eq(false));

m_runtime->shutdown();

blockingPublisher.join(); // ensure the wasChunkSent store happens before the read
EXPECT_THAT(wasSampleSent.load(), Eq(true));
}

// disabled because we cannot use the RouDiEnvironment but need a RouDi for this test
// will be re-enabled with the PoshRuntime Mock from #449
TEST(PoshRuntimeFactory_test, DISABLED_SetValidRuntimeFactorySucceeds)
Expand Down
Loading