Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QoS - Liveliness, Deadline, and Lifespan C++ Demos #320

Merged
merged 6 commits into from
May 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions quality_of_service_demo/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
cmake_minimum_required(VERSION 3.5)

project(quality_of_service_demo)

# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
set(CMAKE_CXX_STANDARD 14)
endif()

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rcutils)
find_package(rmw REQUIRED)
find_package(std_msgs REQUIRED)

include_directories(include)
ament_export_include_directories(include)

function(qos_demo_executable target)
add_executable(${target} src/${target}.cpp src/common_nodes.cpp)
ament_target_dependencies(${target}
"rclcpp"
"rcutils"
"std_msgs"
)
install(TARGETS ${target} DESTINATION lib/${PROJECT_NAME})
endfunction()

qos_demo_executable(liveliness)
qos_demo_executable(lifespan)
qos_demo_executable(deadline)

if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
ament_lint_auto_find_test_dependencies()
endif()

ament_package()
72 changes: 72 additions & 0 deletions quality_of_service_demo/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Quality of Service Demos

The demo applications in this package show the various Quality of Service settings available in ROS2.
The intent is to provide a dedicated application for each QoS type, in order to highlight that single feature in isolation.

## History
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved

No History QoS demo app exists yet in this package.

## Reliability

No Reliability QoS demo app exists yet in this package.

## Durability

No Durability QoS demo app exists yet in this package.

## Deadline

`quality_of_service_demo/deadline` demonstrates messages not meeting their required deadline.

The application requires an argument a `deadline_duration` - this is the maximum acceptable time (in positive integer milliseconds) between messages published on the topic.
A `deadline_duration` of `0` means that there is no deadline, so the application will act as a standard Talker/Listener pair.

The Publisher in this demo will pause publishing periodically, which will print deadline expirations from the Subscriber.

