Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[14615] Add implementation of DomainParticipant::find_topic #2716

Merged
merged 33 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
0998819
Refs #14615. Added basic TopicProxy class.
MiguelCompany May 27, 2022
322e302
Refs #14615. Topic holds TopicProxy, which is created by DomainPartic…
MiguelCompany May 27, 2022
c976bc0
Refs #14615. TopicProxy constructs and owns Topic.
MiguelCompany May 31, 2022
637807f
Refs #14615. TopicImpl is not TopicDescriptionImpl and does not hold …
MiguelCompany May 31, 2022
8ad653a
Refs #14615. TopicImpl constructor is public.
MiguelCompany May 31, 2022
9eb8c55
Refs #14615. TopicImpl header cleanup.
MiguelCompany May 31, 2022
b0149c7
Refs #14615. Added TopicProxyFactory class with basic interface.
MiguelCompany May 27, 2022
d6dde52
Refs #14615. TopicProxyFactory empty implementation.
MiguelCompany May 31, 2022
f264f66
Refs #14615. TopicProxyFactory holds TopicImpl by composition.
MiguelCompany May 31, 2022
8f4654a
Refs #14615. TopicProxyFactory holds list of created proxy objects.
MiguelCompany May 31, 2022
520220f
Refs #14615. Implementation of TopicProxyFactory::can_be_deleted.
MiguelCompany May 31, 2022
b414a93
Refs #14615. Implementation of TopicProxyFactory::delete_topic.
MiguelCompany May 31, 2022
652ce67
Refs #14615. Implementation of TopicProxyFactory::create_topic.
MiguelCompany May 31, 2022
3655203
Refs #14615. Adding enable_topic and get_topic to TopicProxyFactory.
MiguelCompany May 31, 2022
dc5cb1c
Refs #14615. DomainParticipantImpl uses TopicProxyFactory.
MiguelCompany May 31, 2022
5381372
Refs #14615. Move find_topic to DomainParticipantImpl.
MiguelCompany Jun 1, 2022
49ea325
Refs #14615. Notify a condition when a topic is created.
MiguelCompany Jun 1, 2022
c019872
Refs #14615. Implementation of DomainParticipantImpl::find_topic.
MiguelCompany Jun 1, 2022
3132215
Refs #14615. Fixed segfault on test.
MiguelCompany Jun 1, 2022
7932671
Refs #14615. Fixed return value on delete_topic.
MiguelCompany Jun 1, 2022
41c0783
Refs #14615. Avoid using input topic on delete_topic.
MiguelCompany Jun 1, 2022
47f16a1
Refs #14615. Always use input listener.
MiguelCompany Jun 1, 2022
b72309c
Refs #14615. Internal set_listener methods return void.
MiguelCompany Jun 2, 2022
157d836
Refs #14615. TopicProxyFactory::for_each.
MiguelCompany Jun 2, 2022
afc7f3c
Refs #14615. TopicImpl holds pointer to TopicProxyFactory.
MiguelCompany Jun 2, 2022
44a3425
Refs #14615. DomainParticipantImpl::set_topic_listener.
MiguelCompany Jun 2, 2022
5785f76
Refs #14615. TopicImpl::set_listener with mask.
MiguelCompany Jun 2, 2022
9262a42
Refs #14615. Topic::set_listener propagates mask.
MiguelCompany Jun 2, 2022
fe4dee4
Refs #14615. Correctly handling infinite timeout.
MiguelCompany Jun 2, 2022
0e9e5c8
Refs #14615. Uncrustify.
MiguelCompany Jun 2, 2022
eadf8af
Refs #14615. Fixed non-c++11 code.
MiguelCompany Jun 2, 2022
8af4751
Refs #14615. Improve doxydoc.
MiguelCompany Jun 21, 2022
9ab899a
Refs #14615. Assertion on dynamic_cast result.
MiguelCompany Jun 21, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions include/dds/topic/Topic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,17 @@ class TopicListener;

class Topic : public dds::core::TEntity<detail::Topic>
{
friend class TopicImpl;
friend class DomainParticipantImpl;

public:

OMG_DDS_REF_TYPE_PROTECTED_DC(
Topic,
Topic,
dds::core::TEntity,
detail::Topic)

OMG_DDS_IMPLICIT_REF_BASE(
Topic)
Topic)

