diff --git a/include/dds/topic/Topic.hpp b/include/dds/topic/Topic.hpp index de377ea4e15..068ef0643d9 100644 --- a/include/dds/topic/Topic.hpp +++ b/include/dds/topic/Topic.hpp @@ -33,18 +33,17 @@ class TopicListener; class Topic : public dds::core::TEntity { - friend class TopicImpl; friend class DomainParticipantImpl; public: OMG_DDS_REF_TYPE_PROTECTED_DC( - Topic, + Topic, dds::core::TEntity, detail::Topic) OMG_DDS_IMPLICIT_REF_BASE( - Topic) + Topic) /** * Create a new Topic. diff --git a/include/fastdds/dds/domain/DomainParticipant.hpp b/include/fastdds/dds/domain/DomainParticipant.hpp index d61e1577bfe..cf12231b05c 100644 --- a/include/fastdds/dds/domain/DomainParticipant.hpp +++ b/include/fastdds/dds/domain/DomainParticipant.hpp @@ -354,11 +354,16 @@ class DomainParticipant : public Entity /** * Gives access to an existing (or ready to exist) enabled Topic. - * Topics obtained by this method must be destroyed by delete_topic. + * It should be noted that the returned Topic is a local object that acts as a proxy to designate the global + * concept of topic. + * Topics obtained by means of find_topic, must also be deleted by means of delete_topic so that the local + * resources can be released. + * If a Topic is obtained multiple times by means of find_topic or create_topic, it must also be deleted that same + * number of times using delete_topic. * * @param topic_name Topic name * @param timeout Maximum time to wait for the Topic - * @return Pointer to the existing Topic, nullptr in error case + * @return Pointer to the existing Topic, nullptr in case of error or timeout */ RTPS_DllAPI Topic* find_topic( const std::string& topic_name, diff --git a/include/fastdds/dds/topic/Topic.hpp b/include/fastdds/dds/topic/Topic.hpp index da6acfc934f..3db760ae080 100644 --- a/include/fastdds/dds/topic/Topic.hpp +++ b/include/fastdds/dds/topic/Topic.hpp @@ -42,7 +42,7 @@ namespace dds { class DomainParticipant; class TopicListener; class DomainParticipantImpl; -class TopicImpl; +class TopicProxy; /** * Class Topic, represents the fact that both publications @@ -52,7 +52,7 @@ class TopicImpl; */ class Topic : public DomainEntity, public TopicDescription { - friend class TopicImpl; + friend class TopicProxy; friend class DomainParticipantImpl; /** @@ -62,7 +62,7 @@ class Topic : public DomainEntity, public TopicDescription Topic( const std::string& topic_name, const std::string& type_name, - TopicImpl* p, + TopicProxy* p, const StatusMask& mask = StatusMask::all()); Topic( @@ -143,7 +143,7 @@ class Topic : public DomainEntity, public TopicDescription protected: - TopicImpl* impl_; + TopicProxy* impl_; friend class ::dds::topic::Topic; diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index af6a6a836bd..15694b57db2 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -96,6 +96,7 @@ set(${PROJECT_NAME}_source_files fastdds/topic/ContentFilteredTopicImpl.cpp fastdds/topic/Topic.cpp fastdds/topic/TopicImpl.cpp + fastdds/topic/TopicProxyFactory.cpp fastdds/topic/TypeSupport.cpp fastdds/topic/qos/TopicQos.cpp fastdds/publisher/qos/DataWriterQos.cpp diff --git a/src/cpp/fastdds/domain/DomainParticipant.cpp b/src/cpp/fastdds/domain/DomainParticipant.cpp index c38ac8c6a13..12fdecf63b2 100644 --- a/src/cpp/fastdds/domain/DomainParticipant.cpp +++ b/src/cpp/fastdds/domain/DomainParticipant.cpp @@ -238,10 +238,7 @@ Topic* DomainParticipant::find_topic( const std::string& topic_name, const fastrtps::Duration_t& timeout) { - static_cast (topic_name); - static_cast (timeout); - logWarning(DOMAIN_PARTICIPANT, "find_topic method not implemented"); - return nullptr; + return impl_->find_topic(topic_name, timeout); } TopicDescription* DomainParticipant::lookup_topicdescription( diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp index a5155098770..f5ea6a59ac4 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.cpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.cpp @@ -58,6 +58,8 @@ #include #include #include +#include +#include #include #include @@ -327,7 +329,7 @@ ReturnCode_t DomainParticipantImpl::enable() for (auto topic : topics_) { - topic.second->user_topic_->enable(); + topic.second->enable_topic(); } } @@ -467,6 +469,53 @@ ReturnCode_t DomainParticipantImpl::delete_subscriber( return ReturnCode_t::RETCODE_ERROR; } +Topic* DomainParticipantImpl::find_topic( + const std::string& topic_name, + const fastrtps::Duration_t& timeout) +{ + auto find_fn = [this, &topic_name]() + { + return topics_.count(topic_name) > 0; + }; + + std::unique_lock lock(mtx_topics_); + if (timeout.is_infinite()) + { + cond_topics_.wait(lock, find_fn); + } + else + { + auto duration = std::chrono::seconds(timeout.seconds) + std::chrono::nanoseconds(timeout.nanosec); + if (!cond_topics_.wait_for(lock, duration, find_fn)) + { + return nullptr; + } + } + + Topic* ret_val = topics_[topic_name]->create_topic()->get_topic(); + + InstanceHandle_t topic_handle; + create_instance_handle(topic_handle); + ret_val->set_instance_handle(topic_handle); + topics_by_handle_[topic_handle] = ret_val; + + return ret_val; +} + +void DomainParticipantImpl::set_topic_listener( + const TopicProxyFactory* factory, + TopicImpl* impl, + TopicListener* listener, + const StatusMask& mask) +{ + std::lock_guard lock(mtx_topics_); + impl->set_listener(listener); + factory->for_each([mask](const std::unique_ptr& proxy) + { + proxy->get_topic()->status_mask_ = mask; + }); +} + ReturnCode_t DomainParticipantImpl::delete_topic( const Topic* topic) { @@ -475,30 +524,36 @@ ReturnCode_t DomainParticipantImpl::delete_topic( return ReturnCode_t::RETCODE_BAD_PARAMETER; } - if (participant_ != topic->get_participant()) - { - return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; - } - std::lock_guard lock(mtx_topics_); - auto it = topics_.find(topic->get_name()); - - if (it != topics_.end()) + auto handle_it = std::find_if(topics_by_handle_.begin(), topics_by_handle_.end(), + [topic](const decltype(topics_by_handle_)::value_type& item) + { + return item.second == topic; + }); + if (handle_it != topics_by_handle_.end()) { - assert(topic->get_instance_handle() == it->second->get_topic()->get_instance_handle() - && "The topic instance handle does not match the topic implementation instance handle"); - if (it->second->is_referenced()) + auto it = topics_.find(topic->get_name()); + assert(it != topics_.end() && "Topic found by handle but factory not found"); + + TopicProxy* proxy = dynamic_cast(topic->get_impl()); + assert(nullptr != proxy); + auto ret_code = it->second->delete_topic(proxy); + if (ReturnCode_t::RETCODE_OK == ret_code) { - return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; + InstanceHandle_t handle = topic->get_instance_handle(); + topics_by_handle_.erase(handle); + + if (it->second->can_be_deleted()) + { + auto factory = it->second; + topics_.erase(it); + delete factory; + } } - it->second->set_listener(nullptr); - topics_by_handle_.erase(topic->get_instance_handle()); - delete it->second; - topics_.erase(it); - return ReturnCode_t::RETCODE_OK; + return ret_code; } - return ReturnCode_t::RETCODE_ERROR; + return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; } ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic( @@ -537,7 +592,7 @@ ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic( return nullptr; } - TopicImpl* topic_impl = dynamic_cast(related_topic->get_impl()); + TopicProxy* topic_impl = dynamic_cast(related_topic->get_impl()); assert(nullptr != topic_impl); const TypeSupport& type = topic_impl->get_type(); LoanableSequence::size_type n_params; @@ -904,12 +959,11 @@ ReturnCode_t DomainParticipantImpl::delete_contained_entities() std::lock_guard lock_topics(mtx_topics_); filtered_topics_.clear(); + topics_by_handle_.clear(); auto it_topics = topics_.begin(); while (it_topics != topics_.end()) { - it_topics->second->set_listener(nullptr); - topics_by_handle_.erase(it_topics->second->get_topic()->get_instance_handle()); delete it_topics->second; it_topics = topics_.erase(it_topics); } @@ -1300,15 +1354,14 @@ Topic* DomainParticipantImpl::create_topic( InstanceHandle_t topic_handle; create_instance_handle(topic_handle); - //TODO CONSTRUIR LA IMPLEMENTACION DENTRO DEL OBJETO DEL USUARIO. - TopicImpl* topic_impl = new TopicImpl(this, type_support, qos, listener); - Topic* topic = new Topic(topic_name, type_name, topic_impl, mask); - topic_impl->user_topic_ = topic; + TopicProxyFactory* factory = new TopicProxyFactory(this, topic_name, mask, type_support, qos, listener); + TopicProxy* proxy = factory->create_topic(); + Topic* topic = proxy->get_topic(); topic->set_instance_handle(topic_handle); //SAVE THE TOPIC INTO MAPS topics_by_handle_[topic_handle] = topic; - topics_[topic_name] = topic_impl; + topics_[topic_name] = factory; // Enable topic if appropriate if (enabled && qos_.entity_factory().autoenable_created_entities) @@ -1318,6 +1371,8 @@ Topic* DomainParticipantImpl::create_topic( (void)ret_topic_enable; } + cond_topics_.notify_all(); + return topic; } @@ -1348,7 +1403,7 @@ TopicDescription* DomainParticipantImpl::lookup_topicdescription( auto it = topics_.find(topic_name); if (it != topics_.end()) { - return it->second->user_topic_; + return it->second->get_topic()->get_topic(); } auto filtered_it = filtered_topics_.find(topic_name); diff --git a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp index 883bb81e437..2f1cd373977 100644 --- a/src/cpp/fastdds/domain/DomainParticipantImpl.hpp +++ b/src/cpp/fastdds/domain/DomainParticipantImpl.hpp @@ -20,6 +20,10 @@ #ifndef _FASTDDS_PARTICIPANTIMPL_HPP_ #define _FASTDDS_PARTICIPANTIMPL_HPP_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC + +#include +#include + #include #include #include @@ -37,6 +41,7 @@ #include #include "fastdds/topic/DDSSQLFilter/DDSFilterFactory.hpp" +#include using eprosima::fastrtps::types::ReturnCode_t; @@ -213,6 +218,37 @@ class DomainParticipantImpl TopicListener* listener = nullptr, const StatusMask& mask = StatusMask::all()); + /** + * Gives access to an existing (or ready to exist) enabled Topic. + * It should be noted that the returned Topic is a local object that acts as a proxy to designate the global + * concept of topic. + * Topics obtained by means of find_topic, must also be deleted by means of delete_topic so that the local + * resources can be released. + * If a Topic is obtained multiple times by means of find_topic or create_topic, it must also be deleted that same + * number of times using delete_topic. + * + * @param topic_name Topic name + * @param timeout Maximum time to wait for the Topic + * @return Pointer to the existing Topic, nullptr in case of error or timeout + */ + Topic* find_topic( + const std::string& topic_name, + const fastrtps::Duration_t& timeout); + + /** + * Implementation of Topic::set_listener that propagates the listener and mask to all the TopicProxy + * objects held by the same TopicProxy factory in a thread-safe way. + * + * @param factory TopicProxyFactory managing the topic on which the listener should be changed. + * @param listener Listener to assign to all the TopicProxy objects owned by the factory. + * @param mask StatusMask to assign to all the TopicProxy objects owned by the factory. + */ + void set_topic_listener( + const TopicProxyFactory* factory, + TopicImpl* topic, + TopicListener* listener, + const StatusMask& mask); + ReturnCode_t delete_topic( const Topic* topic); @@ -480,12 +516,13 @@ class DomainParticipantImpl mutable std::mutex mtx_types_; //!Topic map - std::map topics_; + std::map topics_; std::map topics_by_handle_; std::map> filtered_topics_; std::map filter_factories_; DDSSQLFilter::DDSFilterFactory dds_sql_filter_factory_; mutable std::mutex mtx_topics_; + std::condition_variable cond_topics_; TopicQos default_topic_qos_; diff --git a/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp b/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp index 2cac63dd551..a1c6684f42b 100644 --- a/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp +++ b/src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp @@ -36,7 +36,7 @@ #include #include #include -#include +#include #include #include @@ -202,7 +202,7 @@ class ReaderFilterCollection DomainParticipantImpl* participant, Topic* topic) { - TopicImpl* writer_topic = static_cast(topic->get_impl()); + TopicProxy* writer_topic = static_cast(topic->get_impl()); if (0 == filter_info.filter_class_name.size() || 0 != writer_topic->get_rtps_topic_name().compare(filter_info.related_topic_name.c_str())) diff --git a/src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp b/src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp index 63c882554d9..d032429d9e0 100644 --- a/src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp +++ b/src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp @@ -27,6 +27,7 @@ #include #include #include +#include namespace eprosima { namespace fastdds { @@ -55,7 +56,7 @@ ReturnCode_t ContentFilteredTopicImpl::set_expression_parameters( const char* new_expression, const std::vector& new_expression_parameters) { - TopicImpl* topic_impl = dynamic_cast(related_topic->get_impl()); + TopicProxy* topic_impl = dynamic_cast(related_topic->get_impl()); assert(nullptr != topic_impl); const TypeSupport& type = topic_impl->get_type(); diff --git a/src/cpp/fastdds/topic/ContentFilteredTopicImpl.hpp b/src/cpp/fastdds/topic/ContentFilteredTopicImpl.hpp index c7cdcda14b7..f76a469312a 100644 --- a/src/cpp/fastdds/topic/ContentFilteredTopicImpl.hpp +++ b/src/cpp/fastdds/topic/ContentFilteredTopicImpl.hpp @@ -32,7 +32,6 @@ #include #include -#include namespace eprosima { namespace fastdds { diff --git a/src/cpp/fastdds/topic/Topic.cpp b/src/cpp/fastdds/topic/Topic.cpp index 8b21ded3d04..ed8975170b5 100644 --- a/src/cpp/fastdds/topic/Topic.cpp +++ b/src/cpp/fastdds/topic/Topic.cpp @@ -19,7 +19,7 @@ #include #include -#include +#include #include @@ -30,7 +30,7 @@ namespace dds { Topic::Topic( const std::string& topic_name, const std::string& type_name, - TopicImpl* p, + TopicProxy* p, const StatusMask& mask) : DomainEntity(mask) , TopicDescription(topic_name, type_name) @@ -82,14 +82,8 @@ ReturnCode_t Topic::set_listener( TopicListener* listener, const StatusMask& mask) { - TopicListener* value = mask.is_active(mask.inconsistent_topic()) ? listener : nullptr; - ReturnCode_t ret_val = impl_->set_listener(value); - if (ret_val == ReturnCode_t::RETCODE_OK) - { - status_mask_ = mask; - } - - return ret_val; + impl_->set_listener(listener, mask); + return ReturnCode_t::RETCODE_OK; } DomainParticipant* Topic::get_participant() const diff --git a/src/cpp/fastdds/topic/TopicImpl.cpp b/src/cpp/fastdds/topic/TopicImpl.cpp index 7ccf492ae5d..2b8c8a66cf7 100644 --- a/src/cpp/fastdds/topic/TopicImpl.cpp +++ b/src/cpp/fastdds/topic/TopicImpl.cpp @@ -33,21 +33,21 @@ namespace fastdds { namespace dds { TopicImpl::TopicImpl( + TopicProxyFactory* factory, DomainParticipantImpl* p, TypeSupport type_support, const TopicQos& qos, TopicListener* listen) - : participant_(p) + : factory_(factory) + , participant_(p) , type_support_(type_support) , qos_(&qos == &TOPIC_QOS_DEFAULT ? participant_->get_default_topic_qos() : qos) , listener_(listen) - , user_topic_(nullptr) { } TopicImpl::~TopicImpl() { - delete user_topic_; } ReturnCode_t TopicImpl::check_qos( @@ -143,21 +143,22 @@ const TopicListener* TopicImpl::get_listener() const return listener_; } -ReturnCode_t TopicImpl::set_listener( +void TopicImpl::set_listener( TopicListener* listener) { listener_ = listener; - return ReturnCode_t::RETCODE_OK; } -DomainParticipant* TopicImpl::get_participant() const +void TopicImpl::set_listener( + TopicListener* listener, + const StatusMask& mask) { - return participant_->get_participant(); + participant_->set_topic_listener(factory_, this, listener, mask); } -const Topic* TopicImpl::get_topic() const +DomainParticipant* TopicImpl::get_participant() const { - return user_topic_; + return participant_->get_participant(); } const TypeSupport& TopicImpl::get_type() const @@ -166,10 +167,11 @@ const TypeSupport& TopicImpl::get_type() const } TopicListener* TopicImpl::get_listener_for( - const StatusMask& status) + const StatusMask& status, + const Topic* topic) { if (listener_ != nullptr && - user_topic_->get_status_mask().is_active(status)) + topic->get_status_mask().is_active(status)) { return listener_; } diff --git a/src/cpp/fastdds/topic/TopicImpl.hpp b/src/cpp/fastdds/topic/TopicImpl.hpp index 628da5f6afa..cd1de38fc79 100644 --- a/src/cpp/fastdds/topic/TopicImpl.hpp +++ b/src/cpp/fastdds/topic/TopicImpl.hpp @@ -21,14 +21,12 @@ #define _FASTDDS_TOPICIMPL_HPP_ #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -// #include -#include +#include +#include #include -#include +#include #include -#include - using eprosima::fastrtps::types::ReturnCode_t; namespace eprosima { @@ -39,19 +37,19 @@ class DomainParticipantImpl; class DomainParticipant; class TopicListener; class Topic; +class TopicProxyFactory; -class TopicImpl : public TopicDescriptionImpl +class TopicImpl { - friend class DomainParticipantImpl; +public: TopicImpl( + TopicProxyFactory* factory, DomainParticipantImpl* p, TypeSupport type_support, const TopicQos& qos, TopicListener* listen); -public: - static ReturnCode_t check_qos( const TopicQos& qos); @@ -73,34 +71,32 @@ class TopicImpl : public TopicDescriptionImpl const TopicListener* get_listener() const; - ReturnCode_t set_listener( + void set_listener( TopicListener* listener); - DomainParticipant* get_participant() const; + void set_listener( + TopicListener* listener, + const StatusMask& status); - const Topic* get_topic() const; + DomainParticipant* get_participant() const; const TypeSupport& get_type() const; - const std::string& get_rtps_topic_name() const override - { - return user_topic_->get_name(); - } - /** * Returns the most appropriate listener to handle the callback for the given status, * or nullptr if there is no appropriate listener. */ TopicListener* get_listener_for( - const StatusMask& status); + const StatusMask& status, + const Topic* topic); protected: + TopicProxyFactory* factory_; DomainParticipantImpl* participant_; TypeSupport type_support_; TopicQos qos_; TopicListener* listener_; - Topic* user_topic_; }; diff --git a/src/cpp/fastdds/topic/TopicProxy.hpp b/src/cpp/fastdds/topic/TopicProxy.hpp new file mode 100644 index 00000000000..1c455402fe9 --- /dev/null +++ b/src/cpp/fastdds/topic/TopicProxy.hpp @@ -0,0 +1,117 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + * TopicProxy.hpp + */ + +#ifndef _FASTDDS_TOPICPROXY_HPP_ +#define _FASTDDS_TOPICPROXY_HPP_ + +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include + +using eprosima::fastrtps::types::ReturnCode_t; + +namespace eprosima { +namespace fastdds { +namespace dds { + +class DomainParticipant; + +class TopicProxy : public TopicDescriptionImpl +{ +public: + + TopicProxy( + const std::string& topic_name, + const std::string& type_name, + const StatusMask& mask, + TopicImpl* impl) noexcept + : impl_(impl) + , user_topic_(new Topic(topic_name, type_name, this, mask)) + { + } + + const TopicQos& get_qos() const + { + return impl_->get_qos(); + } + + ReturnCode_t set_qos( + const TopicQos& qos) + { + return impl_->set_qos(qos); + } + + const TopicListener* get_listener() const + { + return impl_->get_listener(); + } + + void set_listener( + TopicListener* listener, + const StatusMask& mask) + { + impl_->set_listener(listener, mask); + } + + DomainParticipant* get_participant() const + { + return impl_->get_participant(); + } + + const TypeSupport& get_type() const + { + return impl_->get_type(); + } + + TopicListener* get_listener_for( + const StatusMask& status) + { + return impl_->get_listener_for(status, user_topic_.get()); + } + + Topic* get_topic() const + { + return user_topic_.get(); + } + + const std::string& get_rtps_topic_name() const override + { + return user_topic_->get_name(); + } + +private: + + TopicImpl* impl_ = nullptr; + std::unique_ptr user_topic_; +}; + +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif /* _FASTDDS_TOPICPROXY_HPP_ */ diff --git a/src/cpp/fastdds/topic/TopicProxyFactory.cpp b/src/cpp/fastdds/topic/TopicProxyFactory.cpp new file mode 100644 index 00000000000..dd1fc2853bc --- /dev/null +++ b/src/cpp/fastdds/topic/TopicProxyFactory.cpp @@ -0,0 +1,72 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + * TopicProxyFactory.cpp + */ + +#include + +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace dds { + +TopicProxy* TopicProxyFactory::create_topic() +{ + TopicProxy* ret_val = new TopicProxy(topic_name_, topic_impl_.get_type()->getName(), status_mask_, &topic_impl_); + proxies_.emplace_back(ret_val); + return ret_val; +} + +ReturnCode_t TopicProxyFactory::delete_topic( + TopicProxy* proxy) +{ + auto it = std::find_if(proxies_.begin(), proxies_.end(), [proxy](const std::unique_ptr& item) + { + return item.get() == proxy; + }); + if (it != proxies_.end() && !proxy->is_referenced()) + { + proxies_.erase(it); + return ReturnCode_t::RETCODE_OK; + } + + return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; +} + +TopicProxy* TopicProxyFactory::get_topic() +{ + return proxies_.empty() ? nullptr : proxies_.front().get(); +} + +bool TopicProxyFactory::can_be_deleted() +{ + return proxies_.empty(); +} + +void TopicProxyFactory::enable_topic() +{ + for (auto& item : proxies_) + { + item->get_topic()->enable(); + } +} + +} // namespace dds +} // namespace fastdds +} // namespace eprosima diff --git a/src/cpp/fastdds/topic/TopicProxyFactory.hpp b/src/cpp/fastdds/topic/TopicProxyFactory.hpp new file mode 100644 index 00000000000..afd646be59d --- /dev/null +++ b/src/cpp/fastdds/topic/TopicProxyFactory.hpp @@ -0,0 +1,140 @@ +// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/* + * TopicProxyFactory.hpp + */ + +#ifndef _FASTDDS_TOPICPROXYFACTORY_HPP_ +#define _FASTDDS_TOPICPROXYFACTORY_HPP_ + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include + +using eprosima::fastrtps::types::ReturnCode_t; + +namespace eprosima { +namespace fastdds { +namespace dds { + +class DomainParticipantImpl; + +/** + * A factory of TopicProxy objects for a specific topic. + */ +class TopicProxyFactory +{ +public: + + /** + * Construct a TopicProxyFactory. + * + * @param participant Pointer to the DomainParticipantImpl creating this object. + * @param topic_name Name of the topic managed by this factory. + * @param status_mask Initial StatusMask of the topic managed by this factory. + * @param type_support TypeSupport to use for the topics created by this factory. + * @param qos TopicQos to use on the creation of the implementation object. + * @param listener TopicListener to use on the creation of the implementation object. + */ + TopicProxyFactory( + DomainParticipantImpl* participant, + const std::string& topic_name, + const StatusMask& status_mask, + TypeSupport type_support, + const TopicQos& qos, + TopicListener* listener) + : topic_name_(topic_name) + , status_mask_(status_mask) + , topic_impl_(this, participant, type_support, qos, listener) + { + } + + /** + * Create a new proxy object for the topic managed by the factory. + * + * @return Pointer to the created TopicProxy + */ + TopicProxy* create_topic(); + + /** + * Delete a proxy object for the topic managed by the factory. + * + * @param proxy Pointer to the TopicProxy object to be deleted. + * + * @return PRECONDITION_NOT_MET if the @c proxy was not created by this factory, or has already being deleted. + * @return PRECONDITION_NOT_MET if the @c proxy is still referenced. + * @return OK if the @c proxy is correctly deleted. + */ + ReturnCode_t delete_topic( + TopicProxy* proxy); + + /** + * Get one of the TopicProxy objects created by the factory. + * + * @return nullptr if the factory owns no proxy objects. + * @return Pointer to one of the proxies owned by the factory. + */ + TopicProxy* get_topic(); + + /** + * Return whether this factory can be deleted. + * Will disallow deletion if it still owns some proxy objects. + * + * @return true if the factory owns no proxy objects + */ + bool can_be_deleted(); + + /** + * Enable the topic managed by the factory. + */ + void enable_topic(); + + /** + * Apply the given function to all the TopicProxy objects owned by the factory. + */ + template + void for_each( + UnaryFunction f) const + { + std::for_each(proxies_.begin(), proxies_.end(), f); + } + +private: + + //! Name of the topic managed by the factory. + std::string topic_name_; + //! StatusMask of the topic managed by the factory. + StatusMask status_mask_; + //! Implementation object for the topic managed by the factory. + TopicImpl topic_impl_; + //! List of TopicProxy objects created by this factory. + std::list> proxies_; +}; + +} // namespace dds +} // namespace fastdds +} // namespace eprosima + +#endif /* _FASTDDS_TOPICPROXYFACTORY_HPP_ */ diff --git a/test/blackbox/common/DDSBlackboxTestsFindTopic.cpp b/test/blackbox/common/DDSBlackboxTestsFindTopic.cpp index aaa77def582..eb207896720 100644 --- a/test/blackbox/common/DDSBlackboxTestsFindTopic.cpp +++ b/test/blackbox/common/DDSBlackboxTestsFindTopic.cpp @@ -51,6 +51,13 @@ class DDSFindTopicTest : public testing::Test */ struct TestType : public TopicDataType { + TestType() + : TopicDataType() + { + m_isGetKeyDefined = false; + m_typeSize = 16; + } + bool serialize( void*, fastrtps::rtps::SerializedPayload_t*) override diff --git a/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp b/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp index f72eb85a96e..c3d2ca8b5b7 100644 --- a/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp +++ b/test/mock/dds/DomainParticipantImpl/fastdds/domain/DomainParticipantImpl.hpp @@ -44,6 +44,7 @@ #include #include #include +#include using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; @@ -263,10 +264,10 @@ class DomainParticipantImpl { return nullptr; } - TopicImpl* topic_impl = new TopicImpl(this, type_support, qos, listener); - Topic* topic = new Topic(topic_name, type_name, topic_impl, mask); - topic_impl->user_topic_ = topic; - topics_[topic_name] = topic_impl; + TopicImpl* topic_impl = new TopicImpl(nullptr, this, type_support, qos, listener); + TopicProxy* proxy = new TopicProxy(topic_name, type_name, mask, topic_impl); + Topic* topic = proxy->get_topic(); + topics_[topic_name] = proxy; topic->enable(); return topic; } @@ -281,6 +282,21 @@ class DomainParticipantImpl return create_topic(topic_name, type_name, TOPIC_QOS_DEFAULT, listener, mask); } + Topic* find_topic( + const std::string& /*topic_name*/, + const fastrtps::Duration_t& /*timeout*/) + { + return nullptr; + } + + void set_topic_listener( + const TopicProxyFactory* /*factory*/, + TopicImpl* /*impl*/, + TopicListener* /*listener*/, + const StatusMask& /*mask*/) + { + } + ReturnCode_t delete_topic( const Topic* topic) { @@ -340,7 +356,7 @@ class DomainParticipantImpl auto it = topics_.find(topic_name); if (it != topics_.end()) { - return it->second->user_topic_; + return it->second->get_topic(); } return nullptr; } @@ -643,7 +659,6 @@ class DomainParticipantImpl while (it_topics != topics_.end()) { - it_topics->second->set_listener(nullptr); delete it_topics->second; it_topics = topics_.erase(it_topics); } @@ -676,7 +691,7 @@ class DomainParticipantImpl std::map subscribers_; mutable std::mutex mtx_subs_; SubscriberQos default_sub_qos_; - std::map topics_; + std::map topics_; mutable std::mutex mtx_topics_; std::map types_; mutable std::mutex mtx_types_; diff --git a/test/unittest/dds/publisher/CMakeLists.txt b/test/unittest/dds/publisher/CMakeLists.txt index 599f3314622..b810ad82c84 100644 --- a/test/unittest/dds/publisher/CMakeLists.txt +++ b/test/unittest/dds/publisher/CMakeLists.txt @@ -90,6 +90,7 @@ set(DATAWRITERTESTS_SOURCE DataWriterTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/Topic.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicProxyFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TypeSupport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/qos/TopicQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastrtps_deprecated/subscriber/SubscriberHistory.cpp diff --git a/test/unittest/dds/status/CMakeLists.txt b/test/unittest/dds/status/CMakeLists.txt index 994b0b40385..2cb872f7b78 100644 --- a/test/unittest/dds/status/CMakeLists.txt +++ b/test/unittest/dds/status/CMakeLists.txt @@ -50,6 +50,7 @@ set(LISTENERTESTS_SOURCE ListenerTests.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/Topic.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/qos/TopicQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicProxyFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TypeSupport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/core/policy/ParameterList.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/log/Log.cpp diff --git a/test/unittest/statistics/dds/CMakeLists.txt b/test/unittest/statistics/dds/CMakeLists.txt index 14c63f667b0..da51538e08c 100644 --- a/test/unittest/statistics/dds/CMakeLists.txt +++ b/test/unittest/statistics/dds/CMakeLists.txt @@ -140,6 +140,7 @@ if (SQLITE3_SUPPORT AND FASTDDS_STATISTICS) ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/Topic.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicImpl.cpp + ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TopicProxyFactory.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/TypeSupport.cpp ${PROJECT_SOURCE_DIR}/src/cpp/fastdds/topic/qos/TopicQos.cpp ${PROJECT_SOURCE_DIR}/src/cpp/rtps/attributes/PropertyPolicy.cpp