From 1b7eaea94542c05e3bd9a953bd3c00fb3bb41b14 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Tue, 26 Mar 2024 08:19:43 +0100 Subject: [PATCH] Discard already processed samples on PDPListener (#4268) (#4283) * Refs #20276: Discard processing already processed samples on PDPListener Signed-off-by: EduPonz * Refs #20276: Fix failing tests Signed-off-by: EduPonz * Refs #20276: Address Miguel's comments Signed-off-by: EduPonz --------- Signed-off-by: EduPonz (cherry picked from commit 4864393b1e77d6a7d41ea9aafb1c83f686a21fc8) Co-authored-by: Eduardo Ponz Segrelles --- .../rtps/builtin/data/ParticipantProxyData.h | 2 ++ .../builtin/data/ParticipantProxyData.cpp | 3 +++ .../discovery/participant/PDPListener.cpp | 19 ++++++++++++++++++- 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h index b3d1462875d..57b36f7cc73 100644 --- a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h +++ b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h @@ -118,6 +118,8 @@ class ParticipantProxyData //! ProxyHashTable* m_writers = nullptr; + SampleIdentity m_sample_identity; + /** * Update the data. * @param pdata Object to copy the data from diff --git a/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp b/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp index f980689f53a..1f5d038e862 100644 --- a/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp +++ b/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp @@ -66,6 +66,7 @@ ParticipantProxyData::ParticipantProxyData( , should_check_lease_duration(false) , m_readers(new ProxyHashTable(allocation.readers)) , m_writers(new ProxyHashTable(allocation.writers)) + , m_sample_identity() { m_userData.set_max_size(static_cast(allocation.data_limits.max_user_data)); } @@ -99,6 +100,7 @@ ParticipantProxyData::ParticipantProxyData( // so there is no need to copy m_readers and m_writers , m_readers(nullptr) , m_writers(nullptr) + , m_sample_identity(pdata.m_sample_identity) , lease_duration_(pdata.lease_duration_) { } @@ -717,6 +719,7 @@ void ParticipantProxyData::copy( isAlive = pdata.isAlive; m_userData = pdata.m_userData; m_properties = pdata.m_properties; + m_sample_identity = pdata.m_sample_identity; // This method is only called when a new participant is discovered.The destination of the copy // will always be a new ParticipantProxyData or one from the pool, so there is no need for diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp index 6eecef90021..420f4466838 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp @@ -110,16 +110,33 @@ void PDPListener::onNewCacheChangeAdded( // Check if participant already exists (updated info) ParticipantProxyData* pdata = nullptr; + bool already_processed = false; for (ParticipantProxyData* it : parent_pdp_->participant_proxies_) { if (guid == it->m_guid) { pdata = it; + + // This means this is the same DATA(p) that we have already processed. + // We do not compare sample_identity directly because it is not properly filled + // in the change during desearialization. + if (it->m_sample_identity.writer_guid() == change->writerGUID && + it->m_sample_identity.sequence_number() == change->sequenceNumber) + { + already_processed = true; + } + break; } } - process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock); + // Only process the DATA(p) if it is not a repeated one + if (!already_processed) + { + temp_participant_data_.m_sample_identity.writer_guid(change->writerGUID); + temp_participant_data_.m_sample_identity.sequence_number(change->sequenceNumber); + process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock); + } } } else if (reader->matched_writer_is_matched(writer_guid))