Skip to content

Commit

Permalink
iox-eclipse-iceoryx#751 Replace ShutdownManager with SignalWatcher in…
Browse files Browse the repository at this point in the history
… dds gateway, replace semaphores in test with adaptive wait

Signed-off-by: Christian Eltzschig <me@elchris.org>
  • Loading branch information
elfenpiff committed Jun 23, 2022
1 parent 803f501 commit 42d1bde
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 78 deletions.
4 changes: 2 additions & 2 deletions iceoryx_binding_c/test/moduletests/test_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ void notifyClient(ClientPortData& portData)
portData.m_connectionState = iox::ConnectionState::CONNECTED;
iox::popo::ChunkQueuePusher<ClientChunkQueueData_t> pusher{&portData.m_chunkReceiverData};
pusher.push(iox::mepoo::SharedChunk());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error());
}

TIMING_TEST_F(iox_listener_test, NotifyingClientEventWorks, Repeat(5), [&] {
Expand Down Expand Up @@ -491,7 +491,7 @@ void notifyServer(ServerPortData& portData)
{
iox::popo::ChunkQueuePusher<ServerChunkQueueData_t> pusher{&portData.m_chunkReceiverData};
pusher.push(iox::mepoo::SharedChunk());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error());
}

TEST_F(iox_listener_test, AttachingServerWorks)
Expand Down
6 changes: 3 additions & 3 deletions iceoryx_binding_c/test/moduletests/test_wait_set.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ void notifyClient(ClientPortData& portData)
portData.m_connectionState = iox::ConnectionState::CONNECTED;
iox::popo::ChunkQueuePusher<ClientChunkQueueData_t> pusher{&portData.m_chunkReceiverData};
pusher.push(iox::mepoo::SharedChunk());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error());
}

TEST_F(iox_ws_test, NotifyingClientEventWorks)
Expand Down Expand Up @@ -928,7 +928,7 @@ void notifyServer(ServerPortData& portData)
{
iox::popo::ChunkQueuePusher<ServerChunkQueueData_t> pusher{&portData.m_chunkReceiverData};
pusher.push(iox::mepoo::SharedChunk());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error());
}

TEST_F(iox_ws_test, AttachingServerEventWorks)
Expand Down Expand Up @@ -1140,7 +1140,7 @@ void notifyServiceDiscovery(SubscriberPortData& portData)
{
iox::popo::ChunkQueuePusher<SubscriberChunkReceiverData_t> pusher{&portData.m_chunkReceiverData};
pusher.push(iox::mepoo::SharedChunk());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error());
EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error());
}

TEST_F(iox_ws_test, NotifyingServiceDiscoveryEventWorks)
Expand Down
37 changes: 2 additions & 35 deletions iceoryx_dds/source/gateway/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,46 +21,13 @@
#include "iceoryx_hoofs/cxx/helplets.hpp"
#include "iceoryx_hoofs/cxx/optional.hpp"
#include "iceoryx_hoofs/platform/signal.hpp"
#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_handler.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_posh/gateway/gateway_config.hpp"
#include "iceoryx_posh/gateway/toml_gateway_config_parser.hpp"
#include "iceoryx_posh/runtime/posh_runtime.hpp"

class ShutdownManager
{
public:
static void scheduleShutdown(int num)
{
char reason = '\0';
psignal(num, &reason);
s_semaphore.post().or_else([](auto) {
std::cerr << "failed to call post on shutdown semaphore" << std::endl;
std::terminate();
});
}
static void waitUntilShutdown()
{
s_semaphore.wait().or_else([](auto) {
std::cerr << "failed to call wait on shutdown semaphore" << std::endl;
std::terminate();
});
}

private:
static iox::posix::Semaphore s_semaphore;
ShutdownManager() = default;
};
iox::posix::Semaphore ShutdownManager::s_semaphore =
iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0u).value();

