Skip to content

Commit

Permalink
iox-eclipse-iceoryx#751 replace adaptive_wait with Barrier
Browse files Browse the repository at this point in the history
Signed-off-by: Christian Eltzschig <me@elchris.org>
  • Loading branch information
elfenpiff committed Jun 23, 2022
1 parent 3513d5f commit c6dce93
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 45 deletions.
8 changes: 4 additions & 4 deletions iceoryx_hoofs/test/moduletests/test_posix_signal_watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_hoofs/testing/barrier.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "test.hpp"
#include <atomic>
Expand Down Expand Up @@ -85,21 +85,21 @@ void unblocksWhenSignalWasRaisedForWaiters(SignalWatcher_test& test,
const uint64_t numberOfWaiters,
const std::function<void()>& wait)
{
std::atomic<uint64_t> isThreadStarted{0};
Barrier isThreadStarted(numberOfWaiters);
std::atomic<uint64_t> isThreadFinished{0};

std::vector<std::thread> threads;

for (uint64_t i = 0; i < numberOfWaiters; ++i)
{
threads.emplace_back([&] {
++isThreadStarted;
isThreadStarted.notify();
wait();
++isThreadFinished;
});
}

iox::cxx::internal::adaptive_wait().wait_loop([&] { return (isThreadStarted != numberOfWaiters); });
isThreadStarted.wait();

std::this_thread::sleep_for(test.waitingTime);

Expand Down
13 changes: 7 additions & 6 deletions iceoryx_posh/test/integrationtests/test_client_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/testing/barrier.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/popo/client.hpp"
#include "iceoryx_posh/popo/server.hpp"
Expand Down Expand Up @@ -348,7 +349,7 @@ TEST_F(ClientServer_test, ServerTakeRequestUnblocksClientSendingRequest)
std::atomic_bool wasRequestSent{false};

// block in a separate thread
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread blockingClient([&] {
auto sendRequest = [&]() {
auto loanResult = client.loan();
Expand All @@ -363,14 +364,14 @@ TEST_F(ClientServer_test, ServerTakeRequestUnblocksClientSendingRequest)
}

// signal that an blocking send is expected
isThreadStarted = true;
isThreadStarted.notify();
sendRequest();
wasRequestSent = true;
});

// wait some time to check if the client is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
isThreadStarted.wait();
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasRequestSent.load(), Eq(false));

Expand Down Expand Up @@ -409,7 +410,7 @@ TEST_F(ClientServer_test, ClientTakesResponseUnblocksServerSendingResponse)
std::atomic_bool wasResponseSent{false};

// block in a separate thread
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread blockingServer([&] {
auto processRequest = [&]() {
auto takeResult = server.take();
Expand All @@ -424,14 +425,14 @@ TEST_F(ClientServer_test, ClientTakesResponseUnblocksServerSendingResponse)
processRequest();
}

isThreadStarted = true;
isThreadStarted.notify();
processRequest();
wasResponseSent = true;
});

// wait some time to check if the server is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
isThreadStarted.wait();
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasResponseSent.load(), Eq(false));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "iceoryx_hoofs/cxx/string.hpp"
#include "iceoryx_hoofs/cxx/variant.hpp"
#include "iceoryx_hoofs/cxx/vector.hpp"
#include "iceoryx_hoofs/testing/barrier.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/popo/publisher.hpp"
#include "iceoryx_posh/popo/subscriber.hpp"
Expand Down Expand Up @@ -551,16 +552,16 @@ TEST_F(PublisherSubscriberCommunication_test, PublisherBlocksWhenBlockingActivat
EXPECT_FALSE(publisher->publishCopyOf("and hypnotoad will smile back").has_error());

std::atomic_bool wasSampleDelivered{false};
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread t1([&] {
isThreadStarted = true;
isThreadStarted.notify();
EXPECT_FALSE(publisher->publishCopyOf("oh no hypnotoad is staring at me").has_error());
wasSampleDelivered.store(true);
});

constexpr int64_t TIMEOUT_IN_MS = 100;

iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
isThreadStarted.wait();
std::this_thread::sleep_for(std::chrono::milliseconds(TIMEOUT_IN_MS));
EXPECT_FALSE(wasSampleDelivered.load());

Expand Down
19 changes: 10 additions & 9 deletions iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/cxx/variant_queue.hpp"
#include "iceoryx_hoofs/testing/barrier.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/internal/mepoo/shared_chunk.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/chunk_distributor.hpp"
Expand Down Expand Up @@ -620,16 +621,16 @@ TYPED_TEST(ChunkDistributor_test, DeliverToQueueWithBlockingOptionBlocksDelivery
ASSERT_FALSE(sut.deliverToQueue(queueData->m_uniqueId, EXPECTED_QUEUE_INDEX, chunk).has_error());
}

std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
auto chunk = this->allocateChunk(7373);
std::atomic_bool wasChunkDelivered{false};
std::thread t1([&] {
isThreadStarted = true;
isThreadStarted.notify();
ASSERT_FALSE(sut.deliverToQueue(queueData->m_uniqueId, EXPECTED_QUEUE_INDEX, chunk).has_error());
wasChunkDelivered = true;
});

iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });
isThreadStarted.wait();

