From 9d0af6df5e41cddb5c0b4e44295d3d322e220d20 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Thu, 28 Mar 2019 17:27:02 -0700 Subject: [PATCH 1/6] Add demos for liveliness, lifespan, and deadline QoS policies Signed-off-by: Emerson Knapp --- quality_of_service_demo/CMakeLists.txt | 42 +++++ quality_of_service_demo/README.md | 45 ++++++ .../quality_of_service_demo/common_nodes.hpp | 61 +++++++ quality_of_service_demo/package.xml | 38 +++++ quality_of_service_demo/src/common_nodes.cpp | 101 ++++++++++++ quality_of_service_demo/src/deadline.cpp | 113 +++++++++++++ quality_of_service_demo/src/lifespan.cpp | 111 +++++++++++++ quality_of_service_demo/src/liveliness.cpp | 153 ++++++++++++++++++ 8 files changed, 664 insertions(+) create mode 100644 quality_of_service_demo/CMakeLists.txt create mode 100644 quality_of_service_demo/README.md create mode 100644 quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp create mode 100644 quality_of_service_demo/package.xml create mode 100644 quality_of_service_demo/src/common_nodes.cpp create mode 100644 quality_of_service_demo/src/deadline.cpp create mode 100644 quality_of_service_demo/src/lifespan.cpp create mode 100644 quality_of_service_demo/src/liveliness.cpp diff --git a/quality_of_service_demo/CMakeLists.txt b/quality_of_service_demo/CMakeLists.txt new file mode 100644 index 000000000..ee2e01ca8 --- /dev/null +++ b/quality_of_service_demo/CMakeLists.txt @@ -0,0 +1,42 @@ +cmake_minimum_required(VERSION 3.5) + +project(quality_of_service_demo) + +# Default to C++14 +if(NOT CMAKE_CXX_STANDARD) + set(CMAKE_CXX_STANDARD 14) +endif() + +if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") + add_compile_options(-Wall -Wextra -Wpedantic) +endif() + +find_package(ament_cmake REQUIRED) +find_package(rclcpp REQUIRED) +find_package(rcutils) +find_package(rmw REQUIRED) +find_package(std_msgs REQUIRED) + +include_directories(include) +ament_export_include_directories(include) + +function(qos_demo_executable target) + add_executable(${target} src/${target}.cpp src/common_nodes.cpp) + ament_target_dependencies(${target} + "rclcpp" + "rcutils" + "std_msgs" + ) + install(TARGETS ${target} DESTINATION lib/${PROJECT_NAME}) +endfunction() + +qos_demo_executable(liveliness) +qos_demo_executable(lifespan) +qos_demo_executable(deadline) + +if(BUILD_TESTING) + find_package(ament_lint_auto REQUIRED) + ament_lint_auto_find_test_dependencies() +endif() + +ament_package() diff --git a/quality_of_service_demo/README.md b/quality_of_service_demo/README.md new file mode 100644 index 000000000..23ea8d947 --- /dev/null +++ b/quality_of_service_demo/README.md @@ -0,0 +1,45 @@ +# Quality of Service Demos + +The demo applications in this package show the various Quality of Service settings available in ROS2. The intent is to provide a dedicated application for each QoS type, in order to highlight that single feature in isolation. + +## History +No History demo app exists yet in this package + +## Reliability +No Reliability demo app exists yet in this package + +## Durability +No Durability demo app exists yet in this package + +## Deadline +`quality_of_service_demo/deadline` demonstrates messages not meeting their required deadline. + +The application requires an argument a `deadline_duration` - this is the maximum acceptable time (in milliseconds) between messages published on the topic. + +The Publisher in this demo will pause publishing periodically, which will print deadline expirations from the Subscriber. + +Run `quality_of_service_demo/deadline -h` to see the usage for more configuration options. + +## Lifespan +`quality_of_service_demo/lifespan` demonstrates messages being deleted from a Publisher's outgoing queue once their configured lifespan expires. + +The application requires an argument `lifespan_duration` - this is the maximum time (in milliseconds) that a message will sit in the Publisher's outgoing queue. + +The Publisher in this demo will publish a preset number of messaegs, then stop Publishing. A Subscriber will start subscribing after a preset amount of time and should receive _fewer_ messages backfilled than were originally published, because some of them outlasted their lifespan and were removed. + +Run `quality_of_service_demo/lifespan -h` to see the usage for more configuration options. + +Examples: +* `lifespan 1000 -s 3000 -p 10` + * After a few seconds, you should see (approximately) messages 4-10 printed from the Subscriber + * The first 3 messages, with 1 second lifespan, were gone by the time the Subscriber joined at 3 seconds +* `lifespan 3000 -s 3000 -p 10` + * After a few seconds, you should see messages 0-10 printed from the Subscriber + * All messages, with their longer 3 second lifespan, survived until the subscriber joined. + +## Liveliness +`quality_of_service_demo/liveliness` demonstrates notification of liveliness changes, based on different Liveliness configurations. + +The application requires an argument `lease_duration` that specifies how often (in milliseconds) an entity must "check in" to let the system know it's alive. + +The Publisher in this demo will assert its liveliness based on passed in options, and be killed after some amount of time. When the Publisher is killed, you will see a Liveliness change event printed from the Publisher after `lease_duration` expires. diff --git a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp new file mode 100644 index 000000000..f0d9656c1 --- /dev/null +++ b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp @@ -0,0 +1,61 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_ +#define QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_ + +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" + +class Talker : public rclcpp::Node +{ +public: + Talker( + const std::string & topic_name, + rclcpp::PublisherOptions<> pub_options, + size_t max_count = 0, + std::chrono::milliseconds assert_node_period = std::chrono::milliseconds(0), + std::chrono::milliseconds assert_topic_period = std::chrono::milliseconds(0)); + + void pause_for(std::chrono::milliseconds pause_length); + + rclcpp::TimerBase::SharedPtr publish_timer_; + rclcpp::TimerBase::SharedPtr pause_timer_; + rclcpp::TimerBase::SharedPtr assert_node_timer_; + rclcpp::TimerBase::SharedPtr assert_topic_timer_; + rclcpp::Publisher::SharedPtr publisher_; + size_t count_ = 0; + size_t max_count_ = 0; +}; + +class Listener : public rclcpp::Node +{ +public: + Listener( + const std::string & topic_name, + rclcpp::SubscriptionOptions<> sub_options, + bool defer_subscribe = false); + void start_listening(); + +private: + rclcpp::Subscription::SharedPtr subscription_ = nullptr; + rclcpp::SubscriptionOptions<> sub_options_; + std::string topic_; +}; + +#endif // QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_ diff --git a/quality_of_service_demo/package.xml b/quality_of_service_demo/package.xml new file mode 100644 index 000000000..a7097e6ff --- /dev/null +++ b/quality_of_service_demo/package.xml @@ -0,0 +1,38 @@ + + + + quality_of_service_demo + 0.1.0 + Demo applications for Quality of Service features + + Amazon ROS Contributions + + Apache License 2.0 + + Emerson Knapp + + ament_cmake + + example_interfaces + rclcpp + rcutils + rmw + rmw_implementation_cmake + std_msgs + + example_interfaces + launch_ros + rclcpp + rcutils + rmw + std_msgs + + ament_lint_auto + ament_lint_common + launch + launch_testing + + + ament_cmake + + diff --git a/quality_of_service_demo/src/common_nodes.cpp b/quality_of_service_demo/src/common_nodes.cpp new file mode 100644 index 000000000..546236fc1 --- /dev/null +++ b/quality_of_service_demo/src/common_nodes.cpp @@ -0,0 +1,101 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "quality_of_service_demo/common_nodes.hpp" + +using namespace std::chrono_literals; + +Talker::Talker( + const std::string & topic_name, + rclcpp::PublisherOptions<> pub_options, + size_t max_count, + std::chrono::milliseconds assert_node_period, + std::chrono::milliseconds assert_topic_period) +: Node("talker"), + max_count_(max_count) +{ + RCLCPP_INFO(get_logger(), "Talker starting up"); + publisher_ = create_publisher(topic_name, nullptr, pub_options); + publish_timer_ = create_wall_timer( + 500ms, + [this]() -> void { + std_msgs::msg::String msg; + msg.data = "Talker says " + std::to_string(count_++); + publisher_->publish(msg); + if (max_count_ > 0 && count_ > max_count_) { + publish_timer_->cancel(); + } + }); + // If enabled, create timer to assert liveliness at the node level + if (assert_node_period != 0ms) { + assert_node_timer_ = create_wall_timer( + assert_node_period, + [this]() -> void { + this->assert_liveliness(); + }); + } + // If enabled, create timer to assert liveliness on the topic + if (assert_topic_period != 0ms) { + assert_topic_timer_ = create_wall_timer( + assert_topic_period, + [this]() -> void { + publisher_->assert_liveliness(); + }); + } +} + +void Talker::pause_for(std::chrono::milliseconds pause_length) +{ + if (pause_timer_) { + // Already paused - just ignore + return; + } + publish_timer_->cancel(); + pause_timer_ = create_wall_timer( + pause_length, + [this]() { + publish_timer_->reset(); + pause_timer_ = nullptr; + }); +} + + +Listener::Listener( + const std::string & topic_name, + rclcpp::SubscriptionOptions<> sub_options, + bool defer_subscribe) +: Node("listener"), + sub_options_(sub_options), + topic_(topic_name) +{ + RCLCPP_INFO(get_logger(), "Listener starting up"); + if (!defer_subscribe) { + start_listening(); + } +} + +void Listener::start_listening() +{ + subscription_ = create_subscription( + topic_, + [this](const typename std_msgs::msg::String::SharedPtr msg) -> void + { + RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str()); + }, + nullptr, + sub_options_); +} diff --git a/quality_of_service_demo/src/deadline.cpp b/quality_of_service_demo/src/deadline.cpp new file mode 100644 index 000000000..65967d639 --- /dev/null +++ b/quality_of_service_demo/src/deadline.cpp @@ -0,0 +1,113 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rcutils/cmdline_parser.h" + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" + +#include "std_msgs/msg/string.hpp" +#include "std_msgs/msg/bool.hpp" + +#include "quality_of_service_demo/common_nodes.hpp" + +static const size_t default_period_pause_talker = 5000; +static const size_t default_duration_pause_talker = 1000; + +void print_usage() +{ + printf("Usage for deadline:\n"); + printf("deadline deadline_duration [-p period_pause_talker] [-d duration_pause_talker] [-h]\n"); + printf("required arguments:\n"); + printf("deadline_duration: Duration (in ms) of the Deadline QoS setting.\n"); + printf("options:\n"); + printf("-h : Print this help function.\n"); + printf("-p period_pause_talker : How often to pause the talker (in ms). Defaults to %zu.\n", + default_period_pause_talker); + printf("-d duration_pause_talker : How long to pause the talker (in ms). Defaults to %zu.\n", + default_duration_pause_talker); +} + + +int main(int argc, char * argv[]) +{ + // Force flush of the stdout buffer. + setvbuf(stdout, NULL, _IONBF, BUFSIZ); + + // Argument parsing + if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { + print_usage(); + return 0; + } + + // Required arguments + size_t deadline_duration_ms = std::stoul(argv[1]); + + // Optional argument default values + std::chrono::milliseconds period_pause_talker(default_period_pause_talker); + std::chrono::milliseconds duration_pause_talker(default_duration_pause_talker); + + // Optional argument parsing + if (rcutils_cli_option_exist(argv, argv + argc, "-p")) { + period_pause_talker = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, "-p"))); + } + if (rcutils_cli_option_exist(argv, argv + argc, "-d")) { + duration_pause_talker = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, "-d"))); + } + + // Initialization and configuration + rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor exec; + + std::string topic("qos_deadline_chatter"); + + rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + qos_profile.deadline.sec = deadline_duration_ms / 1000; + qos_profile.deadline.nsec = (deadline_duration_ms % 1000) * 1000000; + printf("Qos prof sec %zu nsec %zu\n", qos_profile.deadline.sec, qos_profile.deadline.nsec); + + rclcpp::SubscriptionOptions<> sub_options(qos_profile); + sub_options.deadline_callback( + [](rclcpp::QOSDeadlineRequestedInfo & /* event */) -> void + { + RCUTILS_LOG_INFO("Requested deadline missed event"); + }); + + rclcpp::PublisherOptions<> pub_options(qos_profile); + + + auto talker = std::make_shared(topic, pub_options); + auto listener = std::make_shared(topic, sub_options); + + auto pause_timer = talker->create_wall_timer( + period_pause_talker, + [talker, duration_pause_talker]() -> void { + talker->pause_for(duration_pause_talker); + }); + + // Execution + exec.add_node(talker); + exec.add_node(listener); + exec.spin(); + + // Cleanup + rclcpp::shutdown(); + return 0; +} diff --git a/quality_of_service_demo/src/lifespan.cpp b/quality_of_service_demo/src/lifespan.cpp new file mode 100644 index 000000000..e93ae546d --- /dev/null +++ b/quality_of_service_demo/src/lifespan.cpp @@ -0,0 +1,111 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rcutils/cmdline_parser.h" + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" + +#include "quality_of_service_demo/common_nodes.hpp" + +static const size_t default_history = 10; +static const size_t default_publish_n_messages = 5; +static const size_t default_subscribe_after_duration_ms = 5000; + +void print_usage() +{ + printf("Usage for lifespan:\n"); + printf("lifespan lifespan_duration [--history history_depth] [-p publish_n_messages] " + "[-s subscribe_after_duration] [-h]\n"); + printf("required arguments:\n"); + printf("lifespan duration: Duration (in ms) of the Lifespan QoS setting.\n"); + printf("options:\n"); + printf("-h : Print this help function.\n"); + printf("--history : The depth of the Publisher's history queue - the maximum number of messages " + "it will store. Defaults to %zu\n", default_history); + printf("-p publish_n_messages : How many messages to publish before stopping. Defaults to %zu\n", + default_publish_n_messages); + printf("-s subscribe_after_duration : The Subscriber will be created this long (in ms) after " + "application startup. Defaults to %zu\n", default_subscribe_after_duration_ms); +} + +int main(int argc, char * argv[]) +{ + // Force flush of the stdout buffer. + setvbuf(stdout, NULL, _IONBF, BUFSIZ); + + // Argument parsing + if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { + print_usage(); + return 0; + } + + // Required arguments + size_t lifespan_duration_ms = std::stoul(argv[1]); + + // Optional argument default values + size_t history = default_history; + size_t publish_n_messages = default_publish_n_messages; + std::chrono::milliseconds subscribe_after_duration(default_subscribe_after_duration_ms); + + // Optional argument parsing + if (rcutils_cli_option_exist(argv, argv + argc, "--history")) { + history = std::stoul(rcutils_cli_get_option(argv, argv + argc, "--history")); + } + if (rcutils_cli_option_exist(argv, argv + argc, "-p")) { + publish_n_messages = std::stoul(rcutils_cli_get_option(argv, argv + argc, "-p")); + } + if (rcutils_cli_option_exist(argv, argv + argc, "-s")) { + subscribe_after_duration = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, "-s"))); + } + + // Configuration and Initialization + rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor exec; + + std::string topic("qos_lifespan_chatter"); + + rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + qos_profile.depth = history; + // Need this if we keep + qos_profile.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + qos_profile.lifespan.sec = lifespan_duration_ms / 1000; + qos_profile.lifespan.nsec = (lifespan_duration_ms % 1000) * 1000000; + + rclcpp::SubscriptionOptions<> sub_options(qos_profile); + rclcpp::PublisherOptions<> pub_options(qos_profile); + + auto listener = std::make_shared(topic, sub_options, true); + auto talker = std::make_shared(topic, pub_options, publish_n_messages); + + exec.add_node(talker); + exec.add_node(listener); + auto timer = listener->create_wall_timer( + subscribe_after_duration, + [listener]() -> void { + listener->start_listening(); + }); + + // Execution + exec.spin(); + + // Cleanup + rclcpp::shutdown(); + return 0; +} diff --git a/quality_of_service_demo/src/liveliness.cpp b/quality_of_service_demo/src/liveliness.cpp new file mode 100644 index 000000000..f727c08c9 --- /dev/null +++ b/quality_of_service_demo/src/liveliness.cpp @@ -0,0 +1,153 @@ +// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "rcutils/cmdline_parser.h" + +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/executors/single_threaded_executor.hpp" + +#include "quality_of_service_demo/common_nodes.hpp" + +static const size_t default_kill_publisher_after_ms = 3000; + +void print_usage() +{ + printf("Usage for liveliness:\n"); + printf("liveliness lease_duration [-p liveliness_policy] [-n node_assert_liveliness_period] " + "[-t topic_assert_liveliness_period] [-h]\n"); + printf("required arguments:\n"); + printf("lease_duration: Duration (in ms) before an inactive Publisher is considered not-alive. " + "0 means infinity.\n"); + printf("options:\n"); + printf("-h : Print this help function.\n"); + printf("-p liveliness_policy : You may specify AUTOMATIC, MANUAL_BY_NODE, or MANUAL_BY_TOPIC. " + "Defaults to AUTOMATIC\n"); + printf("-n node_assert_period : How often the Publisher will assert the liveliness of its " + "Node (in ms). Defaults to 0 (never)\n"); + printf("-t topic_assert_period : How often the Publisher will assert the liveliness of its " + "Publisher (in ms). Defaults to 0 (never)\n"); + printf("-k kill_publisher_after : Kill the publisher after this amount of time (in ms). In " + "AUTOMATIC - destroy the whole node. In MANUAL_BY_NODE, stop node liveliness assertion. In " + "MANUAL_BY_TOPIC, stop topic liveliness assertion. Defaults to %zu\n", + default_kill_publisher_after_ms); +} + +int main(int argc, char * argv[]) +{ + // Argument parsing + if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { + print_usage(); + return 0; + } + // Required arguments + size_t liveliness_lease_duration_ms = std::stoul(argv[1]); + + // Optional argument default values + std::chrono::milliseconds node_assert_period(0); + std::chrono::milliseconds topic_assert_period(0); + std::chrono::milliseconds kill_publisher_after(default_kill_publisher_after_ms); + rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + + // Optional argument parsing + if (rcutils_cli_option_exist(argv, argv + argc, "-n")) { + node_assert_period = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, "-n"))); + } + if (rcutils_cli_option_exist(argv, argv + argc, "-t")) { + topic_assert_period = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, "-t"))); + } + if (rcutils_cli_option_exist(argv, argv + argc, "-k")) { + kill_publisher_after = std::chrono::milliseconds( + std::stoul(rcutils_cli_get_option(argv, argv + argc, "-k"))); + } + if (rcutils_cli_option_exist(argv, argv + argc, "-p")) { + char * policy_str = rcutils_cli_get_option(argv, argv + argc, "-p"); + if (strcmp(policy_str, "AUTOMATIC") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + } else if (strcmp(policy_str, "MANUAL_BY_NODE") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE; + } else if (strcmp(policy_str, "MANUAL_BY_TOPIC") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + } else { + printf("Unknown liveliness policy: %s\n", policy_str); + print_usage(); + return 1; + } + } + + // Configuration and Initialization + rclcpp::init(argc, argv); + rclcpp::executors::SingleThreadedExecutor exec; + + std::string topic("qos_liveliness_chatter"); + + rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + qos_profile.liveliness = liveliness_policy_kind; + qos_profile.liveliness_lease_duration.sec = liveliness_lease_duration_ms / 1000; + qos_profile.liveliness_lease_duration.nsec = (liveliness_lease_duration_ms % 1000) * 1000000; + + rclcpp::SubscriptionOptions<> sub_options(qos_profile); + sub_options.liveliness_callback( + [](rclcpp::QOSLivelinessChangedInfo & /* event */) -> void + { + RCUTILS_LOG_INFO("Liveliness changed event"); + }); + + rclcpp::PublisherOptions<> pub_options(qos_profile); + + auto listener = std::make_shared(topic, sub_options); + auto talker = std::make_shared( + topic, + pub_options, + 0, + node_assert_period, + topic_assert_period); + + // Execution + exec.add_node(listener); + exec.add_node(talker); + + auto timer = listener->create_wall_timer( + kill_publisher_after, + [&exec, talker, liveliness_policy_kind]() -> void { + switch (liveliness_policy_kind) { + case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: + exec.remove_node(talker); + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: + if (talker->assert_node_timer_) { + talker->assert_node_timer_->cancel(); + } + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: + if (talker->assert_topic_timer_) { + talker->assert_topic_timer_->cancel(); + } + break; + default: + break; + } + }); + + exec.spin(); + + // Cleanup + rclcpp::shutdown(); + return 0; +} From 882cce9a7d23350eb40759880e03aee749b05b87 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Thu, 28 Mar 2019 18:33:26 -0700 Subject: [PATCH 2/6] Comment out portions that require non-merged work Signed-off-by: Emerson Knapp --- quality_of_service_demo/README.md | 16 +++++++- quality_of_service_demo/src/common_nodes.cpp | 3 +- quality_of_service_demo/src/deadline.cpp | 41 +++++++++++++------ quality_of_service_demo/src/lifespan.cpp | 10 +++-- quality_of_service_demo/src/liveliness.cpp | 43 ++++++++++++++++---- 5 files changed, 86 insertions(+), 27 deletions(-) diff --git a/quality_of_service_demo/README.md b/quality_of_service_demo/README.md index 23ea8d947..c34ba2122 100644 --- a/quality_of_service_demo/README.md +++ b/quality_of_service_demo/README.md @@ -20,6 +20,12 @@ The Publisher in this demo will pause publishing periodically, which will print Run `quality_of_service_demo/deadline -h` to see the usage for more configuration options. +Examples: +* `deadline 600 -p 5000 -d 2000` + * The publisher will publish for 5 seconds, then pause for 2 - deadline events will start firing on both participants, until the publisher comes back +* `deadline 600 -p 5000 -d 0` + * The publisher doesn't actually pause, no deadline misses should occur + ## Lifespan `quality_of_service_demo/lifespan` demonstrates messages being deleted from a Publisher's outgoing queue once their configured lifespan expires. @@ -33,9 +39,9 @@ Examples: * `lifespan 1000 -s 3000 -p 10` * After a few seconds, you should see (approximately) messages 4-10 printed from the Subscriber * The first 3 messages, with 1 second lifespan, were gone by the time the Subscriber joined at 3 seconds -* `lifespan 3000 -s 3000 -p 10` +* `lifespan 4000 -s 3000 -p 10` * After a few seconds, you should see messages 0-10 printed from the Subscriber - * All messages, with their longer 3 second lifespan, survived until the subscriber joined. + * All messages, with their longer 4 second lifespan, survived until the subscriber joined. ## Liveliness `quality_of_service_demo/liveliness` demonstrates notification of liveliness changes, based on different Liveliness configurations. @@ -43,3 +49,9 @@ Examples: The application requires an argument `lease_duration` that specifies how often (in milliseconds) an entity must "check in" to let the system know it's alive. The Publisher in this demo will assert its liveliness based on passed in options, and be killed after some amount of time. When the Publisher is killed, you will see a Liveliness change event printed from the Publisher after `lease_duration` expires. + +Examples: +* `liveliness 1000 -k 2000` + * After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed +* `liveliness 1000 -n 2000 -p MANUAL_BY_NODE` + * The Subscriber will receive alternating alive/not-alive events every second. The Publisher asserts its liveliness every 2 seconds, but this is not frequent enough for the liveliness lease of 1 second diff --git a/quality_of_service_demo/src/common_nodes.cpp b/quality_of_service_demo/src/common_nodes.cpp index 546236fc1..10de4ecfb 100644 --- a/quality_of_service_demo/src/common_nodes.cpp +++ b/quality_of_service_demo/src/common_nodes.cpp @@ -29,7 +29,7 @@ Talker::Talker( max_count_(max_count) { RCLCPP_INFO(get_logger(), "Talker starting up"); - publisher_ = create_publisher(topic_name, nullptr, pub_options); + publisher_ = create_publisher(topic_name, pub_options); publish_timer_ = create_wall_timer( 500ms, [this]() -> void { @@ -96,6 +96,5 @@ void Listener::start_listening() { RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str()); }, - nullptr, sub_options_); } diff --git a/quality_of_service_demo/src/deadline.cpp b/quality_of_service_demo/src/deadline.cpp index 65967d639..54775455c 100644 --- a/quality_of_service_demo/src/deadline.cpp +++ b/quality_of_service_demo/src/deadline.cpp @@ -26,8 +26,8 @@ #include "quality_of_service_demo/common_nodes.hpp" -static const size_t default_period_pause_talker = 5000; -static const size_t default_duration_pause_talker = 1000; +static const unsigned default_period_pause_talker = 5000; +static const unsigned default_duration_pause_talker = 1000; void print_usage() { @@ -37,9 +37,9 @@ void print_usage() printf("deadline_duration: Duration (in ms) of the Deadline QoS setting.\n"); printf("options:\n"); printf("-h : Print this help function.\n"); - printf("-p period_pause_talker : How often to pause the talker (in ms). Defaults to %zu.\n", + printf("-p period_pause_talker : How often to pause the talker (in ms). Defaults to %u.\n", default_period_pause_talker); - printf("-d duration_pause_talker : How long to pause the talker (in ms). Defaults to %zu.\n", + printf("-d duration_pause_talker : How long to pause the talker (in ms). Defaults to %u.\n", default_duration_pause_talker); } @@ -79,18 +79,35 @@ int main(int argc, char * argv[]) std::string topic("qos_deadline_chatter"); rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + /* + TODO(emersonknapp) once the new types have been added qos_profile.deadline.sec = deadline_duration_ms / 1000; qos_profile.deadline.nsec = (deadline_duration_ms % 1000) * 1000000; - printf("Qos prof sec %zu nsec %zu\n", qos_profile.deadline.sec, qos_profile.deadline.nsec); - - rclcpp::SubscriptionOptions<> sub_options(qos_profile); - sub_options.deadline_callback( - [](rclcpp::QOSDeadlineRequestedInfo & /* event */) -> void + */ + + rclcpp::SubscriptionOptions<> sub_options; + sub_options.qos_profile = qos_profile; + /* + TODO(emersonknapp) once the callbacks have been added + sub_options.event_callbacks.deadline_callback = + [](rclcpp::DeadlineRequestedInfo & event) -> void { - RCUTILS_LOG_INFO("Requested deadline missed event"); + RCUTILS_LOG_INFO("Requested deadline missed - total %d delta %d", + event.total_count, event.total_count_change); }); - - rclcpp::PublisherOptions<> pub_options(qos_profile); + */ + + rclcpp::PublisherOptions<> pub_options; + pub_options.qos_profile = qos_profile; + /* + TODO(emersonknapp) once the callbacks have been added + pub_options.event_callbacks.deadline_callback = + [](rclcpp::DeadlineOfferedInfo & event) -> void + { + RCUTILS_LOG_INFO("Offered deadline missed - total %d delta %d", + event.total_count, event.total_count_change); + }); + */ auto talker = std::make_shared(topic, pub_options); diff --git a/quality_of_service_demo/src/lifespan.cpp b/quality_of_service_demo/src/lifespan.cpp index e93ae546d..fbdcc9128 100644 --- a/quality_of_service_demo/src/lifespan.cpp +++ b/quality_of_service_demo/src/lifespan.cpp @@ -83,13 +83,17 @@ int main(int argc, char * argv[]) rmw_qos_profile_t qos_profile = rmw_qos_profile_default; qos_profile.depth = history; - // Need this if we keep qos_profile.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; + /* + TODO(emersonknapp) once new types are available qos_profile.lifespan.sec = lifespan_duration_ms / 1000; qos_profile.lifespan.nsec = (lifespan_duration_ms % 1000) * 1000000; + */ - rclcpp::SubscriptionOptions<> sub_options(qos_profile); - rclcpp::PublisherOptions<> pub_options(qos_profile); + rclcpp::SubscriptionOptions<> sub_options; + sub_options.qos_profile = qos_profile; + rclcpp::PublisherOptions<> pub_options; + pub_options.qos_profile = qos_profile; auto listener = std::make_shared(topic, sub_options, true); auto talker = std::make_shared(topic, pub_options, publish_n_messages); diff --git a/quality_of_service_demo/src/liveliness.cpp b/quality_of_service_demo/src/liveliness.cpp index f727c08c9..5ce487ee2 100644 --- a/quality_of_service_demo/src/liveliness.cpp +++ b/quality_of_service_demo/src/liveliness.cpp @@ -61,7 +61,8 @@ int main(int argc, char * argv[]) std::chrono::milliseconds node_assert_period(0); std::chrono::milliseconds topic_assert_period(0); std::chrono::milliseconds kill_publisher_after(default_kill_publisher_after_ms); - rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + // TODO(emersonknapp) once new types are available + // rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; // Optional argument parsing if (rcutils_cli_option_exist(argv, argv + argc, "-n")) { @@ -78,6 +79,8 @@ int main(int argc, char * argv[]) } if (rcutils_cli_option_exist(argv, argv + argc, "-p")) { char * policy_str = rcutils_cli_get_option(argv, argv + argc, "-p"); + /* + TODO(emersonknapp) once new types are available if (strcmp(policy_str, "AUTOMATIC") == 0) { liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; } else if (strcmp(policy_str, "MANUAL_BY_NODE") == 0) { @@ -85,10 +88,11 @@ int main(int argc, char * argv[]) } else if (strcmp(policy_str, "MANUAL_BY_TOPIC") == 0) { liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; } else { + */ printf("Unknown liveliness policy: %s\n", policy_str); print_usage(); return 1; - } + // } } // Configuration and Initialization @@ -98,18 +102,30 @@ int main(int argc, char * argv[]) std::string topic("qos_liveliness_chatter"); rmw_qos_profile_t qos_profile = rmw_qos_profile_default; + /* + TODO(emersonknapp) once new types are available qos_profile.liveliness = liveliness_policy_kind; qos_profile.liveliness_lease_duration.sec = liveliness_lease_duration_ms / 1000; qos_profile.liveliness_lease_duration.nsec = (liveliness_lease_duration_ms % 1000) * 1000000; - - rclcpp::SubscriptionOptions<> sub_options(qos_profile); - sub_options.liveliness_callback( - [](rclcpp::QOSLivelinessChangedInfo & /* event */) -> void + */ + + rclcpp::SubscriptionOptions<> sub_options; + sub_options.qos_profile = qos_profile; + /* + TODO(emersonknapp) once callbacks are available + sub_options.event_callbacks.liveliness_callback = + [](rclcpp::QOSLivelinessChangedInfo & event) -> void { - RCUTILS_LOG_INFO("Liveliness changed event"); + printf("Liveliness changed event: \n"); + printf(" alive_count: %d\n", event.alive_count); + printf(" not_alive_count: %d\n", event.not_alive_count); + printf(" alive_count_change: %d\n", event.alive_count_change); + printf(" not_alive_count_change: %d\n", event.not_alive_count_change); }); + */ - rclcpp::PublisherOptions<> pub_options(qos_profile); + rclcpp::PublisherOptions<> pub_options; + pub_options.qos_profile = qos_profile; auto listener = std::make_shared(topic, sub_options); auto talker = std::make_shared( @@ -125,6 +141,16 @@ int main(int argc, char * argv[]) auto timer = listener->create_wall_timer( kill_publisher_after, + [talker]() -> void { + if (talker->assert_node_timer_) { + talker->assert_node_timer_->cancel(); + } + if (talker->assert_topic_timer_) { + talker->assert_topic_timer_->cancel(); + } + }); + /* + TODO(emersonknapp) once new types are available [&exec, talker, liveliness_policy_kind]() -> void { switch (liveliness_policy_kind) { case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: @@ -144,6 +170,7 @@ int main(int argc, char * argv[]) break; } }); + */ exec.spin(); From 3e0c64171c78385cda3378d0f0954660be5ddfe6 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Wed, 1 May 2019 18:21:51 -0700 Subject: [PATCH 3/6] Uncomment functionality, fix build, fix docs, clarify CLI options Signed-off-by: Emerson Knapp --- quality_of_service_demo/README.md | 55 ++++++---- .../quality_of_service_demo/common_nodes.hpp | 8 +- quality_of_service_demo/src/common_nodes.cpp | 15 +-- quality_of_service_demo/src/deadline.cpp | 53 ++++------ quality_of_service_demo/src/lifespan.cpp | 52 ++++----- quality_of_service_demo/src/liveliness.cpp | 100 ++++++++---------- 6 files changed, 138 insertions(+), 145 deletions(-) diff --git a/quality_of_service_demo/README.md b/quality_of_service_demo/README.md index c34ba2122..93ce0f476 100644 --- a/quality_of_service_demo/README.md +++ b/quality_of_service_demo/README.md @@ -1,57 +1,68 @@ # Quality of Service Demos -The demo applications in this package show the various Quality of Service settings available in ROS2. The intent is to provide a dedicated application for each QoS type, in order to highlight that single feature in isolation. +The demo applications in this package show the various Quality of Service settings available in ROS2. +The intent is to provide a dedicated application for each QoS type, in order to highlight that single feature in isolation. ## History -No History demo app exists yet in this package + +No History QoS demo app exists yet in this package. ## Reliability -No Reliability demo app exists yet in this package + +No Reliability QoS demo app exists yet in this package. ## Durability -No Durability demo app exists yet in this package + +No Durability QoS demo app exists yet in this package. ## Deadline + `quality_of_service_demo/deadline` demonstrates messages not meeting their required deadline. -The application requires an argument a `deadline_duration` - this is the maximum acceptable time (in milliseconds) between messages published on the topic. +The application requires an argument a `deadline_duration` - this is the maximum acceptable time (in positive integer milliseconds) between messages published on the topic. +A `deadline_duration` of `0` means that there is no deadline, so the application will act as a standard Talker/Listener pair. The Publisher in this demo will pause publishing periodically, which will print deadline expirations from the Subscriber. -Run `quality_of_service_demo/deadline -h` to see the usage for more configuration options. +Run `quality_of_service_demo/deadline -h` for more usage information. Examples: -* `deadline 600 -p 5000 -d 2000` - * The publisher will publish for 5 seconds, then pause for 2 - deadline events will start firing on both participants, until the publisher comes back -* `deadline 600 -p 5000 -d 0` - * The publisher doesn't actually pause, no deadline misses should occur +* `deadline 600 --publish-for 5000 --pause-for 2000` + * The publisher will publish for 5 seconds, then pause for 2 - deadline events will start firing on both participants, until the publisher comes back. +* `deadline 600 --publish-for 5000 --pause-for 0` + * The publisher doesn't actually pause, no deadline misses should occur. ## Lifespan + `quality_of_service_demo/lifespan` demonstrates messages being deleted from a Publisher's outgoing queue once their configured lifespan expires. -The application requires an argument `lifespan_duration` - this is the maximum time (in milliseconds) that a message will sit in the Publisher's outgoing queue. +The application requires an argument `lifespan_duration` - this is the maximum time (in positive integer milliseconds) that a message will sit in the Publisher's outgoing queue. -The Publisher in this demo will publish a preset number of messaegs, then stop Publishing. A Subscriber will start subscribing after a preset amount of time and should receive _fewer_ messages backfilled than were originally published, because some of them outlasted their lifespan and were removed. +The Publisher in this demo will publish a preset number of messages, then stop publishing. +A Subscriber will start subscribing after a preset amount of time and may receive _fewer_ backfilled messages than were originally published, because some message can outlast their lifespan and be removed. -Run `quality_of_service_demo/lifespan -h` to see the usage for more configuration options. +Run `quality_of_service_demo/lifespan -h` for more usage information. Examples: -* `lifespan 1000 -s 3000 -p 10` - * After a few seconds, you should see (approximately) messages 4-10 printed from the Subscriber - * The first 3 messages, with 1 second lifespan, were gone by the time the Subscriber joined at 3 seconds -* `lifespan 4000 -s 3000 -p 10` - * After a few seconds, you should see messages 0-10 printed from the Subscriber - * All messages, with their longer 4 second lifespan, survived until the subscriber joined. +* `lifespan 1000 --publish-count 10 --subscribe-after 3000` + * After a few seconds, you should see (approximately) messages 4-10 printed from the Subscriber. + * The first 3 messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds. +* `lifespan 4000 --publish-count 10 --subscribe-after 3000` + * After a few seconds, you should see all of the messages (0-9) printed from the Subscriber. + * All messages, with their 4 second lifespan, survived until the subscriber joined. ## Liveliness + `quality_of_service_demo/liveliness` demonstrates notification of liveliness changes, based on different Liveliness configurations. The application requires an argument `lease_duration` that specifies how often (in milliseconds) an entity must "check in" to let the system know it's alive. The Publisher in this demo will assert its liveliness based on passed in options, and be killed after some amount of time. When the Publisher is killed, you will see a Liveliness change event printed from the Publisher after `lease_duration` expires. +Run `quality_of_service_demo/liveliness -h` for more usage information. + Examples: -* `liveliness 1000 -k 2000` +* `liveliness 1000 --kill-publisher-after 2000` * After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed -* `liveliness 1000 -n 2000 -p MANUAL_BY_NODE` - * The Subscriber will receive alternating alive/not-alive events every second. The Publisher asserts its liveliness every 2 seconds, but this is not frequent enough for the liveliness lease of 1 second +* `liveliness 1000 --node-assert-period 2000 --policy MANUAL_BY_NODE` + * The Subscriber will receive alternating alive/not-alive events every second. The Publisher asserts its liveliness every 2 seconds, but this is not frequent enough for the liveliness lease duration of 1 second. diff --git a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp index f0d9656c1..7e1d18349 100644 --- a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp +++ b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp @@ -22,12 +22,14 @@ #include "rclcpp/rclcpp.hpp" #include "std_msgs/msg/string.hpp" +static const uint32_t MILLION = 1000L * 1000L; + class Talker : public rclcpp::Node { public: Talker( const std::string & topic_name, - rclcpp::PublisherOptions<> pub_options, + rclcpp::PublisherOptions pub_options, size_t max_count = 0, std::chrono::milliseconds assert_node_period = std::chrono::milliseconds(0), std::chrono::milliseconds assert_topic_period = std::chrono::milliseconds(0)); @@ -48,13 +50,13 @@ class Listener : public rclcpp::Node public: Listener( const std::string & topic_name, - rclcpp::SubscriptionOptions<> sub_options, + rclcpp::SubscriptionOptions sub_options, bool defer_subscribe = false); void start_listening(); private: rclcpp::Subscription::SharedPtr subscription_ = nullptr; - rclcpp::SubscriptionOptions<> sub_options_; + rclcpp::SubscriptionOptions sub_options_; std::string topic_; }; diff --git a/quality_of_service_demo/src/common_nodes.cpp b/quality_of_service_demo/src/common_nodes.cpp index 10de4ecfb..dd5e1bd22 100644 --- a/quality_of_service_demo/src/common_nodes.cpp +++ b/quality_of_service_demo/src/common_nodes.cpp @@ -21,7 +21,7 @@ using namespace std::chrono_literals; Talker::Talker( const std::string & topic_name, - rclcpp::PublisherOptions<> pub_options, + rclcpp::PublisherOptions pub_options, size_t max_count, std::chrono::milliseconds assert_node_period, std::chrono::milliseconds assert_topic_period) @@ -29,7 +29,7 @@ Talker::Talker( max_count_(max_count) { RCLCPP_INFO(get_logger(), "Talker starting up"); - publisher_ = create_publisher(topic_name, pub_options); + publisher_ = create_publisher(topic_name, pub_options.qos_profile.depth, pub_options); publish_timer_ = create_wall_timer( 500ms, [this]() -> void { @@ -44,16 +44,16 @@ Talker::Talker( if (assert_node_period != 0ms) { assert_node_timer_ = create_wall_timer( assert_node_period, - [this]() -> void { - this->assert_liveliness(); + [this]() -> bool { + return this->assert_liveliness(); }); } // If enabled, create timer to assert liveliness on the topic if (assert_topic_period != 0ms) { assert_topic_timer_ = create_wall_timer( assert_topic_period, - [this]() -> void { - publisher_->assert_liveliness(); + [this]() -> bool { + return publisher_->assert_liveliness(); }); } } @@ -76,7 +76,7 @@ void Talker::pause_for(std::chrono::milliseconds pause_length) Listener::Listener( const std::string & topic_name, - rclcpp::SubscriptionOptions<> sub_options, + rclcpp::SubscriptionOptions sub_options, bool defer_subscribe) : Node("listener"), sub_options_(sub_options), @@ -96,5 +96,6 @@ void Listener::start_listening() { RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str()); }, + sub_options_.qos_profile.depth, sub_options_); } diff --git a/quality_of_service_demo/src/deadline.cpp b/quality_of_service_demo/src/deadline.cpp index 54775455c..d18b05257 100644 --- a/quality_of_service_demo/src/deadline.cpp +++ b/quality_of_service_demo/src/deadline.cpp @@ -26,21 +26,23 @@ #include "quality_of_service_demo/common_nodes.hpp" -static const unsigned default_period_pause_talker = 5000; -static const unsigned default_duration_pause_talker = 1000; +static const char * OPTION_PUBLISH_FOR = "--publish-for"; +static const uint32_t DEFAULT_PUBLISH_FOR = 5000; +static const char * OPTION_PAUSE_FOR = "--pause-for"; +static const uint32_t DEFAULT_PAUSE_FOR = 1000; void print_usage() { printf("Usage for deadline:\n"); printf("deadline deadline_duration [-p period_pause_talker] [-d duration_pause_talker] [-h]\n"); printf("required arguments:\n"); - printf("deadline_duration: Duration (in ms) of the Deadline QoS setting.\n"); + printf("deadline_duration: Duration (in uint milliseconds) of the Deadline QoS setting.\n"); printf("options:\n"); - printf("-h : Print this help function.\n"); - printf("-p period_pause_talker : How often to pause the talker (in ms). Defaults to %u.\n", - default_period_pause_talker); - printf("-d duration_pause_talker : How long to pause the talker (in ms). Defaults to %u.\n", - default_duration_pause_talker); + printf("-h : Print this help message.\n"); + printf("%s duration_publish_for : How long to publish (in uint milliseconds) until pausing the talker. Defaults to %lu.\n", + OPTION_PUBLISH_FOR, (unsigned long)DEFAULT_PUBLISH_FOR); + printf("%s duration_pause_for : How long to pause the talker (in uint milliseconds) before beginning to publish again. Defaults to %lu.\n", + OPTION_PAUSE_FOR, (unsigned long)DEFAULT_PAUSE_FOR); } @@ -59,17 +61,17 @@ int main(int argc, char * argv[]) size_t deadline_duration_ms = std::stoul(argv[1]); // Optional argument default values - std::chrono::milliseconds period_pause_talker(default_period_pause_talker); - std::chrono::milliseconds duration_pause_talker(default_duration_pause_talker); + std::chrono::milliseconds period_pause_talker(DEFAULT_PUBLISH_FOR); + std::chrono::milliseconds duration_pause_talker(DEFAULT_PAUSE_FOR); // Optional argument parsing - if (rcutils_cli_option_exist(argv, argv + argc, "-p")) { + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_PUBLISH_FOR)) { period_pause_talker = std::chrono::milliseconds( - std::stoul(rcutils_cli_get_option(argv, argv + argc, "-p"))); + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_PUBLISH_FOR))); } - if (rcutils_cli_option_exist(argv, argv + argc, "-d")) { + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_PAUSE_FOR)) { duration_pause_talker = std::chrono::milliseconds( - std::stoul(rcutils_cli_get_option(argv, argv + argc, "-d"))); + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_PAUSE_FOR))); } // Initialization and configuration @@ -79,35 +81,26 @@ int main(int argc, char * argv[]) std::string topic("qos_deadline_chatter"); rmw_qos_profile_t qos_profile = rmw_qos_profile_default; - /* - TODO(emersonknapp) once the new types have been added qos_profile.deadline.sec = deadline_duration_ms / 1000; - qos_profile.deadline.nsec = (deadline_duration_ms % 1000) * 1000000; - */ + qos_profile.deadline.nsec = (deadline_duration_ms % 1000) * MILLION; - rclcpp::SubscriptionOptions<> sub_options; + rclcpp::SubscriptionOptions sub_options; sub_options.qos_profile = qos_profile; - /* - TODO(emersonknapp) once the callbacks have been added sub_options.event_callbacks.deadline_callback = - [](rclcpp::DeadlineRequestedInfo & event) -> void + [](rclcpp::QOSDeadlineRequestedInfo & event) -> void { RCUTILS_LOG_INFO("Requested deadline missed - total %d delta %d", event.total_count, event.total_count_change); - }); - */ + }; - rclcpp::PublisherOptions<> pub_options; + rclcpp::PublisherOptions pub_options; pub_options.qos_profile = qos_profile; - /* - TODO(emersonknapp) once the callbacks have been added pub_options.event_callbacks.deadline_callback = - [](rclcpp::DeadlineOfferedInfo & event) -> void + [](rclcpp::QOSDeadlineOfferedInfo & event) -> void { RCUTILS_LOG_INFO("Offered deadline missed - total %d delta %d", event.total_count, event.total_count_change); - }); - */ + }; auto talker = std::make_shared(topic, pub_options); diff --git a/quality_of_service_demo/src/lifespan.cpp b/quality_of_service_demo/src/lifespan.cpp index fbdcc9128..3d39b310d 100644 --- a/quality_of_service_demo/src/lifespan.cpp +++ b/quality_of_service_demo/src/lifespan.cpp @@ -23,9 +23,12 @@ #include "quality_of_service_demo/common_nodes.hpp" -static const size_t default_history = 10; -static const size_t default_publish_n_messages = 5; -static const size_t default_subscribe_after_duration_ms = 5000; +static const char * OPTION_HISTORY = "--history"; +static const uint32_t DEFAULT_HISTORY = 10; +static const char * OPTION_PUBLISH_COUNT = "--publish-count"; +static const uint32_t DEFAULT_PUBLISH_COUNT = 5; +static const char * OPTION_SUBSCRIBE_AFTER = "--subscribe-after"; +static const uint32_t DEFAULT_SUBSCRIBE_AFTER = 5000; void print_usage() { @@ -35,13 +38,13 @@ void print_usage() printf("required arguments:\n"); printf("lifespan duration: Duration (in ms) of the Lifespan QoS setting.\n"); printf("options:\n"); - printf("-h : Print this help function.\n"); - printf("--history : The depth of the Publisher's history queue - the maximum number of messages " - "it will store. Defaults to %zu\n", default_history); - printf("-p publish_n_messages : How many messages to publish before stopping. Defaults to %zu\n", - default_publish_n_messages); - printf("-s subscribe_after_duration : The Subscriber will be created this long (in ms) after " - "application startup. Defaults to %zu\n", default_subscribe_after_duration_ms); + printf("-h : Print this help message.\n"); + printf("%s : The depth of the Publisher's history queue - the maximum number of messages " + "it will store. Defaults to %lu\n", OPTION_HISTORY, (unsigned long)DEFAULT_HISTORY); + printf("%s publish_n_messages : How many messages to publish before stopping. Defaults to %lu\n", + OPTION_PUBLISH_COUNT, (unsigned long)DEFAULT_PUBLISH_COUNT); + printf("%s subscribe_after_duration : The Subscriber will be created this long (in ms) after " + "application startup. Defaults to %lu\n", OPTION_SUBSCRIBE_AFTER, (unsigned long)DEFAULT_SUBSCRIBE_AFTER); } int main(int argc, char * argv[]) @@ -59,20 +62,20 @@ int main(int argc, char * argv[]) size_t lifespan_duration_ms = std::stoul(argv[1]); // Optional argument default values - size_t history = default_history; - size_t publish_n_messages = default_publish_n_messages; - std::chrono::milliseconds subscribe_after_duration(default_subscribe_after_duration_ms); + size_t history = DEFAULT_HISTORY; + size_t publish_count = DEFAULT_PUBLISH_COUNT; + std::chrono::milliseconds subscribe_after_duration(DEFAULT_SUBSCRIBE_AFTER); // Optional argument parsing - if (rcutils_cli_option_exist(argv, argv + argc, "--history")) { - history = std::stoul(rcutils_cli_get_option(argv, argv + argc, "--history")); + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_HISTORY)) { + history = std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_HISTORY)); } - if (rcutils_cli_option_exist(argv, argv + argc, "-p")) { - publish_n_messages = std::stoul(rcutils_cli_get_option(argv, argv + argc, "-p")); + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_PUBLISH_COUNT)) { + publish_count = std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_PUBLISH_COUNT)); } - if (rcutils_cli_option_exist(argv, argv + argc, "-s")) { + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_SUBSCRIBE_AFTER)) { subscribe_after_duration = std::chrono::milliseconds( - std::stoul(rcutils_cli_get_option(argv, argv + argc, "-s"))); + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_SUBSCRIBE_AFTER))); } // Configuration and Initialization @@ -84,19 +87,16 @@ int main(int argc, char * argv[]) rmw_qos_profile_t qos_profile = rmw_qos_profile_default; qos_profile.depth = history; qos_profile.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; - /* - TODO(emersonknapp) once new types are available qos_profile.lifespan.sec = lifespan_duration_ms / 1000; - qos_profile.lifespan.nsec = (lifespan_duration_ms % 1000) * 1000000; - */ + qos_profile.lifespan.nsec = (lifespan_duration_ms % 1000) * MILLION; - rclcpp::SubscriptionOptions<> sub_options; + rclcpp::SubscriptionOptions sub_options; sub_options.qos_profile = qos_profile; - rclcpp::PublisherOptions<> pub_options; + rclcpp::PublisherOptions pub_options; pub_options.qos_profile = qos_profile; auto listener = std::make_shared(topic, sub_options, true); - auto talker = std::make_shared(topic, pub_options, publish_n_messages); + auto talker = std::make_shared(topic, pub_options, publish_count); exec.add_node(talker); exec.add_node(listener); diff --git a/quality_of_service_demo/src/liveliness.cpp b/quality_of_service_demo/src/liveliness.cpp index 5ce487ee2..ec1c9ee82 100644 --- a/quality_of_service_demo/src/liveliness.cpp +++ b/quality_of_service_demo/src/liveliness.cpp @@ -23,7 +23,12 @@ #include "quality_of_service_demo/common_nodes.hpp" -static const size_t default_kill_publisher_after_ms = 3000; +static const char * OPTION_POLICY = "--policy"; +static const char * DEFAULT_POLICY = "AUTOMATIC"; +static const char * OPTION_NODE_ASSERT_PERIOD = "--node-assert-period"; +static const char * OPTION_TOPIC_ASSERT_PERIOD = "--topic-assert-period"; +static const char * OPTION_KILL_PUBLISHER_AFTER = "--kill-publisher-after"; +static const uint32_t DEFAULT_KILL_PUBLISHER_AFTER = 3000; void print_usage() { @@ -34,17 +39,17 @@ void print_usage() printf("lease_duration: Duration (in ms) before an inactive Publisher is considered not-alive. " "0 means infinity.\n"); printf("options:\n"); - printf("-h : Print this help function.\n"); - printf("-p liveliness_policy : You may specify AUTOMATIC, MANUAL_BY_NODE, or MANUAL_BY_TOPIC. " - "Defaults to AUTOMATIC\n"); - printf("-n node_assert_period : How often the Publisher will assert the liveliness of its " - "Node (in ms). Defaults to 0 (never)\n"); - printf("-t topic_assert_period : How often the Publisher will assert the liveliness of its " - "Publisher (in ms). Defaults to 0 (never)\n"); - printf("-k kill_publisher_after : Kill the publisher after this amount of time (in ms). In " + printf("-h : Print this help message.\n"); + printf("%s liveliness_policy : You may specify AUTOMATIC, MANUAL_BY_NODE, or MANUAL_BY_TOPIC. " + "Defaults to %s\n", OPTION_POLICY, DEFAULT_POLICY); + printf("%s node_assert_period : How often the Publisher will assert the liveliness of its " + "Node (in uint milliseconds). Defaults to 0 (never)\n", OPTION_NODE_ASSERT_PERIOD); + printf("%s topic_assert_period : How often the Publisher will assert the liveliness of its " + "Publisher (in uint milliseconds). Defaults to 0 (never)\n", OPTION_TOPIC_ASSERT_PERIOD); + printf("%s kill_publisher_after : Kill the publisher after this amount of time (in uint milliseconds). In " "AUTOMATIC - destroy the whole node. In MANUAL_BY_NODE, stop node liveliness assertion. In " - "MANUAL_BY_TOPIC, stop topic liveliness assertion. Defaults to %zu\n", - default_kill_publisher_after_ms); + "MANUAL_BY_TOPIC, stop topic liveliness assertion. Defaults to %lu\n", + OPTION_KILL_PUBLISHER_AFTER, (unsigned long)DEFAULT_KILL_PUBLISHER_AFTER); } int main(int argc, char * argv[]) @@ -60,39 +65,37 @@ int main(int argc, char * argv[]) // Optional argument default values std::chrono::milliseconds node_assert_period(0); std::chrono::milliseconds topic_assert_period(0); - std::chrono::milliseconds kill_publisher_after(default_kill_publisher_after_ms); - // TODO(emersonknapp) once new types are available - // rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + std::chrono::milliseconds kill_publisher_after(DEFAULT_KILL_PUBLISHER_AFTER); + const char * policy_name = DEFAULT_POLICY; // Optional argument parsing - if (rcutils_cli_option_exist(argv, argv + argc, "-n")) { + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD)) { node_assert_period = std::chrono::milliseconds( - std::stoul(rcutils_cli_get_option(argv, argv + argc, "-n"))); + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD))); } - if (rcutils_cli_option_exist(argv, argv + argc, "-t")) { + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_TOPIC_ASSERT_PERIOD)) { topic_assert_period = std::chrono::milliseconds( - std::stoul(rcutils_cli_get_option(argv, argv + argc, "-t"))); + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_TOPIC_ASSERT_PERIOD))); } - if (rcutils_cli_option_exist(argv, argv + argc, "-k")) { + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_KILL_PUBLISHER_AFTER)) { kill_publisher_after = std::chrono::milliseconds( - std::stoul(rcutils_cli_get_option(argv, argv + argc, "-k"))); + std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_KILL_PUBLISHER_AFTER))); } - if (rcutils_cli_option_exist(argv, argv + argc, "-p")) { - char * policy_str = rcutils_cli_get_option(argv, argv + argc, "-p"); - /* - TODO(emersonknapp) once new types are available - if (strcmp(policy_str, "AUTOMATIC") == 0) { - liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; - } else if (strcmp(policy_str, "MANUAL_BY_NODE") == 0) { - liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE; - } else if (strcmp(policy_str, "MANUAL_BY_TOPIC") == 0) { - liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; - } else { - */ - printf("Unknown liveliness policy: %s\n", policy_str); - print_usage(); - return 1; - // } + if (rcutils_cli_option_exist(argv, argv + argc, OPTION_POLICY)) { + policy_name = rcutils_cli_get_option(argv, argv + argc, OPTION_POLICY); + } + + rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + if (strcmp(policy_name, "AUTOMATIC") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + } else if (strcmp(policy_name, "MANUAL_BY_NODE") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE; + } else if (strcmp(policy_name, "MANUAL_BY_TOPIC") == 0) { + liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC; + } else { + printf("Unknown liveliness policy: %s\n", policy_name); + print_usage(); + return 1; } // Configuration and Initialization @@ -102,17 +105,12 @@ int main(int argc, char * argv[]) std::string topic("qos_liveliness_chatter"); rmw_qos_profile_t qos_profile = rmw_qos_profile_default; - /* - TODO(emersonknapp) once new types are available qos_profile.liveliness = liveliness_policy_kind; qos_profile.liveliness_lease_duration.sec = liveliness_lease_duration_ms / 1000; - qos_profile.liveliness_lease_duration.nsec = (liveliness_lease_duration_ms % 1000) * 1000000; - */ + qos_profile.liveliness_lease_duration.nsec = (liveliness_lease_duration_ms % 1000) * MILLION; - rclcpp::SubscriptionOptions<> sub_options; + rclcpp::SubscriptionOptions sub_options; sub_options.qos_profile = qos_profile; - /* - TODO(emersonknapp) once callbacks are available sub_options.event_callbacks.liveliness_callback = [](rclcpp::QOSLivelinessChangedInfo & event) -> void { @@ -121,10 +119,9 @@ int main(int argc, char * argv[]) printf(" not_alive_count: %d\n", event.not_alive_count); printf(" alive_count_change: %d\n", event.alive_count_change); printf(" not_alive_count_change: %d\n", event.not_alive_count_change); - }); - */ + }; - rclcpp::PublisherOptions<> pub_options; + rclcpp::PublisherOptions pub_options; pub_options.qos_profile = qos_profile; auto listener = std::make_shared(topic, sub_options); @@ -141,16 +138,6 @@ int main(int argc, char * argv[]) auto timer = listener->create_wall_timer( kill_publisher_after, - [talker]() -> void { - if (talker->assert_node_timer_) { - talker->assert_node_timer_->cancel(); - } - if (talker->assert_topic_timer_) { - talker->assert_topic_timer_->cancel(); - } - }); - /* - TODO(emersonknapp) once new types are available [&exec, talker, liveliness_policy_kind]() -> void { switch (liveliness_policy_kind) { case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: @@ -170,7 +157,6 @@ int main(int argc, char * argv[]) break; } }); - */ exec.spin(); From 895c1a2233ee223c0ee0bfce8534814b3b5a2ed2 Mon Sep 17 00:00:00 2001 From: Miaofei Date: Thu, 9 May 2019 15:38:06 -0700 Subject: [PATCH 4/6] update to be compatible with latest rclcpp API Signed-off-by: Miaofei --- .../quality_of_service_demo/common_nodes.hpp | 7 +++++-- quality_of_service_demo/src/common_nodes.cpp | 11 +++++++---- quality_of_service_demo/src/deadline.cpp | 14 +++++--------- quality_of_service_demo/src/lifespan.cpp | 16 ++++++---------- quality_of_service_demo/src/liveliness.cpp | 17 +++++++++-------- 5 files changed, 32 insertions(+), 33 deletions(-) diff --git a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp index 7e1d18349..1451a22e8 100644 --- a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp +++ b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp @@ -29,7 +29,8 @@ class Talker : public rclcpp::Node public: Talker( const std::string & topic_name, - rclcpp::PublisherOptions pub_options, + const rclcpp::QoS & qos_profile, + const rclcpp::PublisherOptions & pub_options, size_t max_count = 0, std::chrono::milliseconds assert_node_period = std::chrono::milliseconds(0), std::chrono::milliseconds assert_topic_period = std::chrono::milliseconds(0)); @@ -50,12 +51,14 @@ class Listener : public rclcpp::Node public: Listener( const std::string & topic_name, - rclcpp::SubscriptionOptions sub_options, + const rclcpp::QoS & qos_profile, + const rclcpp::SubscriptionOptions & sub_options, bool defer_subscribe = false); void start_listening(); private: rclcpp::Subscription::SharedPtr subscription_ = nullptr; + rclcpp::QoS qos_profile_; rclcpp::SubscriptionOptions sub_options_; std::string topic_; }; diff --git a/quality_of_service_demo/src/common_nodes.cpp b/quality_of_service_demo/src/common_nodes.cpp index dd5e1bd22..4d9faa413 100644 --- a/quality_of_service_demo/src/common_nodes.cpp +++ b/quality_of_service_demo/src/common_nodes.cpp @@ -21,7 +21,8 @@ using namespace std::chrono_literals; Talker::Talker( const std::string & topic_name, - rclcpp::PublisherOptions pub_options, + const rclcpp::QoS & qos_profile, + const rclcpp::PublisherOptions & pub_options, size_t max_count, std::chrono::milliseconds assert_node_period, std::chrono::milliseconds assert_topic_period) @@ -29,7 +30,7 @@ Talker::Talker( max_count_(max_count) { RCLCPP_INFO(get_logger(), "Talker starting up"); - publisher_ = create_publisher(topic_name, pub_options.qos_profile.depth, pub_options); + publisher_ = create_publisher(topic_name, qos_profile, pub_options); publish_timer_ = create_wall_timer( 500ms, [this]() -> void { @@ -76,9 +77,11 @@ void Talker::pause_for(std::chrono::milliseconds pause_length) Listener::Listener( const std::string & topic_name, - rclcpp::SubscriptionOptions sub_options, + const rclcpp::QoS & qos_profile, + const rclcpp::SubscriptionOptions & sub_options, bool defer_subscribe) : Node("listener"), + qos_profile_(qos_profile), sub_options_(sub_options), topic_(topic_name) { @@ -92,10 +95,10 @@ void Listener::start_listening() { subscription_ = create_subscription( topic_, + qos_profile_, [this](const typename std_msgs::msg::String::SharedPtr msg) -> void { RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str()); }, - sub_options_.qos_profile.depth, sub_options_); } diff --git a/quality_of_service_demo/src/deadline.cpp b/quality_of_service_demo/src/deadline.cpp index d18b05257..0addbe0f2 100644 --- a/quality_of_service_demo/src/deadline.cpp +++ b/quality_of_service_demo/src/deadline.cpp @@ -58,7 +58,7 @@ int main(int argc, char * argv[]) } // Required arguments - size_t deadline_duration_ms = std::stoul(argv[1]); + std::chrono::milliseconds deadline_duration(std::stoul(argv[1])); // Optional argument default values std::chrono::milliseconds period_pause_talker(DEFAULT_PUBLISH_FOR); @@ -80,12 +80,10 @@ int main(int argc, char * argv[]) std::string topic("qos_deadline_chatter"); - rmw_qos_profile_t qos_profile = rmw_qos_profile_default; - qos_profile.deadline.sec = deadline_duration_ms / 1000; - qos_profile.deadline.nsec = (deadline_duration_ms % 1000) * MILLION; + rclcpp::QoS qos_profile(10); + qos_profile.deadline(deadline_duration); rclcpp::SubscriptionOptions sub_options; - sub_options.qos_profile = qos_profile; sub_options.event_callbacks.deadline_callback = [](rclcpp::QOSDeadlineRequestedInfo & event) -> void { @@ -94,7 +92,6 @@ int main(int argc, char * argv[]) }; rclcpp::PublisherOptions pub_options; - pub_options.qos_profile = qos_profile; pub_options.event_callbacks.deadline_callback = [](rclcpp::QOSDeadlineOfferedInfo & event) -> void { @@ -102,9 +99,8 @@ int main(int argc, char * argv[]) event.total_count, event.total_count_change); }; - - auto talker = std::make_shared(topic, pub_options); - auto listener = std::make_shared(topic, sub_options); + auto talker = std::make_shared(topic, qos_profile, pub_options); + auto listener = std::make_shared(topic, qos_profile, sub_options); auto pause_timer = talker->create_wall_timer( period_pause_talker, diff --git a/quality_of_service_demo/src/lifespan.cpp b/quality_of_service_demo/src/lifespan.cpp index 3d39b310d..492956e2d 100644 --- a/quality_of_service_demo/src/lifespan.cpp +++ b/quality_of_service_demo/src/lifespan.cpp @@ -59,7 +59,7 @@ int main(int argc, char * argv[]) } // Required arguments - size_t lifespan_duration_ms = std::stoul(argv[1]); + std::chrono::milliseconds lifespan_duration(std::stoul(argv[1])); // Optional argument default values size_t history = DEFAULT_HISTORY; @@ -84,19 +84,15 @@ int main(int argc, char * argv[]) std::string topic("qos_lifespan_chatter"); - rmw_qos_profile_t qos_profile = rmw_qos_profile_default; - qos_profile.depth = history; - qos_profile.durability = RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL; - qos_profile.lifespan.sec = lifespan_duration_ms / 1000; - qos_profile.lifespan.nsec = (lifespan_duration_ms % 1000) * MILLION; + rclcpp::QoS qos_profile(history); + qos_profile.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL); + qos_profile.lifespan(lifespan_duration); rclcpp::SubscriptionOptions sub_options; - sub_options.qos_profile = qos_profile; rclcpp::PublisherOptions pub_options; - pub_options.qos_profile = qos_profile; - auto listener = std::make_shared(topic, sub_options, true); - auto talker = std::make_shared(topic, pub_options, publish_count); + auto listener = std::make_shared(topic, qos_profile, sub_options, true); + auto talker = std::make_shared(topic, qos_profile, pub_options, publish_count); exec.add_node(talker); exec.add_node(listener); diff --git a/quality_of_service_demo/src/liveliness.cpp b/quality_of_service_demo/src/liveliness.cpp index ec1c9ee82..8018c1b92 100644 --- a/quality_of_service_demo/src/liveliness.cpp +++ b/quality_of_service_demo/src/liveliness.cpp @@ -60,7 +60,7 @@ int main(int argc, char * argv[]) return 0; } // Required arguments - size_t liveliness_lease_duration_ms = std::stoul(argv[1]); + std::chrono::milliseconds liveliness_lease_duration(std::stoul(argv[1])); // Optional argument default values std::chrono::milliseconds node_assert_period(0); @@ -104,13 +104,11 @@ int main(int argc, char * argv[]) std::string topic("qos_liveliness_chatter"); - rmw_qos_profile_t qos_profile = rmw_qos_profile_default; - qos_profile.liveliness = liveliness_policy_kind; - qos_profile.liveliness_lease_duration.sec = liveliness_lease_duration_ms / 1000; - qos_profile.liveliness_lease_duration.nsec = (liveliness_lease_duration_ms % 1000) * MILLION; + rclcpp::QoS qos_profile(10); + qos_profile.liveliness(liveliness_policy_kind); + qos_profile.liveliness_lease_duration(liveliness_lease_duration); rclcpp::SubscriptionOptions sub_options; - sub_options.qos_profile = qos_profile; sub_options.event_callbacks.liveliness_callback = [](rclcpp::QOSLivelinessChangedInfo & event) -> void { @@ -122,11 +120,14 @@ int main(int argc, char * argv[]) }; rclcpp::PublisherOptions pub_options; - pub_options.qos_profile = qos_profile; - auto listener = std::make_shared(topic, sub_options); + auto listener = std::make_shared( + topic, + qos_profile, + sub_options); auto talker = std::make_shared( topic, + qos_profile, pub_options, 0, node_assert_period, From 8ea63dacba4677021b3c345beb4603b2a47583ef Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Tue, 14 May 2019 16:02:22 -0700 Subject: [PATCH 5/6] Address remaining PR feedback. Mostly code comments and documentation. Some code restructure and extra variable name verbosity. Signed-off-by: Emerson Knapp --- quality_of_service_demo/README.md | 8 +- .../quality_of_service_demo/common_nodes.hpp | 69 +++++-- quality_of_service_demo/src/common_nodes.cpp | 78 +++++--- quality_of_service_demo/src/deadline.cpp | 15 +- quality_of_service_demo/src/lifespan.cpp | 22 ++- quality_of_service_demo/src/liveliness.cpp | 174 ++++++++++-------- 6 files changed, 232 insertions(+), 134 deletions(-) diff --git a/quality_of_service_demo/README.md b/quality_of_service_demo/README.md index 93ce0f476..7b06e9093 100644 --- a/quality_of_service_demo/README.md +++ b/quality_of_service_demo/README.md @@ -45,8 +45,8 @@ Run `quality_of_service_demo/lifespan -h` for more usage information. Examples: * `lifespan 1000 --publish-count 10 --subscribe-after 3000` - * After a few seconds, you should see (approximately) messages 4-10 printed from the Subscriber. - * The first 3 messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds. + * After a few seconds, you should see (approximately) messages 4-9 printed from the Subscriber. + * The first few messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds. * `lifespan 4000 --publish-count 10 --subscribe-after 3000` * After a few seconds, you should see all of the messages (0-9) printed from the Subscriber. * All messages, with their 4 second lifespan, survived until the subscriber joined. @@ -64,5 +64,5 @@ Run `quality_of_service_demo/liveliness -h` for more usage information. Examples: * `liveliness 1000 --kill-publisher-after 2000` * After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed -* `liveliness 1000 --node-assert-period 2000 --policy MANUAL_BY_NODE` - * The Subscriber will receive alternating alive/not-alive events every second. The Publisher asserts its liveliness every 2 seconds, but this is not frequent enough for the liveliness lease duration of 1 second. +* `liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE` + * Publishing a message counts as implicitly asserting liveliness. Therefore, with this configuration, the node never explicitly asserts its liveliness, but it publishes less often (500ms) than the liveliness lease, so you will see continuous stream alive/not-alive messages as the publisher misses its lease and "becomes alive again" when it publishes. diff --git a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp index 1451a22e8..6dcb36696 100644 --- a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp +++ b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp @@ -27,40 +27,83 @@ static const uint32_t MILLION = 1000L * 1000L; class Talker : public rclcpp::Node { public: + /** + * \param[in] topic_name Topic to publish to. + * \param[in] qos_profile QoS profile for Publisher. + * \param[in] publisher_options Additional options for Publisher. + * \param[in] publish_count (Optional) Number of messages to publish before stopping. + * 0 (default) means publish forever. + * \param[in] assert_node_period (Optional) How often to manually assert Node liveliness. + * 0 (default) means never. + * \param[in] assert_topic_period (Optional) How often to manually assert Publisher liveliness. + * 0 (default) means never. + **/ Talker( const std::string & topic_name, const rclcpp::QoS & qos_profile, - const rclcpp::PublisherOptions & pub_options, - size_t max_count = 0, + const rclcpp::PublisherOptions & publisher_options, + size_t publish_count = 0, std::chrono::milliseconds assert_node_period = std::chrono::milliseconds(0), std::chrono::milliseconds assert_topic_period = std::chrono::milliseconds(0)); - void pause_for(std::chrono::milliseconds pause_length); + /// Stop publishing for a while. + /** + * Stops the publisher for the specified amount of time. + * A message will be published immediately on the expiration of pause_duration. + * The regular publishing interval will resume at that point. + * If publishing is already paused, this call will be ignored. + * The remaining pause duration will not be affected. + * \param[in] pause_duration Amount of time to pause for. + **/ + void pause_for(std::chrono::milliseconds pause_duration); - rclcpp::TimerBase::SharedPtr publish_timer_; - rclcpp::TimerBase::SharedPtr pause_timer_; - rclcpp::TimerBase::SharedPtr assert_node_timer_; - rclcpp::TimerBase::SharedPtr assert_topic_timer_; - rclcpp::Publisher::SharedPtr publisher_; - size_t count_ = 0; - size_t max_count_ = 0; + /// Publish a single message. + /** + * Counts towards the total message count that will be published. + */ + void publish(); + + /// Cancel any manual liveliness assertions this node was configured to do. + void stop_asserting_liveliness(); + +private: + rclcpp::TimerBase::SharedPtr assert_node_timer_ = nullptr; + rclcpp::TimerBase::SharedPtr assert_topic_timer_ = nullptr; + rclcpp::TimerBase::SharedPtr publish_timer_ = nullptr; + rclcpp::TimerBase::SharedPtr pause_timer_ = nullptr; + rclcpp::Publisher::SharedPtr publisher_ = nullptr; + size_t publish_count_ = 0; + const size_t stop_at_count_ = 0; }; class Listener : public rclcpp::Node { public: + /// Standard Constructor. + /** + * \param[in] topic_name Topic to subscribe to. + * \param[in] qos_profile QoS profile for Subscription. + * \param[in] sub_options Additional options for Subscription. + * \param[in] defer_subscribe (Optional) don't create Subscription until user calls + * start_listening(). + */ Listener( const std::string & topic_name, const rclcpp::QoS & qos_profile, - const rclcpp::SubscriptionOptions & sub_options, + const rclcpp::SubscriptionOptions & subscription_options, bool defer_subscribe = false); + + /// Instantiates Subscription. + /** + * Does nothing if it has already been called. + */ void start_listening(); private: rclcpp::Subscription::SharedPtr subscription_ = nullptr; rclcpp::QoS qos_profile_; - rclcpp::SubscriptionOptions sub_options_; - std::string topic_; + rclcpp::SubscriptionOptions subscription_options_; + const std::string topic_name_; }; #endif // QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_ diff --git a/quality_of_service_demo/src/common_nodes.cpp b/quality_of_service_demo/src/common_nodes.cpp index 4d9faa413..c83d29696 100644 --- a/quality_of_service_demo/src/common_nodes.cpp +++ b/quality_of_service_demo/src/common_nodes.cpp @@ -22,25 +22,17 @@ using namespace std::chrono_literals; Talker::Talker( const std::string & topic_name, const rclcpp::QoS & qos_profile, - const rclcpp::PublisherOptions & pub_options, - size_t max_count, + const rclcpp::PublisherOptions & publisher_options, + size_t publish_count, std::chrono::milliseconds assert_node_period, std::chrono::milliseconds assert_topic_period) : Node("talker"), - max_count_(max_count) + stop_at_count_(publish_count) { RCLCPP_INFO(get_logger(), "Talker starting up"); - publisher_ = create_publisher(topic_name, qos_profile, pub_options); - publish_timer_ = create_wall_timer( - 500ms, - [this]() -> void { - std_msgs::msg::String msg; - msg.data = "Talker says " + std::to_string(count_++); - publisher_->publish(msg); - if (max_count_ > 0 && count_ > max_count_) { - publish_timer_->cancel(); - } - }); + publisher_ = create_publisher( + topic_name, qos_profile, publisher_options); + publish_timer_ = create_wall_timer(500ms, std::bind(&Talker::publish, this)); // If enabled, create timer to assert liveliness at the node level if (assert_node_period != 0ms) { assert_node_timer_ = create_wall_timer( @@ -59,31 +51,58 @@ Talker::Talker( } } -void Talker::pause_for(std::chrono::milliseconds pause_length) +void +Talker::pause_for(std::chrono::milliseconds pause_length) { if (pause_timer_) { - // Already paused - just ignore + // Already paused - ignoring. return; } publish_timer_->cancel(); pause_timer_ = create_wall_timer( pause_length, [this]() { + // Publishing immediately on pause expiration and resuming regular interval. + publish(); publish_timer_->reset(); pause_timer_ = nullptr; }); } +void +Talker::publish() +{ + std_msgs::msg::String msg; + msg.data = "Talker says " + std::to_string(publish_count_); + publish_count_ += 1; + publisher_->publish(msg); + if (stop_at_count_ > 0 && publish_count_ >= stop_at_count_) { + publish_timer_->cancel(); + } +} + +void +Talker::stop_asserting_liveliness() +{ + if (assert_node_timer_) { + assert_node_timer_->cancel(); + assert_node_timer_ = nullptr; + } + if (assert_topic_timer_) { + assert_topic_timer_->cancel(); + assert_topic_timer_ = nullptr; + } +} Listener::Listener( const std::string & topic_name, const rclcpp::QoS & qos_profile, - const rclcpp::SubscriptionOptions & sub_options, + const rclcpp::SubscriptionOptions & subscription_options, bool defer_subscribe) : Node("listener"), qos_profile_(qos_profile), - sub_options_(sub_options), - topic_(topic_name) + subscription_options_(subscription_options), + topic_name_(topic_name) { RCLCPP_INFO(get_logger(), "Listener starting up"); if (!defer_subscribe) { @@ -91,14 +110,17 @@ Listener::Listener( } } -void Listener::start_listening() +void +Listener::start_listening() { - subscription_ = create_subscription( - topic_, - qos_profile_, - [this](const typename std_msgs::msg::String::SharedPtr msg) -> void - { - RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str()); - }, - sub_options_); + if (!subscription_) { + subscription_ = create_subscription( + topic_name_, + qos_profile_, + [this](const typename std_msgs::msg::String::SharedPtr msg) -> void + { + RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str()); + }, + subscription_options_); + } } diff --git a/quality_of_service_demo/src/deadline.cpp b/quality_of_service_demo/src/deadline.cpp index 0addbe0f2..59e5aa6c4 100644 --- a/quality_of_service_demo/src/deadline.cpp +++ b/quality_of_service_demo/src/deadline.cpp @@ -27,9 +27,9 @@ #include "quality_of_service_demo/common_nodes.hpp" static const char * OPTION_PUBLISH_FOR = "--publish-for"; -static const uint32_t DEFAULT_PUBLISH_FOR = 5000; +static const size_t DEFAULT_PUBLISH_FOR = 5000; static const char * OPTION_PAUSE_FOR = "--pause-for"; -static const uint32_t DEFAULT_PAUSE_FOR = 1000; +static const size_t DEFAULT_PAUSE_FOR = 1000; void print_usage() { @@ -39,13 +39,14 @@ void print_usage() printf("deadline_duration: Duration (in uint milliseconds) of the Deadline QoS setting.\n"); printf("options:\n"); printf("-h : Print this help message.\n"); - printf("%s duration_publish_for : How long to publish (in uint milliseconds) until pausing the talker. Defaults to %lu.\n", - OPTION_PUBLISH_FOR, (unsigned long)DEFAULT_PUBLISH_FOR); - printf("%s duration_pause_for : How long to pause the talker (in uint milliseconds) before beginning to publish again. Defaults to %lu.\n", - OPTION_PAUSE_FOR, (unsigned long)DEFAULT_PAUSE_FOR); + printf("%s duration_publish_for : How long to publish (in uint milliseconds) until pausing " + "the talker. Defaults to %zu.\n", + OPTION_PUBLISH_FOR, DEFAULT_PUBLISH_FOR); + printf("%s duration_pause_for : How long to pause the talker (in uint milliseconds) before " + "beginning to publish again. Defaults to %zu.\n", + OPTION_PAUSE_FOR, DEFAULT_PAUSE_FOR); } - int main(int argc, char * argv[]) { // Force flush of the stdout buffer. diff --git a/quality_of_service_demo/src/lifespan.cpp b/quality_of_service_demo/src/lifespan.cpp index 492956e2d..48b7f20fc 100644 --- a/quality_of_service_demo/src/lifespan.cpp +++ b/quality_of_service_demo/src/lifespan.cpp @@ -11,8 +11,8 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - #include +#include #include #include @@ -24,11 +24,11 @@ #include "quality_of_service_demo/common_nodes.hpp" static const char * OPTION_HISTORY = "--history"; -static const uint32_t DEFAULT_HISTORY = 10; +static const size_t DEFAULT_HISTORY = 10; static const char * OPTION_PUBLISH_COUNT = "--publish-count"; -static const uint32_t DEFAULT_PUBLISH_COUNT = 5; +static const size_t DEFAULT_PUBLISH_COUNT = 5; static const char * OPTION_SUBSCRIBE_AFTER = "--subscribe-after"; -static const uint32_t DEFAULT_SUBSCRIBE_AFTER = 5000; +static const size_t DEFAULT_SUBSCRIBE_AFTER = 5000; void print_usage() { @@ -39,12 +39,16 @@ void print_usage() printf("lifespan duration: Duration (in ms) of the Lifespan QoS setting.\n"); printf("options:\n"); printf("-h : Print this help message.\n"); - printf("%s : The depth of the Publisher's history queue - the maximum number of messages " - "it will store. Defaults to %lu\n", OPTION_HISTORY, (unsigned long)DEFAULT_HISTORY); - printf("%s publish_n_messages : How many messages to publish before stopping. Defaults to %lu\n", - OPTION_PUBLISH_COUNT, (unsigned long)DEFAULT_PUBLISH_COUNT); + printf("%s history : The depth of the Publisher's history queue - " + "the maximum number of messages it will store for late-joining subscriptions. " + "Defaults to %zu\n", + OPTION_HISTORY, DEFAULT_HISTORY); + printf("%s publish_n_messages : How many messages to publish before stopping. " + "Defaults to %zu\n", + OPTION_PUBLISH_COUNT, DEFAULT_PUBLISH_COUNT); printf("%s subscribe_after_duration : The Subscriber will be created this long (in ms) after " - "application startup. Defaults to %lu\n", OPTION_SUBSCRIBE_AFTER, (unsigned long)DEFAULT_SUBSCRIBE_AFTER); + "application startup. Defaults to %zu\n", + OPTION_SUBSCRIBE_AFTER, DEFAULT_SUBSCRIBE_AFTER); } int main(int argc, char * argv[]) diff --git a/quality_of_service_demo/src/liveliness.cpp b/quality_of_service_demo/src/liveliness.cpp index 8018c1b92..a4e0574be 100644 --- a/quality_of_service_demo/src/liveliness.cpp +++ b/quality_of_service_demo/src/liveliness.cpp @@ -28,7 +28,7 @@ static const char * DEFAULT_POLICY = "AUTOMATIC"; static const char * OPTION_NODE_ASSERT_PERIOD = "--node-assert-period"; static const char * OPTION_TOPIC_ASSERT_PERIOD = "--topic-assert-period"; static const char * OPTION_KILL_PUBLISHER_AFTER = "--kill-publisher-after"; -static const uint32_t DEFAULT_KILL_PUBLISHER_AFTER = 3000; +static const size_t DEFAULT_KILL_PUBLISHER_AFTER = 3000; void print_usage() { @@ -42,33 +42,116 @@ void print_usage() printf("-h : Print this help message.\n"); printf("%s liveliness_policy : You may specify AUTOMATIC, MANUAL_BY_NODE, or MANUAL_BY_TOPIC. " "Defaults to %s\n", OPTION_POLICY, DEFAULT_POLICY); - printf("%s node_assert_period : How often the Publisher will assert the liveliness of its " + printf("%s node_assert_period : How often the Publisher will assert the liveliness of its " "Node (in uint milliseconds). Defaults to 0 (never)\n", OPTION_NODE_ASSERT_PERIOD); printf("%s topic_assert_period : How often the Publisher will assert the liveliness of its " "Publisher (in uint milliseconds). Defaults to 0 (never)\n", OPTION_TOPIC_ASSERT_PERIOD); - printf("%s kill_publisher_after : Kill the publisher after this amount of time (in uint milliseconds). In " - "AUTOMATIC - destroy the whole node. In MANUAL_BY_NODE, stop node liveliness assertion. In " - "MANUAL_BY_TOPIC, stop topic liveliness assertion. Defaults to %lu\n", - OPTION_KILL_PUBLISHER_AFTER, (unsigned long)DEFAULT_KILL_PUBLISHER_AFTER); + printf("%s kill_publisher_after : " + "Kill the publisher after this amount of time (in uint milliseconds). " + "In AUTOMATIC - destroy the whole node. " + "In MANUAL_BY_NODE, stop node liveliness assertion. " + "In MANUAL_BY_TOPIC, stop topic liveliness assertion. " + "Defaults to %lu\n", + OPTION_KILL_PUBLISHER_AFTER, DEFAULT_KILL_PUBLISHER_AFTER); } +class LivelinessDemo +{ +public: + LivelinessDemo( + const std::string & topic, + rmw_qos_liveliness_policy_t liveliness_policy_kind, + std::chrono::milliseconds liveliness_lease_duration, + std::chrono::milliseconds node_assert_period, + std::chrono::milliseconds topic_assert_period, + std::chrono::milliseconds kill_publisher_after) + : liveliness_policy_kind_(liveliness_policy_kind) + { + rclcpp::QoS qos_profile(10); + qos_profile.liveliness(liveliness_policy_kind); + qos_profile.liveliness_lease_duration(liveliness_lease_duration); + + rclcpp::SubscriptionOptions subscription_options; + subscription_options.event_callbacks.liveliness_callback = std::bind( + &LivelinessDemo::print_liveliness_changed_event, this, std::placeholders::_1); + + rclcpp::PublisherOptions publisher_options; + + listener_ = std::make_shared( + topic, qos_profile, subscription_options); + talker_ = std::make_shared( + topic, qos_profile, publisher_options, 0, node_assert_period, topic_assert_period); + + executor.add_node(listener_); + executor.add_node(talker_); + + kill_publisher_timer_ = listener_->create_wall_timer( + kill_publisher_after, std::bind(&LivelinessDemo::kill_publisher, this)); + } + + void + spin() + { + executor.spin(); + } + +private: + void + print_liveliness_changed_event(rclcpp::QOSLivelinessChangedInfo & event) + { + printf("Liveliness changed event: \n"); + printf(" alive_count: %d\n", event.alive_count); + printf(" not_alive_count: %d\n", event.not_alive_count); + printf(" alive_count_change: %d\n", event.alive_count_change); + printf(" not_alive_count_change: %d\n", event.not_alive_count_change); + } + + void + kill_publisher() + { + if (!talker_) { + return; + } + switch (liveliness_policy_kind_) { + case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: + executor.remove_node(talker_); + talker_ = nullptr; + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: + talker_->stop_asserting_liveliness(); + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: + talker_->stop_asserting_liveliness(); + break; + default: + break; + } + } + + rmw_qos_liveliness_policy_t liveliness_policy_kind_; + rclcpp::executors::SingleThreadedExecutor executor; + std::shared_ptr talker_ = nullptr; + std::shared_ptr listener_ = nullptr; + rclcpp::TimerBase::SharedPtr kill_publisher_timer_ = nullptr; +}; + int main(int argc, char * argv[]) { - // Argument parsing + // Argument count and usage if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { print_usage(); return 0; } - // Required arguments + // Configuration variables std::chrono::milliseconds liveliness_lease_duration(std::stoul(argv[1])); - - // Optional argument default values std::chrono::milliseconds node_assert_period(0); std::chrono::milliseconds topic_assert_period(0); std::chrono::milliseconds kill_publisher_after(DEFAULT_KILL_PUBLISHER_AFTER); const char * policy_name = DEFAULT_POLICY; + rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; + std::string topic("qos_liveliness_chatter"); - // Optional argument parsing + // Argument parsing if (rcutils_cli_option_exist(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD)) { node_assert_period = std::chrono::milliseconds( std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD))); @@ -85,7 +168,6 @@ int main(int argc, char * argv[]) policy_name = rcutils_cli_get_option(argv, argv + argc, OPTION_POLICY); } - rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; if (strcmp(policy_name, "AUTOMATIC") == 0) { liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; } else if (strcmp(policy_name, "MANUAL_BY_NODE") == 0) { @@ -98,70 +180,16 @@ int main(int argc, char * argv[]) return 1; } - // Configuration and Initialization + // Demo execution rclcpp::init(argc, argv); - rclcpp::executors::SingleThreadedExecutor exec; - - std::string topic("qos_liveliness_chatter"); - - rclcpp::QoS qos_profile(10); - qos_profile.liveliness(liveliness_policy_kind); - qos_profile.liveliness_lease_duration(liveliness_lease_duration); - - rclcpp::SubscriptionOptions sub_options; - sub_options.event_callbacks.liveliness_callback = - [](rclcpp::QOSLivelinessChangedInfo & event) -> void - { - printf("Liveliness changed event: \n"); - printf(" alive_count: %d\n", event.alive_count); - printf(" not_alive_count: %d\n", event.not_alive_count); - printf(" alive_count_change: %d\n", event.alive_count_change); - printf(" not_alive_count_change: %d\n", event.not_alive_count_change); - }; - - rclcpp::PublisherOptions pub_options; - - auto listener = std::make_shared( - topic, - qos_profile, - sub_options); - auto talker = std::make_shared( + LivelinessDemo demo( topic, - qos_profile, - pub_options, - 0, + liveliness_policy_kind, + liveliness_lease_duration, node_assert_period, - topic_assert_period); - - // Execution - exec.add_node(listener); - exec.add_node(talker); - - auto timer = listener->create_wall_timer( - kill_publisher_after, - [&exec, talker, liveliness_policy_kind]() -> void { - switch (liveliness_policy_kind) { - case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: - exec.remove_node(talker); - break; - case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: - if (talker->assert_node_timer_) { - talker->assert_node_timer_->cancel(); - } - break; - case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: - if (talker->assert_topic_timer_) { - talker->assert_topic_timer_->cancel(); - } - break; - default: - break; - } - }); - - exec.spin(); - - // Cleanup + topic_assert_period, + kill_publisher_after); + demo.spin(); rclcpp::shutdown(); return 0; } From 70cbb1aa3d646a20bf17d5d8a2046532fc210020 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Fri, 17 May 2019 10:47:36 -0700 Subject: [PATCH 6/6] Unwrap liveliness demo, fix argument usage messaging, make code ordering consistent across all demos Signed-off-by: Emerson Knapp --- quality_of_service_demo/README.md | 24 ++- .../quality_of_service_demo/common_nodes.hpp | 4 +- quality_of_service_demo/src/common_nodes.cpp | 7 +- quality_of_service_demo/src/deadline.cpp | 48 +++-- quality_of_service_demo/src/lifespan.cpp | 34 ++-- quality_of_service_demo/src/liveliness.cpp | 185 ++++++++---------- 6 files changed, 154 insertions(+), 148 deletions(-) diff --git a/quality_of_service_demo/README.md b/quality_of_service_demo/README.md index 7b06e9093..f402f91ed 100644 --- a/quality_of_service_demo/README.md +++ b/quality_of_service_demo/README.md @@ -27,9 +27,9 @@ The Publisher in this demo will pause publishing periodically, which will print Run `quality_of_service_demo/deadline -h` for more usage information. Examples: -* `deadline 600 --publish-for 5000 --pause-for 2000` +* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 2000` * The publisher will publish for 5 seconds, then pause for 2 - deadline events will start firing on both participants, until the publisher comes back. -* `deadline 600 --publish-for 5000 --pause-for 0` +* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 0` * The publisher doesn't actually pause, no deadline misses should occur. ## Lifespan @@ -41,13 +41,13 @@ The application requires an argument `lifespan_duration` - this is the maximum t The Publisher in this demo will publish a preset number of messages, then stop publishing. A Subscriber will start subscribing after a preset amount of time and may receive _fewer_ backfilled messages than were originally published, because some message can outlast their lifespan and be removed. -Run `quality_of_service_demo/lifespan -h` for more usage information. +Run `lifespan -h` for more usage information. Examples: -* `lifespan 1000 --publish-count 10 --subscribe-after 3000` +* `ros2 run quality_of_service_demo lifespan 1000 --publish-count 10 --subscribe-after 3000` * After a few seconds, you should see (approximately) messages 4-9 printed from the Subscriber. * The first few messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds. -* `lifespan 4000 --publish-count 10 --subscribe-after 3000` +* `ros2 run quality_of_service_demo lifespan 4000 --publish-count 10 --subscribe-after 3000` * After a few seconds, you should see all of the messages (0-9) printed from the Subscriber. * All messages, with their 4 second lifespan, survived until the subscriber joined. @@ -57,12 +57,16 @@ Examples: The application requires an argument `lease_duration` that specifies how often (in milliseconds) an entity must "check in" to let the system know it's alive. -The Publisher in this demo will assert its liveliness based on passed in options, and be killed after some amount of time. When the Publisher is killed, you will see a Liveliness change event printed from the Publisher after `lease_duration` expires. +The Publisher in this demo will assert its liveliness based on passed in options, and be stopped after some amount of time. +When using `AUTOMATIC` liveliness policy, the Publisher is deleted at this time, in order to stop all automatic liveliness assertions in the rmw implementation - therefore only the Subscription will receive a Liveliness event for `AUTOMATIC`. +For `MANUAL` liveliness policies, the Publisher's assertions are stopped, as well as its message publishing. +Publishing a messages implicitly counts as asserting liveliness, so publishing is stopped in order to allow the Liveliness lease to lapse. +Therefore in the `MANUAL` policy types, you will see Liveliness events from both Publisher and Subscription (in rmw implementations that implement this feature). Run `quality_of_service_demo/liveliness -h` for more usage information. Examples: -* `liveliness 1000 --kill-publisher-after 2000` - * After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed -* `liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE` - * Publishing a message counts as implicitly asserting liveliness. Therefore, with this configuration, the node never explicitly asserts its liveliness, but it publishes less often (500ms) than the liveliness lease, so you will see continuous stream alive/not-alive messages as the publisher misses its lease and "becomes alive again" when it publishes. +* `ros2 run quality_of_service_demo liveliness 1000 --kill-publisher-after 2000` + * After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed. +* `ros2 run quality_of_service_demo liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE` + * With this configuration, the node never explicitly asserts its liveliness, but it publishes messages (implicitly asserting liveliness) less often (500ms) than the liveliness lease, so you will see alternating alive/not-alive messages as the publisher misses its lease and "becomes alive again" when it publishes, until the talker is stopped. diff --git a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp index 6dcb36696..002a95f87 100644 --- a/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp +++ b/quality_of_service_demo/include/quality_of_service_demo/common_nodes.hpp @@ -63,8 +63,8 @@ class Talker : public rclcpp::Node */ void publish(); - /// Cancel any manual liveliness assertions this node was configured to do. - void stop_asserting_liveliness(); + /// Cancel publishing, and any manual liveliness assertions this node was configured to do. + void stop(); private: rclcpp::TimerBase::SharedPtr assert_node_timer_ = nullptr; diff --git a/quality_of_service_demo/src/common_nodes.cpp b/quality_of_service_demo/src/common_nodes.cpp index c83d29696..eaf592702 100644 --- a/quality_of_service_demo/src/common_nodes.cpp +++ b/quality_of_service_demo/src/common_nodes.cpp @@ -82,16 +82,17 @@ Talker::publish() } void -Talker::stop_asserting_liveliness() +Talker::stop() { if (assert_node_timer_) { assert_node_timer_->cancel(); - assert_node_timer_ = nullptr; + assert_node_timer_.reset(); } if (assert_topic_timer_) { assert_topic_timer_->cancel(); - assert_topic_timer_ = nullptr; + assert_topic_timer_.reset(); } + publish_timer_->cancel(); } Listener::Listener( diff --git a/quality_of_service_demo/src/deadline.cpp b/quality_of_service_demo/src/deadline.cpp index 59e5aa6c4..d5290237a 100644 --- a/quality_of_service_demo/src/deadline.cpp +++ b/quality_of_service_demo/src/deadline.cpp @@ -34,16 +34,25 @@ static const size_t DEFAULT_PAUSE_FOR = 1000; void print_usage() { printf("Usage for deadline:\n"); - printf("deadline deadline_duration [-p period_pause_talker] [-d duration_pause_talker] [-h]\n"); + printf("deadline " + "deadline_duration " + "[%s publish_for] " + "[%s pause_for] " + "[-h]\n", + OPTION_PUBLISH_FOR, + OPTION_PAUSE_FOR); printf("required arguments:\n"); - printf("deadline_duration: Duration (in uint milliseconds) of the Deadline QoS setting.\n"); - printf("options:\n"); + printf("deadline_duration: " + "Duration in positive integer milliseconds of the Deadline QoS setting.\n"); + printf("optional arguments:\n"); printf("-h : Print this help message.\n"); - printf("%s duration_publish_for : How long to publish (in uint milliseconds) until pausing " - "the talker. Defaults to %zu.\n", + printf("%s publish_for : " + "Duration to publish (in positive integer milliseconds) until pausing the talker. " + "Defaults to %zu.\n", OPTION_PUBLISH_FOR, DEFAULT_PUBLISH_FOR); - printf("%s duration_pause_for : How long to pause the talker (in uint milliseconds) before " - "beginning to publish again. Defaults to %zu.\n", + printf("%s pause_for : " + "Duration to pause (in positive integer milliseconds) before starting to publish again. " + "Defaults to %zu.\n", OPTION_PAUSE_FOR, DEFAULT_PAUSE_FOR); } @@ -52,18 +61,17 @@ int main(int argc, char * argv[]) // Force flush of the stdout buffer. setvbuf(stdout, NULL, _IONBF, BUFSIZ); - // Argument parsing + // Argument count and usage if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { print_usage(); return 0; } - // Required arguments + // Configuration variables std::chrono::milliseconds deadline_duration(std::stoul(argv[1])); - - // Optional argument default values std::chrono::milliseconds period_pause_talker(DEFAULT_PUBLISH_FOR); std::chrono::milliseconds duration_pause_talker(DEFAULT_PAUSE_FOR); + std::string topic("qos_deadline_chatter"); // Optional argument parsing if (rcutils_cli_option_exist(argv, argv + argc, OPTION_PUBLISH_FOR)) { @@ -77,9 +85,7 @@ int main(int argc, char * argv[]) // Initialization and configuration rclcpp::init(argc, argv); - rclcpp::executors::SingleThreadedExecutor exec; - - std::string topic("qos_deadline_chatter"); + rclcpp::executors::SingleThreadedExecutor executor; rclcpp::QoS qos_profile(10); qos_profile.deadline(deadline_duration); @@ -88,7 +94,8 @@ int main(int argc, char * argv[]) sub_options.event_callbacks.deadline_callback = [](rclcpp::QOSDeadlineRequestedInfo & event) -> void { - RCUTILS_LOG_INFO("Requested deadline missed - total %d delta %d", + RCLCPP_INFO(rclcpp::get_logger("Listener"), + "Requested deadline missed - total %d delta %d", event.total_count, event.total_count_change); }; @@ -96,12 +103,13 @@ int main(int argc, char * argv[]) pub_options.event_callbacks.deadline_callback = [](rclcpp::QOSDeadlineOfferedInfo & event) -> void { - RCUTILS_LOG_INFO("Offered deadline missed - total %d delta %d", + RCLCPP_INFO(rclcpp::get_logger("Talker"), + "Offered deadline missed - total %d delta %d", event.total_count, event.total_count_change); }; - auto talker = std::make_shared(topic, qos_profile, pub_options); auto listener = std::make_shared(topic, qos_profile, sub_options); + auto talker = std::make_shared(topic, qos_profile, pub_options); auto pause_timer = talker->create_wall_timer( period_pause_talker, @@ -110,9 +118,9 @@ int main(int argc, char * argv[]) }); // Execution - exec.add_node(talker); - exec.add_node(listener); - exec.spin(); + executor.add_node(talker); + executor.add_node(listener); + executor.spin(); // Cleanup rclcpp::shutdown(); diff --git a/quality_of_service_demo/src/lifespan.cpp b/quality_of_service_demo/src/lifespan.cpp index 48b7f20fc..f00836964 100644 --- a/quality_of_service_demo/src/lifespan.cpp +++ b/quality_of_service_demo/src/lifespan.cpp @@ -33,20 +33,31 @@ static const size_t DEFAULT_SUBSCRIBE_AFTER = 5000; void print_usage() { printf("Usage for lifespan:\n"); - printf("lifespan lifespan_duration [--history history_depth] [-p publish_n_messages] " - "[-s subscribe_after_duration] [-h]\n"); + printf("lifespan " + "lifespan_duration " + "[%s history_depth] " + "[%s publish_count] " + "[%s subscribe_after] " + "[-h]\n", + OPTION_HISTORY, + OPTION_PUBLISH_COUNT, + OPTION_SUBSCRIBE_AFTER); printf("required arguments:\n"); - printf("lifespan duration: Duration (in ms) of the Lifespan QoS setting.\n"); - printf("options:\n"); + printf("lifespan duration: " + "Duration in positive integer milliseconds of the Lifespan QoS setting.\n"); + printf("optional arguments:\n"); printf("-h : Print this help message.\n"); - printf("%s history : The depth of the Publisher's history queue - " + printf("%s history : " + "The depth of the Publisher's history queue - " "the maximum number of messages it will store for late-joining subscriptions. " "Defaults to %zu\n", OPTION_HISTORY, DEFAULT_HISTORY); - printf("%s publish_n_messages : How many messages to publish before stopping. " + printf("%s publish_n_messages : " + "How many messages to publish before stopping. " "Defaults to %zu\n", OPTION_PUBLISH_COUNT, DEFAULT_PUBLISH_COUNT); - printf("%s subscribe_after_duration : The Subscriber will be created this long (in ms) after " + printf("%s subscribe_after_duration : " + "The Subscriber will be created this long in positive integer milliseconds after " "application startup. Defaults to %zu\n", OPTION_SUBSCRIBE_AFTER, DEFAULT_SUBSCRIBE_AFTER); } @@ -89,8 +100,9 @@ int main(int argc, char * argv[]) std::string topic("qos_lifespan_chatter"); rclcpp::QoS qos_profile(history); - qos_profile.durability(RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL); - qos_profile.lifespan(lifespan_duration); + qos_profile + .transient_local() + .lifespan(lifespan_duration); rclcpp::SubscriptionOptions sub_options; rclcpp::PublisherOptions pub_options; @@ -98,8 +110,6 @@ int main(int argc, char * argv[]) auto listener = std::make_shared(topic, qos_profile, sub_options, true); auto talker = std::make_shared(topic, qos_profile, pub_options, publish_count); - exec.add_node(talker); - exec.add_node(listener); auto timer = listener->create_wall_timer( subscribe_after_duration, [listener]() -> void { @@ -107,6 +117,8 @@ int main(int argc, char * argv[]) }); // Execution + exec.add_node(talker); + exec.add_node(listener); exec.spin(); // Cleanup diff --git a/quality_of_service_demo/src/liveliness.cpp b/quality_of_service_demo/src/liveliness.cpp index a4e0574be..53239a4c5 100644 --- a/quality_of_service_demo/src/liveliness.cpp +++ b/quality_of_service_demo/src/liveliness.cpp @@ -33,115 +33,55 @@ static const size_t DEFAULT_KILL_PUBLISHER_AFTER = 3000; void print_usage() { printf("Usage for liveliness:\n"); - printf("liveliness lease_duration [-p liveliness_policy] [-n node_assert_liveliness_period] " - "[-t topic_assert_liveliness_period] [-h]\n"); + printf("liveliness " + "lease_duration " + "[%s liveliness_policy] " + "[%s node_assert_liveliness_period] " + "[%s topic_assert_liveliness_period] " + "[-h]\n", + OPTION_POLICY, + OPTION_NODE_ASSERT_PERIOD, + OPTION_TOPIC_ASSERT_PERIOD); printf("required arguments:\n"); - printf("lease_duration: Duration (in ms) before an inactive Publisher is considered not-alive. " - "0 means infinity.\n"); - printf("options:\n"); + printf("lease_duration: " + "Duration in positive integer milliseconds after which an inactive Publisher is considered " + "not-alive. 0 means never.\n"); + printf("optional arguments:\n"); printf("-h : Print this help message.\n"); - printf("%s liveliness_policy : You may specify AUTOMATIC, MANUAL_BY_NODE, or MANUAL_BY_TOPIC. " - "Defaults to %s\n", OPTION_POLICY, DEFAULT_POLICY); - printf("%s node_assert_period : How often the Publisher will assert the liveliness of its " - "Node (in uint milliseconds). Defaults to 0 (never)\n", OPTION_NODE_ASSERT_PERIOD); - printf("%s topic_assert_period : How often the Publisher will assert the liveliness of its " - "Publisher (in uint milliseconds). Defaults to 0 (never)\n", OPTION_TOPIC_ASSERT_PERIOD); + printf("%s liveliness_policy : " + "You may specify AUTOMATIC, MANUAL_BY_NODE, or MANUAL_BY_TOPIC. " + "Defaults to %s\n", + OPTION_POLICY, DEFAULT_POLICY); + printf("%s node_assert_period : " + "How often the Publisher will assert the liveliness of its Node, in positive integer " + "milliseconds. " + "Defaults to 0 (never)\n", + OPTION_NODE_ASSERT_PERIOD); + printf("%s topic_assert_period : " + "How often the Publisher will assert the liveliness of its Publisher " + ", in positive integer milliseconds. " + "Defaults to 0 (never)\n", + OPTION_TOPIC_ASSERT_PERIOD); printf("%s kill_publisher_after : " "Kill the publisher after this amount of time (in uint milliseconds). " "In AUTOMATIC - destroy the whole node. " "In MANUAL_BY_NODE, stop node liveliness assertion. " "In MANUAL_BY_TOPIC, stop topic liveliness assertion. " - "Defaults to %lu\n", + "Defaults to %zu\n", OPTION_KILL_PUBLISHER_AFTER, DEFAULT_KILL_PUBLISHER_AFTER); } -class LivelinessDemo -{ -public: - LivelinessDemo( - const std::string & topic, - rmw_qos_liveliness_policy_t liveliness_policy_kind, - std::chrono::milliseconds liveliness_lease_duration, - std::chrono::milliseconds node_assert_period, - std::chrono::milliseconds topic_assert_period, - std::chrono::milliseconds kill_publisher_after) - : liveliness_policy_kind_(liveliness_policy_kind) - { - rclcpp::QoS qos_profile(10); - qos_profile.liveliness(liveliness_policy_kind); - qos_profile.liveliness_lease_duration(liveliness_lease_duration); - - rclcpp::SubscriptionOptions subscription_options; - subscription_options.event_callbacks.liveliness_callback = std::bind( - &LivelinessDemo::print_liveliness_changed_event, this, std::placeholders::_1); - - rclcpp::PublisherOptions publisher_options; - - listener_ = std::make_shared( - topic, qos_profile, subscription_options); - talker_ = std::make_shared( - topic, qos_profile, publisher_options, 0, node_assert_period, topic_assert_period); - - executor.add_node(listener_); - executor.add_node(talker_); - - kill_publisher_timer_ = listener_->create_wall_timer( - kill_publisher_after, std::bind(&LivelinessDemo::kill_publisher, this)); - } - - void - spin() - { - executor.spin(); - } - -private: - void - print_liveliness_changed_event(rclcpp::QOSLivelinessChangedInfo & event) - { - printf("Liveliness changed event: \n"); - printf(" alive_count: %d\n", event.alive_count); - printf(" not_alive_count: %d\n", event.not_alive_count); - printf(" alive_count_change: %d\n", event.alive_count_change); - printf(" not_alive_count_change: %d\n", event.not_alive_count_change); - } - - void - kill_publisher() - { - if (!talker_) { - return; - } - switch (liveliness_policy_kind_) { - case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: - executor.remove_node(talker_); - talker_ = nullptr; - break; - case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: - talker_->stop_asserting_liveliness(); - break; - case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: - talker_->stop_asserting_liveliness(); - break; - default: - break; - } - } - - rmw_qos_liveliness_policy_t liveliness_policy_kind_; - rclcpp::executors::SingleThreadedExecutor executor; - std::shared_ptr talker_ = nullptr; - std::shared_ptr listener_ = nullptr; - rclcpp::TimerBase::SharedPtr kill_publisher_timer_ = nullptr; -}; - int main(int argc, char * argv[]) { + // Force flush of the stdout buffer. + setvbuf(stdout, NULL, _IONBF, BUFSIZ); + // Argument count and usage if (argc < 2 || rcutils_cli_option_exist(argv, argv + argc, "-h")) { print_usage(); return 0; } + // Configuration variables std::chrono::milliseconds liveliness_lease_duration(std::stoul(argv[1])); std::chrono::milliseconds node_assert_period(0); @@ -151,7 +91,7 @@ int main(int argc, char * argv[]) rmw_qos_liveliness_policy_t liveliness_policy_kind = RMW_QOS_POLICY_LIVELINESS_AUTOMATIC; std::string topic("qos_liveliness_chatter"); - // Argument parsing + // Optional argument parsing if (rcutils_cli_option_exist(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD)) { node_assert_period = std::chrono::milliseconds( std::stoul(rcutils_cli_get_option(argv, argv + argc, OPTION_NODE_ASSERT_PERIOD))); @@ -180,16 +120,57 @@ int main(int argc, char * argv[]) return 1; } - // Demo execution + // Initialization and configuration rclcpp::init(argc, argv); - LivelinessDemo demo( - topic, - liveliness_policy_kind, - liveliness_lease_duration, - node_assert_period, - topic_assert_period, - kill_publisher_after); - demo.spin(); + rclcpp::executors::SingleThreadedExecutor executor; + + rclcpp::QoS qos_profile(10); + qos_profile + .liveliness(liveliness_policy_kind) + .liveliness_lease_duration(liveliness_lease_duration); + + rclcpp::SubscriptionOptions subscription_options; + subscription_options.event_callbacks.liveliness_callback = + [](rclcpp::QOSLivelinessChangedInfo & event) + { + printf("Liveliness changed event: \n"); + printf(" alive_count: %d\n", event.alive_count); + printf(" not_alive_count: %d\n", event.not_alive_count); + printf(" alive_count_change: %d\n", event.alive_count_change); + printf(" not_alive_count_change: %d\n", event.not_alive_count_change); + }; + + rclcpp::PublisherOptions publisher_options; + + auto listener = std::make_shared(topic, qos_profile, subscription_options); + auto talker = std::make_shared( + topic, qos_profile, publisher_options, 0, node_assert_period, topic_assert_period); + + auto kill_talker_timer = listener->create_wall_timer( + kill_publisher_after, [&talker, &executor, liveliness_policy_kind]() { + if (!talker) { + return; + } + switch (liveliness_policy_kind) { + case RMW_QOS_POLICY_LIVELINESS_AUTOMATIC: + executor.remove_node(talker); + talker.reset(); + break; + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE: + case RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC: + talker->stop(); + break; + default: + break; + } + }); + + // Execution + executor.add_node(listener); + executor.add_node(talker); + executor.spin(); + + // Cleanup rclcpp::shutdown(); return 0; }