diff --git a/cmake/common/gtest.cmake b/cmake/common/gtest.cmake index 5a901480593..facc8d4ad22 100644 --- a/cmake/common/gtest.cmake +++ b/cmake/common/gtest.cmake @@ -59,6 +59,11 @@ macro(add_gtest) # Normal tests file(STRINGS ${GTEST_SOURCE_FILE} GTEST_TEST_NAMES REGEX "^([T][Y][P][E][D][_])?TEST") foreach(GTEST_TEST_NAME ${GTEST_TEST_NAMES}) + + if(GTEST_TEST_NAME MATCHES "TYPED_TEST_SUITE") + continue() + endif() + string(REGEX REPLACE ["\) \(,"] ";" GTEST_TEST_NAME ${GTEST_TEST_NAME}) list(GET GTEST_TEST_NAME 1 GTEST_GROUP_NAME) list(GET GTEST_TEST_NAME 3 GTEST_TEST_NAME) diff --git a/cmake/modules/check_shared_mutex_priority.cpp b/cmake/modules/check_shared_mutex_priority.cpp new file mode 100644 index 00000000000..7dab4e2a4e7 --- /dev/null +++ b/cmake/modules/check_shared_mutex_priority.cpp @@ -0,0 +1,69 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file check_shared_mutex_priority.cpp + * + */ + +#include +#include +#include +#include +#include +#include + +using namespace std; + +int main() +{ + shared_mutex sm; + atomic_bool mark = false; + + // take first shared lock + sm.lock_shared(); + + // signal is taken + thread exclusive([&]() + { + mark = true; + lock_guard guard(sm); + }); + + // Wait till the thread takes the lock + do + { + this_thread::sleep_for(chrono::milliseconds(100)); + } + while (!mark); + + // try take the second shared lock + bool success = sm.try_lock_shared(); + if (success) + { + sm.unlock_shared(); + cout << "PTHREAD_RWLOCK_PREFER_READER_NP" << endl; + } + else + { + cout << "PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP" << endl; + } + + // release first lock + sm.unlock_shared(); + // wait for the main thread + exclusive.join(); + + return 0; +} diff --git a/include/fastrtps/utils/ProxyPool.hpp b/include/fastrtps/utils/ProxyPool.hpp index f4d3b31c0df..6d812623d6e 100644 --- a/include/fastrtps/utils/ProxyPool.hpp +++ b/include/fastrtps/utils/ProxyPool.hpp @@ -154,6 +154,9 @@ class ProxyPool // return the resource mask_.set(idx); + + // notify the resource is free + cv_.notify_one(); } public: diff --git a/include/fastrtps/utils/shared_mutex.hpp b/include/fastrtps/utils/shared_mutex.hpp index d5dd91d2e28..d6924144eb7 100644 --- a/include/fastrtps/utils/shared_mutex.hpp +++ b/include/fastrtps/utils/shared_mutex.hpp @@ -1,5 +1,29 @@ -// Copyright Howard Hinnant 2007-2010. Distributed under the Boost -// Software License, Version 1.0. (see http://www.boost.org/LICENSE_1_0.txt) +/* + Copyright Howard Hinnant 2007-2010. Distributed under the Boost + Software License, Version 1.0. (see http://www.boost.org/LICENSE_1_0.txt) + The original implementation has been modified to support the POSIX priorities: + + PTHREAD_RWLOCK_PREFER_READER_NP + This is the default. A thread may hold multiple read + locks; that is, read locks are recursive. According to + The Single Unix Specification, the behavior is unspecified + when a reader tries to place a lock, and there is no write + lock but writers are waiting. Giving preference to the + reader, as is set by PTHREAD_RWLOCK_PREFER_READER_NP, + implies that the reader will receive the requested lock, + even if a writer is waiting. As long as there are + readers, the writer will be starved. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP + Setting the lock kind to this avoids writer starvation as + long as any read locking is not done in a recursive + fashion. + + The C++ Standard has not yet (C++20) imposed any requirements on shared_mutex implementation thus + each platform made its own choices: + Windows & Boost defaults to PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP. + Linux & Mac defaults to PTHREAD_RWLOCK_PREFER_READER_NP. + */ /** * @file shared_mutex.hpp @@ -8,32 +32,33 @@ #ifndef _UTILS_SHARED_MUTEX_HPP_ #define _UTILS_SHARED_MUTEX_HPP_ -#if defined(__has_include) && __has_include() -# include -#endif // if defined(__has_include) && __has_include() - -// Detect if the share_mutex feature is available -#if defined(__has_include) && __has_include() && !defined(__cpp_lib_shared_mutex) || \ - /* deprecated procedure if the good one is not available*/ \ - ( !(defined(__has_include) && __has_include()) && \ - !(defined(HAVE_CXX17) && HAVE_CXX17) && __cplusplus < 201703 ) - -#include -#include #include +#include +#include +#include #include +#include namespace eprosima { +namespace detail { + +// mimic POSIX Read-Write lock syntax +enum class shared_mutex_type +{ + PTHREAD_RWLOCK_PREFER_READER_NP, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP +}; -class shared_mutex +class shared_mutex_base { + +protected: + typedef std::mutex mutex_t; typedef std::condition_variable cond_t; typedef unsigned count_t; mutex_t mut_; cond_t gate1_; - cond_t gate2_; count_t state_; static const count_t write_entered_ = 1U << (sizeof(count_t) * CHAR_BIT - 1); @@ -41,40 +66,26 @@ class shared_mutex public: - shared_mutex() + shared_mutex_base() : state_(0) { } - ~shared_mutex() + ~shared_mutex_base() { std::lock_guard _(mut_); } - shared_mutex( - const shared_mutex&) = delete; - shared_mutex& operator =( - const shared_mutex&) = delete; + shared_mutex_base( + const shared_mutex_base&) = delete; + shared_mutex_base& operator =( + const shared_mutex_base&) = delete; // Exclusive ownership - void lock() - { - std::unique_lock lk(mut_); - while (state_ & write_entered_) - { - gate1_.wait(lk); - } - state_ |= write_entered_; - while (state_ & n_readers_) - { - gate2_.wait(lk); - } - } - bool try_lock() { - std::unique_lock lk(mut_); + std::lock_guard _(mut_); if (state_ == 0) { state_ = write_entered_; @@ -106,7 +117,7 @@ class shared_mutex bool try_lock_shared() { - std::unique_lock lk(mut_); + std::lock_guard _(mut_); count_t num_readers = state_ & n_readers_; if (!(state_ & write_entered_) && num_readers != n_readers_) { @@ -118,6 +129,35 @@ class shared_mutex return false; } +}; + +template +class shared_mutex; + +// original Hinnant implementation prioritizing writers + +template<> +class shared_mutex + : public shared_mutex_base +{ + cond_t gate2_; + +public: + + void lock() + { + std::unique_lock lk(mut_); + while (state_ & write_entered_) + { + gate1_.wait(lk); + } + state_ |= write_entered_; + while (state_ & n_readers_) + { + gate2_.wait(lk); + } + } + void unlock_shared() { std::lock_guard _(mut_); @@ -131,17 +171,144 @@ class shared_mutex gate2_.notify_one(); } } - else + else if (num_readers == n_readers_ - 1) { - if (num_readers == n_readers_ - 1) - { - gate1_.notify_one(); - } + gate1_.notify_one(); } } }; +// implementation not locking readers on behalf of writers + +template<> +class shared_mutex + : public shared_mutex_base +{ + count_t writer_waiting_ = 0; + +public: + + void lock() + { + std::unique_lock lk(mut_); + ++writer_waiting_; + while (state_ & n_readers_ || state_ & write_entered_) + { + gate1_.wait(lk); + } + state_ |= write_entered_; + --writer_waiting_; + } + + void unlock_shared() + { + std::lock_guard _(mut_); + count_t num_readers = (state_ & n_readers_) - 1; + state_ &= ~n_readers_; + state_ |= num_readers; + + if ((writer_waiting_ && num_readers == 0) + || (num_readers == n_readers_ - 1)) + { + gate1_.notify_one(); + } + } + +}; + +// Debugger wrapper class that provides insight +template +class debug_wrapper : public sm +{ + std::mutex wm_; + // Identity of the exclusive owner if any + std::thread::id exclusive_owner_ = {}; + // key_type thread_id, mapped_type number of locks + std::map shared_owners_; + +public: + + ~debug_wrapper() + { + std::lock_guard _(wm_); + } + + // Exclusive ownership + + void lock() + { + sm::lock(); + std::lock_guard _(wm_); + exclusive_owner_ = std::this_thread::get_id(); + } + + bool try_lock() + { + bool res = sm::try_lock(); + std::lock_guard _(wm_); + if (res) + { + exclusive_owner_ = std::this_thread::get_id(); + } + return res; + } + + void unlock() + { + sm::unlock(); + std::lock_guard _(wm_); + exclusive_owner_ = std::thread::id(); + } + + // Shared ownership + + void lock_shared() + { + sm::lock_shared(); + std::lock_guard _(wm_); + ++shared_owners_[std::this_thread::get_id()]; + } + + bool try_lock_shared() + { + bool res = sm::try_lock_shared(); + std::lock_guard _(wm_); + if (res) + { + ++shared_owners_[std::this_thread::get_id()]; + } + return res; + } + + void unlock_shared() + { + sm::unlock_shared(); + std::lock_guard _(wm_); + auto owner = shared_owners_.find(std::this_thread::get_id()); + if ( owner != shared_owners_.end() && 0 == --owner->second ) + { + shared_owners_.erase(owner); + } + } + +}; + +} // namespace detail +} // namespace eprosima + +#if defined(__has_include) && __has_include() +# include +#endif // if defined(__has_include) && __has_include() + +// Detect if the shared_lock feature is available +#if defined(__has_include) && __has_include() && !defined(__cpp_lib_shared_mutex) || \ + /* deprecated procedure if the good one is not available*/ \ + ( !(defined(__has_include) && __has_include()) && \ + !(defined(HAVE_CXX17) && HAVE_CXX17) && __cplusplus < 201703 ) + +namespace eprosima { + template class shared_lock { @@ -345,6 +512,132 @@ shared_lock::try_lock() return owns_; } +template +template +bool +shared_lock::try_lock_until( + const std::chrono::time_point& abs_time) +{ + if (m_ == nullptr) + { + throw std::system_error(std::error_code(EPERM, std::system_category()), + "shared_lock::try_lock_until: references null mutex"); + } + if (owns_) + { + throw std::system_error(std::error_code(EDEADLK, std::system_category()), + "shared_lock::try_lock_until: already locked"); + } + owns_ = m_->try_lock_shared_until(abs_time); + return owns_; +} + +template +void +shared_lock::unlock() +{ + if (!owns_) + { + throw std::system_error(std::error_code(EPERM, std::system_category()), + "shared_lock::unlock: not locked"); + } + m_->unlock_shared(); + owns_ = false; +} + +template +inline +void +swap( + shared_lock& x, + shared_lock& y) +{ + x.swap(y); +} + +} //namespace eprosima + +#else // fallback to STL + +#include + +namespace eprosima { + +using std::shared_lock; +using std::swap; + +} //namespace eprosima + +#endif // shared_lock selection + +#ifndef USE_THIRDPARTY_SHARED_MUTEX +# if defined(_MSC_VER) && _MSVC_LANG < 202302L +# pragma message("warning: USE_THIRDPARTY_SHARED_MUTEX not defined. By default use framework version.") +# else +# warning "USE_THIRDPARTY_SHARED_MUTEX not defined. By default use framework version." +# endif // if defined(_MSC_VER) && _MSVC_LANG < 202302L +# define USE_THIRDPARTY_SHARED_MUTEX 0 +#endif // ifndef USE_THIRDPARTY_SHARED_MUTEX + +// Detect if the share_mutex feature is available or if the user forces it +#if defined(__has_include) && __has_include() && !defined(__cpp_lib_shared_mutex) || \ + /* allow users to ignore shared_mutex framework implementation */ \ + (~USE_THIRDPARTY_SHARED_MUTEX + 1) || \ + /* deprecated procedure if the good one is not available*/ \ + ( !(defined(__has_include) && __has_include()) && \ + !(defined(HAVE_CXX17) && HAVE_CXX17) && __cplusplus < 201703 ) + +/* + Fast-DDS defaults to PTHREAD_RWLOCK_PREFER_READER_NP for two main reasons: + + - It allows reader side recursiveness. If we have two threads (T1, T2) and + called S a shared lock and E and exclusive one. + + T1: S -> S + T2: E + + PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not + influenced by the E locks. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock. before T1 + takes S twice. That happens because: + + T1's second S will wait for E (writer is prioritized) + + E will wait for T1's first S lock (writer needs atomic access) + + T1's first S cannot unlock because is blocked in the second S. + + Thus, shared_mutex is + non-recursive. + + - It prevents ABBA deadlocks with other mutexes. If we have three threads + (Ti) and P is an ordinary mutex: + + T1: P -> S + T2: S -> P + T3: E + + PTHREAD_RWLOCK_PREFER_READER_NP will never deadlock. The S locks are not + influenced by the E locks. Starvation issues can be managed in the user + code. + + PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP will deadlock if T3 takes E + before T1 takes S. That happens because: + + T1's S will wait for E (writer is prioritized) + + E will wait for T2's S lock (writer needs atomic access) + + T2's S cannot unlock because is blocked in P (owned by T1). + + Thus, shared_mutex must be + managed like an ordinary mutex in deadlock sense. + */ + +namespace eprosima { + +#ifdef NDEBUG +using shared_mutex = detail::shared_mutex; +#else +using shared_mutex = + detail::debug_wrapper>; +#endif // NDEBUG + } //namespace eprosima #else // fallback to STL @@ -353,12 +646,10 @@ shared_lock::try_lock() namespace eprosima { -using shared_mutex = std::shared_mutex; -template< class Mutex > -using shared_lock = std::shared_lock; +using std::shared_mutex; } //namespace eprosima -#endif // if (__cplusplus < 201402) || (defined(_MSC_VER) && _MSC_VER < 1900 ) +#endif // shared_mutex selection #endif // _UTILS_SHARED_MUTEX_HPP_ diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index e44c9f2e798..a5213c25954 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -364,9 +364,6 @@ else() set(HAVE_STRICT_REALTIME 0) endif() -configure_file(${PROJECT_SOURCE_DIR}/include/${PROJECT_NAME}/config.h.in - ${PROJECT_BINARY_DIR}/include/${PROJECT_NAME}/config.h) - if(NOT ANDROID) find_package(Threads REQUIRED) endif() @@ -379,6 +376,58 @@ if(APPLE) set(CMAKE_INSTALL_RPATH_USE_LINK_PATH FALSE) endif() +# Find out if libatomic link is required in this platform +find_package(Atomic MODULE) + +# Check if the shared_mutex provided by the platform STL library +# prioritizes writes + +# try_run cannot manage targets yet +get_target_property(CMAKE_ATOMIC_LIB eProsima_atomic INTERFACE_LINK_LIBRARIES) +if(NOT CMAKE_ATOMIC_LIB) + set(CMAKE_ATOMIC_LIB) +endif() + +try_run(SM_RUN_RESULT SM_COMPILE_RESULT + "${CMAKE_CURRENT_BINARY_DIR}/shmtest" + "${CMAKE_SOURCE_DIR}/cmake/modules/check_shared_mutex_priority.cpp" + LINK_LIBRARIES ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_ATOMIC_LIB} + RUN_OUTPUT_VARIABLE SM_RUN_OUTPUT) + +if(SM_COMPILE_RESULT AND NOT SM_RUN_RESULT) + string(STRIP ${SM_RUN_OUTPUT} SM_RUN_OUTPUT) + message(STATUS "Framework's shared_mutex is ${SM_RUN_OUTPUT}") +endif() + +if(SM_RUN_OUTPUT STREQUAL "PTHREAD_RWLOCK_PREFER_READER_NP") + set(USER_CAN_CHOOSE_SHARED_MEMORY_THIRDPARTY ON) +else() + message(STATUS "Forcing third party shared_mutex") + set(USE_THIRDPARTY_SHARED_MUTEX ON) +endif() + +cmake_dependent_option( + USE_THIRDPARTY_SHARED_MUTEX [=[ +Forces the use of a Boost-based shared_mutex implementation +instead of the framework one. Useful to cope with issues on +framework implementations like misguided sanitizer reports. +This implementation will be used by default on frameworks +lacking the shared_mutex feature like those not fulfilling +C++17. +]=] OFF + "USER_CAN_CHOOSE_SHARED_MEMORY_THIRDPARTY" + ON) + +unset(USER_CAN_CHOOSE_SHARED_MEMORY_THIRDPARTY) +unset(SM_RUN_RESULT) +unset(SM_COMPILE_RESULT) +unset(SM_RUN_OUTPUT) +unset(CMAKE_ATOMIC_LIB) + +# Generate the proper configure file +configure_file(${PROJECT_SOURCE_DIR}/include/${PROJECT_NAME}/config.h.in + ${PROJECT_BINARY_DIR}/include/${PROJECT_NAME}/config.h) + #Create library add_library(${PROJECT_NAME} ${${PROJECT_NAME}_source_files}) set_target_properties(${PROJECT_NAME} PROPERTIES VERSION ${PROJECT_VERSION}) @@ -426,9 +475,6 @@ else() set(PRIVACY "PUBLIC") endif() -# Find out if libatomic link is required in this platform -find_package(Atomic MODULE) - # Link library to external libraries. target_link_libraries(${PROJECT_NAME} ${PRIVACY} fastcdr foonathan_memory ${CMAKE_THREAD_LIBS_INIT} ${CMAKE_DL_LIBS} diff --git a/src/cpp/rtps/history/TopicPayloadPool.cpp b/src/cpp/rtps/history/TopicPayloadPool.cpp index 014d57f80f4..d337bcc7dcd 100644 --- a/src/cpp/rtps/history/TopicPayloadPool.cpp +++ b/src/cpp/rtps/history/TopicPayloadPool.cpp @@ -189,20 +189,18 @@ TopicPayloadPool::PayloadNode* TopicPayloadPool::allocate( TopicPayloadPool::PayloadNode* TopicPayloadPool::do_allocate( uint32_t size) { - PayloadNode* payload = nullptr; + PayloadNode* payload = new (std::nothrow) PayloadNode(size); - try + if (payload != nullptr) { - payload = new PayloadNode(size); + payload->data_index(static_cast(all_payloads_.size())); + all_payloads_.push_back(payload); } - catch (std::bad_alloc& exception) + else { - logWarning(RTPS_HISTORY, "Failure to create a new payload " << exception.what()); - return nullptr; + logWarning(RTPS_HISTORY, "Failure to create a new payload "); } - payload->data_index(static_cast(all_payloads_.size())); - all_payloads_.push_back(payload); return payload; } @@ -253,7 +251,11 @@ void TopicPayloadPool::reserve ( for (size_t i = all_payloads_.size(); i < min_num_payloads; ++i) { PayloadNode* payload = do_allocate(size); - free_payloads_.push_back(payload); + + if (payload != nullptr) + { + free_payloads_.push_back(payload); + } } } diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp index 099c2e47211..e6671509396 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.cpp +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.cpp @@ -67,8 +67,6 @@ using UDPv4TransportDescriptor = fastdds::rtps::UDPv4TransportDescriptor; using TCPTransportDescriptor = fastdds::rtps::TCPTransportDescriptor; using SharedMemTransportDescriptor = fastdds::rtps::SharedMemTransportDescriptor; -thread_local RTPSParticipantImpl* RTPSParticipantImpl::collections_mutex_owner_ = nullptr; - static EntityId_t TrustedWriter( const EntityId_t& reader) { diff --git a/src/cpp/rtps/participant/RTPSParticipantImpl.h b/src/cpp/rtps/participant/RTPSParticipantImpl.h index cfeba1f424b..10677a9f4fb 100644 --- a/src/cpp/rtps/participant/RTPSParticipantImpl.h +++ b/src/cpp/rtps/participant/RTPSParticipantImpl.h @@ -534,8 +534,6 @@ class RTPSParticipantImpl uint32_t IdCounter; //! Mutex to safely access endpoints collections mutable shared_mutex endpoints_list_mutex; - //! This member avoids shared_mutex reentrancy by tracking last participant into traversing the endpoints collections - static thread_local RTPSParticipantImpl* collections_mutex_owner_; //!Writer List. std::vector m_allWriterList; //!Reader List @@ -931,15 +929,7 @@ class RTPSParticipantImpl Functor f) { // check if we are reentrying - shared_lock may_lock; - RTPSParticipantImpl* previous_owner = collections_mutex_owner_; - - if (collections_mutex_owner_ != this) - { - shared_lock lock(endpoints_list_mutex); - may_lock = std::move(lock); - collections_mutex_owner_ = this; - } + shared_lock _(endpoints_list_mutex); // traverse the list for ( RTPSWriter* pw : m_userWriterList) @@ -950,9 +940,6 @@ class RTPSParticipantImpl } } - // restore tls former value - std::swap(collections_mutex_owner_, previous_owner); - return f; } @@ -965,15 +952,7 @@ class RTPSParticipantImpl Functor f) { // check if we are reentrying - shared_lock may_lock; - RTPSParticipantImpl* previous_owner = collections_mutex_owner_; - - if (collections_mutex_owner_ != this) - { - shared_lock lock(endpoints_list_mutex); - may_lock = std::move(lock); - collections_mutex_owner_ = this; - } + shared_lock _(endpoints_list_mutex); for ( RTPSReader* pr : m_userReaderList) { @@ -983,9 +962,6 @@ class RTPSParticipantImpl } } - // restore tls former value - std::swap(collections_mutex_owner_, previous_owner); - return f; } diff --git a/test/blackbox/common/DDSBlackboxTestsBasic.cpp b/test/blackbox/common/DDSBlackboxTestsBasic.cpp index 887aab4e08a..f9e6110a383 100644 --- a/test/blackbox/common/DDSBlackboxTestsBasic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsBasic.cpp @@ -184,6 +184,93 @@ TEST(DDSBasic, MultithreadedPublisherCreation) ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); } +TEST(DDSBasic, MultithreadedReaderCreationDoesNotDeadlock) +{ + // Get factory + DomainParticipantFactory* factory = DomainParticipantFactory::get_instance(); + ASSERT_NE(nullptr, factory); + + // Create participant + DomainParticipant* participant = factory->create_participant((uint32_t)GET_PID() % 230, PARTICIPANT_QOS_DEFAULT); + ASSERT_NE(nullptr, participant); + + // Register type + TypeSupport type_support; + type_support.reset(new FixedSizedPubSubType()); + type_support.register_type(participant); + ASSERT_NE(nullptr, type_support); + + // Create subscriber + Subscriber* subscriber = participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT); + ASSERT_NE(nullptr, subscriber); + + // Create publisher + Publisher* publisher = participant->create_publisher(PUBLISHER_QOS_DEFAULT); + ASSERT_NE(nullptr, publisher); + + // Create Topic + Topic* topic = participant->create_topic(TEST_TOPIC_NAME, type_support.get_type_name(), TOPIC_QOS_DEFAULT); + ASSERT_NE(nullptr, topic); + + // Set QoS + DataSharingQosPolicy dsp; + dsp.off(); + + DataWriterQos dw_qos; + DataReaderQos dr_qos; + dw_qos.data_sharing(dsp); + dr_qos.data_sharing(dsp); + + // Create DataWriter + DataWriter* writer = publisher->create_datawriter(topic, dw_qos); + ASSERT_NE(nullptr, writer); + + std::mutex mtx; + std::condition_variable cv; + bool should_finish = false; + + auto thread_run = [subscriber, topic, &mtx, &cv, &should_finish, &dr_qos]() + { + // Create reader + DataReader* reader = subscriber->create_datareader(topic, dr_qos); + ASSERT_NE(nullptr, reader); + + // Wait for test completion request + std::unique_lock lock(mtx); + cv.wait(lock, [&should_finish]() + { + return should_finish; + }); + + ASSERT_EQ(ReturnCode_t::RETCODE_OK, subscriber->delete_datareader(reader)); + }; + + { + std::vector threads; + for (size_t i = 0; i < 10; ++i) + { + threads.push_back(std::thread(thread_run)); + } + + { + std::lock_guard guard(mtx); + should_finish = true; + cv.notify_all(); + } + + for (std::thread& thr : threads) + { + thr.join(); + } + } + + ASSERT_EQ(ReturnCode_t::RETCODE_OK, publisher->delete_datawriter(writer)); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_publisher(publisher)); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_subscriber(subscriber)); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant->delete_topic(topic)); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, factory->delete_participant(participant)); +} + } // namespace dds } // namespace fastdds } // namespace eprosima diff --git a/test/unittest/utils/CMakeLists.txt b/test/unittest/utils/CMakeLists.txt index f56191c71bb..7e1d5aede31 100644 --- a/test/unittest/utils/CMakeLists.txt +++ b/test/unittest/utils/CMakeLists.txt @@ -132,6 +132,12 @@ target_include_directories(SystemInfoTests PRIVATE target_link_libraries(SystemInfoTests GTest::gtest) add_gtest(SystemInfoTests SOURCES ${SYSTEMINFOTESTS_SOURCE}) +add_executable(SharedMutexTests shared_mutex_tests.cpp) +target_compile_definitions(SharedMutexTests PUBLIC USE_THIRDPARTY_SHARED_MUTEX=1) +target_include_directories(SharedMutexTests PUBLIC ${PROJECT_SOURCE_DIR}/include) +target_link_libraries(SharedMutexTests PUBLIC GTest::gtest) +add_gtest(SharedMutexTests SOURCES shared_mutex_tests.cpp) + ############################################################################### # Necessary files ############################################################################### diff --git a/test/unittest/utils/shared_mutex_tests.cpp b/test/unittest/utils/shared_mutex_tests.cpp new file mode 100644 index 00000000000..eba86e73e46 --- /dev/null +++ b/test/unittest/utils/shared_mutex_tests.cpp @@ -0,0 +1,279 @@ +// Copyright (c) Microsoft Corporation. +// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception +// Excerpts from: STL/main/tests/std/tests/Dev11_1150223_shared_mutex/test.cpp + +#include +#include +#include +#include +#include + +#include +#include + +using namespace std; +using namespace std::chrono; +using namespace eprosima; + +template +detail::shared_mutex_type get_mutex_priority( + const Mutex& m); + +template +detail::shared_mutex_type get_mutex_priority( + const detail::shared_mutex&) +{ + return mt; +} + +template +detail::shared_mutex_type get_mutex_priority( + const detail::debug_wrapper>&) +{ + return mt; +} + +template +class SharedMutexTest : public testing::Test +{ +public: + + void join_and_clear( + vector& threads) + { + for (auto& t : threads) + { + t.join(); + } + + threads.clear(); + } + +}; + +using SharedMutexTypes = ::testing::Types< + detail::shared_mutex, + detail::shared_mutex, + detail::debug_wrapper>, + detail::debug_wrapper>>; + +TYPED_TEST_SUITE(SharedMutexTest, SharedMutexTypes, ); + +TYPED_TEST(SharedMutexTest, test_one_writer) +{ + // One simultaneous writer. + atomic atom(-1); + TypeParam mut; + vector threads; + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + while (atom == -1) + { + } + lock_guard ExclusiveLock(mut); + const int val = ++atom; + this_thread::sleep_for(milliseconds(25)); // Not a timing assumption. + ASSERT_EQ(atom, val); + }); + } + + ASSERT_EQ(atom.exchange(0), -1); + this->join_and_clear(threads); + ASSERT_EQ(atom, 4); +} + +TYPED_TEST(SharedMutexTest, test_multiple_readers) +{ + // Many simultaneous readers. + atomic atom(-1); + TypeParam mut; + vector threads; + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + while (atom == -1) + { + } + shared_lock SharedLock(mut); + ++atom; + while (atom < 4) + { + } + }); + } + + ASSERT_EQ(atom.exchange(0), -1); + this->join_and_clear(threads); + ASSERT_EQ(atom, 4); +} + +TYPED_TEST(SharedMutexTest, test_writer_blocking_readers) +{ + // One writer blocking many readers. + atomic atom(-4); + TypeParam mut; + vector threads; + + threads.emplace_back([&atom, &mut] + { + while (atom < 0) + { + } + lock_guard ExclusiveLock(mut); + ASSERT_EQ(atom.exchange(1000), 0); + this_thread::sleep_for(milliseconds(50)); // Not a timing assumption. + ASSERT_EQ(atom.exchange(1729), 1000); + }); + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + ++atom; + while (atom < 1000) + { + } + shared_lock SharedLock(mut); + ASSERT_EQ(atom, 1729); + }); + } + + this->join_and_clear(threads); + ASSERT_EQ(atom, 1729); +} + +TYPED_TEST(SharedMutexTest, test_readers_blocking_writer) +{ + // Many readers blocking one writer. + atomic atom(-5); + TypeParam mut; + vector threads; + + for (int i = 0; i < 4; ++i) + { + threads.emplace_back([&atom, &mut] + { + shared_lock SharedLock(mut); + ++atom; + while (atom < 0) + { + } + this_thread::sleep_for(milliseconds(50)); // Not a timing assumption. + atom += 10; + }); + } + + threads.emplace_back([&atom, &mut] + { + ++atom; + while (atom < 0) + { + } + lock_guard ExclusiveLock(mut); + ASSERT_EQ(atom, 40); + }); + + this->join_and_clear(threads); + ASSERT_EQ(atom, 40); +} + +TYPED_TEST(SharedMutexTest, test_try_lock_and_try_lock_shared) +{ + // Test try_lock() and try_lock_shared(). + TypeParam mut; + + { + unique_lock MainExclusive(mut, try_to_lock); + ASSERT_TRUE(MainExclusive.owns_lock()); + + thread t([&mut] + { + { + unique_lock ExclusiveLock(mut, try_to_lock); + ASSERT_FALSE(ExclusiveLock.owns_lock()); + } + + { + shared_lock SharedLock(mut, try_to_lock); + ASSERT_FALSE(SharedLock.owns_lock()); + } + }); + + t.join(); + } + + { + shared_lock MainShared(mut, try_to_lock); + ASSERT_TRUE(MainShared.owns_lock()); + + thread t([&mut] + { + { + unique_lock ExclusiveLock(mut, try_to_lock); + ASSERT_FALSE(ExclusiveLock.owns_lock()); + } + + { + shared_lock SharedLock(mut, try_to_lock); + ASSERT_TRUE(SharedLock.owns_lock()); + } + }); + + t.join(); + } +} + +TYPED_TEST(SharedMutexTest, test_mutex_priority) +{ + TypeParam sm; + atomic_bool mark{false}; + + // take first shared lock + sm.lock_shared(); + + // signal is taken + thread exclusive([&] + { + mark.store(true); + lock_guard guard(sm); + }); + + // Wait till the thread takes the lock + do + { + this_thread::sleep_for(chrono::milliseconds(100)); + } + while (!mark); + + // try take the second shared lock + bool success = sm.try_lock_shared(); + if (success) + { + sm.unlock_shared(); + ASSERT_EQ(get_mutex_priority(sm), + detail::shared_mutex_type::PTHREAD_RWLOCK_PREFER_READER_NP); + } + else + { + ASSERT_EQ(get_mutex_priority(sm), + detail::shared_mutex_type::PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); + } + + // release first lock + sm.unlock_shared(); + // wait for the main thread + exclusive.join(); +} + +int main( + int argc, + char** argv) +{ + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}