int main()
{
// Set OS signal handlers
auto signalGuardInt = iox::posix::registerSignalHandler(iox::posix::Signal::INT, ShutdownManager::scheduleShutdown);
auto signalGuardTerm =
iox::posix::registerSignalHandler(iox::posix::Signal::TERM, ShutdownManager::scheduleShutdown);

// Start application
iox::runtime::PoshRuntime::initRuntime("iox-dds-gateway");

Expand All @@ -84,7 +51,7 @@ int main()
dds2ioxGateway.runMultithreaded();

// Run until SIGINT or SIGTERM
ShutdownManager::waitUntilShutdown();
iox::posix::waitForTerminationRequest();

return 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/cxx/optional.hpp"
#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_posh/popo/listener.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
Expand Down
1 change: 0 additions & 1 deletion iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/cxx/optional.hpp"
#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_posh/popo/listener.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
Expand Down
12 changes: 6 additions & 6 deletions iceoryx_posh/test/integrationtests/test_client_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ TEST_F(ClientServer_test, ServerTakeRequestUnblocksClientSendingRequest)
ASSERT_TRUE(server.hasClients());
ASSERT_THAT(client.getConnectionState(), Eq(iox::ConnectionState::CONNECTED));

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool wasRequestSent{false};

// block in a separate thread
std::atomic_bool isThreadStarted{false};
std::thread blockingClient([&] {
auto sendRequest = [&]() {
auto loanResult = client.loan();
Expand All @@ -363,14 +363,14 @@ TEST_F(ClientServer_test, ServerTakeRequestUnblocksClientSendingRequest)
}

// signal that an blocking send is expected
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
sendRequest();
wasRequestSent = true;
});

// wait some time to check if the client is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasRequestSent.load(), Eq(false));

Expand Down Expand Up @@ -406,10 +406,10 @@ TEST_F(ClientServer_test, ClientTakesResponseUnblocksServerSendingResponse)
EXPECT_FALSE(clientLoanResult.value().send().has_error());
}

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool wasResponseSent{false};

// block in a separate thread
std::atomic_bool isThreadStarted{false};
std::thread blockingServer([&] {
auto processRequest = [&]() {
auto takeResult = server.take();
Expand All @@ -424,14 +424,14 @@ TEST_F(ClientServer_test, ClientTakesResponseUnblocksServerSendingResponse)
processRequest();
}

ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
processRequest();
wasResponseSent = true;
});

// wait some time to check if the server is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasResponseSent.load(), Eq(false));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "iceoryx_hoofs/cxx/string.hpp"
#include "iceoryx_hoofs/cxx/variant.hpp"
#include "iceoryx_hoofs/cxx/vector.hpp"
#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
Expand Down Expand Up @@ -551,18 +550,17 @@ TEST_F(PublisherSubscriberCommunication_test, PublisherBlocksWhenBlockingActivat
EXPECT_FALSE(publisher->publishCopyOf("start your day with a smile").has_error());
EXPECT_FALSE(publisher->publishCopyOf("and hypnotoad will smile back").has_error());

auto threadSyncSemaphore = posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U);

std::atomic_bool wasSampleDelivered{false};
std::atomic_bool isThreadStarted{false};
std::thread t1([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
EXPECT_FALSE(publisher->publishCopyOf("oh no hypnotoad is staring at me").has_error());
wasSampleDelivered.store(true);
});

constexpr int64_t TIMEOUT_IN_MS = 100;

ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT_IN_MS));
EXPECT_FALSE(wasSampleDelivered.load());

Expand Down Expand Up @@ -595,16 +593,15 @@ TEST_F(PublisherSubscriberCommunication_test, PublisherDoesNotBlockAndDiscardsSa
EXPECT_FALSE(publisher->publishCopyOf("first there was a blubb named mantua").has_error());
EXPECT_FALSE(publisher->publishCopyOf("second hypnotoad ate it").has_error());

auto threadSyncSemaphore = posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U);

std::atomic_bool wasSampleDelivered{false};
std::atomic_bool isThreadStarted{false};
std::thread t1([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
EXPECT_FALSE(publisher->publishCopyOf("third a tiny black hole smells like butter").has_error());
wasSampleDelivered.store(true);
});

ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
t1.join();
EXPECT_TRUE(wasSampleDelivered.load());

Expand Down Expand Up @@ -666,18 +663,17 @@ TEST_F(PublisherSubscriberCommunication_test, MixedOptionsSetupWorksWithBlocking
EXPECT_FALSE(publisherBlocking->publishCopyOf("hypnotoad wants a cookie").has_error());
EXPECT_FALSE(publisherNonBlocking->publishCopyOf("hypnotoad has a sister named hypnoodle").has_error());

auto threadSyncSemaphore = posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U);

std::atomic_bool wasSampleDelivered{false};
std::atomic_bool isThreadStarted{false};
std::thread t1([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
EXPECT_FALSE(publisherBlocking->publishCopyOf("chucky is the only one who can ride the hypnotoad").has_error());
wasSampleDelivered.store(true);
});

constexpr int64_t TIMEOUT_IN_MS = 100;

ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT_IN_MS));
EXPECT_FALSE(wasSampleDelivered.load());

