Skip to content

Commit

Permalink
Discard already processed samples on PDPListener (#4268) (#4283)
Browse files Browse the repository at this point in the history
* Refs #20276: Discard processing already processed samples on PDPListener

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20276: Fix failing tests

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

* Refs #20276: Address Miguel's comments

Signed-off-by: EduPonz <eduardoponz@eprosima.com>

---------

Signed-off-by: EduPonz <eduardoponz@eprosima.com>
(cherry picked from commit 4864393)

Co-authored-by: Eduardo Ponz Segrelles <eduardoponz@eprosima.com>
  • Loading branch information
mergify[bot] and EduPonz authored Mar 26, 2024
1 parent 2c67f2e commit 1b7eaea
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1 deletion.
2 changes: 2 additions & 0 deletions include/fastdds/rtps/builtin/data/ParticipantProxyData.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ class ParticipantProxyData
//!
ProxyHashTable<WriterProxyData>* m_writers = nullptr;

SampleIdentity m_sample_identity;

/**
* Update the data.
* @param pdata Object to copy the data from
Expand Down
3 changes: 3 additions & 0 deletions src/cpp/rtps/builtin/data/ParticipantProxyData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ ParticipantProxyData::ParticipantProxyData(
, should_check_lease_duration(false)
, m_readers(new ProxyHashTable<ReaderProxyData>(allocation.readers))
, m_writers(new ProxyHashTable<WriterProxyData>(allocation.writers))
, m_sample_identity()
{
m_userData.set_max_size(static_cast<uint32_t>(allocation.data_limits.max_user_data));
}
Expand Down Expand Up @@ -99,6 +100,7 @@ ParticipantProxyData::ParticipantProxyData(
// so there is no need to copy m_readers and m_writers
, m_readers(nullptr)
, m_writers(nullptr)
, m_sample_identity(pdata.m_sample_identity)
, lease_duration_(pdata.lease_duration_)
{
}
Expand Down Expand Up @@ -717,6 +719,7 @@ void ParticipantProxyData::copy(
isAlive = pdata.isAlive;
m_userData = pdata.m_userData;
m_properties = pdata.m_properties;
m_sample_identity = pdata.m_sample_identity;

// This method is only called when a new participant is discovered.The destination of the copy
// will always be a new ParticipantProxyData or one from the pool, so there is no need for
Expand Down
19 changes: 18 additions & 1 deletion src/cpp/rtps/builtin/discovery/participant/PDPListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,33 @@ void PDPListener::onNewCacheChangeAdded(

// Check if participant already exists (updated info)
ParticipantProxyData* pdata = nullptr;
bool already_processed = false;
for (ParticipantProxyData* it : parent_pdp_->participant_proxies_)
{
if (guid == it->m_guid)
{
pdata = it;

// This means this is the same DATA(p) that we have already processed.
// We do not compare sample_identity directly because it is not properly filled
// in the change during desearialization.
if (it->m_sample_identity.writer_guid() == change->writerGUID &&
it->m_sample_identity.sequence_number() == change->sequenceNumber)
{
already_processed = true;
}

break;
}
}

process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock);
// Only process the DATA(p) if it is not a repeated one
if (!already_processed)
{
temp_participant_data_.m_sample_identity.writer_guid(change->writerGUID);
temp_participant_data_.m_sample_identity.sequence_number(change->sequenceNumber);
process_alive_data(pdata, temp_participant_data_, writer_guid, reader, lock);
}
}
}
else if (reader->matched_writer_is_matched(writer_guid))
Expand Down

0 comments on commit 1b7eaea

Please sign in to comment.