From bda2327d5407aa61a2b7754a6df3622b19b46679 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mario=20Dom=C3=ADnguez=20L=C3=B3pez?= <116071334+Mario-DL@users.noreply.github.com> Date: Sat, 27 Jan 2024 09:16:07 +0100 Subject: [PATCH] Add a keyed fragmented change to the reader data instance only when its completed (#4261) * Refs #20257: Add regression test Signed-off-by: Mario Dominguez * #Refs #20257: Fix Signed-off-by: Mario Dominguez * Refs #20239: Second rev suggestions Signed-off-by: Mario Dominguez * Refs #20257: Linter Signed-off-by: Mario Dominguez * Refs #20257: Retrieve instance handle condition before for avoid being nullptr Signed-off-by: Mario Dominguez --------- Signed-off-by: Mario Dominguez (cherry picked from commit 9558ce436628bb0ce0bd76d44400efce1eed8378) --- .../subscriber/history/DataReaderHistory.cpp | 35 +++++------ src/cpp/rtps/reader/StatefulReader.cpp | 7 +++ src/cpp/rtps/reader/StatelessReader.cpp | 8 +++ test/blackbox/api/dds-pim/PubSubReader.hpp | 7 +++ .../api/fastrtps_deprecated/PubSubReader.hpp | 7 +++ .../common/BlackboxTestsPubSubFragments.cpp | 60 +++++++++++++++++++ 6 files changed, 105 insertions(+), 19 deletions(-) diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index e73b1043b6d..729a8b86f31 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -132,14 +132,14 @@ DataReaderHistory::DataReaderHistory( compute_key_for_change_fn_ = [this](CacheChange_t* a_change) { - if (a_change->instanceHandle.isDefined()) + if (!a_change->is_fully_assembled()) { - return true; + return false; } - if (!a_change->is_fully_assembled()) + if (a_change->instanceHandle.isDefined()) { - return false; + return true; } if (type_ != nullptr) @@ -708,27 +708,24 @@ bool DataReaderHistory::completed_change( size_t unknown_missing_changes_up_to, SampleRejectedStatusKind& rejection_reason) { - bool ret_value = true; - rejection_reason = NOT_REJECTED; + bool ret_value = false; + rejection_reason = REJECTED_BY_INSTANCES_LIMIT; - if (!change->instanceHandle.isDefined()) + if (compute_key_for_change_fn_(change)) { - ret_value = false; - if (compute_key_for_change_fn_(change)) + InstanceCollection::iterator vit; + if (find_key(change->instanceHandle, vit)) { - InstanceCollection::iterator vit; - if (find_key(change->instanceHandle, vit)) - { - ret_value = !change->instanceHandle.isDefined() || - complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason); - } - else - { - rejection_reason = REJECTED_BY_INSTANCES_LIMIT; - } + ret_value = !change->instanceHandle.isDefined() || + complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason); } } + if (ret_value) + { + rejection_reason = NOT_REJECTED; + } + return ret_value; } diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index 91684f7a780..472aa464073 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -695,6 +695,7 @@ bool StatefulReader::processDataFragMsg( { work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); change_created = work_change; } @@ -703,6 +704,12 @@ bool StatefulReader::processDataFragMsg( if (work_change != nullptr) { + // Set the instanceHandle only when fragment number 1 is received + if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1) + { + work_change->instanceHandle = change_to_add->instanceHandle; + } + work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum, fragmentsInSubmessage); } diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index db058d1959e..ee3703184fb 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -645,6 +645,7 @@ bool StatelessReader::processDataFragMsg( // Sample fits inside pending change. Reuse it. work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); } else @@ -670,6 +671,7 @@ bool StatelessReader::processDataFragMsg( { work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); } } @@ -679,6 +681,12 @@ bool StatelessReader::processDataFragMsg( CacheChange_t* change_completed = nullptr; if (work_change != nullptr) { + // Set the instanceHandle only when fragment number 1 is received + if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1) + { + work_change->instanceHandle = change_to_add->instanceHandle; + } + if (work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum, fragmentsInSubmessage)) { diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 29b8257bb36..19f6ddccf2f 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -1021,6 +1021,13 @@ class PubSubReader return *this; } + PubSubReader& expect_inline_qos( + bool expect) + { + datareader_qos_.expects_inline_qos(expect); + return *this; + } + PubSubReader& heartbeatResponseDelay( const int32_t secs, const int32_t frac) diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index aa7233b4736..cf23095bb9b 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -769,6 +769,13 @@ class PubSubReader return *this; } + PubSubReader& expect_inline_qos( + bool expect) + { + subscriber_attr_.expectsInlineQos = expect; + return *this; + } + PubSubReader& heartbeatResponseDelay( const int32_t secs, const int32_t frac) diff --git a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp index 9df48268914..4200871f920 100644 --- a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp @@ -649,6 +649,66 @@ TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1InLos testTransport->dropLogLength); } +// Regression test for 20257 +// When a non existing change is removed, the change is also removed from the data instance changes sequence +TEST(PubSubFragmentsLimited, + AsyncPubSubAsReliableKeyedData300kbKeepLast1LoosyConditionsSmallFragmentsCorrectlyBehavesWhenInlineQoSAreForced) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + reader.history_depth(2) + .expect_inline_qos(true) + .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + + // To simulate lossy conditions, we are going to remove the default + // builtin transport, and instead use a lossy shim layer variant. + auto testTransport = std::make_shared(); + testTransport->maxMessageSize = 1024; + // We drop 20% of all data frags + testTransport->dropDataFragMessagesPercentage = 20; + testTransport->dropLogLength = 1; + writer.disable_builtin_transport(); + writer.add_user_transport_to_pparams(testTransport); + + // When doing fragmentation, it is necessary to have some degree of + // flow control not to overrun the receive buffer. + uint32_t bytesPerPeriod = 153601; + uint32_t periodInMs = 100; + writer.add_throughput_controller_descriptor_to_pparams( + eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::HIGH_PRIORITY, bytesPerPeriod, periodInMs) + .heartbeat_period_seconds(0) + .heartbeat_period_nanosec(1000000) + .history_depth(1) + .asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyeddata300kb_data_generator(5); + + reader.startReception(data); + + // Send data + writer.send(data, 100); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_seq({ 0, 5 }); + + // Sanity check. Make sure we have dropped a few packets + ASSERT_EQ( + test_UDPv4Transport::test_UDPv4Transport_DropLog.size(), + testTransport->dropLogLength); +} + TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableVolatileData300kbInLossyConditionsSmallFragments) { PubSubReader reader(TEST_TOPIC_NAME);