Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rclpy QoS Demos (Liveliness, Lifespan, Deadline) #338

Merged
merged 9 commits into from
May 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions quality_of_service_demo/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ The Publisher in this demo will pause publishing periodically, which will print
Run `quality_of_service_demo/deadline -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 2000`
* `ros2 run quality_of_service_demo_cpp deadline 600 --publish-for 5000 --pause-for 2000` _or_
* `ros2 run quality_of_service_demo_py deadline 600 --publish-for 5000 --pause-for 2000`
* The publisher will publish for 5 seconds, then pause for 2 - deadline events will start firing on both participants, until the publisher comes back.
* `ros2 run quality_of_service_demo deadline 600 --publish-for 5000 --pause-for 0`
* `ros2 run quality_of_service_demo_cpp deadline 600 --publish-for 5000 --pause-for 0` _or_
* `ros2 run quality_of_service_demo_py deadline 600 --publish-for 5000 --pause-for 0`
* The publisher doesn't actually pause, no deadline misses should occur.

## Lifespan
Expand All @@ -44,10 +46,12 @@ A Subscriber will start subscribing after a preset amount of time and may receiv
Run `lifespan -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo lifespan 1000 --publish-count 10 --subscribe-after 3000`
* `ros2 run quality_of_service_demo_cpp lifespan 1000 --publish-count 10 --subscribe-after 3000` _or_
* `ros2 run quality_of_service_demo_py lifespan 1000 --publish-count 10 --subscribe-after 3000`
* After a few seconds, you should see (approximately) messages 4-9 printed from the Subscriber.
* The first few messages, with 1 second lifespan, were gone by the time the Subscriber joined after 3 seconds.
* `ros2 run quality_of_service_demo lifespan 4000 --publish-count 10 --subscribe-after 3000`
* `ros2 run quality_of_service_demo_cpp lifespan 4000 --publish-count 10 --subscribe-after 3000` _or_
* `ros2 run quality_of_service_demo_py lifespan 4000 --publish-count 10 --subscribe-after 3000`
* After a few seconds, you should see all of the messages (0-9) printed from the Subscriber.
* All messages, with their 4 second lifespan, survived until the subscriber joined.

Expand All @@ -66,7 +70,9 @@ Therefore in the `MANUAL` policy types, you will see Liveliness events from both
Run `quality_of_service_demo/liveliness -h` for more usage information.

Examples:
* `ros2 run quality_of_service_demo liveliness 1000 --kill-publisher-after 2000`
* `ros2 run quality_of_service_demo_cpp liveliness 1000 --kill-publisher-after 2000` _or_
* `ros2 run quality_of_service_demo_py liveliness 1000 --kill-publisher-after 2000`
* After 2 seconds, the publisher will be killed, and the subscriber will receive a callback 1 second after that notifying it that the liveliness has changed.
* `ros2 run quality_of_service_demo liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE`
* `ros2 run quality_of_service_demo_cpp liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE` _or_
* `ros2 run quality_of_service_demo_py liveliness 250 --node-assert-period 0 --policy MANUAL_BY_NODE`
* With this configuration, the node never explicitly asserts its liveliness, but it publishes messages (implicitly asserting liveliness) less often (500ms) than the liveliness lease, so you will see alternating alive/not-alive messages as the publisher misses its lease and "becomes alive again" when it publishes, until the talker is stopped.
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.5)

project(quality_of_service_demo)
project(quality_of_service_demo_cpp)

# Default to C++14
if(NOT CMAKE_CXX_STANDARD)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>quality_of_service_demo</name>
<name>quality_of_service_demo_cpp</name>
<version>0.1.0</version>
<description>Demo applications for Quality of Service features</description>
<description>C++ Demo applications for Quality of Service features</description>

<maintainer email="ros-contributions@amazon.com">Amazon ROS Contributions</maintainer>

