From 0c6da983caa47560cd5eefe0a079549ceaa495ce Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 26 Jan 2024 07:44:37 +0100 Subject: [PATCH] Discard already processed samples on PDPListener (#4268) (#4281) * Refs #20276: Discard processing already processed samples on PDPListener Signed-off-by: EduPonz * Refs #20276: Fix failing tests Signed-off-by: EduPonz * Refs #20276: Address Miguel's comments Signed-off-by: EduPonz --------- Signed-off-by: EduPonz (cherry picked from commit 4864393b1e77d6a7d41ea9aafb1c83f686a21fc8) Co-authored-by: Eduardo Ponz Segrelles --- .../rtps/builtin/data/ParticipantProxyData.h | 2 ++ .../builtin/data/ParticipantProxyData.cpp | 3 +++ .../discovery/participant/PDPListener.cpp | 19 ++++++++++++++++++- 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h index 5602944ed2d..2bdc3daa413 100644 --- a/include/fastdds/rtps/builtin/data/ParticipantProxyData.h +++ b/include/fastdds/rtps/builtin/data/ParticipantProxyData.h @@ -118,6 +118,8 @@ class ParticipantProxyData //! ProxyHashTable* m_writers = nullptr; + SampleIdentity m_sample_identity; + /** * Update the data. * @param pdata Object to copy the data from diff --git a/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp b/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp index a5cefd53f63..45ceac2c1ef 100644 --- a/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp +++ b/src/cpp/rtps/builtin/data/ParticipantProxyData.cpp @@ -64,6 +64,7 @@ ParticipantProxyData::ParticipantProxyData( , should_check_lease_duration(false) , m_readers(new ProxyHashTable(allocation.readers)) , m_writers(new ProxyHashTable(allocation.writers)) + , m_sample_identity() { m_userData.set_max_size(static_cast(allocation.data_limits.max_user_data)); } @@ -97,6 +98,7 @@ ParticipantProxyData::ParticipantProxyData( // so there is no need to copy m_readers and m_writers , m_readers(nullptr) , m_writers(nullptr) + , m_sample_identity(pdata.m_sample_identity) , lease_duration_(pdata.lease_duration_) { } @@ -702,6 +704,7 @@ void ParticipantProxyData::copy( isAlive = pdata.isAlive; m_userData = pdata.m_userData; m_properties = pdata.m_properties; + m_sample_identity = pdata.m_sample_identity; // This method is only called when a new participant is discovered.The destination of the copy // will always be a new ParticipantProxyData or one from the pool, so there is no need for diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp index 8d04bc7b4c8..31f803f88ab 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp @@ -122,16 +122,33 @@ void PDPListener::onNewCacheChangeAdded( // Check if participant already exists (updated info) ParticipantProxyData* pdata = nullptr; + bool already_processed = false; for (ParticipantProxyData* it : parent_pdp_->participant_proxies_) { if (guid == it->m_guid) { pdata = it; + + // This means this is the same DATA(p) that we have already processed. + // We do not compare sample_identity directly because it is not properly filled + // in the change during desearialization. + if (it->m_sample_identity.writer_guid() == change->writerGUID && + it->m_sample_identity.sequence_number() == change->sequenceNumber) + { + already_processed = true; + } + break; } } - process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock); + // Only process the DATA(p) if it is not a repeated one + if (!already_processed) + { + temp_participant_data_.m_sample_identity.writer_guid(change->writerGUID); + temp_participant_data_.m_sample_identity.sequence_number(change->sequenceNumber); + process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock); + } } } else if (reader->matched_writer_is_matched(writer_guid))