Skip to content

Commit

Permalink
Send GAPs correctly when using separate sending (#3012)
Browse files Browse the repository at this point in the history
* Refs #15915: Added test and error trace

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #15915: Disabled for intraprocess

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #15915: Added new gap requirement check when using separate sending

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #15915: Changed the test from a parametrized test to a standalone test

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #15915: Added review suggestions

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #15915: Fixed Windows warnings

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #15915: Added doxygen. Re-added removed assertion

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

* Refs #15915: Removed unused variables

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>

Signed-off-by: Javier Santiago <javiersantiago@eprosima.com>
  • Loading branch information
jsan-rt authored and MiguelCompany committed Oct 25, 2022
1 parent 7f7c909 commit 48ff66a
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 31 deletions.
8 changes: 8 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,14 @@ class StatefulWriter : public RTPSWriter
void prepare_datasharing_delivery(
CacheChange_t* change);

/**
* Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group.
*
* @param group Reference to the Message Group to which the GAP messages are to be added.
*/
void add_gaps_for_holes_in_history_(
RTPSMessageGroup& group);

//! True to disable piggyback heartbeats
bool disable_heartbeat_piggyback_;
//! True to disable positive ACKs
Expand Down
76 changes: 45 additions & 31 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -592,36 +592,12 @@ void StatefulWriter::send_heartbeat_to_all_readers()
RTPSMessageGroup group(mp_RTPSParticipant, this, &locator_selector_general_);
select_all_readers_nts(group, locator_selector_general_);

// Send a GAP with holes in the history.
SequenceNumber_t first_seq = get_seq_num_min();
SequenceNumber_t last_seq = get_seq_num_max();
assert(
(SequenceNumber_t::unknown() == get_seq_num_min() && SequenceNumber_t::unknown() == get_seq_num_max()) ||
(SequenceNumber_t::unknown() != get_seq_num_min() &&
SequenceNumber_t::unknown() != get_seq_num_max()));

assert((SequenceNumber_t::unknown() == first_seq && SequenceNumber_t::unknown() == last_seq) ||
(SequenceNumber_t::unknown() != first_seq && SequenceNumber_t::unknown() != last_seq));

if (SequenceNumber_t::unknown() != first_seq &&
last_seq.to64long() - first_seq.to64long() + 1 != mp_history->getHistorySize())
{
RTPSGapBuilder gaps(group);

// There are holes in the history.
History::const_iterator cit = mp_history->changesBegin();
SequenceNumber_t prev = (*cit)->sequenceNumber + 1;
++cit;
while (cit != mp_history->changesEnd())
{
while (prev != (*cit)->sequenceNumber)
{
gaps.add(prev);
++prev;
}

++prev;
++cit;
}

gaps.flush();
}
add_gaps_for_holes_in_history_(group);

send_heartbeat_nts_(locator_selector_general_.all_remote_readers.size(), group, disable_positive_acks_);
}
Expand Down Expand Up @@ -1740,6 +1716,18 @@ void StatefulWriter::send_heartbeat_to_nts(
try
{
RTPSMessageGroup group(mp_RTPSParticipant, this, remoteReaderProxy.message_sender());
SequenceNumber_t firstSeq = get_seq_num_min();
SequenceNumber_t lastSeq = get_seq_num_max();

if (firstSeq != c_SequenceNumber_Unknown && lastSeq != c_SequenceNumber_Unknown)
{
assert(firstSeq <= lastSeq);
if (!liveliness)
{
add_gaps_for_holes_in_history_(group);
}
}

send_heartbeat_nts_(1u, group, disable_positive_acks_, liveliness);
}
catch (const RTPSMessageGroup::timeout&)
Expand Down Expand Up @@ -1781,8 +1769,6 @@ void StatefulWriter::send_heartbeat_nts_(
else
{
assert(firstSeq <= lastSeq);

// Check if it has to be sent a GAP with the gaps in the history
}

incrementHBCount();
Expand Down Expand Up @@ -2088,6 +2074,34 @@ DeliveryRetCode StatefulWriter::deliver_sample_nts(
return ret_code;
}

void StatefulWriter::add_gaps_for_holes_in_history_(
RTPSMessageGroup& group)
{
SequenceNumber_t firstSeq = get_seq_num_min();
SequenceNumber_t lastSeq = get_seq_num_max();

if (SequenceNumber_t::unknown() != firstSeq &&
lastSeq.to64long() - firstSeq.to64long() + 1 != mp_history->getHistorySize())
{
RTPSGapBuilder gaps(group);
// There are holes in the history.
History::const_iterator cit = mp_history->changesBegin();
SequenceNumber_t prev = (*cit)->sequenceNumber + 1;
++cit;
while (cit != mp_history->changesEnd())
{
while (prev != (*cit)->sequenceNumber)
{
gaps.add(prev);
++prev;
}
++prev;
++cit;
}
gaps.flush();
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
90 changes: 90 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "BlackboxTests.hpp"

#include <chrono>
#include <cstdint>
#include <memory>
#include <thread>

Expand Down Expand Up @@ -469,6 +470,95 @@ TEST_P(RTPS, RTPSAsReliableWithRegistrationAndHolesInHistory)
late_joiner.block_for_all();
}

/*
* This test checks that GAPs are properly sent when a writer is sending data to
* each reader separately.
*/

TEST(RTPS, RTPSUnavailableSampleGapWhenSeparateSending)
{
RTPSWithRegistrationReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
RTPSWithRegistrationWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

// To simulate lossy conditions
auto testTransport = std::make_shared<rtps::test_UDPv4TransportDescriptor>();

reader.
durability(eprosima::fastrtps::rtps::DurabilityKind_t::TRANSIENT_LOCAL).
history_depth(3).
reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE).init();

ASSERT_TRUE(reader.isInitialized());

// set_separate_sending

writer.durability(eprosima::fastrtps::rtps::DurabilityKind_t::TRANSIENT_LOCAL).
disable_builtin_transport().
reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE).
history_depth(3).
add_user_transport_to_pparams(testTransport).init();

ASSERT_TRUE(writer.isInitialized());

writer.set_separate_sending(true);

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

HelloWorld message;
message.message("HelloWorld");

std::list<HelloWorld> data;
std::list<HelloWorld> expected;

reader.startReception();

// Send data
uint16_t index = 0;
message.index(++index);

data.push_back(message);
expected.push_back(message);
reader.expected_data(expected);
writer.send(data);

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true;

std::this_thread::sleep_for(std::chrono::seconds(1));

message.index(++index);
data.push_back(message);
writer.send(data);

message.index(++index);
data.push_back(message);
expected.push_back(message);
reader.expected_data(expected);
writer.send(data);

writer.remove_change({0, 2});

std::this_thread::sleep_for(std::chrono::seconds(1));

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = false;

std::this_thread::sleep_for(std::chrono::seconds(1));

message.index(++index);
data.push_back(message);
expected.push_back(message);
reader.expected_data(expected);

writer.send(data);

// Block reader until reception finished or timeout.
reader.block_for_all(std::chrono::seconds(1));
// Block until all data is ACK'd
writer.waitForAllAcked(std::chrono::seconds(1));

EXPECT_EQ(reader.getReceivedCount(), static_cast<unsigned int>(expected.size()));
}

TEST_P(RTPS, RTPSAsReliableVolatileTwoWritersConsecutives)
{
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/common/RTPSWithRegistrationReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,13 @@ class RTPSWithRegistrationReader
return *this;
}

RTPSWithRegistrationReader& history_depth(
const int32_t depth)
{
topic_attr_.historyQos.depth = depth;
return *this;
}

RTPSWithRegistrationReader& reliability(
const eprosima::fastrtps::rtps::ReliabilityKind_t kind)
{
Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/common/RTPSWithRegistrationWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ class RTPSWithRegistrationWriter
return *this;
}

void set_separate_sending(
bool separate_sending)
{
writer_->set_separate_sending(separate_sending);
}

uint32_t get_matched() const
{
return matched_;
Expand Down

0 comments on commit 48ff66a

Please sign in to comment.