Expand Down
1 change: 1 addition & 0 deletions quality_of_service_demo/rclpy/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
25 changes: 25 additions & 0 deletions quality_of_service_demo/rclpy/package.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>quality_of_service_demo_py</name>
<version>0.1.0</version>
<description>Python Demo applications for Quality of Service features</description>

<maintainer email="ros-contributions@amazon.com">Amazon ROS Contributions</maintainer>

<license>Apache License 2.0</license>

<author>Emerson Knapp</author>

<exec_depend>rclpy</exec_depend>
<exec_depend>std_msgs</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>

<export>
<build_type>ament_python</build_type>
</export>
</package>
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from rclpy.node import Node
from std_msgs.msg import String


class Talker(Node):
def __init__(
self, topic_name, qos_profile, event_callbacks,
publish_count=0, assert_node_period=None, assert_topic_period=None
):
"""
Constructor.

@param topic_name: Topic to publish to.
@param qos_profile QoS profile for Publisher.
@param event_callbacks QoS Event callbacks for Publisher.
@param publish_count Number of messages to publish before stopping.
@param assert_node_period How often to manually assert Node liveliness.
@param asert_topic_period How often to manually assert Publisher liveliness.
"""
super().__init__('qos_talker')
self.get_logger().info('Talker starting up')
self.publisher = self.create_publisher(
String, topic_name, qos_profile,
event_callbacks=event_callbacks)
self.publish_timer = self.create_timer(0.5, self.publish)
if assert_node_period:
self.assert_node_timer = self.create_timer(
assert_node_period, self.assert_liveliness)
else:
self.assert_node_timer = None
if assert_topic_period:
self.assert_topic_timer = self.create_timer(
assert_topic_period, self.publisher.assert_liveliness)
else:
self.assert_topic_timer = None
self.pause_timer = None
self.publish_count = 0
self.stop_at_count = publish_count

def pause_for(self, seconds):
"""
Stop publishing for a while.

Stops the Publisher for the specified amount of time.
A message will be published immediately on the expiration of pause_duration.
The regular publishing interval will resume at that point.
If publishing is already paused, this call will be ignored.
The remaining pause duration will not be affected.
@param seconds Amount of time to pause for.
"""
if self.pause_timer:
return
self.publish_timer.cancel()
self.pause_timer = self.create_timer(seconds, self._pause_expired)

def _pause_expired(self):
self.publish()
self.publish_timer.reset()
self.destroy_timer(self.pause_timer)
self.pause_timer = None

def publish(self):
"""
Publish a single message.

Counts toward total message count that will be published.
"""
message = String()
message.data = 'Talker says {}'.format(self.publish_count)
self.get_logger().info('Publishing: {}'.format(message.data))
self.publish_count += 1
if self.stop_at_count > 0 and self.publish_count >= self.stop_at_count:
self.publish_timer.cancel()
self.publisher.publish(message)

def stop(self):
"""Cancel publishing and any manual liveliness assertions."""
if self.assert_node_timer:
self.assert_node_timer.cancel()
self.assert_node_timer = None
if self.assert_topic_timer:
self.assert_topic_timer.cancel()
self.publish_timer.cancel()
self.assert_topic_timer = None


class Listener(Node):

def __init__(self, topic_name, qos_profile, event_callbacks, defer_subscribe=False):
"""
Constructor.

@param topic_name Topic to subscribe to.
@param qos_profile QoS profile for Subscription.
@param event_callbacks QoS event callbacks for Subscription.
@param defer_subscribe Don't create Subscription until user calls start_listening()
"""
super().__init__('qos_listener')
self.subscription = None
self.topic_name = topic_name
self.qos_profile = qos_profile
self.event_callbacks = event_callbacks
if not defer_subscribe:
self.start_listening()

def start_listening(self):
"""
Instantiate Subscription.

Does nothing if it has already been called.
"""
if not self.subscription:
self.subscription = self.create_subscription(
String, self.topic_name, self._message_callback,
qos_or_depth=self.qos_profile,
event_callbacks=self.event_callbacks)
self.get_logger().info('Subscription created')

