Skip to content

Commit

Permalink
Tests fix (#15)
Browse files Browse the repository at this point in the history
* Tests fix

Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar>

* Tests fix

Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar>

* Fixed some tests

Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar>

* Fix test_intra_process_manager test.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

* Style fixups.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

* One more small fix.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

* Fixed issues with allocators

Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar>

* Small fix for lint_cmake.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

Co-authored-by: Chris Lalancette <clalancette@openrobotics.org>

Add additional tests for the TypeAdapt conversions (#16)

* Implement skipping ROS conversion when using the const ref publish.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

* Add tests to ensure that type adaptation doesn't unnecessarily convert.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>

fixed allocator allocate usage on intra process manager (#17)

Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar>

Removed TestPublisher.conversion_exception_is_passed_up test

Signed-off-by: Gonzalo de Pedro <gonzalo@depedro.com.ar>
  • Loading branch information
Gonzalo de Pedro authored and clalancette committed Jan 4, 2022
1 parent a5c5504 commit 89c70d1
Show file tree
Hide file tree
Showing 9 changed files with 607 additions and 213 deletions.
118 changes: 79 additions & 39 deletions rclcpp/include/rclcpp/experimental/intra_process_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,10 @@ class IntraProcessManager
*/
template<
typename MessageT,
typename ROSMessageType,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
typename Deleter = std::default_delete<MessageT>
>
void
do_intra_process_publish(
uint64_t intra_process_publisher_id,
Expand All @@ -206,7 +208,7 @@ class IntraProcessManager
// None of the buffers require ownership, so we promote the pointer
std::shared_ptr<MessageT> msg = std::move(message);

this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
msg, sub_ids.take_shared_subscriptions);
} else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
sub_ids.take_shared_subscriptions.size() <= 1)
Expand All @@ -217,8 +219,7 @@ class IntraProcessManager
concatenated_vector.end(),
sub_ids.take_ownership_subscriptions.begin(),
sub_ids.take_ownership_subscriptions.end());

this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
std::move(message),
concatenated_vector,
allocator);
Expand All @@ -229,17 +230,19 @@ class IntraProcessManager
// for the buffers that do not require ownership
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);

this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
shared_msg, sub_ids.take_shared_subscriptions);
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
std::move(message), sub_ids.take_ownership_subscriptions, allocator);
}
}

template<
typename MessageT,
typename ROSMessageType,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
typename Deleter = std::default_delete<MessageT>
>
std::shared_ptr<const MessageT>
do_intra_process_publish_and_return_shared(
uint64_t intra_process_publisher_id,
Expand All @@ -265,7 +268,7 @@ class IntraProcessManager
// If there are no owning, just convert to shared.
std::shared_ptr<MessageT> shared_msg = std::move(message);
if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
shared_msg, sub_ids.take_shared_subscriptions);
}
return shared_msg;
Expand All @@ -275,12 +278,12 @@ class IntraProcessManager
auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(allocator, *message);

