From 42d1bde478208a774b255338a15333d25f8a0fbb Mon Sep 17 00:00:00 2001 From: Christian Eltzschig Date: Thu, 16 Jun 2022 19:20:43 +0200 Subject: [PATCH] iox-#751 Replace ShutdownManager with SignalWatcher in dds gateway, replace semaphores in test with adaptive wait Signed-off-by: Christian Eltzschig --- .../test/moduletests/test_listener.cpp | 4 +- .../test/moduletests/test_wait_set.cpp | 6 +-- iceoryx_dds/source/gateway/main.cpp | 37 +------------------ ...ice_callbacks_listener_as_class_member.cpp | 1 - .../callbacks/ice_callbacks_subscriber.cpp | 1 - .../integrationtests/test_client_server.cpp | 12 +++--- ...est_publisher_subscriber_communication.cpp | 22 +++++------ .../test/moduletests/test_popo_listener.cpp | 13 ++++--- .../test/moduletests/test_posh_runtime.cpp | 19 +++++----- .../moduletests/test_roudi_portmanager.cpp | 6 +-- 10 files changed, 43 insertions(+), 78 deletions(-) diff --git a/iceoryx_binding_c/test/moduletests/test_listener.cpp b/iceoryx_binding_c/test/moduletests/test_listener.cpp index 1c37c1b205..b6ba72b3ed 100644 --- a/iceoryx_binding_c/test/moduletests/test_listener.cpp +++ b/iceoryx_binding_c/test/moduletests/test_listener.cpp @@ -441,7 +441,7 @@ void notifyClient(ClientPortData& portData) portData.m_connectionState = iox::ConnectionState::CONNECTED; iox::popo::ChunkQueuePusher pusher{&portData.m_chunkReceiverData}; pusher.push(iox::mepoo::SharedChunk()); - EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error()); + EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error()); } TIMING_TEST_F(iox_listener_test, NotifyingClientEventWorks, Repeat(5), [&] { @@ -491,7 +491,7 @@ void notifyServer(ServerPortData& portData) { iox::popo::ChunkQueuePusher pusher{&portData.m_chunkReceiverData}; pusher.push(iox::mepoo::SharedChunk()); - EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error()); + EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error()); } TEST_F(iox_listener_test, AttachingServerWorks) diff --git a/iceoryx_binding_c/test/moduletests/test_wait_set.cpp b/iceoryx_binding_c/test/moduletests/test_wait_set.cpp index 7e5754edc8..87b0a83004 100644 --- a/iceoryx_binding_c/test/moduletests/test_wait_set.cpp +++ b/iceoryx_binding_c/test/moduletests/test_wait_set.cpp @@ -796,7 +796,7 @@ void notifyClient(ClientPortData& portData) portData.m_connectionState = iox::ConnectionState::CONNECTED; iox::popo::ChunkQueuePusher pusher{&portData.m_chunkReceiverData}; pusher.push(iox::mepoo::SharedChunk()); - EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error()); + EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error()); } TEST_F(iox_ws_test, NotifyingClientEventWorks) @@ -928,7 +928,7 @@ void notifyServer(ServerPortData& portData) { iox::popo::ChunkQueuePusher pusher{&portData.m_chunkReceiverData}; pusher.push(iox::mepoo::SharedChunk()); - EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error()); + EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error()); } TEST_F(iox_ws_test, AttachingServerEventWorks) @@ -1140,7 +1140,7 @@ void notifyServiceDiscovery(SubscriberPortData& portData) { iox::popo::ChunkQueuePusher pusher{&portData.m_chunkReceiverData}; pusher.push(iox::mepoo::SharedChunk()); - EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->m_semaphore.post().has_error()); + EXPECT_FALSE(portData.m_chunkReceiverData.m_conditionVariableDataPtr->semaphore->post().has_error()); } TEST_F(iox_ws_test, NotifyingServiceDiscoveryEventWorks) diff --git a/iceoryx_dds/source/gateway/main.cpp b/iceoryx_dds/source/gateway/main.cpp index f3f49fbb7d..9ad5fa16aa 100644 --- a/iceoryx_dds/source/gateway/main.cpp +++ b/iceoryx_dds/source/gateway/main.cpp @@ -21,46 +21,13 @@ #include "iceoryx_hoofs/cxx/helplets.hpp" #include "iceoryx_hoofs/cxx/optional.hpp" #include "iceoryx_hoofs/platform/signal.hpp" -#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp" -#include "iceoryx_hoofs/posix_wrapper/signal_handler.hpp" +#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp" #include "iceoryx_posh/gateway/gateway_config.hpp" #include "iceoryx_posh/gateway/toml_gateway_config_parser.hpp" #include "iceoryx_posh/runtime/posh_runtime.hpp" -class ShutdownManager -{ - public: - static void scheduleShutdown(int num) - { - char reason = '\0'; - psignal(num, &reason); - s_semaphore.post().or_else([](auto) { - std::cerr << "failed to call post on shutdown semaphore" << std::endl; - std::terminate(); - }); - } - static void waitUntilShutdown() - { - s_semaphore.wait().or_else([](auto) { - std::cerr << "failed to call wait on shutdown semaphore" << std::endl; - std::terminate(); - }); - } - - private: - static iox::posix::Semaphore s_semaphore; - ShutdownManager() = default; -}; -iox::posix::Semaphore ShutdownManager::s_semaphore = - iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0u).value(); - int main() { - // Set OS signal handlers - auto signalGuardInt = iox::posix::registerSignalHandler(iox::posix::Signal::INT, ShutdownManager::scheduleShutdown); - auto signalGuardTerm = - iox::posix::registerSignalHandler(iox::posix::Signal::TERM, ShutdownManager::scheduleShutdown); - // Start application iox::runtime::PoshRuntime::initRuntime("iox-dds-gateway"); @@ -84,7 +51,7 @@ int main() dds2ioxGateway.runMultithreaded(); // Run until SIGINT or SIGTERM - ShutdownManager::waitUntilShutdown(); + iox::posix::waitForTerminationRequest(); return 0; } diff --git a/iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp b/iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp index 07e4248a43..5d85c8612d 100644 --- a/iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp +++ b/iceoryx_examples/callbacks/ice_callbacks_listener_as_class_member.cpp @@ -15,7 +15,6 @@ // SPDX-License-Identifier: Apache-2.0 #include "iceoryx_hoofs/cxx/optional.hpp" -#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp" #include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp" #include "iceoryx_posh/popo/listener.hpp" #include "iceoryx_posh/popo/subscriber.hpp" diff --git a/iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp b/iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp index e2cd85e78b..a969f34a08 100644 --- a/iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp +++ b/iceoryx_examples/callbacks/ice_callbacks_subscriber.cpp @@ -15,7 +15,6 @@ // SPDX-License-Identifier: Apache-2.0 #include "iceoryx_hoofs/cxx/optional.hpp" -#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp" #include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp" #include "iceoryx_posh/popo/listener.hpp" #include "iceoryx_posh/popo/subscriber.hpp" diff --git a/iceoryx_posh/test/integrationtests/test_client_server.cpp b/iceoryx_posh/test/integrationtests/test_client_server.cpp index 1b9852aa2d..318bb82bb1 100644 --- a/iceoryx_posh/test/integrationtests/test_client_server.cpp +++ b/iceoryx_posh/test/integrationtests/test_client_server.cpp @@ -345,10 +345,10 @@ TEST_F(ClientServer_test, ServerTakeRequestUnblocksClientSendingRequest) ASSERT_TRUE(server.hasClients()); ASSERT_THAT(client.getConnectionState(), Eq(iox::ConnectionState::CONNECTED)); - auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U); std::atomic_bool wasRequestSent{false}; // block in a separate thread + std::atomic_bool isThreadStarted{false}; std::thread blockingClient([&] { auto sendRequest = [&]() { auto loanResult = client.loan(); @@ -363,14 +363,14 @@ TEST_F(ClientServer_test, ServerTakeRequestUnblocksClientSendingRequest) } // signal that an blocking send is expected - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; sendRequest(); wasRequestSent = true; }); // wait some time to check if the client is blocked constexpr std::chrono::milliseconds SLEEP_TIME{100U}; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); std::this_thread::sleep_for(SLEEP_TIME); EXPECT_THAT(wasRequestSent.load(), Eq(false)); @@ -406,10 +406,10 @@ TEST_F(ClientServer_test, ClientTakesResponseUnblocksServerSendingResponse) EXPECT_FALSE(clientLoanResult.value().send().has_error()); } - auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U); std::atomic_bool wasResponseSent{false}; // block in a separate thread + std::atomic_bool isThreadStarted{false}; std::thread blockingServer([&] { auto processRequest = [&]() { auto takeResult = server.take(); @@ -424,14 +424,14 @@ TEST_F(ClientServer_test, ClientTakesResponseUnblocksServerSendingResponse) processRequest(); } - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; processRequest(); wasResponseSent = true; }); // wait some time to check if the server is blocked constexpr std::chrono::milliseconds SLEEP_TIME{100U}; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); std::this_thread::sleep_for(SLEEP_TIME); EXPECT_THAT(wasResponseSent.load(), Eq(false)); diff --git a/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp b/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp index 57b6508c71..d07ee37da4 100644 --- a/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp +++ b/iceoryx_posh/test/integrationtests/test_publisher_subscriber_communication.cpp @@ -21,7 +21,6 @@ #include "iceoryx_hoofs/cxx/string.hpp" #include "iceoryx_hoofs/cxx/variant.hpp" #include "iceoryx_hoofs/cxx/vector.hpp" -#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp" #include "iceoryx_hoofs/testing/watch_dog.hpp" #include "iceoryx_posh/popo/publisher.hpp" #include "iceoryx_posh/popo/subscriber.hpp" @@ -551,18 +550,17 @@ TEST_F(PublisherSubscriberCommunication_test, PublisherBlocksWhenBlockingActivat EXPECT_FALSE(publisher->publishCopyOf("start your day with a smile").has_error()); EXPECT_FALSE(publisher->publishCopyOf("and hypnotoad will smile back").has_error()); - auto threadSyncSemaphore = posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U); - std::atomic_bool wasSampleDelivered{false}; + std::atomic_bool isThreadStarted{false}; std::thread t1([&] { - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; EXPECT_FALSE(publisher->publishCopyOf("oh no hypnotoad is staring at me").has_error()); wasSampleDelivered.store(true); }); constexpr int64_t TIMEOUT_IN_MS = 100; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); EXPECT_FALSE(wasSampleDelivered.load()); @@ -595,16 +593,15 @@ TEST_F(PublisherSubscriberCommunication_test, PublisherDoesNotBlockAndDiscardsSa EXPECT_FALSE(publisher->publishCopyOf("first there was a blubb named mantua").has_error()); EXPECT_FALSE(publisher->publishCopyOf("second hypnotoad ate it").has_error()); - auto threadSyncSemaphore = posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U); - std::atomic_bool wasSampleDelivered{false}; + std::atomic_bool isThreadStarted{false}; std::thread t1([&] { - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; EXPECT_FALSE(publisher->publishCopyOf("third a tiny black hole smells like butter").has_error()); wasSampleDelivered.store(true); }); - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); t1.join(); EXPECT_TRUE(wasSampleDelivered.load()); @@ -666,18 +663,17 @@ TEST_F(PublisherSubscriberCommunication_test, MixedOptionsSetupWorksWithBlocking EXPECT_FALSE(publisherBlocking->publishCopyOf("hypnotoad wants a cookie").has_error()); EXPECT_FALSE(publisherNonBlocking->publishCopyOf("hypnotoad has a sister named hypnoodle").has_error()); - auto threadSyncSemaphore = posix::Semaphore::create(posix::CreateUnnamedSingleProcessSemaphore, 0U); - std::atomic_bool wasSampleDelivered{false}; + std::atomic_bool isThreadStarted{false}; std::thread t1([&] { - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; EXPECT_FALSE(publisherBlocking->publishCopyOf("chucky is the only one who can ride the hypnotoad").has_error()); wasSampleDelivered.store(true); }); constexpr int64_t TIMEOUT_IN_MS = 100; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT_IN_MS)); EXPECT_FALSE(wasSampleDelivered.load()); diff --git a/iceoryx_posh/test/moduletests/test_popo_listener.cpp b/iceoryx_posh/test/moduletests/test_popo_listener.cpp index d3d260f008..1b982216ca 100644 --- a/iceoryx_posh/test/moduletests/test_popo_listener.cpp +++ b/iceoryx_posh/test/moduletests/test_popo_listener.cpp @@ -18,7 +18,7 @@ #include "iceoryx_hoofs/cxx/optional.hpp" #include "iceoryx_hoofs/cxx/vector.hpp" #include "iceoryx_hoofs/internal/concurrent/smart_lock.hpp" -#include "iceoryx_hoofs/posix_wrapper/semaphore.hpp" +#include "iceoryx_hoofs/posix_wrapper/unnamed_semaphore.hpp" #include "iceoryx_hoofs/testing/timing_test.hpp" #include "iceoryx_hoofs/testing/watch_dog.hpp" #include "iceoryx_posh/iceoryx_posh_types.hpp" @@ -156,7 +156,7 @@ iox::concurrent::smart_lock> g_toBeAttached; iox::concurrent::smart_lock> g_toBeDetached; std::array g_triggerCallbackArg; uint64_t g_triggerCallbackRuntimeInMs = 0U; -iox::cxx::optional g_callbackBlocker; +iox::cxx::optional g_callbackBlocker; class Listener_test : public Test { @@ -227,8 +227,11 @@ class Listener_test : public Test void activateTriggerCallbackBlocker() noexcept { - g_callbackBlocker.emplace( - iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U).value()); + iox::posix::UnnamedSemaphoreBuilder() + .initialValue(0U) + .isInterProcessCapable(false) + .create(g_callbackBlocker) + .expect("Unable to create callback blocker semaphore"); } void unblockTriggerCallback(const uint64_t numberOfUnblocks) noexcept @@ -1213,4 +1216,4 @@ TIMING_TEST_F(Listener_test, AttachingInCallbackWorks, Repeat(5), [&] { // END ////////////////////////////////// -} // namespace \ No newline at end of file +} // namespace diff --git a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp index b7d31fd18a..684ad92089 100644 --- a/iceoryx_posh/test/moduletests/test_posh_runtime.cpp +++ b/iceoryx_posh/test/moduletests/test_posh_runtime.cpp @@ -16,6 +16,7 @@ // SPDX-License-Identifier: Apache-2.0 #include "iceoryx_hoofs/cxx/convert.hpp" +#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp" #include "iceoryx_hoofs/testing/timing_test.hpp" #include "iceoryx_hoofs/testing/watch_dog.hpp" #include "iceoryx_posh/iceoryx_posh_types.hpp" @@ -949,7 +950,6 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingPublisher) // send samples to fill subscriber queue ASSERT_FALSE(publisher.publishCopyOf(42U).has_error()); - auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U); std::atomic_bool wasSampleSent{false}; constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s}; @@ -957,15 +957,16 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingPublisher) deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); }); // block in a separate thread + std::atomic_bool isThreadStarted{false}; std::thread blockingPublisher([&] { - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; ASSERT_FALSE(publisher.publishCopyOf(42U).has_error()); wasSampleSent = true; }); // wait some time to check if the publisher is blocked + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); constexpr std::chrono::milliseconds SLEEP_TIME{100U}; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); std::this_thread::sleep_for(SLEEP_TIME); EXPECT_THAT(wasSampleSent.load(), Eq(false)); @@ -997,7 +998,6 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient) ASSERT_TRUE(server.hasClients()); ASSERT_THAT(client.getConnectionState(), Eq(iox::ConnectionState::CONNECTED)); - auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U); std::atomic_bool wasRequestSent{false}; constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s}; @@ -1005,6 +1005,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient) deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); }); // block in a separate thread + std::atomic_bool isThreadStarted{false}; std::thread blockingClient([&] { auto sendRequest = [&](bool expectError) { auto clientLoanResult = client.loan(sizeof(uint64_t), alignof(uint64_t)); @@ -1025,7 +1026,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient) } // signal that an blocking send is expected - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; constexpr bool EXPECT_ERROR_INDICATOR{true}; sendRequest(EXPECT_ERROR_INDICATOR); wasRequestSent = true; @@ -1033,7 +1034,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient) // wait some time to check if the client is blocked constexpr std::chrono::milliseconds SLEEP_TIME{100U}; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); std::this_thread::sleep_for(SLEEP_TIME); EXPECT_THAT(wasRequestSent.load(), Eq(false)); @@ -1073,7 +1074,6 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer) EXPECT_FALSE(client.send(clientLoanResult.value()).has_error()); } - auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U); std::atomic_bool wasResponseSent{false}; constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s}; @@ -1081,6 +1081,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer) deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); }); // block in a separate thread + std::atomic_bool isThreadStarted{false}; std::thread blockingServer([&] { auto processRequest = [&](bool expectError) { auto takeResult = server.take(); @@ -1102,7 +1103,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer) processRequest(EXPECT_ERROR_INDICATOR); } - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; constexpr bool EXPECT_ERROR_INDICATOR{true}; processRequest(EXPECT_ERROR_INDICATOR); wasResponseSent = true; @@ -1110,7 +1111,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer) // wait some time to check if the server is blocked constexpr std::chrono::milliseconds SLEEP_TIME{100U}; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); std::this_thread::sleep_for(SLEEP_TIME); EXPECT_THAT(wasResponseSent.load(), Eq(false)); diff --git a/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp b/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp index 4b897f54b1..0f99b35f4f 100644 --- a/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp +++ b/iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp @@ -835,7 +835,6 @@ void PortManager_test::setupAndTestBlockingPublisher(const iox::RuntimeName_t& p ASSERT_FALSE(maybeChunk.has_error()); publisher.sendChunk(maybeChunk.value()); - auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U); std::atomic_bool wasChunkSent{false}; constexpr iox::units::Duration DEADLOCK_TIMEOUT{5_s}; @@ -843,17 +842,18 @@ void PortManager_test::setupAndTestBlockingPublisher(const iox::RuntimeName_t& p deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); }); // block in a separate thread + std::atomic_bool isThreadStarted{false}; std::thread blockingPublisher([&] { auto maybeChunk = publisher.tryAllocateChunk(42U, 8U); ASSERT_FALSE(maybeChunk.has_error()); - ASSERT_FALSE(threadSyncSemaphore->post().has_error()); + isThreadStarted = true; publisher.sendChunk(maybeChunk.value()); wasChunkSent = true; }); // wait some time to check if the publisher is blocked constexpr int64_t SLEEP_IN_MS = 100; - ASSERT_FALSE(threadSyncSemaphore->wait().has_error()); + iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); }); std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_IN_MS)); EXPECT_THAT(wasChunkSent.load(), Eq(false));