Skip to content

Commit

Permalink
Implementation of unique locators request feature for readers (#1768)
Browse files Browse the repository at this point in the history
* Refs 10497. Update sub_unique_network_flows blackbox.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10497. Added PubSubBasic test with unique flows feature.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10497. Added method to parse property value.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10497. createReceiverResources now returns bool.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10497. Try to create specific resources when requested.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10497. PubSubParticipant takes data when received.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10497. Uncrustify.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>

* Refs 10497. Fix build error on non-windows platforms.

Signed-off-by: Miguel Company <MiguelCompany@eprosima.com>
  • Loading branch information
MiguelCompany committed Feb 26, 2021
1 parent 0dc3287 commit d2f3965
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 33 deletions.
106 changes: 85 additions & 21 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,26 @@ static bool should_be_intraprocess_only(
(ParticipantFilteringFlags::FILTER_DIFFERENT_HOST | ParticipantFilteringFlags::FILTER_DIFFERENT_PROCESS);
}

static bool get_unique_flows_parameters(
const RTPSParticipantAttributes& part_att,
const EndpointAttributes& att,
bool& unique_flows,
uint16_t& initial_port,
uint16_t& final_port)
{
const std::string* value = PropertyPolicyHelper::find_property(att.properties, "fastdds.unique_network_flows");

unique_flows = (nullptr != value);
if (unique_flows)
{
// TODO (Miguel C): parse value to get port range
final_port = part_att.port.portBase;
initial_port = part_att.port.portBase - 400;
}

return true;
}

Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule(
Locator_t& loc)
{
Expand Down Expand Up @@ -681,13 +701,6 @@ bool RTPSParticipantImpl::create_reader(
return false;
}

// Check for unique_network_flows feature
if (nullptr != PropertyPolicyHelper::find_property(param.endpoint.properties, "fastdds.unique_network_flows"))
{
logError(RTPS_PARTICIPANT, "Unique network flows not supported on readers");
return false;
}

// Special case for DiscoveryProtocol::BACKUP, which abuses persistence guid
GUID_t former_persistence_guid = param.endpoint.persistence_guid;
if (param.endpoint.persistence_guid == c_Guid_Unknown)
Expand All @@ -708,6 +721,15 @@ bool RTPSParticipantImpl::create_reader(
return false;
}

// Check for unique_network_flows feature
bool request_unique_flows = false;
uint16_t initial_port = 0;
uint16_t final_port = 0;
if (!get_unique_flows_parameters(m_att, param.endpoint, request_unique_flows, initial_port, final_port))
{
return false;
}

normalize_endpoint_locators(param.endpoint);

RTPSReader* SReader = nullptr;
Expand Down Expand Up @@ -756,7 +778,7 @@ bool RTPSParticipantImpl::create_reader(

if (enable)
{
if (!createAndAssociateReceiverswithEndpoint(SReader))
if (!createAndAssociateReceiverswithEndpoint(SReader, request_unique_flows, initial_port, final_port))
{
delete(SReader);
return false;
Expand Down Expand Up @@ -1097,27 +1119,64 @@ bool RTPSParticipantImpl::assignEndpointListenResources(
}

bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
Endpoint* pend)
Endpoint* pend,
bool unique_flows,
uint16_t initial_unique_port,
uint16_t final_unique_port)
{
/* This function...
- Asks the network factory for new resources
- Encapsulates the new resources within the ReceiverControlBlock list
- Associated the endpoint to the new elements in the list
- Launches the listener thread
*/
// 1 - Ask the network factory to generate the elements that do still not exist
std::vector<ReceiverResource> newItems; //Store the newly created elements
std::vector<ReceiverResource> newItemsBuffer; //Store intermediate results
//Iterate through the list of unicast and multicast locators the endpoint has... unless its empty
//In that case, just use the standard
if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty())
{
// Take default locators from the participant.

if (unique_flows)
{
pend->getAttributes().multicastLocatorList.clear();
pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList;
pend->getAttributes().multicastLocatorList = m_att.defaultMulticastLocatorList;

uint16_t port = initial_unique_port;
while (port < final_unique_port)
{
// Set port on unicast locators
for (Locator_t& loc : pend->getAttributes().unicastLocatorList)
{
loc.port = port;
}

// Try creating receiver resources
if (createReceiverResources(pend->getAttributes().unicastLocatorList, false, true))
{
break;
}

// Try with next port
++port;
}

// Fail when unique ports are exhausted
if (port >= final_unique_port)
{
logError(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
<< initial_unique_port << "-" << final_unique_port);
return false;
}
}
else
{
// 1 - Ask the network factory to generate the elements that do still not exist
//Iterate through the list of unicast and multicast locators the endpoint has... unless its empty
//In that case, just use the standard
if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty())
{
// Take default locators from the participant.
pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList;
pend->getAttributes().multicastLocatorList = m_att.defaultMulticastLocatorList;
}
createReceiverResources(pend->getAttributes().unicastLocatorList, false, true);
createReceiverResources(pend->getAttributes().multicastLocatorList, false, true);
}
createReceiverResources(pend->getAttributes().unicastLocatorList, false, true);
createReceiverResources(pend->getAttributes().multicastLocatorList, false, true);

// Associate the Endpoint with ReceiverControlBlock
assignEndpointListenResources(pend);
Expand Down Expand Up @@ -1184,12 +1243,13 @@ bool RTPSParticipantImpl::createSendResources(
return true;
}

void RTPSParticipantImpl::createReceiverResources(
bool RTPSParticipantImpl::createReceiverResources(
LocatorList_t& Locator_list,
bool ApplyMutation,
bool RegisterReceiver)
{
std::vector<std::shared_ptr<ReceiverResource>> newItemsBuffer;
bool ret_val = Locator_list.empty();

#if HAVE_SECURITY
// An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
Expand All @@ -1214,6 +1274,8 @@ void RTPSParticipantImpl::createReceiverResources(
}
}

ret_val |= !newItemsBuffer.empty();

for (auto it_buffer = newItemsBuffer.begin(); it_buffer != newItemsBuffer.end(); ++it_buffer)
{
std::lock_guard<std::mutex> lock(m_receiverResourcelistMutex);
Expand All @@ -1230,6 +1292,8 @@ void RTPSParticipantImpl::createReceiverResources(
}
newItemsBuffer.clear();
}

return ret_val;
}

void RTPSParticipantImpl::createSenderResources(
Expand Down
12 changes: 9 additions & 3 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -562,10 +562,16 @@ class RTPSParticipantImpl

/** Create the new ReceiverResources needed for a new Locator, contains the calls to assignEndpointListenResources
and consequently assignEndpoint2LocatorList
@param pend - Pointer to the endpoint which triggered the creation of the Receivers
@param pend - Pointer to the endpoint which triggered the creation of the Receivers.
@param unique_flows - Whether unique listening ports should be created for this endpoint.
@param initial_unique_port - First unique listening port to try.
@param final_unique_port - Unique listening port that will not be tried.
*/
bool createAndAssociateReceiverswithEndpoint(
Endpoint* pend);
Endpoint* pend,
bool unique_flows = false,
uint16_t initial_unique_port = 0,
uint16_t final_unique_port = 0);

/** Create non-existent SendResources based on the Locator list of the entity
@param pend - Pointer to the endpoint whose SenderResources are to be created
Expand Down Expand Up @@ -859,7 +865,7 @@ class RTPSParticipantImpl
* @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable
* @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled.
*/
void createReceiverResources(
bool createReceiverResources(
LocatorList_t& Locator_list,
bool ApplyMutation,
bool RegisterReceiver);
Expand Down
62 changes: 59 additions & 3 deletions test/blackbox/api/dds-pim/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@
template<class TypeSupport>
class PubSubParticipant
{
public:

typedef TypeSupport type_support;
typedef typename type_support::type type;

private:

class PubListener : public eprosima::fastdds::dds::DataWriterListener
{
friend class PubSubParticipant;
Expand Down Expand Up @@ -117,6 +124,18 @@ class PubSubParticipant

}

void on_data_available(
eprosima::fastdds::dds::DataReader* reader) override
{
type data;
eprosima::fastdds::dds::SampleInfo info;

while (ReturnCode_t::RETCODE_OK == reader->take_next_sample(&data, &info))
{
participant_->data_received();
}
}

private:

SubListener& operator =(
Expand All @@ -127,9 +146,6 @@ class PubSubParticipant

public:

typedef TypeSupport type_support;
typedef typename type_support::type type;

PubSubParticipant(
unsigned int num_publishers,
unsigned int num_subscribers,
Expand Down Expand Up @@ -310,6 +326,18 @@ class PubSubParticipant
return false;
}

eprosima::fastdds::dds::DataWriter& get_native_writer(
unsigned int index)
{
return *(std::get<2>(publishers_[index]));
}

eprosima::fastdds::dds::DataReader& get_native_reader(
unsigned int index)
{
return *(std::get<2>(subscribers_[index]));
}

bool send_sample(
type& msg,
unsigned int index = 0)
Expand Down Expand Up @@ -467,6 +495,20 @@ class PubSubParticipant
return *this;
}

PubSubParticipant& pub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
datawriter_qos_.properties() = property_policy;
return *this;
}

PubSubParticipant& sub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
datareader_qos_.properties() = property_policy;
return *this;
}

PubSubParticipant& pub_topic_name(
std::string topicName)
{
Expand Down Expand Up @@ -585,6 +627,13 @@ class PubSubParticipant
sub_liveliness_cv_.notify_one();
}

void data_received()
{
std::unique_lock<std::mutex> lock(sub_data_mutex_);
sub_times_data_received_++;
sub_data_cv_.notify_one();
}

unsigned int pub_times_liveliness_lost()
{
std::unique_lock<std::mutex> lock(pub_liveliness_mutex_);
Expand Down Expand Up @@ -691,6 +740,13 @@ class PubSubParticipant
//! A condition variable for liveliness of publisher
std::condition_variable pub_liveliness_cv_;

//! A mutex protecting received data
std::mutex sub_data_mutex_;
//! A condition variable for received data
std::condition_variable sub_data_cv_;
//! Number of times a subscriber received data
size_t sub_times_data_received_ = 0;

eprosima::fastdds::dds::TypeSupport type_;
};

Expand Down
26 changes: 26 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubParticipant.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,18 @@ class PubSubParticipant
return false;
}

eprosima::fastrtps::Publisher& get_native_writer(
unsigned int index)
{
return *(publishers_[index]);
}

eprosima::fastrtps::Subscriber& get_native_reader(
unsigned int index)
{
return *(subscribers_[index]);
}

bool send_sample(
type& msg,
unsigned int index = 0)
Expand Down Expand Up @@ -395,6 +407,20 @@ class PubSubParticipant
return *this;
}

PubSubParticipant& pub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
publisher_attr_.properties = property_policy;
return *this;
}

PubSubParticipant& sub_property_policy(
const eprosima::fastrtps::rtps::PropertyPolicy property_policy)
{
subscriber_attr_.properties = property_policy;
return *this;
}

PubSubParticipant& pub_topic_name(
std::string topicName)
{
Expand Down
Loading

0 comments on commit d2f3965

Please sign in to comment.