Skip to content

Commit

Permalink
Fix waitset notification (#3087)
Browse files Browse the repository at this point in the history
* fix notification lost

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* add a regression test

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* rename a variable name and update comments

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* fix uncrustify issue

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* make the regression test better

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* fix uncrustify

Signed-off-by: Chen Lihui <lihui.chen@sony.com>

* Refs #16192. Fix deadlock on WaitSetImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 16192. Workaround for deadlock and notify pessimization

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 16192. Mandatory logic fix

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 16192. linter

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs #16192. Catch up any notification lost

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 16192. Avoid predicate evaluation on spurious wakeups.

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 16192. Fixing gcc warnings

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

* Refs 16192. Allowing wait without previous notification

Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>

Signed-off-by: Chen Lihui <lihui.chen@sony.com>
Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
Signed-off-by: Miguel Barro <miguelbarro@eprosima.com>
Co-authored-by: Miguel Company <miguelcompany@eprosima.com>
Co-authored-by: Miguel Barro <miguelbarro@eprosima.com>
  • Loading branch information
3 people authored Jan 6, 2023
1 parent 5c589de commit a9e49cd
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 15 deletions.
54 changes: 40 additions & 14 deletions src/cpp/fastdds/core/condition/WaitSetImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ ReturnCode_t WaitSetImpl::wait(
const fastrtps::Duration_t& timeout)
{
std::unique_lock<std::mutex> lock(mutex_);
// last notification processed
unsigned int old_counter = notifications_ - 1;

if (is_waiting_)
{
Expand All @@ -111,31 +113,54 @@ ReturnCode_t WaitSetImpl::wait(

auto fill_active_conditions = [&]()
{
bool ret_val = false;
active_conditions.clear();
for (const Condition* c : entries_)
bool ret_val;

if ( old_counter == notifications_ )
{
// spurious wakeup
return false;
}

// Loop if predicate may be outdated
do
{
if (c->get_trigger_value())
ret_val = false;
old_counter = notifications_;
active_conditions.clear();

for (const Condition* c : entries_)
{
ret_val = true;
active_conditions.push_back(const_cast<Condition*>(c));
if (c->get_trigger_value())
{
ret_val = true;
active_conditions.push_back(const_cast<Condition*>(c));
}
}
}
while (old_counter != notifications_
&& active_conditions.size() != entries_.size());

return ret_val;
};

bool condition_value = false;
is_waiting_ = true;
if (fastrtps::c_TimeInfinite == timeout)
{
cond_.wait(lock, fill_active_conditions);
condition_value = true;
}
else
auto missing_notification_outage = std::chrono::milliseconds(500);
auto now = std::chrono::steady_clock::now();
auto deadline = fastrtps::c_TimeInfinite == timeout ?
std::chrono::steady_clock::time_point::max() :
now + std::chrono::nanoseconds(timeout.to_ns());

do
{
auto ns = timeout.to_ns();
condition_value = cond_.wait_for(lock, std::chrono::nanoseconds(ns), fill_active_conditions);
now = std::chrono::steady_clock::now();
auto next_outage_timeout = now + missing_notification_outage;
auto ctimeout = std::min(next_outage_timeout, deadline);

condition_value = cond_.wait_until(lock, ctimeout, fill_active_conditions);
}
while (!condition_value && ( old_counter != notifications_ || deadline > now));

is_waiting_ = false;

return condition_value ? ReturnCode_t::RETCODE_OK : ReturnCode_t::RETCODE_TIMEOUT;
Expand All @@ -156,6 +181,7 @@ ReturnCode_t WaitSetImpl::get_conditions(

void WaitSetImpl::wake_up()
{
++notifications_;
cond_.notify_one();
}

Expand Down
2 changes: 2 additions & 0 deletions src/cpp/fastdds/core/condition/WaitSetImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#ifndef _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_
#define _FASTDDS_CORE_CONDITION_WAITSETIMPL_HPP_

#include <atomic>
#include <condition_variable>
#include <mutex>

Expand Down Expand Up @@ -113,6 +114,7 @@ struct WaitSetImpl
std::condition_variable cond_;
eprosima::utilities::collections::unordered_vector<const Condition*> entries_;
bool is_waiting_ = false;
std::atomic_uint notifications_ = {1};
};

} // namespace detail
Expand Down
71 changes: 70 additions & 1 deletion test/unittest/dds/core/condition/WaitSetImplTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
// limitations under the License.

#include <algorithm>
#include <future>
#include <thread>

#include <gtest/gtest.h>

Expand All @@ -33,7 +35,7 @@ class TestCondition : public Condition
{
public:

bool trigger_value = false;
volatile bool trigger_value = false;

bool get_trigger_value() const override
{
Expand Down Expand Up @@ -197,6 +199,73 @@ TEST(WaitSetImplTests, wait)
}
}

TEST(WaitSetImplTests, fix_wait_notification_lost)
{
ConditionSeq conditions;
WaitSetImpl wait_set;

// Waiting should return the added connection after the trigger value is updated and the wait_set waken.
{
TestCondition triggered_condition;

// Expecting calls on the notifier of triggered_condition.
auto notifier = triggered_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

class AnotherTestCondition : public Condition
{
public:

bool get_trigger_value() const override
{
// Time to simulate thread context switch or something else
std::this_thread::sleep_for(std::chrono::seconds(2));
return false;
}

}
second_simulator_condition;

// Expecting calls on the notifier of second_simulator_condition.
notifier = second_simulator_condition.get_notifier();
EXPECT_CALL(*notifier, attach_to(_)).Times(1);
EXPECT_CALL(*notifier, will_be_deleted(_)).Times(1);

wait_set.attach_condition(triggered_condition);
wait_set.attach_condition(second_simulator_condition);

std::promise<void> promise;
std::future<void> future = promise.get_future();
ReturnCode_t ret = ReturnCode_t::RETCODE_ERROR;
std::thread wait_conditions([&]()
{
// Not to use `WaitSetImpl::wait` with a timeout value, because the
// `condition_variable::wait_for` could call _Predicate function again.
ret = wait_set.wait(conditions, eprosima::fastrtps::c_TimeInfinite);
promise.set_value();
});

// One second sleep to make the `wait_set.wait` check `triggered_condition` in the above thread
std::this_thread::sleep_for(std::chrono::seconds(1));
triggered_condition.trigger_value = true;
wait_set.wake_up();

// Expecting get notification after wake_up, otherwise output error within 5 seconds.
future.wait_for(std::chrono::seconds(5));
EXPECT_EQ(ReturnCode_t::RETCODE_OK, ret);
EXPECT_EQ(1u, conditions.size());
EXPECT_NE(conditions.cend(), std::find(conditions.cbegin(), conditions.cend(), &triggered_condition));

// Wake up the `wait_set` to make sure the thread exit
wait_set.wake_up();
wait_conditions.join();

wait_set.will_be_deleted(triggered_condition);
wait_set.will_be_deleted(second_simulator_condition);
}
}

int main(
int argc,
char** argv)
Expand Down

0 comments on commit a9e49cd

Please sign in to comment.