Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert changes from #3046 #3083

Merged
merged 1 commit into from
Nov 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions include/fastdds/rtps/resources/TimedEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,6 @@ class TimedEvent
void restart_timer(
const std::chrono::steady_clock::time_point& timeout);

/*!
* @brief Unregisters the event, sets its state to INACTIVE, and re-registers it.
* It may be seen as a blocking version of \c cancel_timer
*/
void recreate_timer();

/**
* Update event interval.
* When updating the interval, the timer is not restarted and the new interval will only be used the next time you call restart_timer().
Expand Down
23 changes: 1 addition & 22 deletions src/cpp/rtps/reader/WriterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ WriterProxy::WriterProxy(
, locators_entry_(loc_alloc.max_unicast_locators, loc_alloc.max_multicast_locators)
, is_datasharing_writer_(false)
, received_at_least_one_heartbeat_(false)
, state_(StateCode::STOPPED)
{
//Create Events
ResourceEvent& event_manager = reader_->getRTPSParticipant()->getEventResource();
Expand Down Expand Up @@ -145,7 +144,6 @@ void WriterProxy::start(
filter_remote_locators(locators_entry_,
reader_->getAttributes().external_unicast_locators, reader_->getAttributes().ignore_non_matching_locators);
is_datasharing_writer_ = is_datasharing;
state_.store(StateCode::IDLE);
initial_acknack_->restart_timer();
loaded_from_storage(initial_sequence);
received_at_least_one_heartbeat_ = false;
Expand All @@ -170,16 +168,7 @@ void WriterProxy::update(

void WriterProxy::stop()
{
StateCode prev_code;
if ((prev_code = state_.exchange(StateCode::STOPPED)) == StateCode::BUSY)
{
// Initial ack_nack being performed, wait for it to finish
initial_acknack_->recreate_timer();
}
else
{
initial_acknack_->cancel_timer();
}
initial_acknack_->cancel_timer();
heartbeat_response_->cancel_timer();

clear();
Expand Down Expand Up @@ -488,13 +477,6 @@ bool WriterProxy::perform_initial_ack_nack()
{
bool ret_value = false;

StateCode expected = StateCode::IDLE;
if (!state_.compare_exchange_strong(expected, StateCode::BUSY))
{
// Stopped from another thread -> abort
return ret_value;
}

if (!is_datasharing_writer_)
{
// Send initial NACK.
Expand All @@ -518,9 +500,6 @@ bool WriterProxy::perform_initial_ack_nack()
}
}

expected = StateCode::BUSY;
state_.compare_exchange_strong(expected, StateCode::IDLE);

return ret_value;
}

Expand Down
9 changes: 0 additions & 9 deletions src/cpp/rtps/reader/WriterProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,6 @@ class WriterProxy : public RTPSMessageSenderInterface

private:

enum StateCode
{
IDLE = 0, //! Writer Proxy is not performing any critical operations.
BUSY, //! Writer Proxy is performing a critical operation. Some actions (e.g. stop) should wait for its completion.
STOPPED, //! Writer Proxy has been requested to \c stop.
};

/**
* Set initial value for last acked sequence number.
* @param[in] seq_num last acked sequence number.
Expand Down Expand Up @@ -415,8 +408,6 @@ class WriterProxy : public RTPSMessageSenderInterface
bool is_datasharing_writer_;
//! Wether at least one heartbeat was recevied.
bool received_at_least_one_heartbeat_;
//! Current state of this Writer Proxy
std::atomic<StateCode> state_;

using ChangeIterator = decltype(changes_received_)::iterator;

Expand Down
7 changes: 0 additions & 7 deletions src/cpp/rtps/resources/TimedEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ void TimedEvent::restart_timer(
}
}

void TimedEvent::recreate_timer()
{
service_.unregister_timer(impl_);
impl_->go_cancel();
service_.register_timer(impl_);
}

bool TimedEvent::update_interval(
const Duration_t& inter)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class TimedEvent
MOCK_METHOD0(restart_timer, void());
MOCK_METHOD1(restart_timer, void(const std::chrono::steady_clock::time_point& timeout));
MOCK_METHOD0(cancel_timer, void());
MOCK_METHOD0(recreate_timer, void());
MOCK_METHOD1(update_interval, bool(const Duration_t&));
MOCK_METHOD1(update_interval_millisec, bool(double));
};
Expand Down
37 changes: 0 additions & 37 deletions test/unittest/rtps/resources/timedevent/TimedEventTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,43 +110,6 @@ TEST(TimedEvent, Event_RestartEvents)
ASSERT_EQ(successed, 1);
}

/*!
* @fn TEST(TimedEvent, Event_RecreateEvents)
* @brief This test checks the correct behavior of recreating events.
* First it is checked that recreating and restarting an event multiple times is possible.
* The event is then recreated (blocking cancel), and an object shared with the callback is modified in the main thread.
* A data race would be reported by thread sanitizer if cancelling (\c cancel_timer) the event instead.
*/
TEST(TimedEvent, Event_RecreateEvents)
{
int num = 0;
auto callback = [&num]() -> void
{
std::this_thread::sleep_for(std::chrono::milliseconds(1));
num++;
};

MockEvent event(*env->service_, 100, false, callback);

for (int i = 0; i < 10; ++i)
{
event.event().recreate_timer();
event.event().restart_timer();
event.wait();
}

// Recreate timer (blocking cancel) and modify object shared with callback
// A data race would be reported by thread sanitizer if using cancel_timer instead
event.event().recreate_timer();
num = 10;

ASSERT_FALSE(event.wait(120));

int successed = event.successed_.load(std::memory_order_relaxed);

ASSERT_EQ(successed, 10);
}

