diff --git a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp index e9f40f3b8de..51cf8030956 100644 --- a/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp +++ b/src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp @@ -863,7 +863,7 @@ void DiscoveryDataBase::create_writers_from_change_( } else { - logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " as no associated participant. Skipping"); + logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " has no associated participant. Skipping"); return; } @@ -980,7 +980,7 @@ void DiscoveryDataBase::create_readers_from_change_( } else { - logError(DISCOVERY_DATABASE, "Writer " << reader_guid << " as no associated participant. Skipping"); + logError(DISCOVERY_DATABASE, "Reader " << reader_guid << " has no associated participant. Skipping"); return; } @@ -1714,10 +1714,15 @@ void DiscoveryDataBase::AckedFunctor::operator () ( { if (reader_proxy->guid().guidPrefix == *it) { - // If the participant is already in the DB it means it has answered to the pinging - // or that is pinging us and we have already received its DATA(p) - // If neither of both has happenned we should not wait for it to ack this data, so we - // skip it and leave it as acked + /* + * If the participant is already in the DB it means it has answered to the pinging + * or that is pinging us and we have already received its DATA(p) + * If neither has happenned (participant is not in DB) + * we should not wait for it to ack this data, or it could get stucked in an endless loop + * (this Remote Server could not exist and/or never be discovered) + * Nevertheless, the ack is still pending for this participant and once it is discovered this + * data will be sent again + */ auto remote_server_it = db_->participants_.find(*it); if (remote_server_it == db_->participants_.end()) { @@ -1725,6 +1730,8 @@ void DiscoveryDataBase::AckedFunctor::operator () ( "check as acked for " << reader_proxy->guid() << " as it has not answered pinging yet"); return; } + + break; } } @@ -1741,32 +1748,20 @@ void DiscoveryDataBase::unmatch_participant_( { logInfo(DISCOVERY_DATABASE, "unmatching participant: " << guid_prefix); - auto pit = participants_.find(guid_prefix); - if (pit == participants_.end()) + // For each participant remove it + // IMPORTANT: This is not for every relevant participant, as participant A could be in other participant's B info + // and B not be relevant for A. So it must be done for every Participant. + for (auto& participant_it : participants_) { - logWarning(DISCOVERY_DATABASE, - "Attempting to unmatch an unexisting participant: " << guid_prefix); + participant_it.second.remove_participant(guid_prefix); } - - // For each relevant participant make not relevant - for (eprosima::fastrtps::rtps::GuidPrefix_t relevant_participant : pit->second.relevant_participants()) + for (auto& writer_it : writers_) { - if (relevant_participant != guid_prefix) - { - auto rpit = participants_.find(relevant_participant); - if (rpit == participants_.end()) - { - // This is not an error. Remote participants will try to unmatch with participants even - // when the match is not reciprocal - logInfo(DISCOVERY_DATABASE, - "Participant " << relevant_participant << " matched with an unexisting participant: " << - guid_prefix); - } - else - { - rpit->second.remove_participant(guid_prefix); - } - } + writer_it.second.remove_participant(guid_prefix); + } + for (auto& reader_it : readers_) + { + reader_it.second.remove_participant(guid_prefix); } } diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp index 0be70655591..c9918be430a 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp @@ -755,6 +755,30 @@ bool get_server_client_default_guidPrefix( return false; } +bool PDPClient::remove_remote_participant( + const GUID_t& partGUID, + ParticipantDiscoveryInfo::DISCOVERY_STATUS reason) +{ + if (PDP::remove_remote_participant(partGUID, reason)) + { + // If it works fine, return + return true; + } + + // Erase Proxies created before having the Participant + GUID_t wguid; + wguid.guidPrefix = partGUID.guidPrefix; + wguid.entityId = c_EntityId_SPDPWriter; + mp_PDPReader->matched_writer_remove(wguid); + + GUID_t rguid; + rguid.guidPrefix = partGUID.guidPrefix; + rguid.entityId = c_EntityId_SPDPReader; + mp_PDPWriter->matched_reader_remove(rguid); + + return false; +} + } /* namespace rtps */ } /* namespace fastdds */ } /* namespace eprosima */ diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h index d2620444b67..8e8c28ae5ff 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPClient.h +++ b/src/cpp/rtps/builtin/discovery/participant/PDPClient.h @@ -122,6 +122,16 @@ class PDPClient : public PDP void notifyAboveRemoteEndpoints( const ParticipantProxyData& pdata) override; + /** + * This method removes a remote RTPSParticipant and all its writers and readers. + * @param participant_guid GUID_t of the remote RTPSParticipant. + * @param reason Why the participant is being removed (dropped vs removed) + * @return true if correct. + */ + bool remove_remote_participant( + const GUID_t& participant_guid, + ParticipantDiscoveryInfo::DISCOVERY_STATUS reason) override; + /** * Matching server EDP endpoints * @return true if all servers have been discovered diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp index 98cdbd409a8..3d6f5b17742 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp @@ -63,6 +63,8 @@ void PDPServerListener::onNewCacheChangeAdded( // Get PDP reader to release change auto pdp_reader = pdp_server()->mp_PDPReader; + bool routine_should_be_awake = false; + // Create a delete function to clear the data associated with the unique pointer in case the change is not passed // to the database. auto deleter = [pdp_history](CacheChange_t* p) @@ -258,7 +260,7 @@ void PDPServerListener::onNewCacheChangeAdded( // The server does not have to postpone the execution of the routine if a change is received, i.e. // the server routine is triggered instantly as the default value of the interval that the server has // to wait is 0. - pdp_server()->awake_routine_thread(); + routine_should_be_awake = true; // TODO: when the DiscoveryDataBase allows updating capabilities we can dismissed old PDP processing } @@ -376,7 +378,7 @@ void PDPServerListener::onNewCacheChangeAdded( // The server does not have to postpone the execution of the routine if a change is received, i.e. // the server routine is triggered instantly as the default value of the interval that the server has // to wait is 0. - pdp_server()->awake_routine_thread(); + routine_should_be_awake = true; // From here on, the discovery database takes ownership of the CacheChange_t. Henceforth there are no // references to the change. Take change ownership away from the unique pointer, so that its destruction @@ -386,13 +388,23 @@ void PDPServerListener::onNewCacheChangeAdded( // Remove participant from proxies reader->getMutex().unlock(); - if (pdp_server()->remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)) - { - reader->getMutex().lock(); - return; - } + pdp_server()->remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT); reader->getMutex().lock(); } + + /* + * Awake routine thread if needed. + * Thread is awaken at the end of the listener as it is required to have created the Proxies before + * the data is processed and the new messages added to history. + * If not, could happen that a message is added to history in order to be sent to a relevant participant, and + * this Participant still not have a ReaderProxy associated, and so it will miss the message and it wont be + * sent again (because if there are no changes PDP is no sent again). + */ + if (routine_should_be_awake) + { + pdp_server()->awake_routine_thread(); + } + // cache is removed from history (if it's still there) and returned to the pool on leaving the scope, since the // unique pointer destruction grants it. If the ownership has been taken away from the unique pointer, then nothing // happens at this point