Skip to content

Commit

Permalink
Add a keyed fragmented change to the reader data instance only when i…
Browse files Browse the repository at this point in the history
…ts completed (#4261) (#4308)

* Add a keyed fragmented change to the reader data instance only when its completed (#4261)

* Refs #20257: Add regression test

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* #Refs #20257: Fix

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20239: Second rev suggestions

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20257: Linter

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20257: Retrieve instance handle condition before for avoid being nullptr

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
(cherry picked from commit 9558ce4)

# Conflicts:
#	src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp

* Refs #20257: Correct test indentation

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

* Refs #20257: Resolve conflicts

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>

---------

Signed-off-by: Mario Dominguez <mariodominguez@eprosima.com>
Co-authored-by: Mario Domínguez López <116071334+Mario-DL@users.noreply.github.com>
Co-authored-by: Mario Dominguez <mariodominguez@eprosima.com>
  • Loading branch information
3 people authored Mar 4, 2024
1 parent 4c2b721 commit 3f203ec
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 9 deletions.
23 changes: 14 additions & 9 deletions src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,14 +139,14 @@ DataReaderHistory::DataReaderHistory(
compute_key_for_change_fn_ =
[this](CacheChange_t* a_change)
{
if (a_change->instanceHandle.isDefined())
if (!a_change->is_fully_assembled())
{
return true;
return false;
}

if (!a_change->is_fully_assembled())
if (a_change->instanceHandle.isDefined())
{
return false;
return true;
}

if (type_ != nullptr)
Expand Down Expand Up @@ -635,15 +635,16 @@ ReaderHistory::iterator DataReaderHistory::remove_change_nts(
bool DataReaderHistory::completed_change(
CacheChange_t* change)
{
bool ret_value = true;
bool ret_value = false;

if (!change->instanceHandle.isDefined())
if (compute_key_for_change_fn_(change))
{
InstanceCollection::iterator vit;
ret_value = compute_key_for_change_fn_(change) && find_key(change->instanceHandle, vit);
if (ret_value)

if (find_key(change->instanceHandle, vit))
{
ret_value = !change->instanceHandle.isDefined() || complete_fn_(change, *vit->second);
ret_value = !change->instanceHandle.isDefined() ||
complete_fn_(change, *vit->second);
}

if (!ret_value)
Expand All @@ -660,6 +661,10 @@ bool DataReaderHistory::completed_change(
}
}
}
else
{
logError(SUBSCRIBER, "Could not compute key from change");
}

return ret_value;
}
Expand Down
7 changes: 7 additions & 0 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,7 @@ bool StatefulReader::processDataFragMsg(
{
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle.clear();
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
change_created = work_change;
}
Expand All @@ -681,6 +682,12 @@ bool StatefulReader::processDataFragMsg(

if (work_change != nullptr)
{
// Set the instanceHandle only when fragment number 1 is received
if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1)
{
work_change->instanceHandle = change_to_add->instanceHandle;
}

work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum,
fragmentsInSubmessage);
}
Expand Down
8 changes: 8 additions & 0 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,7 @@ bool StatelessReader::processDataFragMsg(
// Sample fits inside pending change. Reuse it.
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle.clear();
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
}
else
Expand All @@ -631,6 +632,7 @@ bool StatelessReader::processDataFragMsg(
{
work_change->copy_not_memcpy(change_to_add);
work_change->serializedPayload.length = sampleSize;
work_change->instanceHandle.clear();
work_change->setFragmentSize(change_to_add->getFragmentSize(), true);
}
}
Expand All @@ -640,6 +642,12 @@ bool StatelessReader::processDataFragMsg(
CacheChange_t* change_completed = nullptr;
if (work_change != nullptr)
{
// Set the instanceHandle only when fragment number 1 is received
if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1)
{
work_change->instanceHandle = change_to_add->instanceHandle;
}

if (work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum,
fragmentsInSubmessage))
{
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/dds-pim/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -991,6 +991,13 @@ class PubSubReader
return *this;
}

PubSubReader& expect_inline_qos(
bool expect)
{
datareader_qos_.expects_inline_qos(expect);
return *this;
}

PubSubReader& heartbeatResponseDelay(
const int32_t secs,
const int32_t frac)
Expand Down
7 changes: 7 additions & 0 deletions test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,13 @@ class PubSubReader
return *this;
}

PubSubReader& expect_inline_qos(
bool expect)
{
subscriber_attr_.expectsInlineQos = expect;
return *this;
}

PubSubReader& heartbeatResponseDelay(
const int32_t secs,
const int32_t frac)
Expand Down
61 changes: 61 additions & 0 deletions test/blackbox/common/BlackboxTestsPubSubFragments.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,67 @@ TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1InLos
testTransport->dropLogLength);
}

// Regression test for 20257
// When a non existing change is removed, the change is also removed from the data instance changes sequence
// For uncrustify sake *INDENT-OFF*
TEST(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1LoosyConditionsSmallFragmentsCorrectlyBehavesWhenInlineQoSAreForced)
// *INDENT-ON*
{
PubSubReader<KeyedData1mbPubSubType> reader(TEST_TOPIC_NAME);
PubSubWriter<KeyedData1mbPubSubType> writer(TEST_TOPIC_NAME);

reader.history_depth(2)
.expect_inline_qos(true)
.reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS)
.init();

ASSERT_TRUE(reader.isInitialized());

// To simulate lossy conditions, we are going to remove the default
// builtin transport, and instead use a lossy shim layer variant.
auto testTransport = std::make_shared<test_UDPv4TransportDescriptor>();
testTransport->maxMessageSize = 1024;
// We drop 20% of all data frags
testTransport->dropDataFragMessagesPercentage = 20;
testTransport->dropLogLength = 1;
writer.disable_builtin_transport();
writer.add_user_transport_to_pparams(testTransport);

// When doing fragmentation, it is necessary to have some degree of
// flow control not to overrun the receive buffer.
uint32_t bytesPerPeriod = 153601;
uint32_t periodInMs = 100;
writer.add_throughput_controller_descriptor_to_pparams(
eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::HIGH_PRIORITY, bytesPerPeriod, periodInMs)
.heartbeat_period_seconds(0)
.heartbeat_period_nanosec(1000000)
.history_depth(1)
.asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE).init();

ASSERT_TRUE(writer.isInitialized());

// Because its volatile the durability
// Wait for discovery.
writer.wait_discovery();
reader.wait_discovery();

auto data = default_keyeddata300kb_data_generator(5);

reader.startReception(data);

// Send data
writer.send(data, 100);
// In this test all data should be sent.
ASSERT_TRUE(data.empty());
// Block reader until reception finished or timeout.
reader.block_for_seq({ 0, 5 });

// Sanity check. Make sure we have dropped a few packets
ASSERT_EQ(
test_UDPv4Transport::test_UDPv4Transport_DropLog.size(),
testTransport->dropLogLength);
}

TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableVolatileData300kbInLossyConditionsSmallFragments)
{
PubSubReader<Data1mbPubSubType> reader(TEST_TOPIC_NAME);
Expand Down

0 comments on commit 3f203ec

Please sign in to comment.