def _message_callback(self, message):
self.get_logger().info('I heard: {}'.format(message.data))
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse

from quality_of_service_demo_py.common_nodes import Listener
from quality_of_service_demo_py.common_nodes import Talker

import rclpy
from rclpy.duration import Duration
from rclpy.executors import SingleThreadedExecutor
from rclpy.logging import get_logger
from rclpy.qos import QoSProfile
from rclpy.qos_event import PublisherEventCallbacks
from rclpy.qos_event import SubscriptionEventCallbacks


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'deadline', type=int,
help='Duration in positive integer milliseconds of the Deadline QoS setting.')
parser.add_argument(
'--publish-for', type=int, default=5000,
help='Duration in positive integer milliseconds to publish until pausing the talker.')
parser.add_argument(
'--pause-for', type=int, default=1000,
help='Duration in positive integer milliseconds to pause the talker before beginning '
'to publish again.')
return parser.parse_args()


def main(args=None):
parsed_args = parse_args()
rclpy.init(args=args)

topic = 'qos_deadline_chatter'
deadline = Duration(seconds=parsed_args.deadline / 1000.0)

qos_profile = QoSProfile(
depth=10,
deadline=deadline)

subscription_callbacks = SubscriptionEventCallbacks(
deadline=lambda event: get_logger('Listener').info(str(event)))
listener = Listener(topic, qos_profile, event_callbacks=subscription_callbacks)

publisher_callbacks = PublisherEventCallbacks(
deadline=lambda event: get_logger('Talker').info(str(event)))
talker = Talker(topic, qos_profile, event_callbacks=publisher_callbacks)

publish_for_seconds = parsed_args.publish_for / 1000.0
pause_for_seconds = parsed_args.pause_for / 1000.0
pause_timer = talker.create_timer( # noqa: F841
publish_for_seconds,
lambda: talker.pause_for(pause_for_seconds))

executor = SingleThreadedExecutor()
executor.add_node(listener)
executor.add_node(talker)
executor.spin()

rclpy.shutdown()


if __name__ == '__main__':
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse

from quality_of_service_demo_py.common_nodes import Listener
from quality_of_service_demo_py.common_nodes import Talker

import rclpy
from rclpy.duration import Duration
from rclpy.executors import SingleThreadedExecutor
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSProfile


def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
'lifespan', type=int,
help='Duration in positive integer milliseconds of the Lifespan QoS setting.')
parser.add_argument(
'--history', type=int, default=10,
help="The depth of the Publisher's history queue - "
'the maximum number of messages it will store for late-joining subscriptions.')
parser.add_argument(
'--publish-count', type=int, default=10,
help='How many messages to publish before stopping.')
parser.add_argument(
'--subscribe-after', type=int, default=2500,
help='The Subscriber will be created this long (in positive integer milliseconds) '
'after application startup.')
return parser.parse_args()


def main(args=None):
parsed_args = parse_args()
rclpy.init(args=args)

topic = 'qos_lifespan_chatter'
lifespan = Duration(seconds=parsed_args.lifespan / 1000.0)

qos_profile = QoSProfile(
depth=parsed_args.history,
durability=QoSDurabilityPolicy.RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL,
lifespan=lifespan)

listener = Listener(
topic, qos_profile, event_callbacks=None, defer_subscribe=True)
talker = Talker(
topic, qos_profile, event_callbacks=None, publish_count=parsed_args.publish_count)
subscribe_timer = listener.create_timer( # noqa: F841
parsed_args.subscribe_after / 1000.0,
lambda: listener.start_listening())

executor = SingleThreadedExecutor()
executor.add_node(listener)
executor.add_node(talker)
executor.spin()

rclpy.shutdown()


if __name__ == '__main__':
main()
Loading