Skip to content

Commit

Permalink
iox-eclipse-iceoryx#751 Add wait_loop to adaptive_wait and use it in …
Browse files Browse the repository at this point in the history
…the tests for a more efficient busy loop

Signed-off-by: Christian Eltzschig <me@elchris.org>
  • Loading branch information
elfenpiff committed Jun 23, 2022
1 parent 208586b commit 803f501
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
#ifndef IOX_HOOFS_CXX_ADAPTIVE_WAIT_HPP
#define IOX_HOOFS_CXX_ADAPTIVE_WAIT_HPP

#include "iceoryx_hoofs/cxx/function_ref.hpp"
#include "iceoryx_hoofs/internal/units/duration.hpp"

#include <atomic>
#include <cstdint>

namespace iox
Expand Down Expand Up @@ -53,6 +55,10 @@ class adaptive_wait
/// with exponential waiting times is pursued.
void wait() noexcept;

/// @brief Waits in a loop in a smart wait until continueToWait returns false.
/// @param[in] continueToWait callable which returns if the wait should continue
void wait_loop(const function_ref<bool()>& continueToWait) noexcept;

protected:
/// @note All numbers are not accurate and are just rough estimates
/// acquired by the experiments described below.
Expand Down
13 changes: 13 additions & 0 deletions iceoryx_hoofs/source/cxx/adaptive_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,19 @@ void adaptive_wait::wait() noexcept
std::this_thread::sleep_for(FINAL_WAITING_TIME);
}
}

void adaptive_wait::wait_loop(const function_ref<bool()>& continueToWait) noexcept
{
if (!continueToWait)
{
return;
}

while (continueToWait())
{
wait();
}
}
} // namespace internal
} // namespace cxx
} // namespace iox
35 changes: 35 additions & 0 deletions iceoryx_hoofs/test/moduletests/test_cxx_adaptive_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ using namespace ::testing;

#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"

#include <thread>

using namespace iox::cxx::internal;

namespace
Expand Down Expand Up @@ -74,4 +76,37 @@ TEST(AdaptiveWaitTest, waitWaitsAtLeastFINAL_WAITING_TIMEafterINITIAL_REPETITION
std::chrono::nanoseconds(end - start).count(),
Ge(iox::units::Duration::fromMilliseconds(AdaptiveWaitSut::FINAL_WAITING_TIME.count()).toNanoseconds()));
}

TEST(AdaptiveWaitTest, wait_loopWaitsAtLeastAsLongAsTheConditionsReturnsTrue)
{
::testing::Test::RecordProperty("TEST_ID", "c44e9315-fed4-4681-ba0c-2d25bce4459b");
class AdaptiveWaitSut : public adaptive_wait
{
public:
using adaptive_wait::FINAL_WAITING_TIME;
using adaptive_wait::INITIAL_REPETITIONS;
};

std::atomic_bool continueToWait{true};
std::atomic_bool threadIsStarted{false};
std::thread waitThread{[&] {
threadIsStarted = true;
AdaptiveWaitSut().wait_loop([&] { return continueToWait.load(); });
}};

while (!threadIsStarted.load())
{
std::this_thread::yield();
}

auto start = std::chrono::steady_clock::now();
const std::chrono::milliseconds waitTime(100);
std::this_thread::sleep_for(waitTime);
auto end = std::chrono::steady_clock::now();

continueToWait.store(false);

EXPECT_THAT(std::chrono::nanoseconds(end - start).count(),
Ge(iox::units::Duration::fromMilliseconds(waitTime.count()).toNanoseconds()));
}
} // namespace
6 changes: 2 additions & 4 deletions iceoryx_hoofs/test/moduletests/test_posix_signal_watcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"
#include "iceoryx_hoofs/posix_wrapper/signal_watcher.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "test.hpp"
Expand Down Expand Up @@ -98,10 +99,7 @@ void unblocksWhenSignalWasRaisedForWaiters(SignalWatcher_test& test,
});
}

while (isThreadStarted.load() != numberOfWaiters)
{
std::this_thread::yield();
}
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });

