Skip to content

Commit

Permalink
Add implementation of DomainParticipant::find_topic (#2716)
Browse files Browse the repository at this point in the history
* Refs #14615. Added basic TopicProxy class.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Topic holds TopicProxy, which is created by DomainParticipantImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicProxy constructs and owns Topic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicImpl is not TopicDescriptionImpl and does not hold user topic pointer.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicImpl constructor is public.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicImpl header cleanup.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Added TopicProxyFactory class with basic interface.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicProxyFactory empty implementation.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicProxyFactory holds TopicImpl by composition.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicProxyFactory holds list of created proxy objects.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Implementation of TopicProxyFactory::can_be_deleted.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Implementation of TopicProxyFactory::delete_topic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Implementation of TopicProxyFactory::create_topic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Adding enable_topic and get_topic to TopicProxyFactory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. DomainParticipantImpl uses TopicProxyFactory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Move find_topic to DomainParticipantImpl.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Notify a condition when a topic is created.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Implementation of DomainParticipantImpl::find_topic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Fixed segfault on test.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Fixed return value on delete_topic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Avoid using input topic on delete_topic.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Always use input listener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Internal set_listener methods return void.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicProxyFactory::for_each.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicImpl holds pointer to TopicProxyFactory.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. DomainParticipantImpl::set_topic_listener.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. TopicImpl::set_listener with mask.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Topic::set_listener propagates mask.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Correctly handling infinite timeout.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Fixed non-c++11 code.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Improve doxydoc.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs #14615. Assertion on dynamic_cast result.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany authored Jun 21, 2022
1 parent 8ba2e66 commit 4e44ea9
Show file tree
Hide file tree
Showing 21 changed files with 533 additions and 93 deletions.
5 changes: 2 additions & 3 deletions include/dds/topic/Topic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,17 @@ class TopicListener;

class Topic : public dds::core::TEntity<detail::Topic>
{
friend class TopicImpl;
friend class DomainParticipantImpl;

public:

OMG_DDS_REF_TYPE_PROTECTED_DC(
Topic,
Topic,
dds::core::TEntity,
detail::Topic)

OMG_DDS_IMPLICIT_REF_BASE(
Topic)
Topic)

/**
* Create a new Topic.
Expand Down
9 changes: 7 additions & 2 deletions include/fastdds/dds/domain/DomainParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,11 +354,16 @@ class DomainParticipant : public Entity

/**
* Gives access to an existing (or ready to exist) enabled Topic.
* Topics obtained by this method must be destroyed by delete_topic.
* It should be noted that the returned Topic is a local object that acts as a proxy to designate the global
* concept of topic.
* Topics obtained by means of find_topic, must also be deleted by means of delete_topic so that the local
* resources can be released.
* If a Topic is obtained multiple times by means of find_topic or create_topic, it must also be deleted that same
* number of times using delete_topic.
*
* @param topic_name Topic name
* @param timeout Maximum time to wait for the Topic
* @return Pointer to the existing Topic, nullptr in error case
* @return Pointer to the existing Topic, nullptr in case of error or timeout
*/
RTPS_DllAPI Topic* find_topic(
const std::string& topic_name,
Expand Down
8 changes: 4 additions & 4 deletions include/fastdds/dds/topic/Topic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace dds {
class DomainParticipant;
class TopicListener;
class DomainParticipantImpl;
class TopicImpl;
class TopicProxy;

/**
* Class Topic, represents the fact that both publications
Expand All @@ -52,7 +52,7 @@ class TopicImpl;
*/
class Topic : public DomainEntity, public TopicDescription
{
friend class TopicImpl;
friend class TopicProxy;
friend class DomainParticipantImpl;

/**
Expand All @@ -62,7 +62,7 @@ class Topic : public DomainEntity, public TopicDescription
Topic(
const std::string& topic_name,
const std::string& type_name,
TopicImpl* p,
TopicProxy* p,
const StatusMask& mask = StatusMask::all());

Topic(
Expand Down Expand Up @@ -143,7 +143,7 @@ class Topic : public DomainEntity, public TopicDescription

protected:

TopicImpl* impl_;
TopicProxy* impl_;

friend class ::dds::topic::Topic;

Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ set(${PROJECT_NAME}_source_files
fastdds/topic/ContentFilteredTopicImpl.cpp
fastdds/topic/Topic.cpp
fastdds/topic/TopicImpl.cpp
fastdds/topic/TopicProxyFactory.cpp
fastdds/topic/TypeSupport.cpp
fastdds/topic/qos/TopicQos.cpp
fastdds/publisher/qos/DataWriterQos.cpp
Expand Down
5 changes: 1 addition & 4 deletions src/cpp/fastdds/domain/DomainParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,7 @@ Topic* DomainParticipant::find_topic(
const std::string& topic_name,
const fastrtps::Duration_t& timeout)
{
static_cast<void> (topic_name);
static_cast<void> (timeout);
logWarning(DOMAIN_PARTICIPANT, "find_topic method not implemented");
return nullptr;
return impl_->find_topic(topic_name, timeout);
}

TopicDescription* DomainParticipant::lookup_topicdescription(
Expand Down
111 changes: 83 additions & 28 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
#include <fastdds/subscriber/SubscriberImpl.hpp>
#include <fastdds/topic/ContentFilteredTopicImpl.hpp>
#include <fastdds/topic/TopicImpl.hpp>
#include <fastdds/topic/TopicProxy.hpp>
#include <fastdds/topic/TopicProxyFactory.hpp>
#include <rtps/RTPSDomainImpl.hpp>
#include <utils/SystemInfo.hpp>

Expand Down Expand Up @@ -327,7 +329,7 @@ ReturnCode_t DomainParticipantImpl::enable()

for (auto topic : topics_)
{
topic.second->user_topic_->enable();
topic.second->enable_topic();
}
}

Expand Down Expand Up @@ -467,6 +469,53 @@ ReturnCode_t DomainParticipantImpl::delete_subscriber(
return ReturnCode_t::RETCODE_ERROR;
}

Topic* DomainParticipantImpl::find_topic(
const std::string& topic_name,
const fastrtps::Duration_t& timeout)
{
auto find_fn = [this, &topic_name]()
{
return topics_.count(topic_name) > 0;
};

std::unique_lock<std::mutex> lock(mtx_topics_);
if (timeout.is_infinite())
{
cond_topics_.wait(lock, find_fn);
}
else
{
auto duration = std::chrono::seconds(timeout.seconds) + std::chrono::nanoseconds(timeout.nanosec);
if (!cond_topics_.wait_for(lock, duration, find_fn))
{
return nullptr;
}
}

Topic* ret_val = topics_[topic_name]->create_topic()->get_topic();

InstanceHandle_t topic_handle;
create_instance_handle(topic_handle);
ret_val->set_instance_handle(topic_handle);
topics_by_handle_[topic_handle] = ret_val;

return ret_val;
}

void DomainParticipantImpl::set_topic_listener(
const TopicProxyFactory* factory,
TopicImpl* impl,
TopicListener* listener,
const StatusMask& mask)
{
std::lock_guard<std::mutex> lock(mtx_topics_);
impl->set_listener(listener);
factory->for_each([mask](const std::unique_ptr<TopicProxy>& proxy)
{
proxy->get_topic()->status_mask_ = mask;
});
}

ReturnCode_t DomainParticipantImpl::delete_topic(
const Topic* topic)
{
Expand All @@ -475,30 +524,36 @@ ReturnCode_t DomainParticipantImpl::delete_topic(
return ReturnCode_t::RETCODE_BAD_PARAMETER;
}

if (participant_ != topic->get_participant())
{
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
}

std::lock_guard<std::mutex> lock(mtx_topics_);
auto it = topics_.find(topic->get_name());

if (it != topics_.end())
auto handle_it = std::find_if(topics_by_handle_.begin(), topics_by_handle_.end(),
[topic](const decltype(topics_by_handle_)::value_type& item)
{
return item.second == topic;
});
if (handle_it != topics_by_handle_.end())
{
assert(topic->get_instance_handle() == it->second->get_topic()->get_instance_handle()
&& "The topic instance handle does not match the topic implementation instance handle");
if (it->second->is_referenced())
auto it = topics_.find(topic->get_name());
assert(it != topics_.end() && "Topic found by handle but factory not found");

TopicProxy* proxy = dynamic_cast<TopicProxy*>(topic->get_impl());
assert(nullptr != proxy);
auto ret_code = it->second->delete_topic(proxy);
if (ReturnCode_t::RETCODE_OK == ret_code)
{
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
InstanceHandle_t handle = topic->get_instance_handle();
topics_by_handle_.erase(handle);

if (it->second->can_be_deleted())
{
auto factory = it->second;
topics_.erase(it);
delete factory;
}
}
it->second->set_listener(nullptr);
topics_by_handle_.erase(topic->get_instance_handle());
delete it->second;
topics_.erase(it);
return ReturnCode_t::RETCODE_OK;
return ret_code;
}

return ReturnCode_t::RETCODE_ERROR;
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
}

ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic(
Expand Down Expand Up @@ -537,7 +592,7 @@ ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic(
return nullptr;
}

TopicImpl* topic_impl = dynamic_cast<TopicImpl*>(related_topic->get_impl());
TopicProxy* topic_impl = dynamic_cast<TopicProxy*>(related_topic->get_impl());
assert(nullptr != topic_impl);
const TypeSupport& type = topic_impl->get_type();
LoanableSequence<const char*>::size_type n_params;
Expand Down Expand Up @@ -904,12 +959,11 @@ ReturnCode_t DomainParticipantImpl::delete_contained_entities()
std::lock_guard<std::mutex> lock_topics(mtx_topics_);

filtered_topics_.clear();
topics_by_handle_.clear();

auto it_topics = topics_.begin();
while (it_topics != topics_.end())
{
it_topics->second->set_listener(nullptr);
topics_by_handle_.erase(it_topics->second->get_topic()->get_instance_handle());
delete it_topics->second;
it_topics = topics_.erase(it_topics);
}
Expand Down Expand Up @@ -1300,15 +1354,14 @@ Topic* DomainParticipantImpl::create_topic(
InstanceHandle_t topic_handle;
create_instance_handle(topic_handle);

//TODO CONSTRUIR LA IMPLEMENTACION DENTRO DEL OBJETO DEL USUARIO.
TopicImpl* topic_impl = new TopicImpl(this, type_support, qos, listener);
Topic* topic = new Topic(topic_name, type_name, topic_impl, mask);
topic_impl->user_topic_ = topic;
TopicProxyFactory* factory = new TopicProxyFactory(this, topic_name, mask, type_support, qos, listener);
TopicProxy* proxy = factory->create_topic();
Topic* topic = proxy->get_topic();
topic->set_instance_handle(topic_handle);

//SAVE THE TOPIC INTO MAPS
topics_by_handle_[topic_handle] = topic;
topics_[topic_name] = topic_impl;
topics_[topic_name] = factory;

// Enable topic if appropriate
if (enabled && qos_.entity_factory().autoenable_created_entities)
Expand All @@ -1318,6 +1371,8 @@ Topic* DomainParticipantImpl::create_topic(
(void)ret_topic_enable;
}

cond_topics_.notify_all();

return topic;
}

Expand Down Expand Up @@ -1348,7 +1403,7 @@ TopicDescription* DomainParticipantImpl::lookup_topicdescription(
auto it = topics_.find(topic_name);
if (it != topics_.end())
{
return it->second->user_topic_;
return it->second->get_topic()->get_topic();
}

auto filtered_it = filtered_topics_.find(topic_name);
Expand Down
39 changes: 38 additions & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
#ifndef _FASTDDS_PARTICIPANTIMPL_HPP_
#define _FASTDDS_PARTICIPANTIMPL_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <mutex>
#include <condition_variable>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/participant/RTPSParticipantListener.h>
#include <fastdds/rtps/reader/StatefulReader.h>
Expand All @@ -37,6 +41,7 @@
#include <fastrtps/types/TypesBase.h>

#include "fastdds/topic/DDSSQLFilter/DDSFilterFactory.hpp"
#include <fastdds/topic/TopicProxyFactory.hpp>

using eprosima::fastrtps::types::ReturnCode_t;

Expand Down Expand Up @@ -213,6 +218,37 @@ class DomainParticipantImpl
TopicListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* Gives access to an existing (or ready to exist) enabled Topic.
* It should be noted that the returned Topic is a local object that acts as a proxy to designate the global
* concept of topic.
* Topics obtained by means of find_topic, must also be deleted by means of delete_topic so that the local
* resources can be released.
* If a Topic is obtained multiple times by means of find_topic or create_topic, it must also be deleted that same
* number of times using delete_topic.
*
* @param topic_name Topic name
* @param timeout Maximum time to wait for the Topic
* @return Pointer to the existing Topic, nullptr in case of error or timeout
*/
Topic* find_topic(
const std::string& topic_name,
const fastrtps::Duration_t& timeout);

/**
* Implementation of Topic::set_listener that propagates the listener and mask to all the TopicProxy
* objects held by the same TopicProxy factory in a thread-safe way.
*
* @param factory TopicProxyFactory managing the topic on which the listener should be changed.
* @param listener Listener to assign to all the TopicProxy objects owned by the factory.
* @param mask StatusMask to assign to all the TopicProxy objects owned by the factory.
*/
void set_topic_listener(
const TopicProxyFactory* factory,
TopicImpl* topic,
TopicListener* listener,
const StatusMask& mask);

ReturnCode_t delete_topic(
const Topic* topic);

Expand Down Expand Up @@ -480,12 +516,13 @@ class DomainParticipantImpl
mutable std::mutex mtx_types_;

//!Topic map
std::map<std::string, TopicImpl*> topics_;
std::map<std::string, TopicProxyFactory*> topics_;
std::map<InstanceHandle_t, Topic*> topics_by_handle_;
std::map<std::string, std::unique_ptr<ContentFilteredTopic>> filtered_topics_;
std::map<std::string, IContentFilterFactory*> filter_factories_;
DDSSQLFilter::DDSFilterFactory dds_sql_filter_factory_;
mutable std::mutex mtx_topics_;
std::condition_variable cond_topics_;

TopicQos default_topic_qos_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#include <fastdds/domain/DomainParticipantImpl.hpp>
#include <fastdds/publisher/filtering/DataWriterFilteredChange.hpp>
#include <fastdds/publisher/filtering/ReaderFilterInformation.hpp>
#include <fastdds/topic/TopicImpl.hpp>
#include <fastdds/topic/TopicProxy.hpp>
#include <fastdds/topic/ContentFilterInfo.hpp>
#include <fastdds/topic/ContentFilterUtils.hpp>

Expand Down Expand Up @@ -202,7 +202,7 @@ class ReaderFilterCollection
DomainParticipantImpl* participant,
Topic* topic)
{
TopicImpl* writer_topic = static_cast<TopicImpl*>(topic->get_impl());
TopicProxy* writer_topic = static_cast<TopicProxy*>(topic->get_impl());

if (0 == filter_info.filter_class_name.size() ||
0 != writer_topic->get_rtps_topic_name().compare(filter_info.related_topic_name.c_str()))
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fastdds/rtps/messages/CDRMessage.h>
#include <fastdds/subscriber/DataReaderImpl.hpp>
#include <fastdds/topic/ContentFilterUtils.hpp>
#include <fastdds/topic/TopicProxy.hpp>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -55,7 +56,7 @@ ReturnCode_t ContentFilteredTopicImpl::set_expression_parameters(
const char* new_expression,
const std::vector<std::string>& new_expression_parameters)
{
TopicImpl* topic_impl = dynamic_cast<TopicImpl*>(related_topic->get_impl());
TopicProxy* topic_impl = dynamic_cast<TopicProxy*>(related_topic->get_impl());
assert(nullptr != topic_impl);
const TypeSupport& type = topic_impl->get_type();

Expand Down
1 change: 0 additions & 1 deletion src/cpp/fastdds/topic/ContentFilteredTopicImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

#include <fastdds/subscriber/DataReaderImpl.hpp>
#include <fastdds/topic/TopicDescriptionImpl.hpp>
#include <fastdds/topic/TopicImpl.hpp>

namespace eprosima {
namespace fastdds {
Expand Down
Loading

0 comments on commit 4e44ea9

Please sign in to comment.