Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[15915] Send GAPs correctly when using separate sending #3012

Merged
merged 8 commits into from
Oct 25, 2022
8 changes: 8 additions & 0 deletions include/fastdds/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,14 @@ class StatefulWriter : public RTPSWriter
void prepare_datasharing_delivery(
CacheChange_t* change);

/**
* Check the StatefulWriter's sequence numbers and add the required GAP messages to the provided message group.
*
* @param group Reference to the Message Group to which the GAP messages are to be added.
*/
void add_gaps_for_holes_in_history_(
MiguelCompany marked this conversation as resolved.
Show resolved Hide resolved
RTPSMessageGroup& group);

//! True to disable piggyback heartbeats
bool disable_heartbeat_piggyback_;
//! True to disable positive ACKs
Expand Down
66 changes: 41 additions & 25 deletions src/cpp/rtps/writer/StatefulWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,29 +600,7 @@ void StatefulWriter::send_heartbeat_to_all_readers()
assert((SequenceNumber_t::unknown() == first_seq && SequenceNumber_t::unknown() == last_seq) ||
(SequenceNumber_t::unknown() != first_seq && SequenceNumber_t::unknown() != last_seq));

if (SequenceNumber_t::unknown() != first_seq &&
last_seq.to64long() - first_seq.to64long() + 1 != mp_history->getHistorySize())
{
RTPSGapBuilder gaps(group);

// There are holes in the history.
History::const_iterator cit = mp_history->changesBegin();
SequenceNumber_t prev = (*cit)->sequenceNumber + 1;
++cit;
while (cit != mp_history->changesEnd())
{
while (prev != (*cit)->sequenceNumber)
{
gaps.add(prev);
++prev;
}

++prev;
++cit;
}

gaps.flush();
}
add_gaps_for_holes_in_history_(group);

send_heartbeat_nts_(locator_selector_general_.all_remote_readers.size(), group, disable_positive_acks_);
}
Expand Down Expand Up @@ -1751,6 +1729,18 @@ void StatefulWriter::send_heartbeat_to_nts(
try
{
RTPSMessageGroup group(mp_RTPSParticipant, this, remoteReaderProxy.message_sender());
SequenceNumber_t firstSeq = get_seq_num_min();
SequenceNumber_t lastSeq = get_seq_num_max();

if (firstSeq != c_SequenceNumber_Unknown && lastSeq != c_SequenceNumber_Unknown)
{
assert(firstSeq <= lastSeq);
if (!liveliness)
{
add_gaps_for_holes_in_history_(group);
}
}

send_heartbeat_nts_(1u, group, disable_positive_acks_, liveliness);
}
catch (const RTPSMessageGroup::timeout&)
Expand Down Expand Up @@ -1792,8 +1782,6 @@ void StatefulWriter::send_heartbeat_nts_(
else
{
assert(firstSeq <= lastSeq);

// Check if it has to be sent a GAP with the gaps in the history
}

incrementHBCount();
Expand Down Expand Up @@ -2099,6 +2087,34 @@ DeliveryRetCode StatefulWriter::deliver_sample_nts(
return ret_code;
}

void StatefulWriter::add_gaps_for_holes_in_history_(
RTPSMessageGroup& group)
{
SequenceNumber_t firstSeq = get_seq_num_min();
SequenceNumber_t lastSeq = get_seq_num_max();

if (SequenceNumber_t::unknown() != firstSeq &&
lastSeq.to64long() - firstSeq.to64long() + 1 != mp_history->getHistorySize())
{
RTPSGapBuilder gaps(group);
// There are holes in the history.
History::const_iterator cit = mp_history->changesBegin();
SequenceNumber_t prev = (*cit)->sequenceNumber + 1;
++cit;
while (cit != mp_history->changesEnd())
{
while (prev != (*cit)->sequenceNumber)
{
gaps.add(prev);
++prev;
}
++prev;
++cit;
}
gaps.flush();
}
}

} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
90 changes: 90 additions & 0 deletions test/blackbox/common/RTPSBlackboxTestsBasic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "BlackboxTests.hpp"

#include <chrono>
#include <cstdint>
#include <memory>
#include <thread>

Expand Down Expand Up @@ -469,6 +470,95 @@ TEST_P(RTPS, RTPSAsReliableWithRegistrationAndHolesInHistory)
late_joiner.block_for_all();
}

/*
* This test checks that GAPs are properly sent when a writer is sending data to
* each reader separately.
*/

TEST(RTPS, RTPSUnavailableSampleGapWhenSeparateSending)
{
RTPSWithRegistrationReader<HelloWorldPubSubType> reader(TEST_TOPIC_NAME);
RTPSWithRegistrationWriter<HelloWorldPubSubType> writer(TEST_TOPIC_NAME);

// To simulate lossy conditions
auto testTransport = std::make_shared<rtps::test_UDPv4TransportDescriptor>();

reader.
durability(eprosima::fastrtps::rtps::DurabilityKind_t::TRANSIENT_LOCAL).
history_depth(3).
reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE).init();

ASSERT_TRUE(reader.isInitialized());

// set_separate_sending

writer.durability(eprosima::fastrtps::rtps::DurabilityKind_t::TRANSIENT_LOCAL).
disable_builtin_transport().
reliability(eprosima::fastrtps::rtps::ReliabilityKind_t::RELIABLE).
history_depth(3).
add_user_transport_to_pparams(testTransport).init();

ASSERT_TRUE(writer.isInitialized());

writer.set_separate_sending(true);

// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

HelloWorld message;
message.message("HelloWorld");

std::list<HelloWorld> data;
std::list<HelloWorld> expected;

reader.startReception();

// Send data
uint16_t index = 0;
message.index(++index);

data.push_back(message);
expected.push_back(message);
reader.expected_data(expected);
writer.send(data);

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = true;

std::this_thread::sleep_for(std::chrono::seconds(1));

message.index(++index);
data.push_back(message);
writer.send(data);

message.index(++index);
data.push_back(message);
expected.push_back(message);
reader.expected_data(expected);
writer.send(data);

writer.remove_change({0, 2});

std::this_thread::sleep_for(std::chrono::seconds(1));

test_UDPv4Transport::test_UDPv4Transport_ShutdownAllNetwork = false;

std::this_thread::sleep_for(std::chrono::seconds(1));

message.index(++index);
data.push_back(message);
expected.push_back(message);
reader.expected_data(expected);

writer.send(data);

// Block reader until reception finished or timeout.
reader.block_for_all(std::chrono::seconds(1));
// Block until all data is ACK'd
writer.waitForAllAcked(std::chrono::seconds(1));

EXPECT_EQ(reader.getReceivedCount(), static_cast<unsigned int>(expected.size()));
}

TEST_P(RTPS, RTPSAsReliableVolatileTwoWritersConsecutives)
{
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/common/RTPSWithRegistrationReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,13 @@ class RTPSWithRegistrationReader
return *this;
}

RTPSWithRegistrationReader& history_depth(
const int32_t depth)
{
topic_attr_.historyQos.depth = depth;
return *this;
}

RTPSWithRegistrationReader& reliability(
const eprosima::fastrtps::rtps::ReliabilityKind_t kind)
{
Expand Down
6 changes: 6 additions & 0 deletions test/blackbox/common/RTPSWithRegistrationWriter.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,12 @@ class RTPSWithRegistrationWriter
return *this;
}

void set_separate_sending(
bool separate_sending)
{
writer_->set_separate_sending(separate_sending);
}

uint32_t get_matched() const
{
return matched_;
Expand Down