/*!
* @fn TEST(TimedEvent, EventOnSuccessAutoDestruc_QuickCancelEvents)
* @brief This test checks the event is not destroyed when it is canceled.
Expand Down
31 changes: 7 additions & 24 deletions test/unittest/rtps/resources/timedevent/mock/MockEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,10 @@ using namespace eprosima::fastrtps::rtps;
MockEvent::MockEvent(
eprosima::fastrtps::rtps::ResourceEvent& service,
double milliseconds,
bool autorestart,
std::function<void()> inner_callback)
bool autorestart)
: successed_(0)
, sem_count_(0)
, autorestart_(autorestart)
, inner_callback_(inner_callback)
, event_(service, std::bind(&MockEvent::callback, this), milliseconds)
{
}
Expand All @@ -39,7 +37,7 @@ bool MockEvent::callback()

successed_.fetch_add(1, std::memory_order_relaxed);

if (autorestart_)
if(autorestart_)
{
restart = true;
}
Expand All @@ -49,22 +47,14 @@ bool MockEvent::callback()
sem_mutex_.unlock();
sem_cond_.notify_one();

if (inner_callback_)
{
inner_callback_();
}

return restart;
}

void MockEvent::wait()
{
std::unique_lock<std::mutex> lock(sem_mutex_);

sem_cond_.wait(lock, [&]() -> bool
{
return sem_count_ != 0;
} );
sem_cond_.wait(lock, [&]() -> bool { return sem_count_ != 0; } );

--sem_count_;
}
Expand All @@ -75,24 +65,17 @@ void MockEvent::wait_success()

while (successed_.load(std::memory_order_relaxed) == 0)
{
sem_cond_.wait(lock, [&]() -> bool
{
return sem_count_ != 0;
} );
sem_cond_.wait(lock, [&]() -> bool { return sem_count_ != 0; } );
--sem_count_;
}
}

bool MockEvent::wait(
unsigned int milliseconds)
bool MockEvent::wait(unsigned int milliseconds)
{
std::unique_lock<std::mutex> lock(sem_mutex_);

if (!sem_cond_.wait_for(lock, std::chrono::milliseconds(milliseconds),
[&]() -> bool
{
return sem_count_ != 0;
} ))
if(!sem_cond_.wait_for(lock, std::chrono::milliseconds(milliseconds),
[&]() -> bool { return sem_count_ != 0; } ))
{
return false;
}
Expand Down
42 changes: 18 additions & 24 deletions test/unittest/rtps/resources/timedevent/mock/MockEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,34 @@

class MockEvent
{
public:
public:

MockEvent(
eprosima::fastrtps::rtps::ResourceEvent& service,
double milliseconds,
bool autorestart,
std::function<void()> inner_callback = {});
MockEvent(
eprosima::fastrtps::rtps::ResourceEvent& service,
double milliseconds,
bool autorestart);

virtual ~MockEvent();
virtual ~MockEvent();

eprosima::fastrtps::rtps::TimedEvent& event()
{
return event_;
}
eprosima::fastrtps::rtps::TimedEvent& event() { return event_; }

bool callback();
bool callback();

void wait();
void wait();

void wait_success();
void wait_success();

bool wait(
unsigned int milliseconds);
bool wait(unsigned int milliseconds);

std::atomic<int> successed_;
std::atomic<int> successed_;

private:
private:

int sem_count_;
std::mutex sem_mutex_;
std::condition_variable sem_cond_;
bool autorestart_;
std::function<void()> inner_callback_;
eprosima::fastrtps::rtps::TimedEvent event_;
int sem_count_;
std::mutex sem_mutex_;
std::condition_variable sem_cond_;
bool autorestart_;
eprosima::fastrtps::rtps::TimedEvent event_;
};

#endif // _TEST_RTPS_RESOURCES_TIMEDEVENT_MOCKEVENT_H_