diff --git a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp index b6dc0684b0..e5d7fa39b1 100644 --- a/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp +++ b/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp @@ -19,7 +19,8 @@ #include -#include +#include +#include #include #include #include @@ -28,14 +29,15 @@ #include #include "rclcpp/allocator/allocator_deleter.hpp" -#include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/ros_message_intra_process_buffer.hpp" +#include "rclcpp/experimental/subscription_intra_process.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "rclcpp/experimental/subscription_intra_process_buffer.hpp" #include "rclcpp/logger.hpp" #include "rclcpp/logging.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/publisher_base.hpp" +#include "rclcpp/type_adapter.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp @@ -174,24 +176,16 @@ class IntraProcessManager */ template< typename MessageT, - typename PublishedType, - typename ROSMessageType, - typename Alloc, - typename Deleter, - typename ROSMessageTypeAllocatorTraits, - typename ROSMessageTypeAllocator, - typename ROSMessageTypeDeleter, - typename PublishedTypeAllocator - > + typename Alloc = std::allocator, + typename Deleter = std::default_delete> void do_intra_process_publish( uint64_t intra_process_publisher_id, - std::unique_ptr message, - PublishedTypeAllocator & published_type_allocator, - ROSMessageTypeAllocator & ros_message_type_allocator, - ROSMessageTypeDeleter & ros_message_type_deleter) + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator) { - std::cout << "do_intra_process_publish --- " << std::endl; + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; std::shared_lock lock(mutex_); @@ -207,166 +201,50 @@ class IntraProcessManager if (sub_ids.take_ownership_subscriptions.empty()) { // None of the buffers require ownership, so we promote the pointer - std::shared_ptr msg = std::move(message); - - this->template add_shared_msg_to_buffers( - msg, - sub_ids.take_shared_subscriptions, - ros_message_type_allocator); - } else { - if (sub_ids.take_shared_subscriptions.size() <= 1) { - // There is at maximum 1 buffer that does not require ownership. - // So this case is equivalent to all the buffers requiring ownership - - std::cout << " to all the buffers requiring ownership --- " << std::endl; - - std::cout << "message has type : " << typeid(message).name() << std::endl; - - // Merge the two vector of ids into a unique one - std::vector concatenated_vector(sub_ids.take_shared_subscriptions); - concatenated_vector.insert( - concatenated_vector.end(), - sub_ids.take_ownership_subscriptions.begin(), - sub_ids.take_ownership_subscriptions.end()); - - this->template add_owned_msg_to_buffers( - std::move(message), - concatenated_vector, - published_type_allocator, - ros_message_type_allocator, - ros_message_type_deleter); - } else { - // Construct a new shared pointer from the message - // for the buffers that do not require ownership - auto shared_msg = std::allocate_shared( - published_type_allocator, - *message); - - this->template add_shared_msg_to_buffers( - shared_msg, - sub_ids.take_shared_subscriptions, - ros_message_type_allocator); - this->template add_owned_msg_to_buffers( - std::move(message), - sub_ids.take_ownership_subscriptions, - published_type_allocator, - ros_message_type_allocator, - ros_message_type_deleter); - } - } - } - - template< - typename MessageT, - typename T, - typename PublishedType, - typename ROSMessageType, - typename Alloc, - typename Deleter, - typename ROSMessageTypeAllocatorTraits, - typename ROSMessageTypeAllocator, - typename ROSMessageTypeDeleter, - typename PublishedTypeAllocator - > - typename - std::enable_if_t< - rosidl_generator_traits::is_message::value && - std::is_same::value, std::shared_ptr - > - do_intra_process_publish_and_return_shared( - uint64_t intra_process_publisher_id, - std::unique_ptr message, - PublishedTypeAllocator & published_type_allocator, - ROSMessageTypeAllocator & ros_message_type_allocator, - ROSMessageTypeDeleter & ros_message_type_deleter) - { - std::shared_lock lock(mutex_); - - auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); - if (publisher_it == pub_to_subs_.end()) { - // Publisher is either invalid or no longer exists. - RCLCPP_WARN( - rclcpp::get_logger("rclcpp"), - "Calling do_intra_process_publish for invalid or no longer existing publisher id"); - return nullptr; - } - const auto & sub_ids = publisher_it->second; - - if (sub_ids.take_ownership_subscriptions.empty()) { - // If there are no owning, just convert to shared. - std::shared_ptr shared_msg = std::move(message); - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, - sub_ids.take_shared_subscriptions, - ros_message_type_allocator); - } - return shared_msg; - } else { - // Construct a new shared pointer from the message for the buffers that - // do not require ownership and to return. - auto shared_msg = std::allocate_shared( - published_type_allocator, - *message); - - if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, - sub_ids.take_shared_subscriptions, - ros_message_type_allocator); - } - - this->template add_owned_msg_to_buffers( + std::shared_ptr msg = std::move(message); + + this->template add_shared_msg_to_buffers( + msg, sub_ids.take_shared_subscriptions); + } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT + sub_ids.take_shared_subscriptions.size() <= 1) + { + // Merge the two vector of ids into a unique one + std::vector concatenated_vector(sub_ids.take_shared_subscriptions); + concatenated_vector.insert( + concatenated_vector.end(), + sub_ids.take_ownership_subscriptions.begin(), + sub_ids.take_ownership_subscriptions.end()); + + this->template add_owned_msg_to_buffers( std::move(message), - sub_ids.take_ownership_subscriptions, - published_type_allocator, - ros_message_type_allocator, - ros_message_type_deleter); - - return shared_msg; + concatenated_vector, + allocator); + } else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT + sub_ids.take_shared_subscriptions.size() > 1) + { + // Construct a new shared pointer from the message + // for the buffers that do not require ownership + auto shared_msg = std::allocate_shared(allocator, *message); + + this->template add_shared_msg_to_buffers( + shared_msg, sub_ids.take_shared_subscriptions); + this->template add_owned_msg_to_buffers( + std::move(message), sub_ids.take_ownership_subscriptions, allocator); } } template< typename MessageT, - typename T, - typename PublishedType, - typename ROSMessageType, - typename Alloc, - typename Deleter, - typename ROSMessageTypeAllocatorTraits, - typename ROSMessageTypeAllocator, - typename ROSMessageTypeDeleter, - typename PublishedTypeAllocator - > - typename - std::enable_if_t< - rclcpp::TypeAdapter::is_specialized::value && - std::is_same::value, std::shared_ptr - > + typename Alloc = std::allocator, + typename Deleter = std::default_delete> + std::shared_ptr do_intra_process_publish_and_return_shared( uint64_t intra_process_publisher_id, - std::unique_ptr message, - PublishedTypeAllocator & published_type_allocator, - ROSMessageTypeAllocator & ros_message_type_allocator, - ROSMessageTypeDeleter & ros_message_type_deleter) + std::unique_ptr message, + typename allocator::AllocRebind::allocator_type & allocator) { - auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1); - ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr); - auto unique_ros_msg = std::unique_ptr( - ptr, - ros_message_type_deleter); - rclcpp::TypeAdapter::convert_to_ros_message(*message, *unique_ros_msg); + using MessageAllocTraits = allocator::AllocRebind; + using MessageAllocatorT = typename MessageAllocTraits::allocator_type; std::shared_lock lock(mutex_); @@ -382,44 +260,30 @@ class IntraProcessManager if (sub_ids.take_ownership_subscriptions.empty()) { // If there are no owning, just convert to shared. - std::shared_ptr shared_msg = std::move(message); + std::shared_ptr shared_msg = std::move(message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( - shared_msg, - sub_ids.take_shared_subscriptions, - ros_message_type_allocator); + this->template add_shared_msg_to_buffers( + shared_msg, sub_ids.take_shared_subscriptions); } + return shared_msg; } else { // Construct a new shared pointer from the message for the buffers that // do not require ownership and to return. - auto shared_msg = std::allocate_shared( - published_type_allocator, - *message); + auto shared_msg = std::allocate_shared(allocator, *message); if (!sub_ids.take_shared_subscriptions.empty()) { - this->template add_shared_msg_to_buffers( + this->template add_shared_msg_to_buffers( shared_msg, - sub_ids.take_shared_subscriptions, - ros_message_type_allocator); + sub_ids.take_shared_subscriptions); } - - this->template add_owned_msg_to_buffers( - std::move(message), - sub_ids.take_ownership_subscriptions, - published_type_allocator, - ros_message_type_allocator, - ros_message_type_deleter); + if (!sub_ids.take_ownership_subscriptions.empty()) { + this->template add_owned_msg_to_buffers( + std::move(message), + sub_ids.take_ownership_subscriptions, + allocator); + } + return shared_msg; } - - // TODO(clalancette): depending on how the publisher and subscriber are setup, we may end - // up doing a conversion more than once; in this function and down in - // add_{shared,owned}_msg_to_buffers(). We should probably push this down further to avoid - // that double conversion. - return unique_ros_msg; } /// Return true if the given rmw_gid_t matches any stored Publishers. @@ -469,19 +333,16 @@ class IntraProcessManager template< typename MessageT, - typename PublishedType, - typename ROSMessageType, typename Alloc, - typename Deleter, - typename ROSMessageTypeAllocatorTraits, - typename ROSMessageTypeAllocator - > + typename Deleter> void add_shared_msg_to_buffers( - std::shared_ptr message, - std::vector subscription_ids, - ROSMessageTypeAllocator & ros_message_type_allocator) + std::shared_ptr message, + std::vector subscription_ids) { + using PublishedType = typename rclcpp::TypeAdapter::custom_type; + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; + for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); if (subscription_it == subscriptions_.end()) { @@ -490,8 +351,8 @@ class IntraProcessManager auto subscription_base = subscription_it->second.lock(); if (subscription_base) { auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcessBuffer>(subscription_base); + rclcpp::experimental::SubscriptionIntraProcessBuffer + >(subscription_base); if (nullptr == subscription) { auto ros_message_subscription = std::dynamic_pointer_cast< rclcpp::experimental::ROSMessageIntraProcessBuffer @@ -500,15 +361,13 @@ class IntraProcessManager if (nullptr == ros_message_subscription) { throw std::runtime_error( "failed to dynamic cast SubscriptionIntraProcessBase to " - "SubscriptionIntraProcessBuffer, which " + "SubscriptionIntraProcessBuffer, which " "can happen when the publisher and subscription use different " "allocator types, which is not supported"); } else { if constexpr (rclcpp::TypeAdapter::is_specialized::value) { - auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1); - ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr); - auto ros_msg = std::shared_ptr(ptr); - rclcpp::TypeAdapter::convert_to_ros_message(*message, *ros_msg); + ROSMessageType ros_msg; + rclcpp::TypeAdapter::convert_to_ros_message(*message, ros_msg); ros_message_subscription->provide_intra_process_message(ros_msg); } else { ros_message_subscription->provide_intra_process_message(message); @@ -525,29 +384,18 @@ class IntraProcessManager template< typename MessageT, - typename PublishedType, - typename ROSMessageType, - typename Alloc, - typename Deleter, - typename ROSMessageTypeAllocatorTraits, - typename ROSMessageTypeAllocator, - typename ROSMessageTypeDeleter, - typename PublishedTypeAllocator - > + typename Alloc = std::allocator, + typename Deleter = std::default_delete> void add_owned_msg_to_buffers( - std::unique_ptr message, + std::unique_ptr message, std::vector subscription_ids, - PublishedTypeAllocator & published_type_allocator, - ROSMessageTypeAllocator & ros_message_type_allocator, - ROSMessageTypeDeleter & ros_message_type_deleter) + typename allocator::AllocRebind::allocator_type & allocator) { - std::cout << "add owned msg to buffers" << std::endl; - - std::cout << "message has type : " << typeid(message).name() << std::endl; - - using MessageAllocTraits = allocator::AllocRebind; - using MessageUniquePtr = std::unique_ptr; + using MessageAllocTraits = allocator::AllocRebind; + using MessageUniquePtr = std::unique_ptr; + using PublishedType = typename rclcpp::TypeAdapter::custom_type; + using ROSMessageType = typename rclcpp::TypeAdapter::ros_message_type; for (auto it = subscription_ids.begin(); it != subscription_ids.end(); it++) { auto subscription_it = subscriptions_.find(*it); @@ -555,16 +403,11 @@ class IntraProcessManager throw std::runtime_error("subscription has unexpectedly gone out of scope"); } auto subscription_base = subscription_it->second.lock(); - std::cout << "Subscription Base?" << std::endl; if (subscription_base) { - std::cout << "Typed Subscription" << std::endl; - auto subscription = std::dynamic_pointer_cast< - rclcpp::experimental::SubscriptionIntraProcessBuffer>(subscription_base); + rclcpp::experimental::SubscriptionIntraProcessBuffer + >(subscription_base); if (nullptr == subscription) { - std::cout << "ROSMessage Subscription" << std::endl; - auto ros_message_subscription = std::dynamic_pointer_cast< rclcpp::experimental::ROSMessageIntraProcessBuffer >(subscription_base); @@ -572,31 +415,28 @@ class IntraProcessManager if (nullptr == ros_message_subscription) { throw std::runtime_error( "failed to dynamic cast SubscriptionIntraProcessBase to " - "SubscriptionIntraProcessBuffer, which " + "SubscriptionIntraProcessBuffer, which " "can happen when the publisher and subscription use different " "allocator types, which is not supported"); } else { - std::cout << "ROSMessage TypeAdapted Subscription" << std::endl; - if constexpr (rclcpp::TypeAdapter::is_specialized::value) { - auto ptr = ROSMessageTypeAllocatorTraits::allocate(ros_message_type_allocator, 1); - ROSMessageTypeAllocatorTraits::construct(ros_message_type_allocator, ptr); - auto ros_msg = std::unique_ptr( - ptr, - ros_message_type_deleter); - rclcpp::TypeAdapter::convert_to_ros_message(*message, *ros_msg); + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + auto ptr = ROSMessageTypeAllocatorTraits::allocate(allocator, 1); + ROSMessageTypeAllocatorTraits::construct(allocator, ptr); + Deleter deleter = message.get_deleter(); + auto ros_msg = std::unique_ptr(ptr, deleter); + rclcpp::TypeAdapter::convert_to_ros_message(message, ros_msg); ros_message_subscription->provide_intra_process_message(std::move(ros_msg)); } else { if (std::next(it) == subscription_ids.end()) { // If this is the last subscription, give up ownership - ros_message_subscription->provide_intra_process_message(std::move(message)); } else { // Copy the message since we have additional subscriptions to serve MessageUniquePtr copy_message; Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(published_type_allocator, 1); - MessageAllocTraits::construct(published_type_allocator, ptr, *message); + auto ptr = MessageAllocTraits::allocate(allocator, 1); + MessageAllocTraits::construct(allocator, ptr, *message); copy_message = MessageUniquePtr(ptr, deleter); ros_message_subscription->provide_intra_process_message(std::move(copy_message)); @@ -604,7 +444,6 @@ class IntraProcessManager } } } else { - std::cout << "Typed Subscription" << std::endl; if (std::next(it) == subscription_ids.end()) { // If this is the last subscription, give up ownership subscription->provide_intra_process_data(std::move(message)); @@ -612,15 +451,14 @@ class IntraProcessManager // Copy the message since we have additional subscriptions to serve MessageUniquePtr copy_message; Deleter deleter = message.get_deleter(); - auto ptr = MessageAllocTraits::allocate(published_type_allocator, 1); - MessageAllocTraits::construct(published_type_allocator, ptr, *message); + auto ptr = MessageAllocTraits::allocate(allocator, 1); + MessageAllocTraits::construct(allocator, ptr, *message); copy_message = MessageUniquePtr(ptr, deleter); subscription->provide_intra_process_data(std::move(copy_message)); } } } else { - std::cout << "Erasing subscription" << std::endl; subscriptions_.erase(subscription_it); } } diff --git a/rclcpp/include/rclcpp/experimental/ros_message_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/ros_message_intra_process_buffer.hpp index 3a3b1ed81b..82088fb39a 100644 --- a/rclcpp/include/rclcpp/experimental/ros_message_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/ros_message_intra_process_buffer.hpp @@ -15,16 +15,13 @@ #ifndef RCLCPP__EXPERIMENTAL__ROS_MESSAGE_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__ROS_MESSAGE_INTRA_PROCESS_BUFFER_HPP_ -#include -#include #include -#include #include -#include #include "rcl/error_handling.h" #include "rclcpp/any_subscription_callback.hpp" +#include "rclcpp/context.hpp" #include "rclcpp/experimental/subscription_intra_process_base.hpp" #include "tracetools/tracetools.h" @@ -36,15 +33,19 @@ namespace experimental template< typename RosMessageT, typename Alloc = std::allocator, - typename Deleter = std::default_delete + typename Deleter = std::default_delete > class ROSMessageIntraProcessBuffer : public SubscriptionIntraProcessBase { public: RCLCPP_SMART_PTR_DEFINITIONS(ROSMessageIntraProcessBuffer) + using ROSMessageTypeAllocatorTraits = allocator::AllocRebind; + using ROSMessageTypeAllocator = typename ROSMessageTypeAllocatorTraits::allocator_type; + using ROSMessageTypeDeleter = allocator::Deleter; + using ConstMessageSharedPtr = std::shared_ptr; - using MessageUniquePtr = std::unique_ptr; + using MessageUniquePtr = std::unique_ptr; ROSMessageIntraProcessBuffer( rclcpp::Context::SharedPtr context, diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp index d02302a7c5..ee0bd8f163 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp @@ -46,14 +46,12 @@ template< > class SubscriptionIntraProcess : public SubscriptionIntraProcessBuffer< - MessageT, SubscribedType, SubscribedTypeAlloc, SubscribedTypeDeleter > { using SubscriptionIntraProcessBufferT = SubscriptionIntraProcessBuffer< - MessageT, SubscribedType, SubscribedTypeAlloc, SubscribedTypeDeleter @@ -65,8 +63,8 @@ class SubscriptionIntraProcess using MessageAllocTraits = typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocatorTraits; using MessageAlloc = typename SubscriptionIntraProcessBufferT::SubscribedTypeAllocator; - using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstMessageSharedPtr; - using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::MessageUniquePtr; + using ConstMessageSharedPtr = typename SubscriptionIntraProcessBufferT::ConstDataSharedPtr; + using MessageUniquePtr = typename SubscriptionIntraProcessBufferT::SubscribedTypeUniquePtr; using BufferUniquePtr = typename SubscriptionIntraProcessBufferT::BufferUniquePtr; SubscriptionIntraProcess( @@ -76,7 +74,7 @@ class SubscriptionIntraProcess const std::string & topic_name, const rclcpp::QoS & qos_profile, rclcpp::IntraProcessBufferType buffer_type) - : SubscriptionIntraProcessBuffer( allocator, context, diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index abb891d1d4..43a834503d 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -15,7 +15,6 @@ #ifndef RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ #define RCLCPP__EXPERIMENTAL__SUBSCRIPTION_INTRA_PROCESS_BUFFER_HPP_ -#include // TODO(anyone) remove #include #include #include @@ -38,7 +37,6 @@ namespace experimental { template< - typename MessageT, typename SubscribedType, typename Alloc = std::allocator, typename Deleter = std::default_delete, @@ -64,7 +62,7 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer; using ConstDataSharedPtr = std::shared_ptr; - using DataUniquePtr = std::unique_ptr; + using SubscribedTypeUniquePtr = std::unique_ptr; using BufferUniquePtr = typename rclcpp::experimental::buffers::IntraProcessBuffer< SubscribedType, @@ -78,9 +76,12 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer(context, topic_name, - qos_profile) + : ROSMessageIntraProcessBuffer(context, topic_name, + qos_profile), + subscribed_type_allocator_(*allocator) { + allocator::set_allocator_for_deleter(&subscribed_type_deleter_, &subscribed_type_allocator_); + // Create the intra-process buffer. buffer_ = rclcpp::experimental::create_intra_process_buffer( @@ -96,52 +97,55 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBufferhas_data(); } + SubscribedTypeUniquePtr + convert_ros_message_to_subscribed_type_unique_ptr(const ROSMessageType & msg) + { + if constexpr (!std::is_same::value) { + auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); + SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr); + rclcpp::TypeAdapter::convert_to_custom(msg, *ptr); + return SubscribedTypeUniquePtr(ptr, subscribed_type_deleter_); + } else { + throw std::runtime_error( + "convert_ros_message_to_subscribed_type_unique_ptr " + "unexpectedly called without TypeAdapter"); + } + } + void provide_intra_process_message(ConstMessageSharedPtr message) { - std::cout << "--------------Provide Intra Process Message (ConstMessageSharedPtr)" << std::endl; - - if constexpr (!rclcpp::TypeAdapter::is_specialized::value) { + if constexpr (std::is_same::value) { buffer_->add_shared(std::move(message)); trigger_guard_condition(); } else { - // auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); - // SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr, *message); - // buffer_->add_shared(std::unique_ptr( - // ptr, subscribed_type_deleter_)); - // trigger_guard_condition(); + buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message)); + trigger_guard_condition(); } } void provide_intra_process_message(MessageUniquePtr message) { - std::cout << "--------------Provide Intra Process Message (MessageUniquePtr)" << std::endl; - if constexpr (!rclcpp::TypeAdapter::is_specialized::value) { + if constexpr (std::is_same::value) { buffer_->add_unique(std::move(message)); trigger_guard_condition(); } else { - // auto ptr = SubscribedTypeAllocatorTraits::allocate(subscribed_type_allocator_, 1); - // SubscribedTypeAllocatorTraits::construct(subscribed_type_allocator_, ptr); - // rclcpp::TypeAdapter::convert_to_custom(*message, *ptr); - // buffer_->add_unique(std::unique_ptr( - // ptr, subscribed_type_deleter_)); - // trigger_guard_condition(); + buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message)); + trigger_guard_condition(); } } void provide_intra_process_data(ConstDataSharedPtr message) { - std::cout << "--------------Provide Intra Process DATA (ConstDataSharedPtr)" << std::endl; buffer_->add_shared(std::move(message)); trigger_guard_condition(); } void - provide_intra_process_data(DataUniquePtr message) + provide_intra_process_data(SubscribedTypeUniquePtr message) { - std::cout << "--------------Provide Intra Process DATA (DataUniquePtr)" << std::endl; buffer_->add_unique(std::move(message)); trigger_guard_condition(); } @@ -160,6 +164,8 @@ class SubscriptionIntraProcessBuffer : public ROSMessageIntraProcessBuffer #include #include -#include #include #include "rcl/error_handling.h" #include "rcl/publisher.h" #include "rmw/error_handling.h" #include "rmw/rmw.h" -#include "rosidl_runtime_cpp/traits.hpp" #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" @@ -265,10 +263,11 @@ class Publisher : public PublisherBase get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { - auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); + auto shared_msg = + this->do_intra_process_ros_message_publish_and_return_shared(std::move(msg)); this->do_inter_process_publish(*shared_msg); } else { - this->do_intra_process_publish(std::move(msg)); + this->do_intra_process_ros_message_publish(std::move(msg)); } } @@ -321,29 +320,11 @@ class Publisher : public PublisherBase > publish(std::unique_ptr msg) { - if (!intra_process_is_enabled_) { - // If we aren't using intraprocess at all, do the type conversion - // immediately and publish interprocess. - auto unique_ros_msg = this->create_ros_message_unique_ptr(); - rclcpp::TypeAdapter::convert_to_ros_message(*msg, *unique_ros_msg); - this->do_inter_process_publish(*unique_ros_msg); - return; - } - // If an interprocess subscription exists, then the unique_ptr is promoted - // to a shared_ptr and published. - // This allows doing the intraprocess publish first and then doing the - // interprocess publish, resulting in lower publish-to-subscribe latency. - // It's not possible to do that with an unique_ptr, - // as do_intra_process_publish takes the ownership of the message. - bool inter_process_publish_needed = - get_subscription_count() > get_intra_process_subscription_count(); - - if (inter_process_publish_needed) { - auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); - this->do_inter_process_publish(*shared_msg); - } else { - this->do_intra_process_publish(std::move(msg)); - } + // TODO(wjwwood): later update this to give the unique_ptr to the intra + // process manager and let it decide if it needs to be converted or not. + // For now, convert it unconditionally and pass it the ROSMessageType + // publish function specialization. + this->do_intra_process_publish(std::move(msg)); } /// Publish a message on the topic. @@ -364,18 +345,26 @@ class Publisher : public PublisherBase > publish(const T & msg) { - // Avoid double allocation when not using intra process. + // TODO(wjwwood): later update this to give the unique_ptr to the intra + // process manager and let it decide if it needs to be converted or not. + // For now, convert it unconditionally and pass it the ROSMessageType + // publish function specialization. + + // Avoid allocating when not using intra process. if (!intra_process_is_enabled_) { // Convert to the ROS message equivalent and publish it. ROSMessageType ros_msg; rclcpp::TypeAdapter::convert_to_ros_message(msg, ros_msg); // In this case we're not using intra process. - this->do_inter_process_publish(ros_msg); - return; + return this->do_inter_process_publish(ros_msg); } - // Otherwise we have to allocate memory in a unique_ptr and pass it along. - auto unique_msg = this->duplicate_type_adapt_message_as_unique_ptr(msg); - this->publish(std::move(unique_msg)); + // Otherwise we have to allocate memory in a unique_ptr, convert it, + // and pass it along. + // As the message is not const, a copy should be made. + // A shared_ptr could also be constructed here. + auto unique_ros_msg = this->create_ros_message_unique_ptr(); + rclcpp::TypeAdapter::convert_to_ros_message(msg, *unique_ros_msg); + this->publish(std::move(unique_ros_msg)); } void @@ -512,19 +501,32 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - ipm->template do_intra_process_publish, ROSMessageTypeAllocatorTraits, ROSMessageTypeAllocator, - ROSMessageTypeDeleter, PublishedTypeAllocator>( + ipm->template do_intra_process_publish( + intra_process_publisher_id_, + std::move(msg), + published_type_allocator_); + } + + void + do_intra_process_ros_message_publish(std::unique_ptr msg) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + if (!msg) { + throw std::runtime_error("cannot publish msg which is a null pointer"); + } + + ipm->template do_intra_process_publish( intra_process_publisher_id_, std::move(msg), - published_type_allocator_, - ros_message_type_allocator_, - ros_message_type_deleter_); + ros_message_type_allocator_); } - template - std::shared_ptr + std::shared_ptr do_intra_process_publish_and_return_shared( std::unique_ptr msg) { @@ -537,17 +539,34 @@ class Publisher : public PublisherBase throw std::runtime_error("cannot publish msg which is a null pointer"); } - return ipm->template do_intra_process_publish_and_return_shared, - ROSMessageTypeAllocatorTraits, ROSMessageTypeAllocator, ROSMessageTypeDeleter, - PublishedTypeAllocator>( + return ipm->template do_intra_process_publish_and_return_shared( intra_process_publisher_id_, std::move(msg), - published_type_allocator_, - ros_message_type_allocator_, - ros_message_type_deleter_); + published_type_allocator_); } + std::shared_ptr + do_intra_process_ros_message_publish_and_return_shared( + std::unique_ptr msg) + { + auto ipm = weak_ipm_.lock(); + if (!ipm) { + throw std::runtime_error( + "intra process publish called after destruction of intra process manager"); + } + if (!msg) { + throw std::runtime_error("cannot publish msg which is a null pointer"); + } + + return ipm->template do_intra_process_publish_and_return_shared( + intra_process_publisher_id_, + std::move(msg), + ros_message_type_allocator_); + } + + /// Return a new unique_ptr using the ROSMessageType of the publisher. std::unique_ptr create_ros_message_unique_ptr() @@ -557,7 +576,7 @@ class Publisher : public PublisherBase return std::unique_ptr(ptr, ros_message_type_deleter_); } - /// Duplicate a given ROS message as a unique_ptr. + /// Duplicate a given ros message as a unique_ptr. std::unique_ptr duplicate_ros_message_as_unique_ptr(const ROSMessageType & msg) { @@ -566,15 +585,6 @@ class Publisher : public PublisherBase return std::unique_ptr(ptr, ros_message_type_deleter_); } - /// Duplicate a given type adapted message as a unique_ptr. - std::unique_ptr - duplicate_type_adapt_message_as_unique_ptr(const PublishedType & msg) - { - auto ptr = PublishedTypeAllocatorTraits::allocate(published_type_allocator_, 1); - PublishedTypeAllocatorTraits::construct(published_type_allocator_, ptr, msg); - return std::unique_ptr(ptr, published_type_deleter_); - } - /// Copy of original options passed during construction. /** * It is important to save a copy of this so that the rmw payload which it diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index ad23d60882..c238c9581f 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -276,7 +276,7 @@ class Subscription : public SubscriptionBase /// Take the next message from the inter-process subscription. /** - * This verison takes a SubscribedType which is different frmo the + * This version takes a SubscribedType which is different from the * ROSMessageType when a rclcpp::TypeAdapter is in used. * * \sa take(ROSMessageType &, rclcpp::MessageInfo &) diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 29d7b92901..e60b1636b8 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -63,13 +63,13 @@ if(TARGET test_any_service_callback) ) target_link_libraries(test_any_service_callback ${PROJECT_NAME}) endif() -# ament_add_gtest(test_any_subscription_callback test_any_subscription_callback.cpp) -# if(TARGET test_any_subscription_callback) -# ament_target_dependencies(test_any_subscription_callback -# "test_msgs" -# ) -# target_link_libraries(test_any_subscription_callback ${PROJECT_NAME}) -# endif() +ament_add_gtest(test_any_subscription_callback test_any_subscription_callback.cpp) +if(TARGET test_any_subscription_callback) + ament_target_dependencies(test_any_subscription_callback + "test_msgs" + ) + target_link_libraries(test_any_subscription_callback ${PROJECT_NAME}) +endif() ament_add_gtest(test_client test_client.cpp) if(TARGET test_client) ament_target_dependencies(test_client @@ -135,24 +135,24 @@ ament_add_gtest( if(TARGET test_future_return_code) target_link_libraries(test_future_return_code ${PROJECT_NAME}) endif() -# ament_add_gmock(test_intra_process_manager test_intra_process_manager.cpp) -# if(TARGET test_intra_process_manager) -# ament_target_dependencies(test_intra_process_manager -# "rcl" -# "rcl_interfaces" -# "rmw" -# "rosidl_runtime_cpp" -# "rosidl_typesupport_cpp" -# ) -# target_link_libraries(test_intra_process_manager ${PROJECT_NAME}) -# endif() -# ament_add_gmock(test_intra_process_manager_with_allocators test_intra_process_manager_with_allocators.cpp) -# if(TARGET test_intra_process_manager_with_allocators) -# ament_target_dependencies(test_intra_process_manager_with_allocators -# "test_msgs" -# ) -# target_link_libraries(test_intra_process_manager_with_allocators ${PROJECT_NAME}) -# endif() +ament_add_gmock(test_intra_process_manager test_intra_process_manager.cpp) +if(TARGET test_intra_process_manager) + ament_target_dependencies(test_intra_process_manager + "rcl" + "rcl_interfaces" + "rmw" + "rosidl_runtime_cpp" + "rosidl_typesupport_cpp" + ) + target_link_libraries(test_intra_process_manager ${PROJECT_NAME}) +endif() +ament_add_gmock(test_intra_process_manager_with_allocators test_intra_process_manager_with_allocators.cpp) +if(TARGET test_intra_process_manager_with_allocators) + ament_target_dependencies(test_intra_process_manager_with_allocators + "test_msgs" + ) + target_link_libraries(test_intra_process_manager_with_allocators ${PROJECT_NAME}) +endif() ament_add_gtest(test_ring_buffer_implementation test_ring_buffer_implementation.cpp) if(TARGET test_ring_buffer_implementation) ament_target_dependencies(test_ring_buffer_implementation diff --git a/rclcpp/test/rclcpp/test_any_subscription_callback.cpp b/rclcpp/test/rclcpp/test_any_subscription_callback.cpp index 32abdf2934..02daee010b 100644 --- a/rclcpp/test/rclcpp/test_any_subscription_callback.cpp +++ b/rclcpp/test/rclcpp/test_any_subscription_callback.cpp @@ -25,6 +25,29 @@ #include "test_msgs/msg/empty.hpp" #include "test_msgs/msg/empty.h" +// Type adapter to be used in tests. +struct MyEmpty {}; + +template<> +struct rclcpp::TypeAdapter +{ + using is_specialized = std::true_type; + using custom_type = MyEmpty; + using ros_message_type = test_msgs::msg::Empty; + + static + void + convert_to_ros_message(const custom_type &, ros_message_type &) + {} + + static + void + convert_to_custom(const ros_message_type &, custom_type &) + {} +}; + +using MyTA = rclcpp::TypeAdapter; + class TestAnySubscriptionCallback : public ::testing::Test { public: @@ -43,6 +66,24 @@ class TestAnySubscriptionCallback : public ::testing::Test rclcpp::MessageInfo message_info_; }; +class TestAnySubscriptionCallbackTA : public ::testing::Test +{ +public: + TestAnySubscriptionCallbackTA() {} + + static + std::unique_ptr + get_unique_ptr_msg() + { + return std::make_unique(); + } + +protected: + rclcpp::AnySubscriptionCallback any_subscription_callback_; + std::shared_ptr msg_shared_ptr_{std::make_shared()}; + rclcpp::MessageInfo message_info_; +}; + void construct_with_null_allocator() { // suppress deprecated function warning @@ -97,29 +138,6 @@ TEST_F(TestAnySubscriptionCallback, unset_dispatch_throw) { // Parameterized test to test across all callback types and dispatch types. // -// Type adapter to be used in tests. -struct MyEmpty {}; - -template<> -struct rclcpp::TypeAdapter -{ - using is_specialized = std::true_type; - using custom_type = MyEmpty; - using ros_message_type = test_msgs::msg::Empty; - - static - void - convert_to_ros_message(const custom_type &, ros_message_type &) - {} - - static - void - convert_to_custom(const ros_message_type &, custom_type &) - {} -}; - -using MyTA = rclcpp::TypeAdapter; - template class InstanceContextImpl { @@ -177,7 +195,7 @@ class DispatchTests {}; class DispatchTestsWithTA - : public TestAnySubscriptionCallback, + : public TestAnySubscriptionCallbackTA, public ::testing::WithParamInterface> {}; @@ -193,27 +211,35 @@ format_parameter_with_ta(const ::testing::TestParamInfo as input */ \ - TEST_P(DispatchTests_name, test_inter_shared_dispatch) { \ - auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \ - any_subscription_callback_to_test.dispatch(msg_shared_ptr_, message_info_); \ - } \ - \ - /* Testing dispatch with shared_ptr as input */ \ - TEST_P(DispatchTests_name, test_intra_shared_dispatch) { \ - auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \ - any_subscription_callback_to_test.dispatch_intra_process(msg_shared_ptr_, message_info_); \ - } \ - \ - /* Testing dispatch with unique_ptr as input */ \ - TEST_P(DispatchTests_name, test_intra_unique_dispatch) { \ - auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); \ - any_subscription_callback_to_test.dispatch_intra_process(get_unique_ptr_msg(), message_info_); \ - } +/* Testing dispatch with shared_ptr as input */ +TEST_P(DispatchTests, test_inter_shared_dispatch) { + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); + any_subscription_callback_to_test.dispatch(msg_shared_ptr_, message_info_); +} + +/* Testing dispatch with shared_ptr as input */ +TEST_P(DispatchTests, test_intra_shared_dispatch) { + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); + any_subscription_callback_to_test.dispatch_intra_process(msg_shared_ptr_, message_info_); +} + +/* Testing dispatch with unique_ptr as input */ +TEST_P(DispatchTests, test_intra_unique_dispatch) { + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); + any_subscription_callback_to_test.dispatch_intra_process(get_unique_ptr_msg(), message_info_); +} + +/* Testing dispatch with shared_ptr as input */ +TEST_P(DispatchTestsWithTA, test_intra_shared_dispatch) { + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); + any_subscription_callback_to_test.dispatch_intra_process(msg_shared_ptr_, message_info_); +} -PARAMETERIZED_TESTS(DispatchTests) -PARAMETERIZED_TESTS(DispatchTestsWithTA) +/* Testing dispatch with unique_ptr as input */ +TEST_P(DispatchTestsWithTA, test_intra_unique_dispatch) { + auto any_subscription_callback_to_test = GetParam().get_any_subscription_callback_to_test(); + any_subscription_callback_to_test.dispatch_intra_process(get_unique_ptr_msg(), message_info_); +} // Generic classes for testing callbacks using std::bind to class methods. template diff --git a/rclcpp/test/rclcpp/test_intra_process_manager.cpp b/rclcpp/test/rclcpp/test_intra_process_manager.cpp index 59437aa560..45b781995d 100644 --- a/rclcpp/test/rclcpp/test_intra_process_manager.cpp +++ b/rclcpp/test/rclcpp/test_intra_process_manager.cpp @@ -246,6 +246,18 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase buffer->add(std::move(msg)); } + void + provide_intra_process_data(std::shared_ptr msg) + { + buffer->add(msg); + } + + void + provide_intra_process_data(std::unique_ptr msg) + { + buffer->add(std::move(msg)); + } + std::uintptr_t pop() {