Run `quality_of_service_demo/deadline -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 2000`
* The publisher will publish for 5 seconds, then pause for 2 - deadline events will start firing on both participants, until the publisher comes back.
* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 0`
* The publisher doesn't actually pause, no deadline misses should occur.

## Lifespan

`quality_of_service_demo/lifespan` demonstrates messages being deleted from a Publisher's outgoing queue once their configured lifespan expires.

The application requires an argument `lifespan_duration` - this is the maximum time (in positive integer milliseconds) that a message will sit in the Publisher's outgoing queue.

The Publisher in this demo will publish a preset number of messages, then stop publishing.
A Subscriber will start subscribing after a preset amount of time and may receive _fewer_ backfilled messages than were originally published, because some message can outlast their lifespan and be removed.

Run `lifespan -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo lifespan 1000 --publish-count 10 --subscribe-after 3000`
* After a few seconds, you should see (approximately) messages 4-9 printed from the Subscriber.
* The first few messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds.
* `ros2 run quality_of_service_demo lifespan 4000 --publish-count 10 --subscribe-after 3000`
* After a few seconds, you should see all of the messages (0-9) printed from the Subscriber.
* All messages, with their 4 second lifespan, survived until the subscriber joined.

## Liveliness

`quality_of_service_demo/liveliness` demonstrates notification of liveliness changes, based on different Liveliness configurations.

The application requires an argument `lease_duration` that specifies how often (in milliseconds) an entity must "check in" to let the system know it's alive.

The Publisher in this demo will assert its liveliness based on passed in options, and be stopped after some amount of time.
When using `AUTOMATIC` liveliness policy, the Publisher is deleted at this time, in order to stop all automatic liveliness assertions in the rmw implementation - therefore only the Subscription will receive a Liveliness event for `AUTOMATIC`.
For `MANUAL` liveliness policies, the Publisher's assertions are stopped, as well as its message publishing.
Publishing a messages implicitly counts as asserting liveliness, so publishing is stopped in order to allow the Liveliness lease to lapse.
Therefore in the `MANUAL` policy types, you will see Liveliness events from both Publisher and Subscription (in rmw implementations that implement this feature).

Run `quality_of_service_demo/liveliness -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo liveliness 1000 --kill-publisher-after 2000`
* After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed.
* `ros2 run quality_of_service_demo liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE`
* With this configuration, the node never explicitly asserts its liveliness, but it publishes messages (implicitly asserting liveliness) less often (500ms) than the liveliness lease, so you will see alternating alive/not-alive messages as the publisher misses its lease and "becomes alive again" when it publishes, until the talker is stopped.
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_
#define QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_

#include <chrono>
#include <memory>
#include <string>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

static const uint32_t MILLION = 1000L * 1000L;

class Talker : public rclcpp::Node
{
public:
/**
* \param[in] topic_name Topic to publish to.
* \param[in] qos_profile QoS profile for Publisher.
* \param[in] publisher_options Additional options for Publisher.
* \param[in] publish_count (Optional) Number of messages to publish before stopping.
* 0 (default) means publish forever.
* \param[in] assert_node_period (Optional) How often to manually assert Node liveliness.
* 0 (default) means never.
* \param[in] assert_topic_period (Optional) How often to manually assert Publisher liveliness.
* 0 (default) means never.
**/
Talker(
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
const rclcpp::PublisherOptions & publisher_options,
size_t publish_count = 0,
std::chrono::milliseconds assert_node_period = std::chrono::milliseconds(0),
std::chrono::milliseconds assert_topic_period = std::chrono::milliseconds(0));

/// Stop publishing for a while.
/**
* Stops the publisher for the specified amount of time.
* A message will be published immediately on the expiration of pause_duration.
* The regular publishing interval will resume at that point.
* If publishing is already paused, this call will be ignored.
* The remaining pause duration will not be affected.
* \param[in] pause_duration Amount of time to pause for.
**/
void pause_for(std::chrono::milliseconds pause_duration);

/// Publish a single message.
/**
* Counts towards the total message count that will be published.
*/
void publish();

/// Cancel publishing, and any manual liveliness assertions this node was configured to do.
void stop();

private:
rclcpp::TimerBase::SharedPtr assert_node_timer_ = nullptr;
rclcpp::TimerBase::SharedPtr assert_topic_timer_ = nullptr;
rclcpp::TimerBase::SharedPtr publish_timer_ = nullptr;
rclcpp::TimerBase::SharedPtr pause_timer_ = nullptr;
rclcpp::Publisher<std_msgs::msg::String>::SharedPtr publisher_ = nullptr;
size_t publish_count_ = 0;
const size_t stop_at_count_ = 0;
};

class Listener : public rclcpp::Node
{
public:
/// Standard Constructor.
/**
* \param[in] topic_name Topic to subscribe to.
* \param[in] qos_profile QoS profile for Subscription.
* \param[in] sub_options Additional options for Subscription.
* \param[in] defer_subscribe (Optional) don't create Subscription until user calls
* start_listening().
*/
Listener(
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
const rclcpp::SubscriptionOptions & subscription_options,
bool defer_subscribe = false);

/// Instantiates Subscription.
/**
* Does nothing if it has already been called.
*/
void start_listening();

private:
rclcpp::Subscription<std_msgs::msg::String>::SharedPtr subscription_ = nullptr;
rclcpp::QoS qos_profile_;
rclcpp::SubscriptionOptions subscription_options_;
const std::string topic_name_;
};

#endif // QUALITY_OF_SERVICE_DEMO__COMMON_NODES_HPP_
38 changes: 38 additions & 0 deletions quality_of_service_demo/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>quality_of_service_demo</name>
<version>0.1.0</version>
<description>Demo applications for Quality of Service features</description>

<maintainer email="ros-contributions@amazon.com">Amazon ROS Contributions</maintainer>

<license>Apache License 2.0</license>

<author>Emerson Knapp</author>

<buildtool_depend>ament_cmake</buildtool_depend>

<build_depend>example_interfaces</build_depend>
<build_depend>rclcpp</build_depend>
<build_depend>rcutils</build_depend>
<build_depend>rmw</build_depend>
<build_depend>rmw_implementation_cmake</build_depend>
<build_depend>std_msgs</build_depend>

<exec_depend>example_interfaces</exec_depend>
<exec_depend>launch_ros</exec_depend>
<exec_depend>rclcpp</exec_depend>
<exec_depend>rcutils</exec_depend>
<exec_depend>rmw</exec_depend>
<exec_depend>std_msgs</exec_depend>

<test_depend>ament_lint_auto</test_depend>
<test_depend>ament_lint_common</test_depend>
<test_depend>launch</test_depend>
<test_depend>launch_testing</test_depend>

<export>
<build_type>ament_cmake</build_type>
</export>
</package>
127 changes: 127 additions & 0 deletions quality_of_service_demo/src/common_nodes.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <chrono>
#include <string>

#include "quality_of_service_demo/common_nodes.hpp"

using namespace std::chrono_literals;

Talker::Talker(
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
const rclcpp::PublisherOptions & publisher_options,
size_t publish_count,
std::chrono::milliseconds assert_node_period,
std::chrono::milliseconds assert_topic_period)
: Node("talker"),
stop_at_count_(publish_count)
{
RCLCPP_INFO(get_logger(), "Talker starting up");
publisher_ = create_publisher<std_msgs::msg::String>(
topic_name, qos_profile, publisher_options);
publish_timer_ = create_wall_timer(500ms, std::bind(&Talker::publish, this));
// If enabled, create timer to assert liveliness at the node level
if (assert_node_period != 0ms) {
assert_node_timer_ = create_wall_timer(
assert_node_period,
[this]() -> bool {
return this->assert_liveliness();
});
}
// If enabled, create timer to assert liveliness on the topic
if (assert_topic_period != 0ms) {
assert_topic_timer_ = create_wall_timer(
assert_topic_period,
[this]() -> bool {
return publisher_->assert_liveliness();
});
}
}

void
Talker::pause_for(std::chrono::milliseconds pause_length)
{
if (pause_timer_) {
// Already paused - ignoring.
return;
}
publish_timer_->cancel();
pause_timer_ = create_wall_timer(
pause_length,
[this]() {
// Publishing immediately on pause expiration and resuming regular interval.
publish();
publish_timer_->reset();
emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
pause_timer_ = nullptr;
});
}

void
Talker::publish()
{
std_msgs::msg::String msg;
msg.data = "Talker says " + std::to_string(publish_count_);
publish_count_ += 1;
publisher_->publish(msg);
if (stop_at_count_ > 0 && publish_count_ >= stop_at_count_) {
publish_timer_->cancel();
}
}

void
Talker::stop()
{
if (assert_node_timer_) {
assert_node_timer_->cancel();
assert_node_timer_.reset();
}
if (assert_topic_timer_) {
assert_topic_timer_->cancel();
assert_topic_timer_.reset();
}
publish_timer_->cancel();
}

emersonknapp marked this conversation as resolved.
Show resolved Hide resolved
Listener::Listener(
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
const rclcpp::SubscriptionOptions & subscription_options,
bool defer_subscribe)
: Node("listener"),
qos_profile_(qos_profile),
subscription_options_(subscription_options),
topic_name_(topic_name)
{
RCLCPP_INFO(get_logger(), "Listener starting up");
if (!defer_subscribe) {
start_listening();
}
}

void
Listener::start_listening()
{
if (!subscription_) {
subscription_ = create_subscription<std_msgs::msg::String>(
topic_name_,
qos_profile_,
[this](const typename std_msgs::msg::String::SharedPtr msg) -> void
{
RCLCPP_INFO(get_logger(), "Listener heard: [%s]", msg->data.c_str());
},
subscription_options_);
}
}
Loading