/**
* Create a new Topic.
Expand Down
8 changes: 4 additions & 4 deletions include/fastdds/dds/topic/Topic.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ namespace dds {
class DomainParticipant;
class TopicListener;
class DomainParticipantImpl;
class TopicImpl;
class TopicProxy;

/**
* Class Topic, represents the fact that both publications
Expand All @@ -52,7 +52,7 @@ class TopicImpl;
*/
class Topic : public DomainEntity, public TopicDescription
{
friend class TopicImpl;
friend class TopicProxy;
friend class DomainParticipantImpl;

/**
Expand All @@ -62,7 +62,7 @@ class Topic : public DomainEntity, public TopicDescription
Topic(
const std::string& topic_name,
const std::string& type_name,
TopicImpl* p,
TopicProxy* p,
const StatusMask& mask = StatusMask::all());

Topic(
Expand Down Expand Up @@ -143,7 +143,7 @@ class Topic : public DomainEntity, public TopicDescription

protected:

TopicImpl* impl_;
TopicProxy* impl_;

friend class ::dds::topic::Topic;

Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ set(${PROJECT_NAME}_source_files
fastdds/topic/ContentFilteredTopicImpl.cpp
fastdds/topic/Topic.cpp
fastdds/topic/TopicImpl.cpp
fastdds/topic/TopicProxyFactory.cpp
fastdds/topic/TypeSupport.cpp
fastdds/topic/qos/TopicQos.cpp
fastdds/publisher/qos/DataWriterQos.cpp
Expand Down
5 changes: 1 addition & 4 deletions src/cpp/fastdds/domain/DomainParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,10 +238,7 @@ Topic* DomainParticipant::find_topic(
const std::string& topic_name,
const fastrtps::Duration_t& timeout)
{
static_cast<void> (topic_name);
static_cast<void> (timeout);
logWarning(DOMAIN_PARTICIPANT, "find_topic method not implemented");
return nullptr;
return impl_->find_topic(topic_name, timeout);
}

TopicDescription* DomainParticipant::lookup_topicdescription(
Expand Down
110 changes: 82 additions & 28 deletions src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
#include <fastdds/subscriber/SubscriberImpl.hpp>
#include <fastdds/topic/ContentFilteredTopicImpl.hpp>
#include <fastdds/topic/TopicImpl.hpp>
#include <fastdds/topic/TopicProxy.hpp>
#include <fastdds/topic/TopicProxyFactory.hpp>
#include <rtps/RTPSDomainImpl.hpp>
#include <utils/SystemInfo.hpp>

Expand Down Expand Up @@ -327,7 +329,7 @@ ReturnCode_t DomainParticipantImpl::enable()

for (auto topic : topics_)
{
topic.second->user_topic_->enable();
topic.second->enable_topic();
}
}

Expand Down Expand Up @@ -467,6 +469,53 @@ ReturnCode_t DomainParticipantImpl::delete_subscriber(
return ReturnCode_t::RETCODE_ERROR;
}

Topic* DomainParticipantImpl::find_topic(
const std::string& topic_name,
const fastrtps::Duration_t& timeout)
{
auto find_fn = [this, &topic_name]()
{
return topics_.count(topic_name) > 0;
};

std::unique_lock<std::mutex> lock(mtx_topics_);
if (timeout.is_infinite())
{
cond_topics_.wait(lock, find_fn);
}
else
{
auto duration = std::chrono::seconds(timeout.seconds) + std::chrono::nanoseconds(timeout.nanosec);
if (!cond_topics_.wait_for(lock, duration, find_fn))
{
return nullptr;
}
}

Topic* ret_val = topics_[topic_name]->create_topic()->get_topic();

InstanceHandle_t topic_handle;
create_instance_handle(topic_handle);
ret_val->set_instance_handle(topic_handle);
topics_by_handle_[topic_handle] = ret_val;

return ret_val;
}

void DomainParticipantImpl::set_topic_listener(
const TopicProxyFactory* factory,
TopicImpl* impl,
TopicListener* listener,
const StatusMask& mask)
{
std::lock_guard<std::mutex> lock(mtx_topics_);
impl->set_listener(listener);
factory->for_each([mask](const std::unique_ptr<TopicProxy>& proxy)
{
proxy->get_topic()->status_mask_ = mask;
});
}

ReturnCode_t DomainParticipantImpl::delete_topic(
const Topic* topic)
{
Expand All @@ -475,30 +524,35 @@ ReturnCode_t DomainParticipantImpl::delete_topic(
return ReturnCode_t::RETCODE_BAD_PARAMETER;
}

if (participant_ != topic->get_participant())
{
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
}

std::lock_guard<std::mutex> lock(mtx_topics_);
auto it = topics_.find(topic->get_name());

if (it != topics_.end())
auto handle_it = std::find_if(topics_by_handle_.begin(), topics_by_handle_.end(),
[topic](const decltype(topics_by_handle_)::value_type& item)
{
return item.second == topic;
});
if (handle_it != topics_by_handle_.end())
{
assert(topic->get_instance_handle() == it->second->get_topic()->get_instance_handle()
&& "The topic instance handle does not match the topic implementation instance handle");
if (it->second->is_referenced())
auto it = topics_.find(topic->get_name());
assert(it != topics_.end() && "Topic found by handle but factory not found");

TopicProxy* proxy = dynamic_cast<TopicProxy*>(topic->get_impl());
EduPonz marked this conversation as resolved.
Show resolved Hide resolved
auto ret_code = it->second->delete_topic(proxy);
if (ReturnCode_t::RETCODE_OK == ret_code)
{
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
InstanceHandle_t handle = topic->get_instance_handle();
topics_by_handle_.erase(handle);

if (it->second->can_be_deleted())
{
auto factory = it->second;
topics_.erase(it);
delete factory;
}
}
it->second->set_listener(nullptr);
topics_by_handle_.erase(topic->get_instance_handle());
delete it->second;
topics_.erase(it);
return ReturnCode_t::RETCODE_OK;
return ret_code;
}

return ReturnCode_t::RETCODE_ERROR;
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
}

ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic(
Expand Down Expand Up @@ -537,7 +591,7 @@ ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic(
return nullptr;
}

TopicImpl* topic_impl = dynamic_cast<TopicImpl*>(related_topic->get_impl());
TopicProxy* topic_impl = dynamic_cast<TopicProxy*>(related_topic->get_impl());
assert(nullptr != topic_impl);
const TypeSupport& type = topic_impl->get_type();
LoanableSequence<const char*>::size_type n_params;
Expand Down Expand Up @@ -904,12 +958,11 @@ ReturnCode_t DomainParticipantImpl::delete_contained_entities()
std::lock_guard<std::mutex> lock_topics(mtx_topics_);

filtered_topics_.clear();
topics_by_handle_.clear();

auto it_topics = topics_.begin();
while (it_topics != topics_.end())
{
it_topics->second->set_listener(nullptr);
topics_by_handle_.erase(it_topics->second->get_topic()->get_instance_handle());
delete it_topics->second;
it_topics = topics_.erase(it_topics);
}
Expand Down Expand Up @@ -1300,15 +1353,14 @@ Topic* DomainParticipantImpl::create_topic(
InstanceHandle_t topic_handle;
create_instance_handle(topic_handle);

//TODO CONSTRUIR LA IMPLEMENTACION DENTRO DEL OBJETO DEL USUARIO.
TopicImpl* topic_impl = new TopicImpl(this, type_support, qos, listener);
Topic* topic = new Topic(topic_name, type_name, topic_impl, mask);
topic_impl->user_topic_ = topic;
TopicProxyFactory* factory = new TopicProxyFactory(this, topic_name, mask, type_support, qos, listener);
TopicProxy* proxy = factory->create_topic();
Topic* topic = proxy->get_topic();
topic->set_instance_handle(topic_handle);

//SAVE THE TOPIC INTO MAPS
topics_by_handle_[topic_handle] = topic;
topics_[topic_name] = topic_impl;
topics_[topic_name] = factory;

// Enable topic if appropriate
if (enabled && qos_.entity_factory().autoenable_created_entities)
Expand All @@ -1318,6 +1370,8 @@ Topic* DomainParticipantImpl::create_topic(
(void)ret_topic_enable;
}

cond_topics_.notify_all();

return topic;
}

Expand Down Expand Up @@ -1348,7 +1402,7 @@ TopicDescription* DomainParticipantImpl::lookup_topicdescription(
auto it = topics_.find(topic_name);
if (it != topics_.end())
{
return it->second->user_topic_;
return it->second->get_topic()->get_topic();
}

auto filtered_it = filtered_topics_.find(topic_name);
Expand Down
34 changes: 33 additions & 1 deletion src/cpp/fastdds/domain/DomainParticipantImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
#ifndef _FASTDDS_PARTICIPANTIMPL_HPP_
#define _FASTDDS_PARTICIPANTIMPL_HPP_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <mutex>
#include <condition_variable>

#include <fastdds/rtps/common/Guid.h>
#include <fastdds/rtps/participant/RTPSParticipantListener.h>
#include <fastdds/rtps/reader/StatefulReader.h>
Expand All @@ -37,6 +41,7 @@
#include <fastrtps/types/TypesBase.h>

#include "fastdds/topic/DDSSQLFilter/DDSFilterFactory.hpp"
#include <fastdds/topic/TopicProxyFactory.hpp>

using eprosima::fastrtps::types::ReturnCode_t;

Expand Down Expand Up @@ -213,6 +218,32 @@ class DomainParticipantImpl
TopicListener* listener = nullptr,
const StatusMask& mask = StatusMask::all());

/**
* Gives access to an existing (or ready to exist) enabled Topic.
EduPonz marked this conversation as resolved.
Show resolved Hide resolved
* Topics obtained by this method must be destroyed by delete_topic.
*
* @param topic_name Topic name
* @param timeout Maximum time to wait for the Topic
* @return Pointer to the existing Topic, nullptr in error case
*/
Topic* find_topic(
const std::string& topic_name,
const fastrtps::Duration_t& timeout);

/**
* Implementation of Topic::set_listener that propagates the listener and mask to all the TopicProxy
* objects held by the same TopicProxy factory in a thread-safe way.
*
* @param factory TopicProxyFactory managing the topic on which the listener should be changed.
* @param listener Listener to assign to all the TopicProxy objects owned by the factory.
* @param mask StatusMask to assign to all the TopicProxy objects owned by the factory.
*/
void set_topic_listener(
const TopicProxyFactory* factory,
TopicImpl* topic,
TopicListener* listener,
const StatusMask& mask);

ReturnCode_t delete_topic(
const Topic* topic);

Expand Down Expand Up @@ -480,12 +511,13 @@ class DomainParticipantImpl
mutable std::mutex mtx_types_;

//!Topic map
std::map<std::string, TopicImpl*> topics_;
std::map<std::string, TopicProxyFactory*> topics_;
std::map<InstanceHandle_t, Topic*> topics_by_handle_;
std::map<std::string, std::unique_ptr<ContentFilteredTopic>> filtered_topics_;
std::map<std::string, IContentFilterFactory*> filter_factories_;
DDSSQLFilter::DDSFilterFactory dds_sql_filter_factory_;
mutable std::mutex mtx_topics_;
std::condition_variable cond_topics_;

TopicQos default_topic_qos_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
#include <fastdds/domain/DomainParticipantImpl.hpp>
#include <fastdds/publisher/filtering/DataWriterFilteredChange.hpp>
#include <fastdds/publisher/filtering/ReaderFilterInformation.hpp>
#include <fastdds/topic/TopicImpl.hpp>
#include <fastdds/topic/TopicProxy.hpp>
#include <fastdds/topic/ContentFilterInfo.hpp>
#include <fastdds/topic/ContentFilterUtils.hpp>

Expand Down Expand Up @@ -202,7 +202,7 @@ class ReaderFilterCollection
DomainParticipantImpl* participant,
Topic* topic)
{
TopicImpl* writer_topic = static_cast<TopicImpl*>(topic->get_impl());
TopicProxy* writer_topic = static_cast<TopicProxy*>(topic->get_impl());

if (0 == filter_info.filter_class_name.size() ||
0 != writer_topic->get_rtps_topic_name().compare(filter_info.related_topic_name.c_str()))
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/fastdds/topic/ContentFilteredTopicImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <fastdds/rtps/messages/CDRMessage.h>
#include <fastdds/subscriber/DataReaderImpl.hpp>
#include <fastdds/topic/ContentFilterUtils.hpp>
#include <fastdds/topic/TopicProxy.hpp>

namespace eprosima {
namespace fastdds {
Expand Down Expand Up @@ -55,7 +56,7 @@ ReturnCode_t ContentFilteredTopicImpl::set_expression_parameters(
const char* new_expression,
const std::vector<std::string>& new_expression_parameters)
{
TopicImpl* topic_impl = dynamic_cast<TopicImpl*>(related_topic->get_impl());
TopicProxy* topic_impl = dynamic_cast<TopicProxy*>(related_topic->get_impl());
assert(nullptr != topic_impl);
EduPonz marked this conversation as resolved.
Show resolved Hide resolved
const TypeSupport& type = topic_impl->get_type();

Expand Down
1 change: 0 additions & 1 deletion src/cpp/fastdds/topic/ContentFilteredTopicImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@

#include <fastdds/subscriber/DataReaderImpl.hpp>
#include <fastdds/topic/TopicDescriptionImpl.hpp>
#include <fastdds/topic/TopicImpl.hpp>

namespace eprosima {
namespace fastdds {
Expand Down
Loading