Skip to content

Commit

Permalink
Options-struct interfaces for creating publishers/subscribers (pre-Qo…
Browse files Browse the repository at this point in the history
…S, standalone) (ros2#673)

* Options-based create_publisher and create_subscription interfaces

Introduce new Options structs for creating publishers and subscribers. Deprecate existing interfaces for checking in CI how often they are used.

Signed-off-by: Emerson Knapp <eknapp@amazon.com>

* Remove default params that resulted in ambiguous declarations.

Signed-off-by: Emerson Knapp <eknapp@amazon.com>

* Remove one deprecation to limit upstream impact, add documentation on pub/sub options, slim down test lambdas character count

Signed-off-by: Emerson Knapp <eknapp@amazon.com>

* Un-abbreviate Allocator in new interfaces/types, define a common Options specialization that doesn't need empty brackets

Signed-off-by: Emerson Knapp <eknapp@amazon.com>

* Suppress cppcheck syntaxError for the one function

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
emersonknapp authored and christopherho-ApexAI committed Jun 3, 2019
1 parent c0b8c47 commit 86551d8
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 62 deletions.
9 changes: 9 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ if(BUILD_TESTING)
)
target_link_libraries(test_publisher ${PROJECT_NAME})
endif()
ament_add_gtest(test_pub_sub_option_interface test/test_pub_sub_option_interface.cpp)
if(TARGET test_pub_sub_option_interface)
ament_target_dependencies(test_pub_sub_option_interface
test_msgs
)
target_link_libraries(test_pub_sub_option_interface
${PROJECT_NAME}
)
endif()
ament_add_gtest(test_publisher_subscription_count_api test/test_publisher_subscription_count_api.cpp)
if(TARGET test_publisher_subscription_count_api)
ament_target_dependencies(test_publisher_subscription_count_api
Expand Down
34 changes: 34 additions & 0 deletions rclcpp/include/rclcpp/intra_process_setting.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2019 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__INTRA_PROCESS_SETTING_HPP_
#define RCLCPP__INTRA_PROCESS_SETTING_HPP_

namespace rclcpp
{

/// Used as argument in create_publisher and create_subscriber.
enum class IntraProcessSetting
{
/// Explicitly enable intraprocess comm at publisher/subscription level.
Enable,
/// Explicitly disable intraprocess comm at publisher/subscription level.
Disable,
/// Take intraprocess configuration from the node.
NodeDefault
};

} // namespace rclcpp

#endif // RCLCPP__INTRA_PROCESS_SETTING_HPP_
76 changes: 62 additions & 14 deletions rclcpp/include/rclcpp/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@
#include "rclcpp/node_interfaces/node_waitables_interface.hpp"
#include "rclcpp/parameter.hpp"
#include "rclcpp/publisher.hpp"
#include "rclcpp/publisher_options.hpp"
#include "rclcpp/service.hpp"
#include "rclcpp/subscription.hpp"
#include "rclcpp/subscription_options.hpp"
#include "rclcpp/subscription_traits.hpp"
#include "rclcpp/time.hpp"
#include "rclcpp/timer.hpp"
Expand All @@ -64,17 +66,6 @@
namespace rclcpp
{

/// Used as argument in create_publisher and create_subscriber.
enum class IntraProcessSetting
{
/// Explicitly enable intraprocess comm at publisher/subscription level.
Enable,
/// Explicitly disable intraprocess comm at publisher/subscription level.
Disable,
/// Take intraprocess configuration from the node.
NodeDefault
};

/// Node is the single point of entry for creating publishers and subscribers.
class Node : public std::enable_shared_from_this<Node>
{
Expand Down Expand Up @@ -150,6 +141,24 @@ class Node : public std::enable_shared_from_this<Node>
const std::vector<rclcpp::callback_group::CallbackGroup::WeakPtr> &
get_callback_groups() const;

/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
* \param[in] qos_history_depth The depth of the publisher message queue.
* \param[in] options Additional options for the created Publisher.
* \return Shared pointer to the created publisher.
*/
template<
typename MessageT,
typename AllocatorT = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, AllocatorT>>
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name,
size_t qos_history_depth,
const PublisherOptionsWithAllocator<AllocatorT> &
options = PublisherOptionsWithAllocator<AllocatorT>());

/// Create and return a Publisher.
/**
* \param[in] topic_name The topic for this publisher to publish on.
Expand All @@ -160,10 +169,15 @@ class Node : public std::enable_shared_from_this<Node>
template<
typename MessageT, typename Alloc = std::allocator<void>,
typename PublisherT = ::rclcpp::Publisher<MessageT, Alloc>>
// cppcheck-suppress syntaxError // bug in cppcheck 1.82 for [[deprecated]] on templated function
[[deprecated(
"use the create_publisher(const std::string &, size_t, const PublisherOptions<Alloc> & = "
"PublisherOptions<Alloc>()) signature instead")]]
std::shared_ptr<PublisherT>
create_publisher(
const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator = nullptr,
const std::string & topic_name,
size_t qos_history_depth,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault);

/// Create and return a Publisher.
Expand All @@ -183,6 +197,37 @@ class Node : public std::enable_shared_from_this<Node>
std::shared_ptr<Alloc> allocator = nullptr,
IntraProcessSetting use_intra_process_comm = IntraProcessSetting::NodeDefault);

/// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
* \param[in] callback The user-defined callback function to receive a message
* \param[in] qos_history_depth The depth of the subscription's incoming message queue.
* \param[in] options Additional options for the creation of the Subscription.
* \param[in] msg_mem_strat The message memory strategy to use for allocating messages.
* \return Shared pointer to the created subscription.
*/
/* TODO(jacquelinekay):
Windows build breaks when static member function passed as default
argument to msg_mem_strat, nullptr is a workaround.
*/
template<
typename MessageT,
typename CallbackT,
typename AllocatorT = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>>
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
const SubscriptionOptionsWithAllocator<AllocatorT> &
options = SubscriptionOptionsWithAllocator<AllocatorT>(),
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT
>::SharedPtr
msg_mem_strat = nullptr);

/// Create and return a Subscription.
/**
* \param[in] topic_name The topic to subscribe on.
Expand Down Expand Up @@ -238,12 +283,15 @@ class Node : public std::enable_shared_from_this<Node>
typename Alloc = std::allocator<void>,
typename SubscriptionT = rclcpp::Subscription<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>>
[[deprecated(
"use the create_subscription(const std::string &, CallbackT &&, size_t, "
"const SubscriptionOptions<Alloc> & = SubscriptionOptions<Alloc>(), ...) signature instead")]]
std::shared_ptr<SubscriptionT>
create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group = nullptr,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications = false,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
Expand Down
139 changes: 91 additions & 48 deletions rclcpp/include/rclcpp/node_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,22 +51,6 @@
namespace rclcpp
{

template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
if (!allocator) {
allocator = std::make_shared<Alloc>();
}
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;
return this->create_publisher<MessageT, Alloc, PublisherT>(topic_name, qos,
allocator, use_intra_process_comm);
}

RCLCPP_LOCAL
inline
std::string
Expand All @@ -79,18 +63,22 @@ extend_name_with_sub_namespace(const std::string & name, const std::string & sub
return name_with_sub_namespace;
}

template<typename MessageT, typename Alloc, typename PublisherT>
template<typename MessageT, typename AllocatorT, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
std::shared_ptr<Alloc> allocator, IntraProcessSetting use_intra_process_comm)
const std::string & topic_name,
size_t qos_history_depth,
const PublisherOptionsWithAllocator<AllocatorT> & options)
{
std::shared_ptr<AllocatorT> allocator = options.allocator;
if (!allocator) {
allocator = std::make_shared<Alloc>();
allocator = std::make_shared<AllocatorT>();
}
rmw_qos_profile_t qos_profile = options.qos_profile;
qos_profile.depth = qos_history_depth;

bool use_intra_process;
switch (use_intra_process_comm) {
switch (options.use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
Expand All @@ -105,45 +93,74 @@ Node::create_publisher(
break;
}

return rclcpp::create_publisher<MessageT, Alloc, PublisherT>(
return rclcpp::create_publisher<MessageT, AllocatorT, PublisherT>(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
qos_profile,
use_intra_process,
allocator);
}

template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, size_t qos_history_depth,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
PublisherOptionsWithAllocator<Alloc> pub_options;
pub_options.allocator = allocator;
pub_options.use_intra_process_comm = use_intra_process_comm;
return this->create_publisher<MessageT, Alloc, PublisherT>(
topic_name, qos_history_depth, pub_options);
}

template<typename MessageT, typename Alloc, typename PublisherT>
std::shared_ptr<PublisherT>
Node::create_publisher(
const std::string & topic_name, const rmw_qos_profile_t & qos_profile,
std::shared_ptr<Alloc> allocator, IntraProcessSetting use_intra_process_comm)
{
PublisherOptionsWithAllocator<Alloc> pub_options;
pub_options.qos_profile = qos_profile;
pub_options.allocator = allocator;
pub_options.use_intra_process_comm = use_intra_process_comm;
return this->create_publisher<MessageT, Alloc, PublisherT>(
topic_name, qos_profile.depth, pub_options);
}

template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename AllocatorT,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
size_t qos_history_depth,
const SubscriptionOptionsWithAllocator<AllocatorT> & options,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, AllocatorT>::SharedPtr
msg_mem_strat)
{
using CallbackMessageT = typename rclcpp::subscription_traits::has_message_type<CallbackT>::type;

std::shared_ptr<AllocatorT> allocator = options.allocator;
if (!allocator) {
allocator = std::make_shared<Alloc>();
allocator = std::make_shared<AllocatorT>();
}

rmw_qos_profile_t qos_profile = options.qos_profile;
qos_profile.depth = qos_history_depth;

if (!msg_mem_strat) {
using rclcpp::message_memory_strategy::MessageMemoryStrategy;
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, Alloc>::create_default();
msg_mem_strat = MessageMemoryStrategy<CallbackMessageT, AllocatorT>::create_default();
}

bool use_intra_process;
switch (use_intra_process_comm) {
switch (options.use_intra_process_comm) {
case IntraProcessSetting::Enable:
use_intra_process = true;
break;
Expand All @@ -158,13 +175,14 @@ Node::create_subscription(
break;
}

return rclcpp::create_subscription<MessageT, CallbackT, Alloc, CallbackMessageT, SubscriptionT>(
return rclcpp::create_subscription<
MessageT, CallbackT, AllocatorT, CallbackMessageT, SubscriptionT>(
this->node_topics_.get(),
extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
std::forward<CallbackT>(callback),
qos_profile,
group,
ignore_local_publications,
options.callback_group,
options.ignore_local_publications,
use_intra_process,
msg_mem_strat,
allocator);
Expand All @@ -179,7 +197,7 @@ std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
const rmw_qos_profile_t & qos_profile,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
Expand All @@ -188,18 +206,43 @@ Node::create_subscription(
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
rmw_qos_profile_t qos = rmw_qos_profile_default;
qos.depth = qos_history_depth;
SubscriptionOptionsWithAllocator<Alloc> sub_options;
sub_options.qos_profile = qos_profile;
sub_options.callback_group = group;
sub_options.ignore_local_publications = ignore_local_publications;
sub_options.allocator = allocator;
sub_options.use_intra_process_comm = use_intra_process_comm;

return this->create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
topic_name, std::forward<CallbackT>(callback), qos_profile.depth, sub_options, msg_mem_strat);
}

return this->create_subscription<MessageT>(
topic_name,
std::forward<CallbackT>(callback),
qos,
group,
ignore_local_publications,
msg_mem_strat,
allocator,
use_intra_process_comm);
template<
typename MessageT,
typename CallbackT,
typename Alloc,
typename SubscriptionT>
std::shared_ptr<SubscriptionT>
Node::create_subscription(
const std::string & topic_name,
CallbackT && callback,
size_t qos_history_depth,
rclcpp::callback_group::CallbackGroup::SharedPtr group,
bool ignore_local_publications,
typename rclcpp::message_memory_strategy::MessageMemoryStrategy<
typename rclcpp::subscription_traits::has_message_type<CallbackT>::type, Alloc>::SharedPtr
msg_mem_strat,
std::shared_ptr<Alloc> allocator,
IntraProcessSetting use_intra_process_comm)
{
SubscriptionOptionsWithAllocator<Alloc> sub_options;
sub_options.callback_group = group;
sub_options.ignore_local_publications = ignore_local_publications;
sub_options.allocator = allocator;
sub_options.use_intra_process_comm = use_intra_process_comm;

return this->create_subscription<MessageT, CallbackT, Alloc, SubscriptionT>(
topic_name, std::forward<CallbackT>(callback), qos_history_depth, sub_options, msg_mem_strat);
}

template<typename DurationRepT, typename DurationT, typename CallbackT>
Expand Down
Loading

0 comments on commit 86551d8

Please sign in to comment.