Skip to content

Commit

Permalink
Discovery Server fix reconnection (#2246)
Browse files Browse the repository at this point in the history
* Refs #12522: Fix minor ds errors

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: fix typo

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: apply suggestions

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: Client reconnection fix

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: apply suggestions to fix comment

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: uncrustify

Signed-off-by: jparisu <javierparis@eprosima.com>

* Refs #12522: Apply new fix

Signed-off-by: jparisu <javierparis@eprosima.com>
  • Loading branch information
jparisu authored Nov 22, 2021
1 parent 35910ca commit 4368260
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 36 deletions.
53 changes: 24 additions & 29 deletions src/cpp/rtps/builtin/discovery/database/DiscoveryDataBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ void DiscoveryDataBase::create_writers_from_change_(
}
else
{
logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " as no associated participant. Skipping");
logError(DISCOVERY_DATABASE, "Writer " << writer_guid << " has no associated participant. Skipping");
return;
}

Expand Down Expand Up @@ -987,7 +987,7 @@ void DiscoveryDataBase::create_readers_from_change_(
}
else
{
logError(DISCOVERY_DATABASE, "Writer " << reader_guid << " as no associated participant. Skipping");
logError(DISCOVERY_DATABASE, "Reader " << reader_guid << " has no associated participant. Skipping");
return;
}

Expand Down Expand Up @@ -1721,17 +1721,24 @@ void DiscoveryDataBase::AckedFunctor::operator () (
{
if (reader_proxy->guid().guidPrefix == *it)
{
// If the participant is already in the DB it means it has answered to the pinging
// or that is pinging us and we have already received its DATA(p)
// If neither of both has happenned we should not wait for it to ack this data, so we
// skip it and leave it as acked
/*
* If the participant is already in the DB it means it has answered to the pinging
* or that is pinging us and we have already received its DATA(p)
* If neither has happenned (participant is not in DB)
* we should not wait for it to ack this data, or it could get stucked in an endless loop
* (this Remote Server could not exist and/or never be discovered)
* Nevertheless, the ack is still pending for this participant and once it is discovered this
* data will be sent again
*/
auto remote_server_it = db_->participants_.find(*it);
if (remote_server_it == db_->participants_.end())
{
logInfo(DISCOVERY_DATABASE, "Change " << change_->instanceHandle <<
"check as acked for " << reader_proxy->guid() << " as it has not answered pinging yet");
return;
}

break;
}
}

Expand All @@ -1748,32 +1755,20 @@ void DiscoveryDataBase::unmatch_participant_(
{
logInfo(DISCOVERY_DATABASE, "unmatching participant: " << guid_prefix);

auto pit = participants_.find(guid_prefix);
if (pit == participants_.end())
// For each participant remove it
// IMPORTANT: This is not for every relevant participant, as participant A could be in other participant's B info
// and B not be relevant for A. So it must be done for every Participant.
for (auto& participant_it : participants_)
{
logWarning(DISCOVERY_DATABASE,
"Attempting to unmatch an unexisting participant: " << guid_prefix);
participant_it.second.remove_participant(guid_prefix);
}

// For each relevant participant make not relevant
for (eprosima::fastrtps::rtps::GuidPrefix_t relevant_participant : pit->second.relevant_participants())
for (auto& writer_it : writers_)
{
if (relevant_participant != guid_prefix)
{
auto rpit = participants_.find(relevant_participant);
if (rpit == participants_.end())
{
// This is not an error. Remote participants will try to unmatch with participants even
// when the match is not reciprocal
logInfo(DISCOVERY_DATABASE,
"Participant " << relevant_participant << " matched with an unexisting participant: " <<
guid_prefix);
}
else
{
rpit->second.remove_participant(guid_prefix);
}
}
writer_it.second.remove_participant(guid_prefix);
}
for (auto& reader_it : readers_)
{
reader_it.second.remove_participant(guid_prefix);
}
}

Expand Down
26 changes: 26 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,32 @@ bool get_server_client_default_guidPrefix(
return false;
}

bool PDPClient::remove_remote_participant(
const GUID_t& partGUID,
ParticipantDiscoveryInfo::DISCOVERY_STATUS reason)
{
if (PDP::remove_remote_participant(partGUID, reason))
{
// If it works fine, return
return true;
}

// Erase Proxies created before having the Participant
GUID_t wguid;
wguid.guidPrefix = partGUID.guidPrefix;
wguid.entityId = c_EntityId_SPDPWriter;
mp_PDPReader->matched_writer_remove(wguid);

GUID_t rguid;
rguid.guidPrefix = partGUID.guidPrefix;
rguid.entityId = c_EntityId_SPDPReader;
mp_PDPWriter->matched_reader_remove(rguid);

update_remote_servers_list();

return false;
}

} /* namespace rtps */
} /* namespace fastdds */
} /* namespace eprosima */
10 changes: 10 additions & 0 deletions src/cpp/rtps/builtin/discovery/participant/PDPClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,16 @@ class PDPClient : public PDP
void notifyAboveRemoteEndpoints(
const ParticipantProxyData& pdata) override;

