Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix StatelessWriter locators filtering [18950] #3655

Merged
merged 11 commits into from
Jul 11, 2023
19 changes: 19 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,9 @@ bool RTPSParticipantImpl::create_writer(
return false;
}

// Use participant's external locators if writer has none
setupExternalLocators(SWriter);

#if HAVE_SECURITY
if (!is_builtin)
{
Expand Down Expand Up @@ -864,6 +867,11 @@ bool RTPSParticipantImpl::create_reader(
return false;
}

// Use participant's external locators if reader has none
// WARNING: call before createAndAssociateReceiverswithEndpoint, as the latter intentionally clears external
// locators list when using unique_flows feature
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
setupExternalLocators(SReader);

#if HAVE_SECURITY

if (!is_builtin)
Expand Down Expand Up @@ -1658,6 +1666,17 @@ bool RTPSParticipantImpl::createSendResources(
return true;
}

void RTPSParticipantImpl::setupExternalLocators(
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
Endpoint* pend)
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
{
auto& attributes = pend->getAttributes();
if (attributes.external_unicast_locators.empty())
{
// Take external locators from the participant.
attributes.external_unicast_locators = m_att.default_external_unicast_locators;
}
}

bool RTPSParticipantImpl::createReceiverResources(
LocatorList_t& Locator_list,
bool ApplyMutation,
Expand Down
6 changes: 6 additions & 0 deletions src/cpp/rtps/participant/RTPSParticipantImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,12 @@ class RTPSParticipantImpl
bool createSendResources(
Endpoint* pend);

/** Add participant's external locators to endpoint's when none available
@param pend - Pointer to the endpoint whose external locators are to be set
*/
void setupExternalLocators(
Endpoint* pend);

/** When we want to create a new Resource but the physical channel specified by the Locator
can not be opened, we want to mutate the Locator to open a more or less equivalent channel.
@param loc - Locator we want to change
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/transport/test_UDPv4Transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ test_UDPv4TransportDescriptor::DestinationLocatorFilter test_UDPv4Transport::loc
{
return false;
});
std::map<uint32_t, uint32_t> test_UDPv4Transport::messages_sent{};

test_UDPv4Transport::test_UDPv4Transport(
const test_UDPv4TransportDescriptor& descriptor)
Expand Down Expand Up @@ -201,6 +202,12 @@ bool test_UDPv4Transport::send(

while (it != *destination_locators_end)
{
if (!IsLocatorSupported(*it))
{
++it;
continue;
}

auto now = std::chrono::steady_clock::now();

if (now < max_blocking_time_point)
Expand Down Expand Up @@ -245,6 +252,7 @@ bool test_UDPv4Transport::send(
}
else
{
messages_sent[remote_locator.port]++;
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
return UDPv4Transport::send(send_buffer, send_buffer_size, socket, remote_locator, only_multicast_purpose,
whitelisted, timeout);
}
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/transport/test_UDPv4Transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ class test_UDPv4Transport : public UDPv4Transport

RTPS_DllAPI static test_UDPv4TransportDescriptor::DestinationLocatorFilter locator_filter;

// Record the number of packages sent to the different ports (key)
RTPS_DllAPI static std::map<uint32_t, uint32_t> messages_sent;

protected:

virtual void get_ips(
Expand Down
187 changes: 187 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsTransportSHMUDP.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "BlackboxTests.hpp"

#include <chrono>
#include <cstdint>
#include <memory>
#include <thread>

#include <gtest/gtest.h>

#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
#include <fastdds/rtps/transport/test_UDPv4TransportDescriptor.h>
#include <fastrtps/log/Log.h>
#include <fastrtps/xmlparser/XMLProfileManager.h>

#include "../api/dds-pim/PubSubReader.hpp"
#include "../api/dds-pim/PubSubWriter.hpp"
#include <rtps/transport/test_UDPv4Transport.h>

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
using test_UDPv4Transport = eprosima::fastdds::rtps::test_UDPv4Transport;

enum communication_type
{
TRANSPORT,
INTRAPROCESS,
DATASHARING
};

class SHMUDP : public testing::TestWithParam<communication_type>
{
public:

void SetUp() override
{
LibrarySettingsAttributes library_settings;
switch (GetParam())
{
case INTRAPROCESS:
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_FULL;
xmlparser::XMLProfileManager::library_settings(library_settings);
break;
case DATASHARING:
enable_datasharing = true;
break;
case TRANSPORT:
default:
break;
}
}

void TearDown() override
{
LibrarySettingsAttributes library_settings;
switch (GetParam())
{
case INTRAPROCESS:
library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_OFF;
xmlparser::XMLProfileManager::library_settings(library_settings);
break;
case DATASHARING:
enable_datasharing = false;
break;
case TRANSPORT:
default:
break;
}
}

};

TEST_P(SHMUDP, Transport_SHM_UDP_test)
{
static struct test_conditions
{
uint32_t sub_unicast_port = 7527;
}
conditions;

MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
// Set up
PubSubReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

auto sub_shm_descriptor = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
sub_shm_descriptor->segment_size(2 * 1024 * 1024);
std::shared_ptr<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor> sub_udp_descriptor =
std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();

reader.disable_builtin_transport()
.add_user_transport_to_pparams(sub_shm_descriptor)
.add_user_transport_to_pparams(sub_udp_descriptor)
.reliability(BEST_EFFORT_RELIABILITY_QOS)
.durability_kind(VOLATILE_DURABILITY_QOS)
.history_kind(KEEP_ALL_HISTORY_QOS)
// .add_to_default_unicast_locator_list("127.0.0.1", conditions.sub_unicast_port)
// .add_to_default_unicast_locator_list("127.0.0.1", conditions.sub_unicast_port, true) // SHM (extend method)
// .add_to_unicast_locator_list("127.0.0.1", conditions.sub_unicast_port)
// .add_to_unicast_locator_list("127.0.0.1", conditions.sub_unicast_port, true) // SHM (extend method)
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
.init();
ASSERT_TRUE(reader.isInitialized());

auto pub_shm_descriptor = std::make_shared<eprosima::fastdds::rtps::SharedMemTransportDescriptor>();
pub_shm_descriptor->segment_size(2 * 1024 * 1024);
auto pub_udp_descriptor = std::make_shared<eprosima::fastdds::rtps::test_UDPv4TransportDescriptor>();

writer.disable_builtin_transport()
.add_user_transport_to_pparams(pub_shm_descriptor)
.add_user_transport_to_pparams(pub_udp_descriptor)
.reliability(BEST_EFFORT_RELIABILITY_QOS)
.durability_kind(VOLATILE_DURABILITY_QOS)
.history_kind(KEEP_ALL_HISTORY_QOS)
.asynchronously(SYNCHRONOUS_PUBLISH_MODE)
.init();
ASSERT_TRUE(writer.isInitialized());

// Because its volatile the durability, wait for discovery
writer.wait_discovery();
reader.wait_discovery();

// Send some data.
auto data = default_helloworld_data_generator();
reader.startReception(data);
writer.send(data);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());

// Check that reader receives the unmatched.
reader.block_for_all();

// check that no (user) data has been sent via UDP transport
// TODO: check no data is sent for a specific port (set with add_to_default_unicast_locator_list or
// add_to_unicast_locator_list). Currently this cannot be achieved, as adding a non-default UDP locator makes it
// necessary to also add a non-default SHM one (if SHM communication is desired, as it is the case), but this cannot
// be done until the creation of SHM locators is exposed (currently available in internal SHMLocator::create_locator).
// As a workaround, it is checked that no user data is sent at any port, knowing that metatraffic ports are always
// even and user ones odd.
// uint32_t n_packages_sent = test_UDPv4Transport::messages_sent[conditions.sub_unicast_port];
uint32_t n_packages_sent = 0;
for (std::map<uint32_t, uint32_t>::iterator it = test_UDPv4Transport::messages_sent.begin();
it != test_UDPv4Transport::messages_sent.end(); ++it)
{
if (it->first % 2)
{
n_packages_sent += it->second;
}
}
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_EQ(n_packages_sent, 0u);
}

#ifdef INSTANTIATE_TEST_SUITE_P
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w)
#else
#define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_CASE_P(x, y, z, w)
#endif // ifdef INSTANTIATE_TEST_SUITE_P

GTEST_INSTANTIATE_TEST_MACRO(SHMUDP,
SHMUDP,
testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING),
[](const testing::TestParamInfo<SHMUDP::ParamType>& info)
{
switch (info.param)
{
case INTRAPROCESS:
return "Intraprocess";
break;
case DATASHARING:
return "Datasharing";
break;
case TRANSPORT:
default:
return "Transport";
}
});