Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add CLI verb for burst mode of playback #980

Merged
merged 5 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions ros2bag/ros2bag/verb/burst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from argparse import FileType

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_path_exists
from ros2bag.api import check_positive_float
from ros2bag.api import convert_yaml_to_qos_profile
from ros2bag.api import print_error
from ros2bag.verb import VerbExtension
from ros2cli.node import NODE_NAME_PREFIX
from rosbag2_py import get_registered_readers
from rosbag2_py import Player
from rosbag2_py import PlayOptions
from rosbag2_py import StorageOptions
import yaml


class BurstVerb(VerbExtension):
"""Burst data from a bag."""

def add_arguments(self, parser, cli_name): # noqa: D102
reader_choices = get_registered_readers()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking - mildly wondering if this could be refactored out into common code, as a lot of it looks like copy-paste from play.py

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it could, but I think I'd prefer to do that in a separate PR.


parser.add_argument(
'bag_file', type=check_path_exists, help='bag file to replay')
parser.add_argument(
'-s', '--storage', default='', choices=reader_choices,
help='Storage implementation of bag. By default tries to determine from metadata.')
parser.add_argument(
'--read-ahead-queue-size', type=int, default=1000,
help='size of message queue rosbag tries to hold in memory to help deterministic '
'playback. Larger size will result in larger memory needs but might prevent '
'delay of message playback.')
parser.add_argument(
'--topics', type=str, default=[], nargs='+',
help='topics to replay, separated by space. If none specified, all topics will be '
'replayed.')
parser.add_argument(
'--qos-profile-overrides-path', type=FileType('r'),
help='Path to a yaml file defining overrides of the QoS profile for specific topics.')
parser.add_argument(
'--remap', '-m', default='', nargs='+',
help='list of topics to be remapped: in the form '
'"old_topic1:=new_topic1 old_topic2:=new_topic2 etc." ')
parser.add_argument(
'--storage-config-file', type=FileType('r'),
help='Path to a yaml file defining storage specific configurations. '
'For the default storage plugin settings are specified through syntax:'
'read:'
' pragmas: [\"<setting_name>\" = <setting_value>]'
'Note that applicable settings are limited to read-only for ros2 bag play.'
'For a list of sqlite3 settings, refer to sqlite3 documentation')
parser.add_argument(
'--start-offset', type=check_positive_float, default=0.0,
help='Start the playback player this many seconds into the bag file.')
parser.add_argument(
'-n', '--num-messages', type=check_not_negative_int, default=0,
help='Burst the specified number of messages, then pause.')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
if args.qos_profile_overrides_path:
qos_profile_dict = yaml.safe_load(args.qos_profile_overrides_path)
try:
qos_profile_overrides = convert_yaml_to_qos_profile(
qos_profile_dict)
except (InvalidQoSProfileException, ValueError) as e:
return print_error(str(e))

storage_config_file = ''
if args.storage_config_file:
storage_config_file = args.storage_config_file.name

topic_remapping = ['--ros-args']
for remap_rule in args.remap:
topic_remapping.append('--remap')
topic_remapping.append(remap_rule)

storage_options = StorageOptions(
uri=args.bag_file,
storage_id=args.storage,
storage_config_uri=storage_config_file,
)
play_options = PlayOptions()
play_options.read_ahead_queue_size = args.read_ahead_queue_size
play_options.node_prefix = NODE_NAME_PREFIX
play_options.rate = 1.0
play_options.topics_to_filter = args.topics
play_options.topic_qos_profile_overrides = qos_profile_overrides
play_options.loop = False
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = 0
play_options.delay = 0.0
play_options.disable_keyboard_controls = False # Give the user control
play_options.start_paused = True # Important for allowing the burst
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = -1

