Skip to content

Commit

Permalink
[20815] Only apply content filter to ALIVE changes (#4876)
Browse files Browse the repository at this point in the history
* Refs #20815: Add regression test

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Only apply filter to ALIVE changes

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Rename change_is_relevant_for_filter argument

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Refactor test

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Cast loop index to uint16_t for assigning it to the key field

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Refactor test so PubSubWriter can be used directly

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Fix memory leak

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Apply Mario's suggestions

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Add type traits to PubSubWriterReader and PubSubParticipant

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Default move ctor and assignment in DynamicLoanableSequence

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Add doxygen in DynamicLoanableSequence

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Delete copy semantic from DynamicLoanableSequence

Signed-off-by: eduponz <eduardoponz@eprosima.com>

* Refs #20815: Use alias for TypeTraits::DataListType in PubSub* classes

Signed-off-by: eduponz <eduardoponz@eprosima.com>

---------

Signed-off-by: eduponz <eduardoponz@eprosima.com>
(cherry picked from commit 9a64956)

Signed-off-by: eduponz <eduardoponz@eprosima.com>
  • Loading branch information
EduPonz committed Jun 8, 2024
1 parent 0f649cd commit fdce426
Show file tree
Hide file tree
Showing 17 changed files with 1,042 additions and 131 deletions.
184 changes: 184 additions & 0 deletions include/fastrtps/types/DynamicLoanableSequence.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_
#define _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_

#include <cassert>
#include <memory>

#include <fastdds/dds/core/LoanableSequence.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastrtps/types/DynamicData.h>
#include <fastrtps/types/DynamicPubSubType.h>
#include <fastrtps/types/DynamicTypePtr.h>

namespace eprosima {
namespace fastdds {
namespace dds {

/**
* @brief LoanableSequence specialization for DynamicData.
*
* This class provides a sequence container for handling loanable collections
* of DynamicData.
*
* @tparam _NonConstEnabler to enable data access through [] operator on non-const sequences.
*/
template<typename _NonConstEnabler>
class LoanableSequence<fastrtps::types::DynamicData, _NonConstEnabler>
: public LoanableTypedCollection<fastrtps::types::DynamicData, _NonConstEnabler>
{
public:

/// Type for the size of the sequence.
using size_type = LoanableCollection::size_type;

/// Type for the elements in the sequence.
using element_type = LoanableCollection::element_type;

/**
* @brief Construct a LoanableSequence with a specified dynamic type.
*
* @param[in] dyn_type Pointer to the DynamicType.
*/
LoanableSequence(
fastrtps::types::DynamicType_ptr dyn_type)
: dynamic_type_support_(new fastrtps::types::DynamicPubSubType(dyn_type))
{
}

/**
* @brief Construct a LoanableSequence with a specified maximum size.
*
* @param[in] max Maximum size of the sequence.
*/
LoanableSequence(
size_type max)
{
if (max <= 0)
{
return;
}

resize(max);
}

/**
* @brief Destructor for LoanableSequence.
*/
~LoanableSequence()
{
if (elements_ && !has_ownership_)
{
logWarning(SUBSCRIBER, "Sequence destroyed with active loan");
return;
}

release();
}

/// Deleted copy constructor for LoanableSequence.
LoanableSequence(
const LoanableSequence& other) = delete;

/// Deleted copy assignment operator for LoanableSequence.
LoanableSequence& operator =(
const LoanableSequence& other) = delete;

/**
* @brief Move constructor for LoanableSequence.
*
* @param[in] other The other LoanableSequence to move from.
*/
LoanableSequence(
LoanableSequence&&) = default;

/**
* @brief Move assignment operator for LoanableSequence.
*
* @param[in] other The other LoanableSequence to move from.
*
* @return A reference to this LoanableSequence.
*/
LoanableSequence& operator =(
LoanableSequence&&) = default;

protected:

using LoanableCollection::maximum_;
using LoanableCollection::length_;
using LoanableCollection::elements_;
using LoanableCollection::has_ownership_;

private:

/**
* @brief Resize the sequence to a new maximum size.
*
* @param[in] maximum The new maximum size.
*/
void resize(
size_type maximum) override
{
assert(has_ownership_);

// Resize collection and get new pointer
data_.reserve(maximum);
data_.resize(maximum);
elements_ = reinterpret_cast<element_type*>(data_.data());

// Allocate individual elements
while (maximum_ < maximum)
{
data_[maximum_++] = static_cast<fastrtps::types::DynamicData*>(dynamic_type_support_->createData());
}
}

/**
* @brief Release all elements and clear the sequence.
*/
void release()
{
if (has_ownership_ && elements_)
{
for (size_type n = 0; n < maximum_; ++n)
{
fastrtps::types::DynamicData* elem = data_[n];
dynamic_type_support_->deleteData(elem);
}
std::vector<fastrtps::types::DynamicData*>().swap(data_);
}

maximum_ = 0u;
length_ = 0u;
elements_ = nullptr;
has_ownership_ = true;
}

/// Container for holding the DynamicData elements.
std::vector<fastrtps::types::DynamicData*> data_;

/// Pointer to the DynamicPubSubType type support.
std::unique_ptr<fastrtps::types::DynamicPubSubType> dynamic_type_support_;
};

/// Alias for LoanableSequence with DynamicData and true_type.
using DynamicLoanableSequence = LoanableSequence<fastrtps::types::DynamicData, std::true_type>;

} // namespace dds
} // namespace fastdds
} // namespace eprosima

#endif // _TYPES_DYNAMIC_LOANABLE_SEQUENCE_HPP_
6 changes: 4 additions & 2 deletions include/fastrtps/types/DynamicPubSubType.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@
#ifndef TYPES_DYNAMIC_PUB_SUB_TYPE_H
#define TYPES_DYNAMIC_PUB_SUB_TYPE_H

#include <fastrtps/types/TypesBase.h>
#include <fastdds/dds/topic/TopicDataType.hpp>
#include <fastrtps/types/DynamicData.h>
#include <fastrtps/types/DynamicTypePtr.h>
#include <fastrtps/types/DynamicDataPtr.h>
#include <fastrtps/types/TypesBase.h>
#include <fastrtps/utils/md5.h>

namespace eprosima {
Expand All @@ -37,6 +37,8 @@ class DynamicPubSubType : public eprosima::fastdds::dds::TopicDataType

public:

typedef DynamicData type;

RTPS_DllAPI DynamicPubSubType();

RTPS_DllAPI DynamicPubSubType(
Expand Down
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ set(${PROJECT_NAME}_source_files
rtps/DataSharing/DataSharingListener.cpp
rtps/DataSharing/DataSharingNotification.cpp
rtps/reader/WriterProxy.cpp
rtps/reader/reader_utils.cpp
rtps/reader/StatefulReader.cpp
rtps/reader/StatelessReader.cpp
rtps/reader/RTPSReader.cpp
Expand Down
14 changes: 9 additions & 5 deletions src/cpp/fastdds/publisher/filtering/ReaderFilterCollection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,17 @@ class ReaderFilterCollection
// Copy the signature
std::copy(entry.filter_signature.begin(), entry.filter_signature.end(), signature);

// Evaluate filter and update filtered_out_readers
bool filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
// Only evaluate filter on ALIVE changes, as UNREGISTERED and DISPOSED are always relevant
bool filter_result = true;
if (fastrtps::rtps::ALIVE == change.kind)
{
change.filtered_out_readers.emplace_back(it->first);
// Evaluate filter and update filtered_out_readers
filter_result = entry.filter->evaluate(change.serializedPayload, info, it->first);
if (!filter_result)
{
change.filtered_out_readers.emplace_back(it->first);
}
}

return filter_result;
};

Expand Down
38 changes: 21 additions & 17 deletions src/cpp/rtps/reader/StatefulReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,29 @@
*/

#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/history/ReaderHistory.h>

#include <mutex>
#include <thread>
#include <cassert>

#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <fastrtps/utils/TimeConversion.h>
#include <rtps/history/HistoryAttributesExtension.hpp>
#include <rtps/DataSharing/DataSharingListener.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <fastdds/rtps/builtin/BuiltinProtocols.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/common/VendorId_t.hpp>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/reader/StatefulReader.h>
#include <fastdds/rtps/writer/LivelinessManager.h>
#include <fastrtps/utils/TimeConversion.h>

#include "rtps/RTPSDomainImpl.hpp"

#include <mutex>
#include <thread>

#include <cassert>
#include "reader_utils.hpp"
#include <rtps/DataSharing/DataSharingListener.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>
#include <rtps/history/HistoryAttributesExtension.hpp>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/reader/WriterProxy.h>
#include <rtps/RTPSDomainImpl.hpp>

#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " <<

Expand Down Expand Up @@ -544,14 +547,15 @@ bool StatefulReader::processDataMsg(
return false;
}

if (data_filter_ && !data_filter_->is_relevant(*change, m_guid))
if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_))
{
if (pWP)
{
pWP->irrelevant_change_set(change->sequenceNumber);
NotifyChanges(pWP);
send_ack_if_datasharing(this, mp_history, pWP, change->sequenceNumber);
}
// Change was filtered out, so there isn't anything else to do
return true;
}

Expand Down Expand Up @@ -726,7 +730,7 @@ bool StatefulReader::processDataFragMsg(

// Temporarilly assign the inline qos while evaluating the data filter
work_change->inline_qos = incomingChange->inline_qos;
bool filtered_out = data_filter_ && !data_filter_->is_relevant(*work_change, m_guid);
bool filtered_out = !fastdds::rtps::change_is_relevant_for_filter(*work_change, m_guid, data_filter_);
work_change->inline_qos = SerializedPayload_t();

if (filtered_out)
Expand Down
28 changes: 15 additions & 13 deletions src/cpp/rtps/reader/StatelessReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
*/

#include <fastdds/rtps/reader/StatelessReader.h>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/reader/ReaderListener.h>

#include <cassert>
#include <mutex>
#include <thread>

#include "reader_utils.hpp"
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/builtin/BuiltinProtocols.h>
#include <fastdds/rtps/builtin/liveliness/WLP.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/history/ReaderHistory.h>
#include <fastdds/rtps/reader/ReaderListener.h>
#include <fastdds/rtps/writer/LivelinessManager.h>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/DataSharing/DataSharingListener.hpp>
#include <rtps/DataSharing/ReaderPool.hpp>

#include "rtps/RTPSDomainImpl.hpp"

#include <mutex>
#include <thread>

#include <cassert>
#include <rtps/participant/RTPSParticipantImpl.h>
#include <rtps/RTPSDomainImpl.hpp>

#define IDSTRING "(ID:" << std::this_thread::get_id() << ") " <<

Expand Down Expand Up @@ -481,9 +481,10 @@ bool StatelessReader::processDataMsg(
return false;
}

if (data_filter_ && !data_filter_->is_relevant(*change, m_guid))
if (!fastdds::rtps::change_is_relevant_for_filter(*change, m_guid, data_filter_))
{
update_last_notified(change->writerGUID, change->sequenceNumber);
// Change was filtered out, so there isn't anything else to do
return true;
}

Expand Down Expand Up @@ -674,7 +675,8 @@ bool StatelessReader::processDataFragMsg(
{
// Temporarilly assign the inline qos while evaluating the data filter
change_completed->inline_qos = incomingChange->inline_qos;
bool filtered_out = data_filter_ && !data_filter_->is_relevant(*change_completed, m_guid);
bool filtered_out = !fastdds::rtps::change_is_relevant_for_filter(*change_completed, m_guid,
data_filter_);
change_completed->inline_qos = SerializedPayload_t();

if (filtered_out)
Expand Down
Loading

0 comments on commit fdce426

Please sign in to comment.