/**
* This method removes a remote RTPSParticipant and all its writers and readers.
* @param participant_guid GUID_t of the remote RTPSParticipant.
* @param reason Why the participant is being removed (dropped vs removed)
* @return true if correct.
*/
bool remove_remote_participant(
const GUID_t& participant_guid,
ParticipantDiscoveryInfo::DISCOVERY_STATUS reason) override;

/**
* Matching server EDP endpoints
* @return true if all servers have been discovered
Expand Down
26 changes: 19 additions & 7 deletions src/cpp/rtps/builtin/discovery/participant/PDPServerListener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ void PDPServerListener::onNewCacheChangeAdded(
// Get PDP reader to release change
auto pdp_reader = pdp_server()->mp_PDPReader;

bool routine_should_be_awake = false;

// Create a delete function to clear the data associated with the unique pointer in case the change is not passed
// to the database.
auto deleter = [pdp_history](CacheChange_t* p)
Expand Down Expand Up @@ -262,7 +264,7 @@ void PDPServerListener::onNewCacheChangeAdded(
// The server does not have to postpone the execution of the routine if a change is received, i.e.
// the server routine is triggered instantly as the default value of the interval that the server has
// to wait is 0.
pdp_server()->awake_routine_thread();
routine_should_be_awake = true;

// TODO: when the DiscoveryDataBase allows updating capabilities we can dismissed old PDP processing
}
Expand Down Expand Up @@ -380,7 +382,7 @@ void PDPServerListener::onNewCacheChangeAdded(
// The server does not have to postpone the execution of the routine if a change is received, i.e.
// the server routine is triggered instantly as the default value of the interval that the server has
// to wait is 0.
pdp_server()->awake_routine_thread();
routine_should_be_awake = true;

// From here on, the discovery database takes ownership of the CacheChange_t. Henceforth there are no
// references to the change. Take change ownership away from the unique pointer, so that its destruction
Expand All @@ -390,13 +392,23 @@ void PDPServerListener::onNewCacheChangeAdded(

// Remove participant from proxies
reader->getMutex().unlock();
if (pdp_server()->remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT))
{
reader->getMutex().lock();
return;
}
pdp_server()->remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT);
reader->getMutex().lock();
}

/*
* Awake routine thread if needed.
* Thread is awaken at the end of the listener as it is required to have created the Proxies before
* the data is processed and the new messages added to history.
* If not, could happen that a message is added to history in order to be sent to a relevant participant, and
* this Participant still not have a ReaderProxy associated, and so it will miss the message and it wont be
* sent again (because if there are no changes PDP is no sent again).
*/
if (routine_should_be_awake)
{
pdp_server()->awake_routine_thread();
}

// cache is removed from history (if it's still there) and returned to the pool on leaving the scope, since the
// unique pointer destruction grants it. If the ownership has been taken away from the unique pointer, then nothing
// happens at this point
Expand Down

0 comments on commit 4368260

Please sign in to comment.