From 7591bcf641ad146bad27d098acf675aada10a527 Mon Sep 17 00:00:00 2001 From: emerson Date: Wed, 27 Mar 2019 15:14:29 -0700 Subject: [PATCH] New interfaces for incoming QoS features Adds new PublisherOptions and SubscriptionOptions classes, with new Publisher and Subscriber constructors to accept them. Adds the liveliness assertion callbacks that will be needed for the new Liveliness QoS policy Signed-off-by: Emerson Knapp --- rclcpp/include/rclcpp/callback_group.hpp | 10 +++ rclcpp/include/rclcpp/create_publisher.hpp | 8 ++- rclcpp/include/rclcpp/create_subscription.hpp | 6 +- rclcpp/include/rclcpp/node.hpp | 65 +++++++++++++++-- rclcpp/include/rclcpp/node_impl.hpp | 71 ++++++++++++++++++- .../rclcpp/node_interfaces/node_topics.hpp | 3 +- .../node_interfaces/node_topics_interface.hpp | 3 +- rclcpp/include/rclcpp/parameter_client.hpp | 1 + rclcpp/include/rclcpp/publisher.hpp | 19 ++++- rclcpp/include/rclcpp/publisher_factory.hpp | 9 ++- rclcpp/include/rclcpp/publisher_options.hpp | 44 ++++++++++++ rclcpp/include/rclcpp/subscription.hpp | 2 + .../include/rclcpp/subscription_factory.hpp | 4 +- .../include/rclcpp/subscription_options.hpp | 44 ++++++++++++ rclcpp/src/rclcpp/callback_group.cpp | 15 ++++ .../node_interfaces/node_parameters.cpp | 2 + .../rclcpp/node_interfaces/node_topics.cpp | 16 +++-- rclcpp/src/rclcpp/time_source.cpp | 1 + .../rclcpp_lifecycle/lifecycle_node_impl.hpp | 3 + 19 files changed, 304 insertions(+), 22 deletions(-) create mode 100644 rclcpp/include/rclcpp/publisher_options.hpp create mode 100644 rclcpp/include/rclcpp/subscription_options.hpp diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index c53bffbd7d..0a5c49cbb7 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -22,6 +22,7 @@ #include "rclcpp/client.hpp" #include "rclcpp/service.hpp" +#include "rclcpp/publisher.hpp" #include "rclcpp/subscription.hpp" #include "rclcpp/timer.hpp" #include "rclcpp/visibility_control.hpp" @@ -61,6 +62,10 @@ class CallbackGroup RCLCPP_PUBLIC explicit CallbackGroup(CallbackGroupType group_type); + RCLCPP_PUBLIC + const std::vector & + get_publisher_ptrs() const; + RCLCPP_PUBLIC const std::vector & get_subscription_ptrs() const; @@ -92,6 +97,10 @@ class CallbackGroup protected: RCLCPP_DISABLE_COPY(CallbackGroup) + RCLCPP_PUBLIC + void + add_publisher(const rclcpp::PublisherBase::SharedPtr publisher_ptr); + RCLCPP_PUBLIC void add_subscription(const rclcpp::SubscriptionBase::SharedPtr subscription_ptr); @@ -119,6 +128,7 @@ class CallbackGroup CallbackGroupType type_; // Mutex to protect the subsequent vectors of pointers. mutable std::mutex mutex_; + std::vector publisher_ptrs_; std::vector subscription_ptrs_; std::vector timer_ptrs_; std::vector service_ptrs_; diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index f136f33653..e21b00687b 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -31,6 +31,8 @@ create_publisher( rclcpp::node_interfaces::NodeTopicsInterface * node_topics, const std::string & topic_name, const rmw_qos_profile_t & qos_profile, + const PublisherEventCallbacks & event_callbacks, + rclcpp::callback_group::CallbackGroup::SharedPtr group, bool use_intra_process_comms, std::shared_ptr allocator) { @@ -39,10 +41,12 @@ create_publisher( auto pub = node_topics->create_publisher( topic_name, - rclcpp::create_publisher_factory(allocator), + rclcpp::create_publisher_factory(event_callbacks, allocator), publisher_options, use_intra_process_comms); - node_topics->add_publisher(pub); + + node_topics->add_publisher(pub, group); + return std::dynamic_pointer_cast(pub); } diff --git a/rclcpp/include/rclcpp/create_subscription.hpp b/rclcpp/include/rclcpp/create_subscription.hpp index a4ea581092..e765d376e2 100644 --- a/rclcpp/include/rclcpp/create_subscription.hpp +++ b/rclcpp/include/rclcpp/create_subscription.hpp @@ -38,6 +38,7 @@ create_subscription( const std::string & topic_name, CallbackT && callback, const rmw_qos_profile_t & qos_profile, + const SubscriptionEventCallbacks & event_callbacks, rclcpp::callback_group::CallbackGroup::SharedPtr group, bool ignore_local_publications, bool use_intra_process_comms, @@ -52,7 +53,10 @@ create_subscription( auto factory = rclcpp::create_subscription_factory ( - std::forward(callback), msg_mem_strat, allocator); + std::forward(callback), + event_callbacks, + msg_mem_strat, + allocator); auto sub = node_topics->create_subscription( topic_name, diff --git a/rclcpp/include/rclcpp/node.hpp b/rclcpp/include/rclcpp/node.hpp index a3fe7eb472..21aca82966 100644 --- a/rclcpp/include/rclcpp/node.hpp +++ b/rclcpp/include/rclcpp/node.hpp @@ -139,7 +139,7 @@ class Node : public std::enable_shared_from_this const std::vector & get_callback_groups() const; - /// Create and return a Publisher. + /// Create and return a Publisher. Note: This constructor is deprecated /** * \param[in] topic_name The topic for this publisher to publish on. * \param[in] qos_history_depth The depth of the publisher message queue. @@ -151,10 +151,11 @@ class Node : public std::enable_shared_from_this typename PublisherT = ::rclcpp::Publisher> std::shared_ptr create_publisher( - const std::string & topic_name, size_t qos_history_depth, + const std::string & topic_name, + size_t qos_history_depth, std::shared_ptr allocator = nullptr); - /// Create and return a Publisher. + /// Create and return a Publisher. Note: this constructor is deprecated /** * \param[in] topic_name The topic for this publisher to publish on. * \param[in] qos_profile The quality of service profile to pass on to the rmw implementation. @@ -170,7 +171,23 @@ class Node : public std::enable_shared_from_this const rmw_qos_profile_t & qos_profile = rmw_qos_profile_default, std::shared_ptr allocator = nullptr); - /// Create and return a Subscription. + /// Create and return a Publisher. + /** + * \param[in] topic_name The topic for this publisher to publish on. + * \param[in] options Additional options for the created Publisher + * \return Shared pointer to the created publisher. + */ + template< + typename MessageT, + typename Alloc = std::allocator, + typename PublisherT = ::rclcpp::Publisher> + std::shared_ptr + create_publisher( + const std::string & topic_name, + const PublisherOptions & options, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group = nullptr); + + /// Create and return a Subscription. Note: this constructor is deprecated /** * \param[in] topic_name The topic to subscribe on. * \param[in] callback The user-defined callback function. @@ -203,7 +220,7 @@ class Node : public std::enable_shared_from_this msg_mem_strat = nullptr, std::shared_ptr allocator = nullptr); - /// Create and return a Subscription. + /// Create and return a Subscription. Note: this constructor is deprecated /** * \param[in] topic_name The topic to subscribe on. * \param[in] qos_history_depth The depth of the subscription's incoming message queue. @@ -236,6 +253,34 @@ class Node : public std::enable_shared_from_this msg_mem_strat = nullptr, std::shared_ptr allocator = nullptr); + /// Create and return a Subscription. + /** + * \param[in] topic_name The topic to subscribe on. + * \param[in] callback The user-defined callback function to receive a message + * \param[in] options Additional options for the creation of the Subscription + * \param[in] msg_mem_strat The message memory strategy to use for allocating messages. + * \return Shared pointer to the created subscription. + */ + /* TODO(jacquelinekay): + Windows build breaks when static member function passed as default + argument to msg_mem_strat, nullptr is a workaround. + */ + template< + typename MessageT, + typename CallbackT, + typename Alloc = std::allocator, + typename SubscriptionT = rclcpp::Subscription< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>> + std::shared_ptr + create_subscription( + const std::string & topic_name, + CallbackT && callback, + const SubscriptionOptions & options, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group = nullptr, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr + msg_mem_strat = nullptr); + /// Create a timer. /** * \param[in] period Time interval between triggers of the callback. @@ -600,6 +645,16 @@ class Node : public std::enable_shared_from_this const NodeOptions & get_node_options() const; + /// Manually assert that this Node is alive (for RMW_QOS_POLICY_MANUAL_BY_NODE) + /** + * If the rmw Liveliness policy is set to RMW_QOS_POLICY_MANUAL_BY_NODE, the creator of this + * Node must manually call `assert_liveliness` periodically to signal that this Node + * is still alive. Must be called at least as often as qos_profile's Liveliness lease_duration + */ + RCLCPP_PUBLIC + void + assert_liveliness() {} + protected: /// Construct a sub-node, which will extend the namespace of all entities created with it. /** diff --git a/rclcpp/include/rclcpp/node_impl.hpp b/rclcpp/include/rclcpp/node_impl.hpp index a2ef35b9bb..49cb322da4 100644 --- a/rclcpp/include/rclcpp/node_impl.hpp +++ b/rclcpp/include/rclcpp/node_impl.hpp @@ -54,7 +54,8 @@ namespace rclcpp template std::shared_ptr Node::create_publisher( - const std::string & topic_name, size_t qos_history_depth, + const std::string & topic_name, + size_t qos_history_depth, std::shared_ptr allocator) { if (!allocator) { @@ -80,7 +81,8 @@ extend_name_with_sub_namespace(const std::string & name, const std::string & sub template std::shared_ptr Node::create_publisher( - const std::string & topic_name, const rmw_qos_profile_t & qos_profile, + const std::string & topic_name, + const rmw_qos_profile_t & qos_profile, std::shared_ptr allocator) { if (!allocator) { @@ -91,6 +93,30 @@ Node::create_publisher( this->node_topics_.get(), extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), qos_profile, + PublisherEventCallbacks(), + nullptr, + this->get_node_options().use_intra_process_comms(), + allocator); +} + +template +std::shared_ptr +Node::create_publisher( + const std::string & topic_name, + const PublisherOptions & options, + rclcpp::callback_group::CallbackGroup::SharedPtr group) +{ + std::shared_ptr allocator = options.allocator(); + if (!allocator) { + allocator = std::make_shared(); + } + + return rclcpp::create_publisher( + this->node_topics_.get(), + extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), + options.qos_profile(), + options.event_callbacks(), + group, this->get_node_options().use_intra_process_comms(), allocator); } @@ -128,6 +154,7 @@ Node::create_subscription( extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), std::forward(callback), qos_profile, + SubscriptionEventCallbacks(), group, ignore_local_publications, this->get_node_options().use_intra_process_comms(), @@ -135,6 +162,46 @@ Node::create_subscription( allocator); } +template< + typename MessageT, + typename CallbackT, + typename Alloc, + typename SubscriptionT> +std::shared_ptr +Node::create_subscription( + const std::string & topic_name, + CallbackT && callback, + const SubscriptionOptions & options, + rclcpp::callback_group::CallbackGroup::SharedPtr group, + typename rclcpp::message_memory_strategy::MessageMemoryStrategy< + typename rclcpp::subscription_traits::has_message_type::type, Alloc>::SharedPtr + msg_mem_strat) +{ + using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type::type; + + std::shared_ptr allocator = options.allocator(); + if (!allocator) { + allocator = std::make_shared(); + } + + if (!msg_mem_strat) { + using rclcpp::message_memory_strategy::MessageMemoryStrategy; + msg_mem_strat = MessageMemoryStrategy::create_default(); + } + + return rclcpp::create_subscription( + this->node_topics_.get(), + extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), + std::forward(callback), + options.qos_profile(), + options.event_callbacks(), + group, + options.ignore_local_publications(), + this->get_node_options().use_intra_process_comms(), + msg_mem_strat, + allocator); +} + template< typename MessageT, typename CallbackT, diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp index 31cf62e54c..40beb3aacb 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics.hpp @@ -58,7 +58,8 @@ class NodeTopics : public NodeTopicsInterface virtual void add_publisher( - rclcpp::PublisherBase::SharedPtr publisher); + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group); RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp index b3d46ff334..d888e3ccd2 100644 --- a/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp +++ b/rclcpp/include/rclcpp/node_interfaces/node_topics_interface.hpp @@ -57,7 +57,8 @@ class NodeTopicsInterface virtual void add_publisher( - rclcpp::PublisherBase::SharedPtr publisher) = 0; + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) = 0; RCLCPP_PUBLIC virtual diff --git a/rclcpp/include/rclcpp/parameter_client.hpp b/rclcpp/include/rclcpp/parameter_client.hpp index 879f261836..6b68a793fa 100644 --- a/rclcpp/include/rclcpp/parameter_client.hpp +++ b/rclcpp/include/rclcpp/parameter_client.hpp @@ -127,6 +127,7 @@ class AsyncParametersClient "parameter_events", std::forward(callback), rmw_qos_profile_default, + SubscriptionEventCallbacks(), nullptr, // group, false, // ignore_local_publications, false, // use_intra_process_comms_, diff --git a/rclcpp/include/rclcpp/publisher.hpp b/rclcpp/include/rclcpp/publisher.hpp index 4500fedab5..6b95429c8d 100644 --- a/rclcpp/include/rclcpp/publisher.hpp +++ b/rclcpp/include/rclcpp/publisher.hpp @@ -31,17 +31,21 @@ #include "rclcpp/allocator/allocator_common.hpp" #include "rclcpp/allocator/allocator_deleter.hpp" +#include "rclcpp/exceptions.hpp" #include "rclcpp/macros.hpp" -#include "rclcpp/node_interfaces/node_base_interface.hpp" +#include "rclcpp/publisher_options.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" namespace rclcpp { -// Forward declaration is used for friend statement. namespace node_interfaces { +// NOTE(emersonknapp) Forward declaration avoids including node_base_interface.hpp which causes +// circular inclusion from callback_group.hpp +class NodeBaseInterface; +// Forward declaration is used for friend statement. class NodeTopicsInterface; } @@ -128,6 +132,16 @@ class PublisherBase size_t get_intra_process_subscription_count() const; + /// Manually assert that this Publisher is alive (for RMW_QOS_POLICY_MANUAL_BY_TOPIC) + /** + * If the rmw Liveliness policy is set to RMW_QOS_POLICY_MANUAL_BY_TOPIC, the creator of this + * Publisher must manually call `assert_liveliness` periodically to signal that this Publisher + * is still alive. Must be called at least as often as qos_profile's Liveliness lease_duration + */ + RCLCPP_PUBLIC + void + assert_liveliness() {} + /// Compare this publisher to a gid. /** * Note that this function calls the next function. @@ -194,6 +208,7 @@ class Publisher : public PublisherBase rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rcl_publisher_options_t & publisher_options, + const PublisherEventCallbacks & /* event_callbacks */, const std::shared_ptr & allocator) : PublisherBase( node_base, diff --git a/rclcpp/include/rclcpp/publisher_factory.hpp b/rclcpp/include/rclcpp/publisher_factory.hpp index 4044d1a6cc..98ab04dc6a 100644 --- a/rclcpp/include/rclcpp/publisher_factory.hpp +++ b/rclcpp/include/rclcpp/publisher_factory.hpp @@ -75,13 +75,15 @@ struct PublisherFactory /// Return a PublisherFactory with functions setup for creating a PublisherT. template PublisherFactory -create_publisher_factory(std::shared_ptr allocator) +create_publisher_factory( + const PublisherEventCallbacks & event_callbacks, + std::shared_ptr allocator) { PublisherFactory factory; // factory function that creates a MessageT specific PublisherT factory.create_typed_publisher = - [allocator]( + [event_callbacks, allocator]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, rcl_publisher_options_t & publisher_options) -> std::shared_ptr @@ -89,7 +91,8 @@ create_publisher_factory(std::shared_ptr allocator) auto message_alloc = std::make_shared(*allocator.get()); publisher_options.allocator = allocator::get_rcl_allocator(*message_alloc.get()); - return std::make_shared(node_base, topic_name, publisher_options, message_alloc); + return std::make_shared(node_base, topic_name, publisher_options, + event_callbacks, message_alloc); }; // function to add a publisher to the intra process manager diff --git a/rclcpp/include/rclcpp/publisher_options.hpp b/rclcpp/include/rclcpp/publisher_options.hpp new file mode 100644 index 0000000000..d45a2ec1a7 --- /dev/null +++ b/rclcpp/include/rclcpp/publisher_options.hpp @@ -0,0 +1,44 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__PUBLISHER_OPTIONS_HPP_ +#define RCLCPP__PUBLISHER_OPTIONS_HPP_ + +#include +#include +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +/// Contains callbacks for various types of events a Publisher can receive from the middleware +struct PublisherEventCallbacks +{ + // NOTE: empty placeholder for addition of QoS event callbacks +}; + +/// Structure containing configuration options for Publishers/Subscribers +template> +struct PublisherOptions +{ + PublisherEventCallbacks event_callbacks; + rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + std::shared_ptr allocator = nullptr; +}; + +} // namespace rclcpp + +#endif // RCLCPP__PUBLISHER_OPTIONS_HPP_ diff --git a/rclcpp/include/rclcpp/subscription.hpp b/rclcpp/include/rclcpp/subscription.hpp index 2a2a3e4eef..edc3f2fb70 100644 --- a/rclcpp/include/rclcpp/subscription.hpp +++ b/rclcpp/include/rclcpp/subscription.hpp @@ -34,6 +34,7 @@ #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/macros.hpp" #include "rclcpp/message_memory_strategy.hpp" +#include "rclcpp/subscription_options.hpp" #include "rclcpp/subscription_traits.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -195,6 +196,7 @@ class Subscription : public SubscriptionBase const std::string & topic_name, const rcl_subscription_options_t & subscription_options, AnySubscriptionCallback callback, + const SubscriptionEventCallbacks & /* event_callbacks */, typename message_memory_strategy::MessageMemoryStrategy::SharedPtr memory_strategy = message_memory_strategy::MessageMemoryStrategy::create_default()) diff --git a/rclcpp/include/rclcpp/subscription_factory.hpp b/rclcpp/include/rclcpp/subscription_factory.hpp index e2bdec7ef2..905e10d3de 100644 --- a/rclcpp/include/rclcpp/subscription_factory.hpp +++ b/rclcpp/include/rclcpp/subscription_factory.hpp @@ -75,6 +75,7 @@ template< SubscriptionFactory create_subscription_factory( CallbackT && callback, + const SubscriptionEventCallbacks & event_callbacks, typename rclcpp::message_memory_strategy::MessageMemoryStrategy< CallbackMessageT, Alloc>::SharedPtr msg_mem_strat, @@ -91,7 +92,7 @@ create_subscription_factory( // factory function that creates a MessageT specific SubscriptionT factory.create_typed_subscription = - [allocator, msg_mem_strat, any_subscription_callback, message_alloc]( + [allocator, msg_mem_strat, any_subscription_callback, event_callbacks, message_alloc]( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic_name, rcl_subscription_options_t & subscription_options @@ -109,6 +110,7 @@ create_subscription_factory( topic_name, subscription_options, any_subscription_callback, + event_callbacks, msg_mem_strat); auto sub_base_ptr = std::dynamic_pointer_cast(sub); return sub_base_ptr; diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp new file mode 100644 index 0000000000..df9495b07c --- /dev/null +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -0,0 +1,44 @@ +// Copyright 2019 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SUBSCRIPTION_OPTIONS_HPP_ +#define RCLCPP__SUBSCRIPTION_OPTIONS_HPP_ + +#include +#include +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +/// Contains callbacks for non-message events that a Subscriber can receive from the middleware +struct SubscriptionEventCallbacks +{ + // NOTE: placeholder for addition of QoS Event callbacks +}; + +template> +struct SubscriptionOptions +{ + SubscriptionEventCallbacks event_callbacks; + rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + bool ignore_local_publications = false; + std::shared_ptr allocator = nullptr; +}; + +} // namespace rclcpp + +#endif // RCLCPP__SUBSCRIPTION_OPTIONS_HPP_ diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index b812afc516..e269a6120b 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -23,6 +23,13 @@ CallbackGroup::CallbackGroup(CallbackGroupType group_type) : type_(group_type), can_be_taken_from_(true) {} +const std::vector & +CallbackGroup::get_publisher_ptrs() const +{ + std::lock_guard lock(mutex_); + return publisher_ptrs_; +} + const std::vector & CallbackGroup::get_subscription_ptrs() const { @@ -70,6 +77,14 @@ CallbackGroup::type() const return type_; } +void +CallbackGroup::add_publisher( + const rclcpp::PublisherBase::SharedPtr publisher_ptr) +{ + std::lock_guard lock(mutex_); + publisher_ptrs_.push_back(publisher_ptr); +} + void CallbackGroup::add_subscription( const rclcpp::SubscriptionBase::SharedPtr subscription_ptr) diff --git a/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp b/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp index 8c4d14b94c..345715bd6f 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_parameters.cpp @@ -59,6 +59,8 @@ NodeParameters::NodeParameters( node_topics.get(), "parameter_events", parameter_event_qos_profile, + PublisherEventCallbacks(), + nullptr, use_intra_process, allocator); } diff --git a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp index b380e3c2ec..99e73b586a 100644 --- a/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp +++ b/rclcpp/src/rclcpp/node_interfaces/node_topics.cpp @@ -64,11 +64,19 @@ NodeTopics::create_publisher( void NodeTopics::add_publisher( - rclcpp::PublisherBase::SharedPtr publisher) + rclcpp::PublisherBase::SharedPtr publisher, + rclcpp::callback_group::CallbackGroup::SharedPtr callback_group) { - // The publisher is not added to a callback group or anthing like that for now. - // It may be stored within the NodeTopics class or the NodeBase class in the future. - (void)publisher; + // Assign to a group. + if (callback_group) { + if (!node_base_->callback_group_in_node(callback_group)) { + throw std::runtime_error("Cannot create publisher, callback group not in node."); + } + callback_group->add_publisher(publisher); + } else { + node_base_->get_default_callback_group()->add_publisher(publisher); + } + // Notify the executor that a new publisher was created using the parent Node. { auto notify_guard_condition_lock = node_base_->acquire_notify_guard_condition_lock(); diff --git a/rclcpp/src/rclcpp/time_source.cpp b/rclcpp/src/rclcpp/time_source.cpp index 173ccf054f..8ea80664da 100644 --- a/rclcpp/src/rclcpp/time_source.cpp +++ b/rclcpp/src/rclcpp/time_source.cpp @@ -214,6 +214,7 @@ void TimeSource::create_clock_sub() topic_name, std::move(cb), rmw_qos_profile_default, + rclcpp::SubscriptionEventCallbacks(), group, false, false, diff --git a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp index 69d8a48e96..3a79b4486d 100644 --- a/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp +++ b/rclcpp_lifecycle/include/rclcpp_lifecycle/lifecycle_node_impl.hpp @@ -25,6 +25,7 @@ #include "rclcpp/create_publisher.hpp" #include "rclcpp/create_service.hpp" #include "rclcpp/create_subscription.hpp" +#include "rclcpp/subscription_options.hpp" #include "rclcpp/type_support_decl.hpp" #include "lifecycle_publisher.hpp" @@ -97,6 +98,7 @@ LifecycleNode::create_subscription( topic_name, std::forward(callback), qos_profile, + rclcpp::SubscriptionEventCallbacks(), group, ignore_local_publications, use_intra_process_comms_, @@ -126,6 +128,7 @@ LifecycleNode::create_subscription( topic_name, std::forward(callback), qos, + rclcpp::SubscriptionEventCallbacks(), group, ignore_local_publications, msg_mem_strat,