std::this_thread::sleep_for(this->BLOCKING_DURATION);
EXPECT_THAT(wasChunkDelivered.load(), Eq(false));
Expand Down Expand Up @@ -738,15 +739,15 @@ TYPED_TEST(ChunkDistributor_test, DeliverToSingleQueueBlocksWhenOptionsAreSetToB
ASSERT_FALSE(sut.tryAddQueue(queueData.get(), 0U).has_error());
sut.deliverToAllStoredQueues(this->allocateChunk(155U));

std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::atomic_bool wasChunkDelivered{false};
std::thread t1([&] {
isThreadStarted = true;
isThreadStarted.notify();
sut.deliverToAllStoredQueues(this->allocateChunk(152U));
wasChunkDelivered = true;
});

iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });
isThreadStarted.wait();

std::this_thread::sleep_for(this->BLOCKING_DURATION);
EXPECT_THAT(wasChunkDelivered.load(), Eq(false));
Expand Down Expand Up @@ -785,15 +786,15 @@ TYPED_TEST(ChunkDistributor_test, MultipleBlockingQueuesWillBeFilledWhenThereBec

sut.deliverToAllStoredQueues(this->allocateChunk(425U));

std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::atomic_bool wasChunkDelivered{false};
std::thread t1([&] {
isThreadStarted.store(true);
isThreadStarted.notify();
sut.deliverToAllStoredQueues(this->allocateChunk(1152U));
wasChunkDelivered = true;
});

iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });
isThreadStarted.wait();

std::this_thread::sleep_for(this->BLOCKING_DURATION);
EXPECT_THAT(wasChunkDelivered.load(), Eq(false));
Expand Down
20 changes: 10 additions & 10 deletions iceoryx_posh/test/moduletests/test_popo_condition_variable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"
#include "iceoryx_hoofs/testing/barrier.hpp"
#include "iceoryx_hoofs/testing/timing_test.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/condition_listener.hpp"
Expand Down Expand Up @@ -117,14 +117,14 @@ TEST_F(ConditionVariable_test, WaitAndNotifyResultsInImmediateTriggerMultiThread
{
::testing::Test::RecordProperty("TEST_ID", "39b40c73-3dcc-4af6-9682-b62816c69854");
std::atomic<int> counter{0};
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread waiter([&] {
EXPECT_THAT(counter, Eq(0));
isThreadStarted = true;
isThreadStarted.notify();
m_waiter.wait();
EXPECT_THAT(counter, Eq(1));
});
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });
isThreadStarted.wait();

