From a9e49cda9a1913a30a40952495e8072e7a9f2ecb Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Fri, 6 Jan 2023 21:31:18 +0800 Subject: [PATCH] Fix waitset notification (#3087) * fix notification lost Signed-off-by: Chen Lihui * add a regression test Signed-off-by: Chen Lihui * rename a variable name and update comments Signed-off-by: Chen Lihui * fix uncrustify issue Signed-off-by: Chen Lihui * make the regression test better Signed-off-by: Chen Lihui * fix uncrustify Signed-off-by: Chen Lihui * Refs #16192. Fix deadlock on WaitSetImpl. Signed-off-by: Miguel Company * Refs 16192. Workaround for deadlock and notify pessimization Signed-off-by: Miguel Barro * Refs 16192. Mandatory logic fix Signed-off-by: Miguel Barro * Refs 16192. linter Signed-off-by: Miguel Barro * Refs #16192. Catch up any notification lost Signed-off-by: Miguel Barro * Refs 16192. Avoid predicate evaluation on spurious wakeups. Signed-off-by: Miguel Barro * Refs 16192. Fixing gcc warnings Signed-off-by: Miguel Barro * Refs 16192. Allowing wait without previous notification Signed-off-by: Miguel Barro Signed-off-by: Chen Lihui Signed-off-by: Miguel Company Signed-off-by: Miguel Barro Co-authored-by: Miguel Company Co-authored-by: Miguel Barro --- .../fastdds/core/condition/WaitSetImpl.cpp | 54 ++++++++++---- .../fastdds/core/condition/WaitSetImpl.hpp | 2 + .../dds/core/condition/WaitSetImplTests.cpp | 71 ++++++++++++++++++- 3 files changed, 112 insertions(+), 15 deletions(-) diff --git a/src/cpp/fastdds/core/condition/WaitSetImpl.cpp b/src/cpp/fastdds/core/condition/WaitSetImpl.cpp index 73390e35de6..013bd383c95 100644 --- a/src/cpp/fastdds/core/condition/WaitSetImpl.cpp +++ b/src/cpp/fastdds/core/condition/WaitSetImpl.cpp @@ -103,6 +103,8 @@ ReturnCode_t WaitSetImpl::wait( const fastrtps::Duration_t& timeout) { std::unique_lock lock(mutex_); + // last notification processed + unsigned int old_counter = notifications_ - 1; if (is_waiting_) { @@ -111,31 +113,54 @@ ReturnCode_t WaitSetImpl::wait( auto fill_active_conditions = [&]() { - bool ret_val = false; - active_conditions.clear(); - for (const Condition* c : entries_) + bool ret_val; + + if ( old_counter == notifications_ ) + { + // spurious wakeup + return false; + } + + // Loop if predicate may be outdated + do { - if (c->get_trigger_value()) + ret_val = false; + old_counter = notifications_; + active_conditions.clear(); + + for (const Condition* c : entries_) { - ret_val = true; - active_conditions.push_back(const_cast(c)); + if (c->get_trigger_value()) + { + ret_val = true; + active_conditions.push_back(const_cast(c)); + } } } + while (old_counter != notifications_ + && active_conditions.size() != entries_.size()); + return ret_val; }; bool condition_value = false; is_waiting_ = true; - if (fastrtps::c_TimeInfinite == timeout) - { - cond_.wait(lock, fill_active_conditions); - condition_value = true; - } - else + auto missing_notification_outage = std::chrono::milliseconds(500); + auto now = std::chrono::steady_clock::now(); + auto deadline = fastrtps::c_TimeInfinite == timeout ? + std::chrono::steady_clock::time_point::max() : + now + std::chrono::nanoseconds(timeout.to_ns()); + + do { - auto ns = timeout.to_ns(); - condition_value = cond_.wait_for(lock, std::chrono::nanoseconds(ns), fill_active_conditions); + now = std::chrono::steady_clock::now(); + auto next_outage_timeout = now + missing_notification_outage; + auto ctimeout = std::min(next_outage_timeout, deadline); + + condition_value = cond_.wait_until(lock, ctimeout, fill_active_conditions); } + while (!condition_value && ( old_counter != notifications_ || deadline > now)); + is_waiting_ = false; return condition_value ? ReturnCode_t::RETCODE_OK : ReturnCode_t::RETCODE_TIMEOUT; @@ -156,6 +181,7 @@ ReturnCode_t WaitSetImpl::get_conditions( void WaitSetImpl::wake_up() { + ++notifications_; cond_.notify_one(); } diff --git a/src/cpp/fastdds/core/condition/WaitSetImpl.hpp b/src/cpp/fastdds/core/condition/WaitSetImpl.hpp index d3e5fc187b0..96c5da5dc11 100644 --- a/src/cpp/fastdds/core/condition/WaitSetImpl.hpp +++ b/src/cpp/fastdds/core/condition/WaitSetImpl.hpp @@ -19,6 +19,7 @@ #ifndef _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ #define _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_ +#include #include #include @@ -113,6 +114,7 @@ struct WaitSetImpl std::condition_variable cond_; eprosima::utilities::collections::unordered_vector entries_; bool is_waiting_ = false; + std::atomic_uint notifications_ = {1}; }; } // namespace detail diff --git a/test/unittest/dds/core/condition/WaitSetImplTests.cpp b/test/unittest/dds/core/condition/WaitSetImplTests.cpp index 2d13a7a3f8d..798a6355103 100644 --- a/test/unittest/dds/core/condition/WaitSetImplTests.cpp +++ b/test/unittest/dds/core/condition/WaitSetImplTests.cpp @@ -13,6 +13,8 @@ // limitations under the License. #include +#include +#include #include @@ -33,7 +35,7 @@ class TestCondition : public Condition { public: - bool trigger_value = false; + volatile bool trigger_value = false; bool get_trigger_value() const override { @@ -197,6 +199,73 @@ TEST(WaitSetImplTests, wait) } } +TEST(WaitSetImplTests, fix_wait_notification_lost) +{ + ConditionSeq conditions; + WaitSetImpl wait_set; + + // Waiting should return the added connection after the trigger value is updated and the wait_set waken. + { + TestCondition triggered_condition; + + // Expecting calls on the notifier of triggered_condition. + auto notifier = triggered_condition.get_notifier(); + EXPECT_CALL(*notifier, attach_to(_)).Times(1); + EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1); + + class AnotherTestCondition : public Condition + { + public: + + bool get_trigger_value() const override + { + // Time to simulate thread context switch or something else + std::this_thread::sleep_for(std::chrono::seconds(2)); + return false; + } + + } + second_simulator_condition; + + // Expecting calls on the notifier of second_simulator_condition. + notifier = second_simulator_condition.get_notifier(); + EXPECT_CALL(*notifier, attach_to(_)).Times(1); + EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1); + + wait_set.attach_condition(triggered_condition); + wait_set.attach_condition(second_simulator_condition); + + std::promise promise; + std::future future = promise.get_future(); + ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR; + std::thread wait_conditions([&]() + { + // Not to use `WaitSetImpl::wait` with a timeout value, because the + // `condition_variable::wait_for` could call _Predicate function again. + ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite); + promise.set_value(); + }); + + // One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread + std::this_thread::sleep_for(std::chrono::seconds(1)); + triggered_condition.trigger_value = true; + wait_set.wake_up(); + + // Expecting get notification after wake_up, otherwise output error within 5 seconds. + future.wait_for(std::chrono::seconds(5)); + EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret); + EXPECT_EQ(1u, conditions.size()); + EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition)); + + // Wake up the `wait_set` to make sure the thread exit + wait_set.wake_up(); + wait_conditions.join(); + + wait_set.will_be_deleted(triggered_condition); + wait_set.will_be_deleted(second_simulator_condition); + } +} + int main( int argc, char** argv)