Expand Down
13 changes: 8 additions & 5 deletions iceoryx_posh/test/moduletests/test_popo_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "iceoryx_hoofs/cxx/optional.hpp"
#include "iceoryx_hoofs/cxx/vector.hpp"
#include "iceoryx_hoofs/internal/concurrent/smart_lock.hpp"
#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp"
#include "iceoryx_hoofs/posix_wrapper/unnamed_semaphore.hpp"
#include "iceoryx_hoofs/testing/timing_test.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
Expand Down Expand Up @@ -156,7 +156,7 @@ iox::concurrent::smart_lock<std::vector<EventAndSutPair_t>> g_toBeAttached;
iox::concurrent::smart_lock<std::vector<EventAndSutPair_t>> g_toBeDetached;
std::array<TriggerSourceAndCount, iox::MAX_NUMBER_OF_EVENTS_PER_LISTENER> g_triggerCallbackArg;
uint64_t g_triggerCallbackRuntimeInMs = 0U;
iox::cxx::optional<iox::posix::Semaphore> g_callbackBlocker;
iox::cxx::optional<iox::posix::UnnamedSemaphore> g_callbackBlocker;

class Listener_test : public Test
{
Expand Down Expand Up @@ -227,8 +227,11 @@ class Listener_test : public Test

void activateTriggerCallbackBlocker() noexcept
{
g_callbackBlocker.emplace(
iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U).value());
iox::posix::UnnamedSemaphoreBuilder()
.initialValue(0U)
.isInterProcessCapable(false)
.create(g_callbackBlocker)
.expect("Unable to create callback blocker semaphore");
}

void unblockTriggerCallback(const uint64_t numberOfUnblocks) noexcept
Expand Down Expand Up @@ -1213,4 +1216,4 @@ TIMING_TEST_F(Listener_test, AttachingInCallbackWorks, Repeat(5), [&] {
// END
//////////////////////////////////

} // namespace
} // namespace
19 changes: 10 additions & 9 deletions iceoryx_posh/test/moduletests/test_posh_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/cxx/convert.hpp"
#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"
#include "iceoryx_hoofs/testing/timing_test.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
Expand Down Expand Up @@ -949,23 +950,23 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingPublisher)
// send samples to fill subscriber queue
ASSERT_FALSE(publisher.publishCopyOf(42U).has_error());

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool wasSampleSent{false};

constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s};
Watchdog deadlockWatchdog{DEADLOCK_TIMEOUT};
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::atomic_bool isThreadStarted{false};
std::thread blockingPublisher([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
ASSERT_FALSE(publisher.publishCopyOf(42U).has_error());
wasSampleSent = true;
});

// wait some time to check if the publisher is blocked
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasSampleSent.load(), Eq(false));

Expand Down Expand Up @@ -997,14 +998,14 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient)
ASSERT_TRUE(server.hasClients());
ASSERT_THAT(client.getConnectionState(), Eq(iox::ConnectionState::CONNECTED));

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool wasRequestSent{false};

constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s};
Watchdog deadlockWatchdog{DEADLOCK_TIMEOUT};
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::atomic_bool isThreadStarted{false};
std::thread blockingClient([&] {
auto sendRequest = [&](bool expectError) {
auto clientLoanResult = client.loan(sizeof(uint64_t), alignof(uint64_t));
Expand All @@ -1025,15 +1026,15 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient)
}

// signal that an blocking send is expected
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
constexpr bool EXPECT_ERROR_INDICATOR{true};
sendRequest(EXPECT_ERROR_INDICATOR);
wasRequestSent = true;
});

// wait some time to check if the client is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasRequestSent.load(), Eq(false));

Expand Down Expand Up @@ -1073,14 +1074,14 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer)
EXPECT_FALSE(client.send(clientLoanResult.value()).has_error());
}

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool wasResponseSent{false};

constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s};
Watchdog deadlockWatchdog{DEADLOCK_TIMEOUT};
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::atomic_bool isThreadStarted{false};
std::thread blockingServer([&] {
auto processRequest = [&](bool expectError) {
auto takeResult = server.take();
Expand All @@ -1102,15 +1103,15 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer)
processRequest(EXPECT_ERROR_INDICATOR);
}

ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
constexpr bool EXPECT_ERROR_INDICATOR{true};
processRequest(EXPECT_ERROR_INDICATOR);
wasResponseSent = true;
});

// wait some time to check if the server is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasResponseSent.load(), Eq(false));

Expand Down
Loading

0 comments on commit 42d1bde

Please sign in to comment.