std::this_thread::sleep_for(test.waitingTime);

Expand Down
21 changes: 12 additions & 9 deletions iceoryx_posh/test/moduletests/test_popo_chunk_distributor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -620,16 +620,17 @@ TYPED_TEST(ChunkDistributor_test, DeliverToQueueWithBlockingOptionBlocksDelivery
ASSERT_FALSE(sut.deliverToQueue(queueData->m_uniqueId, EXPECTED_QUEUE_INDEX, chunk).has_error());
}

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool isThreadStarted{false};
auto chunk = this->allocateChunk(7373);
std::atomic_bool wasChunkDelivered{false};
std::thread t1([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
ASSERT_FALSE(sut.deliverToQueue(queueData->m_uniqueId, EXPECTED_QUEUE_INDEX, chunk).has_error());
wasChunkDelivered = true;
});

ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });

std::this_thread::sleep_for(this->BLOCKING_DURATION);
EXPECT_THAT(wasChunkDelivered.load(), Eq(false));

Expand Down Expand Up @@ -737,15 +738,16 @@ TYPED_TEST(ChunkDistributor_test, DeliverToSingleQueueBlocksWhenOptionsAreSetToB
ASSERT_FALSE(sut.tryAddQueue(queueData.get(), 0U).has_error());
sut.deliverToAllStoredQueues(this->allocateChunk(155U));

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool isThreadStarted{false};
std::atomic_bool wasChunkDelivered{false};
std::thread t1([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted = true;
sut.deliverToAllStoredQueues(this->allocateChunk(152U));
wasChunkDelivered = true;
});

ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });

std::this_thread::sleep_for(this->BLOCKING_DURATION);
EXPECT_THAT(wasChunkDelivered.load(), Eq(false));

Expand Down Expand Up @@ -783,15 +785,16 @@ TYPED_TEST(ChunkDistributor_test, MultipleBlockingQueuesWillBeFilledWhenThereBec

sut.deliverToAllStoredQueues(this->allocateChunk(425U));

auto threadSyncSemaphore = iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U);
std::atomic_bool isThreadStarted{false};
std::atomic_bool wasChunkDelivered{false};
std::thread t1([&] {
ASSERT_FALSE(threadSyncSemaphore->post().has_error());
isThreadStarted.store(true);
sut.deliverToAllStoredQueues(this->allocateChunk(1152U));
wasChunkDelivered = true;
});

ASSERT_FALSE(threadSyncSemaphore->wait().has_error());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });

std::this_thread::sleep_for(this->BLOCKING_DURATION);
EXPECT_THAT(wasChunkDelivered.load(), Eq(false));

Expand Down
29 changes: 13 additions & 16 deletions iceoryx_posh/test/moduletests/test_popo_condition_variable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//
// SPDX-License-Identifier: Apache-2.0

#include "iceoryx_hoofs/internal/cxx/adaptive_wait.hpp"
#include "iceoryx_hoofs/testing/timing_test.hpp"
#include "iceoryx_hoofs/testing/watch_dog.hpp"
#include "iceoryx_posh/internal/popo/building_blocks/condition_listener.hpp"
Expand Down Expand Up @@ -58,8 +59,6 @@ class ConditionVariable_test : public Test
}

Watchdog m_watchdog{m_timeToWait};
iox::posix::Semaphore m_syncSemaphore =
iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U).value();
};

TEST_F(ConditionVariable_test, ConditionListenerIsNeitherCopyNorMovable)
Expand Down Expand Up @@ -118,13 +117,15 @@ TEST_F(ConditionVariable_test, WaitAndNotifyResultsInImmediateTriggerMultiThread
{
::testing::Test::RecordProperty("TEST_ID", "39b40c73-3dcc-4af6-9682-b62816c69854");
std::atomic<int> counter{0};
std::atomic_bool isThreadStarted{false};
std::thread waiter([&] {
EXPECT_THAT(counter, Eq(0));
IOX_DISCARD_RESULT(m_syncSemaphore.post());
isThreadStarted = true;
m_waiter.wait();
EXPECT_THAT(counter, Eq(1));
});
IOX_DISCARD_RESULT(m_syncSemaphore.wait());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });

counter++;
m_signaler.notify();
waiter.join();
Expand Down Expand Up @@ -235,8 +236,6 @@ TEST_F(ConditionVariable_test, TimedWaitReturnsAllNotifiedIndices)
TIMING_TEST_F(ConditionVariable_test, TimedWaitBlocksUntilTimeout, Repeat(5), [&] {
ConditionListener listener(m_condVarData);
NotificationVector_t activeNotifications;
iox::posix::Semaphore threadSetupSemaphore =
iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U).value();
std::atomic_bool hasWaited{false};

std::thread waiter([&] {
Expand All @@ -255,8 +254,6 @@ TIMING_TEST_F(ConditionVariable_test, TimedWaitBlocksUntilTimeout, Repeat(5), [&
TIMING_TEST_F(ConditionVariable_test, TimedWaitBlocksUntilNotification, Repeat(5), [&] {
ConditionListener listener(m_condVarData);
NotificationVector_t activeNotifications;
iox::posix::Semaphore threadSetupSemaphore =
iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U).value();
std::atomic_bool hasWaited{false};

std::thread waiter([&] {
Expand Down Expand Up @@ -369,19 +366,19 @@ TIMING_TEST_F(ConditionVariable_test, WaitBlocks, Repeat(5), [&] {
ConditionNotifier notifier(m_condVarData, EVENT_INDEX);
ConditionListener listener(m_condVarData);
NotificationVector_t activeNotifications;
iox::posix::Semaphore threadSetupSemaphore =
iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U).value();
std::atomic_bool isThreadStarted{false};
std::atomic_bool hasWaited{false};

std::thread waiter([&] {
IOX_DISCARD_RESULT(threadSetupSemaphore.post());
isThreadStarted = true;
activeNotifications = listener.wait();
hasWaited.store(true, std::memory_order_relaxed);
ASSERT_THAT(activeNotifications.size(), Eq(1U));
EXPECT_THAT(activeNotifications[0], Eq(EVENT_INDEX));
});

IOX_DISCARD_RESULT(threadSetupSemaphore.wait());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_THAT(hasWaited, Eq(false));
notifier.notify();
Expand All @@ -396,8 +393,6 @@ TIMING_TEST_F(ConditionVariable_test, SecondWaitBlocksUntilNewNotification, Repe
ConditionNotifier notifier1(m_condVarData, FIRST_EVENT_INDEX);
ConditionNotifier notifier2(m_condVarData, SECOND_EVENT_INDEX);
ConditionListener listener(m_condVarData);
iox::posix::Semaphore threadSetupSemaphore =
iox::posix::Semaphore::create(iox::posix::CreateUnnamedSingleProcessSemaphore, 0U).value();
std::atomic_bool hasWaited{false};

Watchdog watchdogFirstWait(m_timeToWait);
Expand All @@ -414,8 +409,9 @@ TIMING_TEST_F(ConditionVariable_test, SecondWaitBlocksUntilNewNotification, Repe
Watchdog watchdogSecondWait(m_timeToWait);
watchdogSecondWait.watchAndActOnFailure([&] { listener.destroy(); });

std::atomic_bool isThreadStarted{false};
std::thread waiter([&] {
IOX_DISCARD_RESULT(threadSetupSemaphore.post());
isThreadStarted = true;
activeNotifications = listener.wait();
hasWaited.store(true, std::memory_order_relaxed);
ASSERT_THAT(activeNotifications.size(), Eq(1U));
Expand All @@ -426,7 +422,8 @@ TIMING_TEST_F(ConditionVariable_test, SecondWaitBlocksUntilNewNotification, Repe
}
});

IOX_DISCARD_RESULT(threadSetupSemaphore.wait());
iox::cxx::internal::adaptive_wait().wait_loop([&] { return !isThreadStarted; });

std::this_thread::sleep_for(std::chrono::milliseconds(10));
EXPECT_THAT(hasWaited, Eq(false));
notifier1.notify();
Expand Down

0 comments on commit 803f501

Please sign in to comment.