diff --git a/include/fastrtps/types/DynamicLoanableSequence.hpp b/include/fastrtps/types/DynamicLoanableSequence.hpp new file mode 100644 index 00000000000..b467c90cdf6 --- /dev/null +++ b/include/fastrtps/types/DynamicLoanableSequence.hpp @@ -0,0 +1,184 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_ +#define _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_ + +#include +#include + +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +/** + * @brief LoanableSequence specialization for DynamicData. + * + * This class provides a sequence container for handling loanable collections + * of DynamicData. + * + * @tparam _NonConstEnabler to enable data access through [] operator on non-const sequences. + */ +template +class LoanableSequence + : public LoanableTypedCollection +{ +public: + + /// Type for the size of the sequence. + using size_type = LoanableCollection::size_type; + + /// Type for the elements in the sequence. + using element_type = LoanableCollection::element_type; + + /** + * @brief Construct a LoanableSequence with a specified dynamic type. + * + * @param[in] dyn_type Pointer to the DynamicType. + */ + LoanableSequence( + fastrtps::types::DynamicType_ptr dyn_type) + : dynamic_type_support_(new fastrtps::types::DynamicPubSubType(dyn_type)) + { + } + + /** + * @brief Construct a LoanableSequence with a specified maximum size. + * + * @param[in] max Maximum size of the sequence. + */ + LoanableSequence( + size_type max) + { + if (max <= 0) + { + return; + } + + resize(max); + } + + /** + * @brief Destructor for LoanableSequence. + */ + ~LoanableSequence() + { + if (elements_ && !has_ownership_) + { + logWarning(SUBSCRIBER, "Sequence destroyed with active loan"); + return; + } + + release(); + } + + /// Deleted copy constructor for LoanableSequence. + LoanableSequence( + const LoanableSequence& other) = delete; + + /// Deleted copy assignment operator for LoanableSequence. + LoanableSequence& operator =( + const LoanableSequence& other) = delete; + + /** + * @brief Move constructor for LoanableSequence. + * + * @param[in] other The other LoanableSequence to move from. + */ + LoanableSequence( + LoanableSequence&&) = default; + + /** + * @brief Move assignment operator for LoanableSequence. + * + * @param[in] other The other LoanableSequence to move from. + * + * @return A reference to this LoanableSequence. + */ + LoanableSequence& operator =( + LoanableSequence&&) = default; + +protected: + + using LoanableCollection::maximum_; + using LoanableCollection::length_; + using LoanableCollection::elements_; + using LoanableCollection::has_ownership_; + +private: + + /** + * @brief Resize the sequence to a new maximum size. + * + * @param[in] maximum The new maximum size. + */ + void resize( + size_type maximum) override + { + assert(has_ownership_); + + // Resize collection and get new pointer + data_.reserve(maximum); + data_.resize(maximum); + elements_ = reinterpret_cast(data_.data()); + + // Allocate individual elements + while (maximum_ < maximum) + { + data_[maximum_++] = static_cast(dynamic_type_support_->createData()); + } + } + + /** + * @brief Release all elements and clear the sequence. + */ + void release() + { + if (has_ownership_ && elements_) + { + for (size_type n = 0; n < maximum_; ++n) + { + fastrtps::types::DynamicData* elem = data_[n]; + dynamic_type_support_->deleteData(elem); + } + std::vector().swap(data_); + } + + maximum_ = 0u; + length_ = 0u; + elements_ = nullptr; + has_ownership_ = true; + } + + /// Container for holding the DynamicData elements. + std::vector data_; + + /// Pointer to the DynamicPubSubType type support. + std::unique_ptr dynamic_type_support_; +}; + +/// Alias for LoanableSequence with DynamicData and true_type. +using DynamicLoanableSequence = LoanableSequence; + +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_ diff --git a/include/fastrtps/types/DynamicPubSubType.h b/include/fastrtps/types/DynamicPubSubType.h index f0d2d1b0a74..98cb02f18c0 100644 --- a/include/fastrtps/types/DynamicPubSubType.h +++ b/include/fastrtps/types/DynamicPubSubType.h @@ -15,10 +15,10 @@ #ifndef TYPES_DYNAMIC_PUB_SUB_TYPE_H #define TYPES_DYNAMIC_PUB_SUB_TYPE_H -#include #include +#include #include -#include +#include #include namespace eprosima { @@ -37,6 +37,8 @@ class DynamicPubSubType : public eprosima::fastdds::dds::TopicDataType public: + typedef DynamicData type; + RTPS_DllAPI DynamicPubSubType(); RTPS_DllAPI DynamicPubSubType( diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index 6f66a18dd8e..fbda6027ec1 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -63,6 +63,7 @@ set(${PROJECT_NAME}_source_files rtps/DataSharing/DataSharingListener.cpp rtps/DataSharing/DataSharingNotification.cpp rtps/reader/WriterProxy.cpp + rtps/reader/reader_utils.cpp rtps/reader/StatefulReader.cpp rtps/reader/StatelessReader.cpp rtps/reader/RTPSReader.cpp diff --git a/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp b/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp index 2cac63dd551..5dea0d2f4dc 100644 --- a/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp +++ b/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp @@ -133,13 +133,17 @@ class ReaderFilterCollection // Copy the signature std::copy(entry.filter_signature.begin(), entry.filter_signature.end(), signature); - // Evaluate filter and update filtered_out_readers - bool filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first); - if (!filter_result) + // Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant + bool filter_result = true; + if (fastrtps::rtps::ALIVE == change.kind) { - change.filtered_out_readers.emplace_back(it->first); + // Evaluate filter and update filtered_out_readers + filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first); + if (!filter_result) + { + change.filtered_out_readers.emplace_back(it->first); + } } - return filter_result; }; diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 1e295892433..92df1b678ca 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -18,26 +18,29 @@ */ #include -#include -#include + +#include +#include +#include + #include -#include -#include -#include -#include -#include -#include -#include #include #include +#include +#include +#include +#include +#include #include +#include -#include "rtps/RTPSDomainImpl.hpp" - -#include -#include - -#include +#include "reader_utils.hpp" +#include +#include +#include +#include +#include +#include #define IDSTRING "(ID:" << std::this_thread::get_id() << ") " << @@ -544,7 +547,7 @@ bool StatefulReader::processDataMsg( return false; } - if (data_filter_ && !data_filter_->is_relevant(*change, m_guid)) + if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_)) { if (pWP) { @@ -552,6 +555,7 @@ bool StatefulReader::processDataMsg( NotifyChanges(pWP); send_ack_if_datasharing(this, mp_history, pWP, change->sequenceNumber); } + // Change was filtered out, so there isn't anything else to do return true; } @@ -726,7 +730,7 @@ bool StatefulReader::processDataFragMsg( // Temporarilly assign the inline qos while evaluating the data filter work_change->inline_qos = incomingChange->inline_qos; - bool filtered_out = data_filter_ && !data_filter_->is_relevant(*work_change, m_guid); + bool filtered_out = !fastdds::rtps::change_is_relevant_for_filter(*work_change, m_guid, data_filter_); work_change->inline_qos = SerializedPayload_t(); if (filtered_out) diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 7c327ecee05..f340da0f858 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -18,23 +18,23 @@ */ #include -#include -#include + +#include +#include +#include + +#include "reader_utils.hpp" #include -#include #include #include +#include +#include +#include #include -#include #include #include - -#include "rtps/RTPSDomainImpl.hpp" - -#include -#include - -#include +#include +#include #define IDSTRING "(ID:" << std::this_thread::get_id() << ") " << @@ -481,9 +481,10 @@ bool StatelessReader::processDataMsg( return false; } - if (data_filter_ && !data_filter_->is_relevant(*change, m_guid)) + if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_)) { update_last_notified(change->writerGUID, change->sequenceNumber); + // Change was filtered out, so there isn't anything else to do return true; } @@ -674,7 +675,8 @@ bool StatelessReader::processDataFragMsg( { // Temporarilly assign the inline qos while evaluating the data filter change_completed->inline_qos = incomingChange->inline_qos; - bool filtered_out = data_filter_ && !data_filter_->is_relevant(*change_completed, m_guid); + bool filtered_out = !fastdds::rtps::change_is_relevant_for_filter(*change_completed, m_guid, + data_filter_); change_completed->inline_qos = SerializedPayload_t(); if (filtered_out) diff --git a/src/cpp/rtps/reader/reader_utils.cpp b/src/cpp/rtps/reader/reader_utils.cpp new file mode 100644 index 00000000000..ed7a024f498 --- /dev/null +++ b/src/cpp/rtps/reader/reader_utils.cpp @@ -0,0 +1,45 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file reader_utils.cpp + */ + +#include "reader_utils.hpp" + +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +bool change_is_relevant_for_filter( + const CacheChange& change, + const GUID& reader_guid, + const IReaderDataFilter* filter) +{ + bool ret = true; + + // Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant + if ((nullptr != filter) && (fastrtps::rtps::ALIVE == change.kind) && (!filter->is_relevant(change, reader_guid))) + { + ret = false; + } + + return ret; +} + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/rtps/reader/reader_utils.hpp b/src/cpp/rtps/reader/reader_utils.hpp new file mode 100644 index 00000000000..bd2d2f9bad9 --- /dev/null +++ b/src/cpp/rtps/reader/reader_utils.hpp @@ -0,0 +1,53 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file reader_utils.hpp + */ + +#ifndef _FASTDDS_RTPS_READER_READERUTILS_H_ +#define _FASTDDS_RTPS_READER_READERUTILS_H_ + +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace rtps { + +using CacheChange = fastrtps::rtps::CacheChange_t; +using GUID = fastrtps::rtps::GUID_t; + +/** + * @brief Check if a change is relevant for a reader. + * + * @param change The CacheChange_t to be evaluated. + * @param reader_guid Reader's GUID_t. + * @param filter The IReaderDataFilter to be used. + * + * @return true if relevant, false otherwise. + */ +bool change_is_relevant_for_filter( + const CacheChange& change, + const GUID& reader_guid, + const IReaderDataFilter* filter); + +} // namespace rtps +} // namespace fastdds +} // namespace eprosima + + +#endif // _FASTDDS_RTPS_READER_READERUTILS_H_ diff --git a/test/blackbox/api/dds-pim/PubSubParticipant.hpp b/test/blackbox/api/dds-pim/PubSubParticipant.hpp index d6022fc7d3a..87a63893129 100644 --- a/test/blackbox/api/dds-pim/PubSubParticipant.hpp +++ b/test/blackbox/api/dds-pim/PubSubParticipant.hpp @@ -27,7 +27,9 @@ #include #include + #include + #include #include #include @@ -38,14 +40,16 @@ #include #include #include -#include #include +#include #include +#include "PubSubTypeTraits.hpp" + /** * @brief A class with one participant that can have multiple publishers and subscribers */ -template +template> class PubSubParticipant { public: @@ -129,13 +133,7 @@ class PubSubParticipant void on_data_available( eprosima::fastdds::dds::DataReader* reader) override { - type data; - eprosima::fastdds::dds::SampleInfo info; - - while (ReturnCode_t::RETCODE_OK == reader->take_next_sample(&data, &info)) - { - participant_->data_received(); - } + participant_->data_received(reader); } private: @@ -315,7 +313,7 @@ class PubSubParticipant if (participant_ != nullptr) { participant_qos_ = participant_->get_qos(); - type_.reset(new type_support()); + TypeTraits::build_type_support(type_); participant_->register_type(type_); return true; } @@ -413,6 +411,7 @@ class PubSubParticipant type& msg, unsigned int index = 0) { + TypeTraits::print_sent_data(msg); return std::get<2>(publishers_[index])->write((void*)&msg); } @@ -814,11 +813,21 @@ class PubSubParticipant sub_liveliness_cv_.notify_one(); } - void data_received() + void data_received( + eprosima::fastdds::dds::DataReader* reader) { - std::unique_lock lock(sub_data_mutex_); - sub_times_data_received_++; - sub_data_cv_.notify_one(); + type* data = static_cast(type_.create_data()); + eprosima::fastdds::dds::SampleInfo info; + + while (ReturnCode_t::RETCODE_OK == reader->take_next_sample(data, &info)) + { + TypeTraits::print_received_data(*data); + std::unique_lock lock(sub_data_mutex_); + sub_times_data_received_++; + sub_data_cv_.notify_one(); + } + + type_.delete_data(data); } unsigned int pub_times_liveliness_lost() diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 51d72642ca3..d64ff9d8b51 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -20,40 +20,51 @@ #ifndef _TEST_BLACKBOX_PUBSUBREADER_HPP_ #define _TEST_BLACKBOX_PUBSUBREADER_HPP_ +#include #include #include #include #include +#include #include +#include #include + #include + #if _MSC_VER #include #endif // _MSC_VER -#include + #include +#include #include -#include #include +#include +#include #include #include #include #include #include #include +#include #include #include -#include +#include #include +#include +#include +#include #include #include #include -#include -#include +#include #include #include -#include + +#include "PubSubTypeTraits.hpp" using DomainParticipantFactory = eprosima::fastdds::dds::DomainParticipantFactory; using eprosima::fastrtps::rtps::IPLocator; @@ -63,13 +74,14 @@ using eprosima::fastdds::rtps::UDPv6TransportDescriptor; using SampleLostStatusFunctor = std::function; -template +template> class PubSubReader { public: typedef TypeSupport type_support; typedef typename type_support::type type; + typedef typename TypeTraits::DataListType datalist_type; protected: @@ -279,6 +291,7 @@ class PubSubReader , listener_(*this) , participant_(nullptr) , topic_(nullptr) + , cf_topic_(nullptr) , subscriber_(nullptr) , datareader_(nullptr) , status_mask_(eprosima::fastdds::dds::StatusMask::all()) @@ -307,6 +320,8 @@ class PubSubReader , times_incompatible_qos_(0) , last_incompatible_qos_(eprosima::fastdds::dds::INVALID_QOS_POLICY_ID) , message_receive_count_(0) + , filter_expression_("") + , expression_parameters_({}) { // Load default QoS to permit testing with external XML profile files. DomainParticipantFactory::get_instance()->load_profiles(); @@ -340,6 +355,19 @@ class PubSubReader loan_sample_validation(false); } + PubSubReader( + const std::string& topic_name, + const std::string& filter_expression, + const std::vector& expression_parameters, + bool take = true, + bool statistics = false, + bool read = true) + : PubSubReader(topic_name, take, statistics, read) + { + filter_expression_ = filter_expression; + expression_parameters_ = expression_parameters; + } + virtual ~PubSubReader() { destroy(); @@ -382,7 +410,7 @@ class PubSubReader { participant_guid_ = participant_->guid(); - type_.reset(new type_support()); + TypeTraits::build_type_support(type_); // Register type ASSERT_EQ(participant_->register_type(type_), ReturnCode_t::RETCODE_OK); @@ -394,6 +422,17 @@ class PubSubReader ASSERT_NE(topic_, nullptr); ASSERT_TRUE(topic_->is_enabled()); + // Create CFT if needed + if (!filter_expression_.empty()) + { + cf_topic_ = participant_->create_contentfilteredtopic( + topic_name_ + "_cft", + topic_, + filter_expression_, + expression_parameters_); + ASSERT_NE(cf_topic_, nullptr); + } + // Create publisher createSubscriber(); } @@ -407,11 +446,17 @@ class PubSubReader ASSERT_NE(subscriber_, nullptr); ASSERT_TRUE(subscriber_->is_enabled()); + using TopicDescriptionPtr = eprosima::fastdds::dds::TopicDescription*; + TopicDescriptionPtr topic_desc {(nullptr != cf_topic_) ? + static_cast(cf_topic_) : + static_cast(topic_)}; + if (!xml_file_.empty()) { if (!datareader_profile_.empty()) { - datareader_ = subscriber_->create_datareader_with_profile(topic_, datareader_profile_, &listener_, + datareader_ = subscriber_->create_datareader_with_profile(topic_desc, datareader_profile_, + &listener_, status_mask_); ASSERT_NE(datareader_, nullptr); ASSERT_TRUE(datareader_->is_enabled()); @@ -419,7 +464,7 @@ class PubSubReader } if (datareader_ == nullptr) { - datareader_ = subscriber_->create_datareader(topic_, datareader_qos_, &listener_, status_mask_); + datareader_ = subscriber_->create_datareader(topic_desc, datareader_qos_, &listener_, status_mask_); } if (datareader_ != nullptr) @@ -442,36 +487,28 @@ class PubSubReader { if (participant_ != nullptr) { - if (datareader_) - { - subscriber_->delete_datareader(datareader_); - datareader_ = nullptr; - } - if (subscriber_) - { - participant_->delete_subscriber(subscriber_); - subscriber_ = nullptr; - } - if (topic_) - { - participant_->delete_topic(topic_); - topic_ = nullptr; - } - eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, participant_->delete_contained_entities()); + datareader_ = nullptr; + subscriber_ = nullptr; + cf_topic_ = nullptr; + topic_ = nullptr; + + ASSERT_EQ(ReturnCode_t::RETCODE_OK, + eprosima::fastdds::dds::DomainParticipantFactory::get_instance()->delete_participant(participant_)); participant_ = nullptr; } initialized_ = false; } - std::list data_not_received() + std::list data_not_received() { std::unique_lock lock(mutex_); return total_msgs_; } eprosima::fastrtps::rtps::SequenceNumber_t startReception( - std::list& msgs) + const std::list& msgs) { mutex_.lock(); total_msgs_ = msgs; @@ -491,6 +528,18 @@ class PubSubReader return get_last_sequence_received(); } + void startReception( + size_t expected_samples) + { + { + std::unique_lock lock(mutex_); + current_processed_count_ = 0; + number_samples_expected_ = expected_samples; + last_seq.clear(); + } + receiving_.store(true); + } + void stopReception() { receiving_.store(false); @@ -593,7 +642,11 @@ class PubSubReader { if (info_seq[n].valid_data) { - auto it = std::find(expected_messages.begin(), expected_messages.end(), data_seq[n]); + auto it = std::find_if(expected_messages.begin(), expected_messages.end(), + [&](const datalist_type& elem) + { + return TypeTraits::compare_data(data_seq[n], elem); + }); ASSERT_NE(it, expected_messages.end()); expected_messages.erase(it); } @@ -1684,6 +1737,13 @@ class PubSubReader return status; } + eprosima::fastdds::dds::SampleLostStatus get_sample_lost_status() const + { + eprosima::fastdds::dds::SampleLostStatus status; + datareader_->get_sample_lost_status(status); + return status; + } + bool is_matched() const { return matched_ > 0; @@ -1762,8 +1822,6 @@ class PubSubReader bool& returnedValue) { returnedValue = false; - type data; - eprosima::fastdds::dds::SampleInfo info; if (!take_ && !read_) { @@ -1773,9 +1831,14 @@ class PubSubReader return; } + // DynamicData ctor is protected so we create data using the TypeSupport. This way we can use both static + // and dynamic types + type* data = static_cast(type_.create_data()); + eprosima::fastdds::dds::SampleInfo info; + ReturnCode_t success = take_ ? - datareader->take_next_sample((void*)&data, &info) : - datareader->read_next_sample((void*)&data, &info); + datareader->take_next_sample(data, &info) : + datareader->read_next_sample(data, &info); if (ReturnCode_t::RETCODE_OK == success) { returnedValue = true; @@ -1790,21 +1853,34 @@ class PubSubReader if (info.valid_data && info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE) { - auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); - ASSERT_NE(it, total_msgs_.end()); - total_msgs_.erase(it); + if (!total_msgs_.empty()) + { + auto it = std::find_if(total_msgs_.begin(), total_msgs_.end(), + [&](const datalist_type& elem) + { + return TypeTraits::compare_data(*data, elem); + }); + ASSERT_NE(it, total_msgs_.end()); + total_msgs_.erase(it); + } ++current_processed_count_; - default_receive_print(data); + TypeTraits::print_received_data(*data); cv_.notify_one(); } + + postprocess_sample(*data, info); + } + + // Delete the free-storage allocated data sample + type_.delete_data(data); } void receive_samples( eprosima::fastdds::dds::DataReader* datareader, bool& returnedValue) { - eprosima::fastdds::dds::LoanableSequence datas; + auto datas = TypeTraits::build_loanable_sequence(); eprosima::fastdds::dds::SampleInfoSeq infos; returnedValue = true; @@ -1841,13 +1917,22 @@ class PubSubReader if (valid_sample) { - auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); - ASSERT_NE(it, total_msgs_.end()); - total_msgs_.erase(it); + if (!total_msgs_.empty()) + { + auto it = std::find_if(total_msgs_.begin(), total_msgs_.end(), + [&data](const datalist_type& elem) + { + return TypeTraits::compare_data(data, elem); + }); + ASSERT_NE(it, total_msgs_.end()); + total_msgs_.erase(it); + } ++current_processed_count_; - default_receive_print(data); + TypeTraits::print_received_data(data); cv_.notify_one(); } + + postprocess_sample(data, info); } } @@ -1866,6 +1951,14 @@ class PubSubReader receive_(datareader, std::ref(returnedValue)); } + virtual void postprocess_sample( + const type& data, + const eprosima::fastdds::dds::SampleInfo& info) + { + static_cast(data); + static_cast(info); + } + void participant_matched() { std::unique_lock lock(mutexDiscovery_); @@ -1919,6 +2012,7 @@ class PubSubReader eprosima::fastdds::dds::DomainParticipant* participant_; eprosima::fastdds::dds::DomainParticipantQos participant_qos_; eprosima::fastdds::dds::Topic* topic_; + eprosima::fastdds::dds::ContentFilteredTopic* cf_topic_; eprosima::fastdds::dds::Subscriber* subscriber_; eprosima::fastdds::dds::SubscriberQos subscriber_qos_; eprosima::fastdds::dds::DataReader* datareader_; @@ -1928,7 +2022,7 @@ class PubSubReader eprosima::fastrtps::rtps::GUID_t participant_guid_; eprosima::fastrtps::rtps::GUID_t datareader_guid_; bool initialized_; - std::list total_msgs_; + std::list total_msgs_; std::mutex mutex_; std::condition_variable cv_; std::mutex mutexDiscovery_; @@ -1996,6 +2090,11 @@ class PubSubReader //! Functor called when called SampleLostStatus listener. SampleLostStatusFunctor sample_lost_status_functor_; + + //! Expression for CFT + std::string filter_expression_; + //! Parameters for CFT expression + std::vector expression_parameters_; }; template diff --git a/test/blackbox/api/dds-pim/PubSubTypeTraits.hpp b/test/blackbox/api/dds-pim/PubSubTypeTraits.hpp new file mode 100644 index 00000000000..8c12b502edf --- /dev/null +++ b/test/blackbox/api/dds-pim/PubSubTypeTraits.hpp @@ -0,0 +1,102 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file PubSubTypeTraits.hpp + * + */ + +#include +#include + +#include "../../common/BlackboxTests.hpp" + +#ifndef _TEST_BLACKBOX_PUBSUBTYPETRAITS_HPP_ +#define _TEST_BLACKBOX_PUBSUBTYPETRAITS_HPP_ + +/** + * @brief This class provides TypeSupport specific traits for the PubSub wrapper classes + * + * Provides all the type specific functionality needed by the PubSub wrapper classes so that + * they can be agnostic to the actual TypeSupport class used, Fast DDS Gen generated or Dynamic. + * + * @tparam _TypeSupport TypeSupport class for which the traits are provided + */ +template +struct PubSubTypeTraits +{ + //! PubSub readers and writers have a list of datas of the type they are handling + using DataListType = typename _TypeSupport::type; + + /** + * @brief Create a new instance of the specific TypeSupport + * + * @param[out] typesupport TypeSupport reference to be populated with the new concrete instance + */ + static void build_type_support( + eprosima::fastdds::dds::TypeSupport& typesupport) + { + return typesupport.reset(new _TypeSupport()); + } + + /** + * @brief Build a LoanableSequence of the specific type + * + * @return The type specific LoanableSequence + */ + static eprosima::fastdds::dds::LoanableSequence build_loanable_sequence() + { + return eprosima::fastdds::dds::LoanableSequence(); + } + + /** + * @brief Compare two DataListType instances + * + * @param data1 First DataListType instance + * @param data2 Second DataListType instance + * + * @return True if the two instances are equal, false otherwise + */ + static bool compare_data( + const DataListType& data1, + const DataListType& data2) + { + return data1 == data2; + } + + /** + * @brief Print the received data + * + * @param data DataListType instance to be printed + */ + static void print_received_data( + const DataListType& data) + { + default_receive_print(data); + } + + /** + * @brief Print the sent data + * + * @param data DataListType instance to be printed + */ + static void print_sent_data( + const DataListType& data) + { + default_send_print(data); + } + +}; + +#endif // _TEST_BLACKBOX_PUBSUBTYPETRAITS_HPP_ diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 0a0d29395bb..ed104c33ac4 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -32,30 +32,31 @@ #include #endif // _MSC_VER -#include +#include #include +#include #include -#include +#include #include +#include #include #include -#include -#include #include #include +#include #include -#include +#include +#include +#include +#include #include #include #include -#include -#include #include #include #include -#include -#include +#include "PubSubTypeTraits.hpp" using DomainParticipantFactory = eprosima::fastdds::dds::DomainParticipantFactory; using eprosima::fastrtps::rtps::IPLocator; @@ -63,7 +64,7 @@ using eprosima::fastdds::rtps::UDPTransportDescriptor; using eprosima::fastdds::rtps::UDPv4TransportDescriptor; using eprosima::fastdds::rtps::UDPv6TransportDescriptor; -template +template> class PubSubWriter { class ParticipantListener : public eprosima::fastdds::dds::DomainParticipantListener @@ -254,6 +255,7 @@ class PubSubWriter typedef TypeSupport type_support; typedef typename type_support::type type; + typedef typename TypeTraits::DataListType datalist_type; PubSubWriter( const std::string& topic_name) @@ -359,7 +361,7 @@ class PubSubWriter { participant_guid_ = participant_->guid(); - type_.reset(new type_support()); + TypeTraits::build_type_support(type_); // Register type ASSERT_EQ(participant_->register_type(type_), ReturnCode_t::RETCODE_OK); @@ -467,7 +469,7 @@ class PubSubWriter } void send( - std::list& msgs, + std::list& msgs, uint32_t milliseconds = 0) { auto it = msgs.begin(); @@ -476,7 +478,7 @@ class PubSubWriter { if (datawriter_->write((void*)&(*it))) { - default_send_print(*it); + TypeTraits::print_sent_data(*it); it = msgs.erase(it); if (milliseconds > 0) { @@ -513,10 +515,18 @@ class PubSubWriter bool send_sample( type& msg) { - default_send_print(msg); + TypeTraits::print_sent_data(msg); return datawriter_->write((void*)&msg); } + ReturnCode_t send_sample( + type& msg, + const eprosima::fastdds::dds::InstanceHandle_t& instance_handle) + { + TypeTraits::print_sent_data(msg); + return datawriter_->write((void*)&msg, instance_handle); + } + void assert_liveliness() { datawriter_->assert_liveliness(); @@ -1601,6 +1611,11 @@ class PubSubWriter return *this; } + eprosima::fastdds::dds::TypeSupport get_type_support() + { + return type_; + } + protected: void participant_matched() diff --git a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp index 3fb648b1dba..ed69ff394d0 100644 --- a/test/blackbox/api/dds-pim/PubSubWriterReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriterReader.hpp @@ -20,34 +20,37 @@ #ifndef _TEST_BLACKBOX_PUBSUBWRITERREADER_HPP_ #define _TEST_BLACKBOX_PUBSUBWRITERREADER_HPP_ -#include +#include +#include +#include +#include +#include +#include +#include + +#include + #include +#include #include #include -#include -#include #include #include +#include #include -#include #include #include #include #include +#include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include +#include "PubSubTypeTraits.hpp" using DomainParticipantFactory = eprosima::fastdds::dds::DomainParticipantFactory; -template +template> class PubSubWriterReader { class ParticipantListener : public eprosima::fastdds::dds::DomainParticipantListener @@ -303,6 +306,7 @@ class PubSubWriterReader typedef TypeSupport type_support; typedef typename type_support::type type; + typedef typename TypeTraits::DataListType datalist_type; PubSubWriterReader( const std::string& topic_name) @@ -388,7 +392,7 @@ class PubSubWriterReader ASSERT_NE(participant_, nullptr); ASSERT_TRUE(participant_->is_enabled()); - type_.reset(new type_support()); + TypeTraits::build_type_support(type_); // Register type ASSERT_EQ(participant_->register_type(type_), ReturnCode_t::RETCODE_OK); @@ -535,7 +539,7 @@ class PubSubWriterReader } void send( - std::list& msgs) + std::list& msgs) { auto it = msgs.begin(); @@ -548,7 +552,7 @@ class PubSubWriterReader std::get<1>(tuple)->write((void*)&(*it)); } - default_send_print(*it); + TypeTraits::print_sent_data(*it); it = msgs.erase(it); } @@ -559,14 +563,14 @@ class PubSubWriterReader } } - std::list data_not_received() + std::list data_not_received() { std::unique_lock lock(mutex_); return total_msgs_; } void startReception( - std::list& msgs) + std::list& msgs) { mutex_.lock(); total_msgs_ = msgs; @@ -864,10 +868,10 @@ class PubSubWriterReader bool& returnedValue) { returnedValue = false; - type data; + type* data = static_cast(type_.create_data()); eprosima::fastdds::dds::SampleInfo info; - if ((ReturnCode_t::RETCODE_OK == datareader->take_next_sample((void*)&data, &info))) + if ((ReturnCode_t::RETCODE_OK == datareader->take_next_sample(data, &info))) { returnedValue = true; @@ -881,7 +885,11 @@ class PubSubWriterReader if (info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE) { - auto it = std::find(total_msgs_.begin(), total_msgs_.end(), data); + auto it = std::find_if(total_msgs_.begin(), total_msgs_.end(), + [&](const datalist_type& elem) + { + return TypeTraits::compare_data(elem, *data); + }); ASSERT_NE(it, total_msgs_.end()); total_msgs_.erase(it); } @@ -889,10 +897,13 @@ class PubSubWriterReader if (info.instance_state == eprosima::fastdds::dds::ALIVE_INSTANCE_STATE) { ++current_received_count_; - default_receive_print(data); + TypeTraits::print_received_data(*data); cv_.notify_one(); } } + + // Delete the free-storage allocated data sample + type_.delete_data(data); } void publication_matched( @@ -967,7 +978,7 @@ class PubSubWriterReader std::string topic_name_; bool initialized_; - std::list total_msgs_; + std::list total_msgs_; std::mutex mutex_; std::condition_variable cv_; std::mutex mutexDiscovery_; diff --git a/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp b/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp index cbf730495a8..230ba89fe17 100644 --- a/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp +++ b/test/blackbox/common/DDSBlackboxTestsContentFilter.cpp @@ -13,17 +13,33 @@ // limitations under the License. #include +#include +#include +#include #include +#include #include +#include +#include +#include #include #include +#include +#include #include +#include +#include +#include #include #include +#include +#include +#include #include +#include "../types/dynamic_types_traits.hpp" #include "../types/HelloWorldTypeObject.h" #include "../types/TestRegression3361PubSubTypes.h" #include "../types/TestRegression3361TypeObject.h" @@ -613,6 +629,130 @@ TEST(DDSContentFilter, CorrectlyHandleAliasOtherHeader) dpf->delete_participant(participant); } +/** + * @brief PubSubReader specialization for using a DynamicPubSubType with content filtering, + * overriding postprocess_sample to count valid and invalid samples. + * + * @tparam TypeTraits TypeTraits to be used. + */ +template +class CFTDynamicPubSubReader : public PubSubReader +{ +public: + + /** + * @brief Construct a new CFTDynamicPubSubReader object + * + * @param topic_name Name of the topic to be used + * @param filter_expression Content filter expression + * @param expression_parameters Content filter expression parameters + */ + CFTDynamicPubSubReader( + const std::string& topic_name, + const std::string& filter_expression, + const std::vector& expression_parameters) + : PubSubReader( + topic_name, + filter_expression, + expression_parameters) + { + } + + /** + * @brief Count of valid samples received + */ + std::atomic valid_samples{0}; + + /** + * @brief Count of invalid samples received + */ + std::atomic invalid_samples{0}; + +private: + + /** + * @brief Postprocess the received samples, counting valid and invalid ones + * + * @param sample Sample to be postprocessed + * @param info SampleInfo of the sample + */ + void postprocess_sample( + const fastrtps::types::DynamicPubSubType::type& /* sample */, + const SampleInfo& info) override final + { + if (info.valid_data) + { + ++valid_samples; + } + else + { + ++invalid_samples; + } + } + +}; + +/* + * Regression test for https://eprosima.easyredmine.com/issues/20815 + * Check that the content filter is only applied to alive changes. + * The test creates a reliable writer and a reader with a content filter that only accepts messages with a specific + * string. After discovery, the writer sends 10 samples which pass the filter in 10 different instances, with the + * particularity that after each write, the instance is unregistered. + * The DATA(u) generated would not pass the filter if it was applied. To check that the filter is only applied to + * ALIVE changes (not unregister or disposed), the test checks that the reader receives 10 valid samples (one per + * sample sent) and 10 invalid samples (one per unregister). Furthermore, it also checks that no samples are lost. + */ +TEST(DDSContentFilter, OnlyFilterAliveChanges) +{ + + /* Create reader with CFT */ + std::string expression = "index = 1"; + CFTDynamicPubSubReader reader("TestTopic", expression, {}); + reader.reliability(RELIABLE_RELIABILITY_QOS).history_depth(2).init(); + ASSERT_TRUE(reader.isInitialized()); + + /* Create writer */ + PubSubWriter writer("TestTopic"); + writer.reliability(RELIABLE_RELIABILITY_QOS).history_depth(2).init(); + ASSERT_TRUE(writer.isInitialized()); + + /* Wait for discovery */ + writer.wait_discovery(); + reader.wait_discovery(); + + /* Send 10 samples, each on a different instance, unregistering instances after writing */ + const size_t num_samples = 10; + reader.startReception(num_samples); + + for (size_t i = 0; i < num_samples; ++i) + { + DynamicData* data = static_cast(writer.get_type_support().create_data()); + + data->set_uint16_value( + static_cast(i), + static_cast(DynamicKeyedHelloworldTypeTraits::KeyedHelloWorldMembers::KEY)); + + data->set_uint16_value( + 1u, + static_cast(DynamicKeyedHelloworldTypeTraits::KeyedHelloWorldMembers::INDEX)); + + InstanceHandle_t handle = writer.register_instance(*data); + ASSERT_NE(HANDLE_NIL, handle); + ASSERT_EQ(ReturnCode_t::RETCODE_OK, writer.send_sample(*data, handle)); + ASSERT_EQ(true, writer.unregister_instance(*data, handle)); + + writer.get_type_support().delete_data(data); + } + + // Wait until all samples are acknowledged + writer.waitForAllAcked(std::chrono::seconds(3)); + + /* Check that both samples and unregisters are received */ + ASSERT_EQ(reader.valid_samples.load(), 10u); + ASSERT_EQ(reader.invalid_samples.load(), 10u); + ASSERT_EQ(reader.get_sample_lost_status().total_count, 0); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else diff --git a/test/blackbox/types/dynamic_types_traits.hpp b/test/blackbox/types/dynamic_types_traits.hpp new file mode 100644 index 00000000000..257015129cb --- /dev/null +++ b/test/blackbox/types/dynamic_types_traits.hpp @@ -0,0 +1,238 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file dynamic_types_traits.hpp + * + */ + +#ifndef _FASTDDS_TEST_DYNAMIC_TYPES_TRAITS_HPP_ +#define _FASTDDS_TEST_DYNAMIC_TYPES_TRAITS_HPP_ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +/** + * @brief PubSubTypeTraits for the PubSubReader and PubSubWriter + * + * This struct enables instantiation of a PubSubReader and PubSubWriter with a DynamicTypeSupport + * for a type equivalent to that of KeyedHelloWorld.idl. + * That and the fact that this builder sets the auto_fill_type_object flag to true allow for instance to create a + * PubSubReader with a content filter, which right now is not possible to do with the Fast DDS generated + * TypeSupport, as it does not contain the TypeObject code. + * + * To use it, do one of: + * - PubSubReader reader; + * - PubSubWriter writer; + */ +struct DynamicKeyedHelloworldTypeTraits +{ + using DataListType = fastrtps::types::DynamicData*; + + /** + * Define the members ids of the struct for readability + */ + enum class KeyedHelloWorldMembers : fastrtps::types::MemberId + { + KEY = 0, + INDEX = 1, + MESSAGE = 2 + }; + + /** + * @brief Build the DynamicTypeSupport for the KeyedHelloWorld type + * + * @param[out] type_support The type support to be reset + */ + static void build_type_support( + TypeSupport& type_support) + { + // Create type support + type_support.reset(new fastrtps::types::DynamicPubSubType(get_type())); + + // Set the autofill type object to true so that TypeObject is registered upon type registration + type_support->auto_fill_type_object(true); + } + + /** + * @brief Build a DynamicLoanableSequence of the specific type + * + * @return The DynamicLoanableSequence + */ + static DynamicLoanableSequence build_loanable_sequence() + { + return DynamicLoanableSequence(get_type()); + } + + /** + * @brief Compare two data instances + * + * PubSubReader and PubSubWriter compare data within std::find_if calls to remove elements from + * their message lists. For Fast DDS Gen generated types, the TypeSupport::type and the type used + * for data list elements is the same. However, for the case of DynamicData, the DataListType needs + * to be a DynamicData*, as the DynamicData ctor and dtor are protected and so they cannot be called to remove elements from the message lists. + * This leads to the need to compare datas of different types when finding elements, as the sent or received data would be a DynamicDaa, whereas the list elements are DynamicData* (a.k.a DataListType). + * + * @param data1 The compared element (DynamicData) + * @param data2 The list element (DataListType) + * + * @return True if equal, false otherwise + */ + static bool compare_data( + const fastrtps::types::DynamicData& data1, + const DataListType& data2) + { + return data1.equals(data2); + } + + /** + * @brief Print the received data + * + * @param data DynamicData to be printed + */ + static void print_received_data( + const fastrtps::types::DynamicData* data) + { + print_received_data(*data); + } + + /** + * @brief Print the received data + * + * @param data DynamicData to be printed + */ + static void print_received_data( + const fastrtps::types::DynamicData& data) + { + print_data_msg("Received", data); + } + + /** + * @brief Print the sent data + * + * @param data DynamicData to be printed + */ + static void print_sent_data( + const fastrtps::types::DynamicData* data) + { + print_sent_data(*data); + } + + /** + * @brief Print the sent data + * + * @param data DynamicData to be printed + */ + static void print_sent_data( + const fastrtps::types::DynamicData& data) + { + print_data_msg("Sent", data); + } + +private: + + /** + * @brief Get the specific DynamicType equivalent to the KeyedHelloWorld.idl type + * + * @return The DynamicType_ptr + */ + static fastrtps::types::DynamicType_ptr get_type() + { + // Using statics here in combination with std::call_once to avoid rebuilding the type every time. + // As the DynamicTypes API is shared_ptr based, we don't need to delete these references manually. + static std::once_flag once_flag; + static fastrtps::types::DynamicTypeBuilder_ptr struct_builder; + static fastrtps::types::DynamicTypeBuilder_ptr key_member_builder; + static fastrtps::types::DynamicType_ptr key_member; + static fastrtps::types::DynamicType_ptr struct_type; + + std::call_once(once_flag, [&]() + { + // Create the struct type builder + const std::string topic_type_name = "keyed_hello_world"; + struct_builder = fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_struct_builder(); + struct_builder->set_name(topic_type_name); + + // Create the key member with the @key annotation + key_member_builder = fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_uint16_builder(); + key_member_builder->apply_annotation(fastrtps::types::ANNOTATION_KEY_ID, "value", "true"); + key_member = key_member_builder->build(); + + // Add members to the struct builder + struct_builder->add_member( + static_cast(KeyedHelloWorldMembers::KEY), + "key", + key_member); + + struct_builder->add_member( + static_cast(KeyedHelloWorldMembers::INDEX), + "index", + fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_uint16_type()); + + struct_builder->add_member( + static_cast(KeyedHelloWorldMembers::MESSAGE), + "message", + fastrtps::types::DynamicTypeBuilderFactory::get_instance()->create_string_type(128)); + + // Build the type + struct_type = struct_builder->build(); + }); + + return struct_type; + } + + /** + * @brief Print a data message + * + * @param intro Introduction message + * @param data DynamicData to be printed. + * + * @pre data must have been built using the type returned by get_type(), that is through the DynamicTypeSupport + * created by build_type_support(). + */ + static void print_data_msg( + const std::string& intro, + const fastrtps::types::DynamicData& data) + { + auto key_value = + data.get_uint16_value(static_cast(KeyedHelloWorldMembers::KEY)); + auto index_value = + data.get_uint16_value(static_cast(KeyedHelloWorldMembers::INDEX)); + auto message_value = + data.get_string_value(static_cast(KeyedHelloWorldMembers::MESSAGE)); + + std::cout << intro << " data: (" << key_value << ", " << index_value << ", \"" << message_value << "\")" << + std::endl; + } + +}; + +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif // _FASTDDS_TEST_DYNAMIC_TYPES_TRAITS_HPP_ diff --git a/test/unittest/dds/publisher/CMakeLists.txt b/test/unittest/dds/publisher/CMakeLists.txt index 2dbcb8244a9..523cf6f50ff 100644 --- a/test/unittest/dds/publisher/CMakeLists.txt +++ b/test/unittest/dds/publisher/CMakeLists.txt @@ -149,6 +149,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/sqlite3.c ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/SQLite3PersistenceService.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index baa471606fe..c22a5912d0b 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -198,6 +198,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS) ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/PersistenceFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/sqlite3.c ${PROJECT_SOURCE_DIR}/src/cpp/rtps/persistence/SQLite3PersistenceService.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/reader_utils.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/RTPSReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulPersistentReader.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/reader/StatefulReader.cpp