counter++;
m_signaler.notify();
Expand Down Expand Up @@ -366,18 +366,18 @@ TIMING_TEST_F(ConditionVariable_test, WaitBlocks, Repeat(5), [&] {
ConditionNotifier notifier(m_condVarData, EVENT_INDEX);
ConditionListener listener(m_condVarData);
NotificationVector_t activeNotifications;
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::atomic_bool hasWaited{false};

std::thread waiter([&] {
isThreadStarted = true;
isThreadStarted.notify();
activeNotifications = listener.wait();
hasWaited.store(true, std::memory_order_relaxed);
ASSERT_THAT(activeNotifications.size(), Eq(1U));
EXPECT_THAT(activeNotifications[0], Eq(EVENT_INDEX));
});

iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });
isThreadStarted.wait();

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_THAT(hasWaited, Eq(false));
Expand Down Expand Up @@ -409,9 +409,9 @@ TIMING_TEST_F(ConditionVariable_test, SecondWaitBlocksUntilNewNotification, Repe
Watchdog watchdogSecondWait(m_timeToWait);
watchdogSecondWait.watchAndActOnFailure([&] { listener.destroy(); });

std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread waiter([&] {
isThreadStarted = true;
isThreadStarted.notify();
activeNotifications = listener.wait();
hasWaited.store(true, std::memory_order_relaxed);
ASSERT_THAT(activeNotifications.size(), Eq(1U));
Expand All @@ -422,7 +422,7 @@ TIMING_TEST_F(ConditionVariable_test, SecondWaitBlocksUntilNewNotification, Repe
}
});

iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });
isThreadStarted.wait();

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_THAT(hasWaited, Eq(false));
Expand Down
20 changes: 10 additions & 10 deletions iceoryx_posh/test/moduletests/test_posh_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/cxx/convert.hpp"
#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"
#include "iceoryx_hoofs/testing/barrier.hpp"
#include "iceoryx_hoofs/testing/timing_test.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/iceoryx_posh_types.hpp"
Expand Down Expand Up @@ -957,15 +957,15 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingPublisher)
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread blockingPublisher([&] {
isThreadStarted = true;
isThreadStarted.notify();
ASSERT_FALSE(publisher.publishCopyOf(42U).has_error());
wasSampleSent = true;
});

// wait some time to check if the publisher is blocked
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
isThreadStarted.wait();
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasSampleSent.load(), Eq(false));
Expand Down Expand Up @@ -1005,7 +1005,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient)
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread blockingClient([&] {
auto sendRequest = [&](bool expectError) {
auto clientLoanResult = client.loan(sizeof(uint64_t), alignof(uint64_t));
Expand All @@ -1026,15 +1026,15 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingClient)
}

// signal that an blocking send is expected
isThreadStarted = true;
isThreadStarted.notify();
constexpr bool EXPECT_ERROR_INDICATOR{true};
sendRequest(EXPECT_ERROR_INDICATOR);
wasRequestSent = true;
});

// wait some time to check if the client is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
isThreadStarted.wait();
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasRequestSent.load(), Eq(false));

Expand Down Expand Up @@ -1081,7 +1081,7 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer)
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread blockingServer([&] {
auto processRequest = [&](bool expectError) {
auto takeResult = server.take();
Expand All @@ -1103,15 +1103,15 @@ TEST_F(PoshRuntime_test, ShutdownUnblocksBlockingServer)
processRequest(EXPECT_ERROR_INDICATOR);
}

isThreadStarted = true;
isThreadStarted.notify();
constexpr bool EXPECT_ERROR_INDICATOR{true};
processRequest(EXPECT_ERROR_INDICATOR);
wasResponseSent = true;
});

// wait some time to check if the server is blocked
constexpr std::chrono::milliseconds SLEEP_TIME{100U};
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
isThreadStarted.wait();
std::this_thread::sleep_for(SLEEP_TIME);
EXPECT_THAT(wasResponseSent.load(), Eq(false));

Expand Down
7 changes: 4 additions & 3 deletions iceoryx_posh/test/moduletests/test_roudi_portmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/testing/barrier.hpp"
#include "test_roudi_portmanager_fixture.hpp"

namespace iox_test_roudi_portmanager
Expand Down Expand Up @@ -842,18 +843,18 @@ void PortManager_test::setupAndTestBlockingPublisher(const iox::RuntimeName_t& p
deadlockWatchdog.watchAndActOnFailure([] { std::terminate(); });

// block in a separate thread
std::atomic_bool isThreadStarted{false};
Barrier isThreadStarted(1U);
std::thread blockingPublisher([&] {
auto maybeChunk = publisher.tryAllocateChunk(42U, 8U);
ASSERT_FALSE(maybeChunk.has_error());
isThreadStarted = true;
isThreadStarted.notify();
publisher.sendChunk(maybeChunk.value());
wasChunkSent = true;
});

// wait some time to check if the publisher is blocked
constexpr int64_t SLEEP_IN_MS = 100;
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted.load(); });
isThreadStarted.wait();
std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_IN_MS));
EXPECT_THAT(wasChunkSent.load(), Eq(false));

Expand Down

0 comments on commit c6dce93

Please sign in to comment.