if (!sub_ids.take_shared_subscriptions.empty()) {
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
shared_msg,
sub_ids.take_shared_subscriptions);
}
if (!sub_ids.take_ownership_subscriptions.empty()) {
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter, ROSMessageType>(
std::move(message),
sub_ids.take_ownership_subscriptions,
allocator);
Expand Down Expand Up @@ -337,14 +340,22 @@ class IntraProcessManager
template<
typename MessageT,
typename Alloc,
typename Deleter>
typename Deleter,
typename ROSMessageType>
void
add_shared_msg_to_buffers(
std::shared_ptr<const MessageT> message,
std::vector<uint64_t> subscription_ids)
{
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;


for (auto id : subscription_ids) {
auto subscription_it = subscriptions_.find(id);
Expand All @@ -354,11 +365,13 @@ class IntraProcessManager
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType, Alloc, Deleter>
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
>(subscription_base);
if (nullptr == subscription) {
auto ros_message_subscription = std::dynamic_pointer_cast<
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType, Alloc, Deleter>
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType,
ROSMessageTypeAllocator, ROSMessageTypeDeleter>
>(subscription_base);

if (nullptr == ros_message_subscription) {
Expand All @@ -371,9 +384,22 @@ class IntraProcessManager
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, ros_msg);
ros_message_subscription->provide_intra_process_message(ros_msg);
ros_message_subscription->provide_intra_process_message(
std::make_shared<ROSMessageType>(ros_msg));
} else {
ros_message_subscription->provide_intra_process_message(message);
if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
ros_message_subscription->provide_intra_process_message(message);
} else {
if constexpr (std::is_same<typename rclcpp::TypeAdapter<MessageT,
ROSMessageType>::ros_message_type, ROSMessageType>::value)
{
ROSMessageType ros_msg;
rclcpp::TypeAdapter<MessageT, ROSMessageType>::convert_to_ros_message(
*message, ros_msg);
ros_message_subscription->provide_intra_process_message(
std::make_shared<ROSMessageType>(ros_msg));
}
}
}
}
} else {
Expand All @@ -388,7 +414,8 @@ class IntraProcessManager
template<
typename MessageT,
typename Alloc = std::allocator<void>,
typename Deleter = std::default_delete<MessageT>>
typename Deleter = std::default_delete<MessageT>,
typename ROSMessageType = MessageT>
void
add_owned_msg_to_buffers(
std::unique_ptr<MessageT, Deleter> message,
Expand All @@ -398,7 +425,14 @@ class IntraProcessManager
using MessageAllocTraits = allocator::AllocRebind<MessageT, Alloc>;
using MessageUniquePtr = std::unique_ptr<MessageT, Deleter>;
using PublishedType = typename rclcpp::TypeAdapter<MessageT>::custom_type;
using ROSMessageType = typename rclcpp::TypeAdapter<MessageT>::ros_message_type;

using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type;
using ROSMessageTypeDeleter = allocator::Deleter<ROSMessageTypeAllocator, ROSMessageType>;

using PublishedTypeAllocatorTraits = allocator::AllocRebind<PublishedType, Alloc>;
using PublishedTypeAllocator = typename PublishedTypeAllocatorTraits::allocator_type;
using PublishedTypeDeleter = allocator::Deleter<PublishedTypeAllocator, PublishedType>;

for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) {
auto subscription_it = subscriptions_.find(*it);
Expand All @@ -408,41 +442,47 @@ class IntraProcessManager
auto subscription_base = subscription_it->second.lock();
if (subscription_base) {
auto subscription = std::dynamic_pointer_cast<
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType, Alloc, Deleter>
rclcpp::experimental::SubscriptionIntraProcessBuffer<PublishedType,
PublishedTypeAllocator, PublishedTypeDeleter, ROSMessageType>
>(subscription_base);
if (nullptr == subscription) {
auto ros_message_subscription = std::dynamic_pointer_cast<
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType, Alloc, Deleter>
rclcpp::experimental::ROSMessageIntraProcessBuffer<ROSMessageType,
ROSMessageTypeAllocator, ROSMessageTypeDeleter>
>(subscription_base);

if (nullptr == ros_message_subscription) {
throw std::runtime_error(
"failed to dynamic cast SubscriptionIntraProcessBase to "
"--failed to dynamic cast SubscriptionIntraProcessBase to "
"SubscriptionIntraProcessBuffer<MessageT, Alloc, Deleter>, which "
"can happen when the publisher and subscription use different "
"allocator types, which is not supported");
} else {
if constexpr (rclcpp::TypeAdapter<MessageT>::is_specialized::value) {
using ROSMessageTypeAllocatorTraits = allocator::AllocRebind<ROSMessageType, Alloc>;
auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1);
ROSMessageTypeAllocatorTraits::construct(allocator, ptr);
Deleter deleter = message.get_deleter();
auto ros_msg = std::unique_ptr<ROSMessageType, Deleter>(ptr, deleter);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(message, ros_msg);
ROSMessageTypeAllocator ros_message_alloc(allocator);
auto ptr = ros_message_alloc.allocate(1);
ros_message_alloc.construct(ptr);
ROSMessageTypeDeleter deleter;
allocator::set_allocator_for_deleter(&deleter, &allocator);
rclcpp::TypeAdapter<MessageT>::convert_to_ros_message(*message, *ptr);
auto ros_msg = std::unique_ptr<ROSMessageType, ROSMessageTypeDeleter>(ptr, deleter);
ros_message_subscription->provide_intra_process_message(std::move(ros_msg));
} else {
if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
ros_message_subscription->provide_intra_process_message(std::move(message));
} else {
// Copy the message since we have additional subscriptions to serve
MessageUniquePtr copy_message;
Deleter deleter = message.get_deleter();
auto ptr = MessageAllocTraits::allocate(allocator, 1);
MessageAllocTraits::construct(allocator, ptr, *message);
copy_message = MessageUniquePtr(ptr, deleter);

ros_message_subscription->provide_intra_process_message(std::move(copy_message));
if constexpr (std::is_same<MessageT, ROSMessageType>::value) {
if (std::next(it) == subscription_ids.end()) {
// If this is the last subscription, give up ownership
ros_message_subscription->provide_intra_process_message(std::move(message));
} else {
// Copy the message since we have additional subscriptions to serve
MessageUniquePtr copy_message;
Deleter deleter = message.get_deleter();
allocator::set_allocator_for_deleter(&deleter, &allocator);
auto ptr = MessageAllocTraits::allocate(allocator, 1);
MessageAllocTraits::construct(allocator, ptr, *message);
copy_message = MessageUniquePtr(ptr, deleter);

ros_message_subscription->provide_intra_process_message(std::move(copy_message));
}
}
}
}
Expand Down
22 changes: 13 additions & 9 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,24 @@ namespace experimental
template<
typename MessageT,
typename SubscribedType,
typename SubscribedTypeAlloc = std::allocator<void>,
typename SubscribedTypeDeleter = std::default_delete<SubscribedType>
typename SubscribedTypeAlloc = std::allocator<SubscribedType>,
typename SubscribedTypeDeleter = std::default_delete<SubscribedType>,
typename ROSMessageType = SubscribedType,
typename Alloc = std::allocator<void>
>
class SubscriptionIntraProcess
: public SubscriptionIntraProcessBuffer<
SubscribedType,
SubscribedTypeAlloc,
SubscribedTypeDeleter
SubscribedTypeDeleter,
ROSMessageType
>
{
using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer<
SubscribedType,
SubscribedTypeAlloc,
SubscribedTypeDeleter
SubscribedTypeDeleter,
ROSMessageType
>;

public:
Expand All @@ -71,15 +75,15 @@ class SubscriptionIntraProcess
using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr;

SubscriptionIntraProcess(
AnySubscriptionCallback<MessageT, SubscribedTypeAlloc> callback,
std::shared_ptr<SubscribedTypeAlloc> allocator,
AnySubscriptionCallback<MessageT, Alloc> callback,
std::shared_ptr<Alloc> allocator,
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBuffer<SubscribedType, SubscribedTypeAlloc,
SubscribedTypeDeleter>(
allocator,
SubscribedTypeDeleter, ROSMessageType>(
std::make_shared<SubscribedTypeAlloc>(*allocator),
context,
topic_name,
qos_profile,
Expand Down Expand Up @@ -157,7 +161,7 @@ class SubscriptionIntraProcess
shared_ptr.reset();
}

AnySubscriptionCallback<MessageT, SubscribedTypeAlloc> any_callback_;
AnySubscriptionCallback<MessageT, Alloc> any_callback_;
};

} // namespace experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,16 @@ namespace experimental

template<
typename SubscribedType,
typename Alloc = std::allocator<void>,
typename Alloc = std::allocator<SubscribedType>,
typename Deleter = std::default_delete<SubscribedType>,
/// MessageT::ros_message_type if MessageT is a TypeAdapter,
/// otherwise just MessageT.
typename ROSMessageType = typename rclcpp::TypeAdapter<SubscribedType>::ros_message_type
typename ROSMessageType = SubscribedType
>
class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMessageType, Alloc,
Deleter>
class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMessageType,
typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
allocator::Deleter<typename allocator::AllocRebind<ROSMessageType, Alloc>::allocator_type,
ROSMessageType>>
{
public:
RCLCPP_SMART_PTR_DEFINITIONS(SubscriptionIntraProcessBuffer)
Expand Down Expand Up @@ -81,8 +83,8 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: ROSMessageIntraProcessBuffer<ROSMessageType, Alloc, Deleter>(context, topic_name,
qos_profile),
: ROSMessageIntraProcessBuffer<ROSMessageType, ROSMessageTypeAllocator, ROSMessageTypeDeleter>(
context, topic_name, qos_profile),
subscribed_type_allocator_(*allocator)
{
allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_);
Expand All @@ -92,7 +94,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer<ROSMe
SubscribedTypeDeleter>(
buffer_type,
qos_profile,
allocator);
std::make_shared<Alloc>(subscribed_type_allocator_));
}

bool
Expand Down
Loading

0 comments on commit 89c70d1

Please sign in to comment.