diff --git a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp index b989db21d44..72d341c5262 100644 --- a/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp +++ b/src/cpp/fastdds/subscriber/history/DataReaderHistory.cpp @@ -139,14 +139,14 @@ DataReaderHistory::DataReaderHistory( compute_key_for_change_fn_ = [this](CacheChange_t* a_change) { - if (a_change->instanceHandle.isDefined()) + if (!a_change->is_fully_assembled()) { - return true; + return false; } - if (!a_change->is_fully_assembled()) + if (a_change->instanceHandle.isDefined()) { - return false; + return true; } if (type_ != nullptr) @@ -635,11 +635,26 @@ ReaderHistory::iterator DataReaderHistory::remove_change_nts( bool DataReaderHistory::completed_change( CacheChange_t* change) { +<<<<<<< HEAD bool ret_value = true; +======= + SampleRejectedStatusKind reason; + return completed_change(change, 0, reason); +} - if (!change->instanceHandle.isDefined()) +bool DataReaderHistory::completed_change( + CacheChange_t* change, + size_t unknown_missing_changes_up_to, + SampleRejectedStatusKind& rejection_reason) +{ + bool ret_value = false; + rejection_reason = REJECTED_BY_INSTANCES_LIMIT; +>>>>>>> 9558ce436 (Add a keyed fragmented change to the reader data instance only when its completed (#4261)) + + if (compute_key_for_change_fn_(change)) { InstanceCollection::iterator vit; +<<<<<<< HEAD ret_value = compute_key_for_change_fn_(change) && find_key(change->instanceHandle, vit); if (ret_value) { @@ -658,9 +673,20 @@ bool DataReaderHistory::completed_change( { logError(SUBSCRIBER, "Change should exist but didn't find it"); } +======= + if (find_key(change->instanceHandle, vit)) + { + ret_value = !change->instanceHandle.isDefined() || + complete_fn_(change, *vit->second, unknown_missing_changes_up_to, rejection_reason); +>>>>>>> 9558ce436 (Add a keyed fragmented change to the reader data instance only when its completed (#4261)) } } + if (ret_value) + { + rejection_reason = NOT_REJECTED; + } + return ret_value; } diff --git a/src/cpp/rtps/reader/StatefulReader.cpp b/src/cpp/rtps/reader/StatefulReader.cpp index e58b1dd92eb..5f488c89780 100644 --- a/src/cpp/rtps/reader/StatefulReader.cpp +++ b/src/cpp/rtps/reader/StatefulReader.cpp @@ -673,6 +673,7 @@ bool StatefulReader::processDataFragMsg( { work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); change_created = work_change; } @@ -681,6 +682,12 @@ bool StatefulReader::processDataFragMsg( if (work_change != nullptr) { + // Set the instanceHandle only when fragment number 1 is received + if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1) + { + work_change->instanceHandle = change_to_add->instanceHandle; + } + work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum, fragmentsInSubmessage); } diff --git a/src/cpp/rtps/reader/StatelessReader.cpp b/src/cpp/rtps/reader/StatelessReader.cpp index 7b057501987..971f62950fb 100644 --- a/src/cpp/rtps/reader/StatelessReader.cpp +++ b/src/cpp/rtps/reader/StatelessReader.cpp @@ -606,6 +606,7 @@ bool StatelessReader::processDataFragMsg( // Sample fits inside pending change. Reuse it. work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); } else @@ -631,6 +632,7 @@ bool StatelessReader::processDataFragMsg( { work_change->copy_not_memcpy(change_to_add); work_change->serializedPayload.length = sampleSize; + work_change->instanceHandle.clear(); work_change->setFragmentSize(change_to_add->getFragmentSize(), true); } } @@ -640,6 +642,12 @@ bool StatelessReader::processDataFragMsg( CacheChange_t* change_completed = nullptr; if (work_change != nullptr) { + // Set the instanceHandle only when fragment number 1 is received + if (!work_change->instanceHandle.isDefined() && fragmentStartingNum == 1) + { + work_change->instanceHandle = change_to_add->instanceHandle; + } + if (work_change->add_fragments(change_to_add->serializedPayload, fragmentStartingNum, fragmentsInSubmessage)) { diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 2f94b832992..d9a16d75e5c 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -991,6 +991,13 @@ class PubSubReader return *this; } + PubSubReader& expect_inline_qos( + bool expect) + { + datareader_qos_.expects_inline_qos(expect); + return *this; + } + PubSubReader& heartbeatResponseDelay( const int32_t secs, const int32_t frac) diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 54ad64aaa1d..19bd9ad35b7 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -793,6 +793,13 @@ class PubSubReader return *this; } + PubSubReader& expect_inline_qos( + bool expect) + { + subscriber_attr_.expectsInlineQos = expect; + return *this; + } + PubSubReader& heartbeatResponseDelay( const int32_t secs, const int32_t frac) diff --git a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp index 9df48268914..4200871f920 100644 --- a/test/blackbox/common/BlackboxTestsPubSubFragments.cpp +++ b/test/blackbox/common/BlackboxTestsPubSubFragments.cpp @@ -649,6 +649,66 @@ TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableKeyedData300kbKeepLast1InLos testTransport->dropLogLength); } +// Regression test for 20257 +// When a non existing change is removed, the change is also removed from the data instance changes sequence +TEST(PubSubFragmentsLimited, + AsyncPubSubAsReliableKeyedData300kbKeepLast1LoosyConditionsSmallFragmentsCorrectlyBehavesWhenInlineQoSAreForced) +{ + PubSubReader reader(TEST_TOPIC_NAME); + PubSubWriter writer(TEST_TOPIC_NAME); + + reader.history_depth(2) + .expect_inline_qos(true) + .reliability(eprosima::fastrtps::RELIABLE_RELIABILITY_QOS) + .init(); + + ASSERT_TRUE(reader.isInitialized()); + + // To simulate lossy conditions, we are going to remove the default + // builtin transport, and instead use a lossy shim layer variant. + auto testTransport = std::make_shared(); + testTransport->maxMessageSize = 1024; + // We drop 20% of all data frags + testTransport->dropDataFragMessagesPercentage = 20; + testTransport->dropLogLength = 1; + writer.disable_builtin_transport(); + writer.add_user_transport_to_pparams(testTransport); + + // When doing fragmentation, it is necessary to have some degree of + // flow control not to overrun the receive buffer. + uint32_t bytesPerPeriod = 153601; + uint32_t periodInMs = 100; + writer.add_throughput_controller_descriptor_to_pparams( + eprosima::fastdds::rtps::FlowControllerSchedulerPolicy::HIGH_PRIORITY, bytesPerPeriod, periodInMs) + .heartbeat_period_seconds(0) + .heartbeat_period_nanosec(1000000) + .history_depth(1) + .asynchronously(eprosima::fastrtps::ASYNCHRONOUS_PUBLISH_MODE).init(); + + ASSERT_TRUE(writer.isInitialized()); + + // Because its volatile the durability + // Wait for discovery. + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyeddata300kb_data_generator(5); + + reader.startReception(data); + + // Send data + writer.send(data, 100); + // In this test all data should be sent. + ASSERT_TRUE(data.empty()); + // Block reader until reception finished or timeout. + reader.block_for_seq({ 0, 5 }); + + // Sanity check. Make sure we have dropped a few packets + ASSERT_EQ( + test_UDPv4Transport::test_UDPv4Transport_DropLog.size(), + testTransport->dropLogLength); +} + TEST_P(PubSubFragmentsLimited, AsyncPubSubAsReliableVolatileData300kbInLossyConditionsSmallFragments) { PubSubReader reader(TEST_TOPIC_NAME);