player = Player()
player.burst(storage_options, play_options, args.num_messages)
1 change: 1 addition & 0 deletions ros2bag/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
'ros2bag.verb = ros2bag.verb:VerbExtension',
],
'ros2bag.verb': [
'burst = ros2bag.verb.burst:BurstVerb',
'convert = ros2bag.verb.convert:ConvertVerb',
'info = ros2bag.verb.info:InfoVerb',
'list = ros2bag.verb.list:ListVerb',
Expand Down
70 changes: 70 additions & 0 deletions ros2bag/test/test_burst.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2022 Open Source Robotics Foundation, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import contextlib
from pathlib import Path
import re
import unittest

from launch import LaunchDescription
from launch.actions import ExecuteProcess
import launch_testing
import launch_testing.actions
import launch_testing.asserts
import launch_testing.markers
import launch_testing.tools

import pytest


RESOURCES_PATH = Path(__file__).parent / 'resources'
TEST_NODE = 'ros2bag_burst_test_node'
TEST_NAMESPACE = 'ros2bag_record_qos_profile'
ERROR_STRING = r'\[ERROR] \[ros2bag]:'


@pytest.mark.rostest
@launch_testing.markers.keep_alive
def generate_test_description():
return LaunchDescription([launch_testing.actions.ReadyToTest()])


class TestRos2BagBurst(unittest.TestCase):

@classmethod
def setUpClass(cls, launch_service, proc_info, proc_output):
@contextlib.contextmanager
def launch_bag_command(self, arguments, **kwargs):
pkg_command_action = ExecuteProcess(
cmd=['ros2', 'bag', *arguments],
additional_env={'PYTHONUNBUFFERED': '1'},
name='ros2bag-cli',
output='screen',
**kwargs
)
with launch_testing.tools.launch_process(
launch_service, pkg_command_action, proc_info, proc_output
) as pkg_command:
yield pkg_command
cls.launch_bag_command = launch_bag_command

def test_burst(self):
"""Test the burst mode of playback."""
bag_path = RESOURCES_PATH / 'empty_bag'
arguments = ['burst', bag_path.as_posix()]
with self.launch_bag_command(arguments=arguments) as bag_command:
bag_command.wait_for_shutdown(timeout=5)
expected_string_regex = re.compile(ERROR_STRING)
matches = expected_string_regex.search(bag_command.output)
assert not matches, 'ros2bag CLI did not produce the expected output'
2 changes: 1 addition & 1 deletion rosbag2_interfaces/srv/Burst.srv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
uint64 num_messages # Number of messages to burst
uint64 num_messages # Number of messages to burst; zero to burst the whole bag
---
uint64 actually_burst # Number of messages actually burst
27 changes: 27 additions & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,32 @@ class Player
exec.cancel();
spin_thread.join();
}

void burst(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
size_t num_messages)
{
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(reader), storage_options, play_options);

rclcpp::executors::SingleThreadedExecutor exec;
exec.add_node(player);
auto spin_thread = std::thread(
[&exec]() {
exec.spin();
});
auto play_thread = std::thread(
[&player]() {
player->play();
});
player->burst(num_messages);

exec.cancel();
spin_thread.join();
play_thread.join();
}
};

class Recorder
Expand Down Expand Up @@ -294,6 +320,7 @@ PYBIND11_MODULE(_transport, m) {
py::class_<rosbag2_py::Player>(m, "Player")
.def(py::init())
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
.def("burst", &rosbag2_py::Player::burst)
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
Expand Down
3 changes: 2 additions & 1 deletion rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ class Player : public rclcpp::Node
virtual bool play_next();

/// \brief Burst the next \p num_messages messages from the queue when paused.
/// \param num_messages The number of messages to burst from the queue.
/// \param num_messages The number of messages to burst from the queue. Specifying zero means no
/// limit (i.e. burst the entire bag).
/// \details This call will play the next \p num_messages from the queue in burst mode. The
/// timing of the messages is ignored.
/// \note If internal player queue is starving and storage has not been completely loaded,
Expand Down
7 changes: 6 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,14 @@ bool Player::play_next()

size_t Player::burst(const size_t num_messages)
{
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(get_logger(), "Burst can only be used when in the paused state.");
return 0;
}

uint64_t messages_played = 0;

for (auto ii = 0u; ii < num_messages; ++ii) {
for (auto ii = 0u; ii < num_messages || num_messages == 0; ++ii) {
if (play_next()) {
++messages_played;
} else {
Expand Down