From 5575a836ba4dbc8220c5f28512a460ad2f0272e3 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Tue, 29 Mar 2022 09:40:20 +0900 Subject: [PATCH 1/4] Add CLI verb for burst mode Signed-off-by: Geoffrey Biggs --- ros2bag/ros2bag/verb/burst.py | 113 ++++++++++++++++++ ros2bag/setup.py | 1 + ros2bag/test/test_burst.py | 69 +++++++++++ rosbag2_interfaces/srv/Burst.srv | 2 +- rosbag2_py/src/rosbag2_py/_transport.cpp | 22 ++++ .../include/rosbag2_transport/player.hpp | 3 +- .../src/rosbag2_transport/player.cpp | 7 +- 7 files changed, 214 insertions(+), 3 deletions(-) create mode 100644 ros2bag/ros2bag/verb/burst.py create mode 100644 ros2bag/test/test_burst.py diff --git a/ros2bag/ros2bag/verb/burst.py b/ros2bag/ros2bag/verb/burst.py new file mode 100644 index 0000000000..9a27605e86 --- /dev/null +++ b/ros2bag/ros2bag/verb/burst.py @@ -0,0 +1,113 @@ +# Copyright 2022 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from argparse import FileType + +from rclpy.qos import InvalidQoSProfileException +from ros2bag.api import check_not_negative_int +from ros2bag.api import check_path_exists +from ros2bag.api import check_positive_float +from ros2bag.api import convert_yaml_to_qos_profile +from ros2bag.api import print_error +from ros2bag.verb import VerbExtension +from ros2cli.node import NODE_NAME_PREFIX +from rosbag2_py import get_registered_readers +from rosbag2_py import Player +from rosbag2_py import PlayOptions +from rosbag2_py import StorageOptions +import yaml + +class BurstVerb(VerbExtension): + """Burst data from a bag.""" + + def add_arguments(self, parser, cli_name): # noqa: D102 + reader_choices = get_registered_readers() + + parser.add_argument( + 'bag_file', type=check_path_exists, help='bag file to replay') + parser.add_argument( + '-s', '--storage', default='', choices=reader_choices, + help='Storage implementation of bag. By default tries to determine from metadata.') + parser.add_argument( + '--read-ahead-queue-size', type=int, default=1000, + help='size of message queue rosbag tries to hold in memory to help deterministic ' + 'playback. Larger size will result in larger memory needs but might prevent ' + 'delay of message playback.') + parser.add_argument( + '--topics', type=str, default=[], nargs='+', + help='topics to replay, separated by space. If none specified, all topics will be ' + 'replayed.') + parser.add_argument( + '--qos-profile-overrides-path', type=FileType('r'), + help='Path to a yaml file defining overrides of the QoS profile for specific topics.') + parser.add_argument( + '--remap', '-m', default='', nargs='+', + help='list of topics to be remapped: in the form ' + '"old_topic1:=new_topic1 old_topic2:=new_topic2 etc." ') + parser.add_argument( + '--storage-config-file', type=FileType('r'), + help='Path to a yaml file defining storage specific configurations. ' + 'For the default storage plugin settings are specified through syntax:' + 'read:' + ' pragmas: [\"\" = ]' + 'Note that applicable settings are limited to read-only for ros2 bag play.' + 'For a list of sqlite3 settings, refer to sqlite3 documentation') + parser.add_argument( + '--start-offset', type=check_positive_float, default=0.0, + help='Start the playback player this many seconds into the bag file.') + parser.add_argument( + '-n', '--num-messages', type=check_not_negative_int, default=0, + help='Burst the specified number of messages, then pause.') + + def main(self, *, args): # noqa: D102 + qos_profile_overrides = {} # Specify a valid default + if args.qos_profile_overrides_path: + qos_profile_dict = yaml.safe_load(args.qos_profile_overrides_path) + try: + qos_profile_overrides = convert_yaml_to_qos_profile( + qos_profile_dict) + except (InvalidQoSProfileException, ValueError) as e: + return print_error(str(e)) + + storage_config_file = '' + if args.storage_config_file: + storage_config_file = args.storage_config_file.name + + topic_remapping = ['--ros-args'] + for remap_rule in args.remap: + topic_remapping.append('--remap') + topic_remapping.append(remap_rule) + + storage_options = StorageOptions( + uri=args.bag_file, + storage_id=args.storage, + storage_config_uri=storage_config_file, + ) + play_options = PlayOptions() + play_options.read_ahead_queue_size = args.read_ahead_queue_size + play_options.node_prefix = NODE_NAME_PREFIX + play_options.rate = 1.0 + play_options.topics_to_filter = args.topics + play_options.topic_qos_profile_overrides = qos_profile_overrides + play_options.loop = False + play_options.topic_remapping_options = topic_remapping + play_options.clock_publish_frequency = 0 + play_options.delay = 0.0 + play_options.disable_keyboard_controls = False # Give the user control + play_options.start_paused = True # Important for allowing the burst + play_options.start_offset = args.start_offset + play_options.wait_acked_timeout = -1 + + player = Player() + player.burst(storage_options, play_options, args.num_messages) diff --git a/ros2bag/setup.py b/ros2bag/setup.py index c13c97b2dc..e7e80be3e1 100644 --- a/ros2bag/setup.py +++ b/ros2bag/setup.py @@ -42,6 +42,7 @@ 'info = ros2bag.verb.info:InfoVerb', 'list = ros2bag.verb.list:ListVerb', 'play = ros2bag.verb.play:PlayVerb', + 'burst = ros2bag.verb.burst:BurstVerb', 'record = ros2bag.verb.record:RecordVerb', 'reindex = ros2bag.verb.reindex:ReindexVerb' ], diff --git a/ros2bag/test/test_burst.py b/ros2bag/test/test_burst.py new file mode 100644 index 0000000000..6d7a791cef --- /dev/null +++ b/ros2bag/test/test_burst.py @@ -0,0 +1,69 @@ +# Copyright 2022 Open Source Robotics Foundation, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +from pathlib import Path +import re +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess +import launch_testing +import launch_testing.actions +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools + +import pytest + + +RESOURCES_PATH = Path(__file__).parent / 'resources' +TEST_NODE = 'ros2bag_burst_test_node' +TEST_NAMESPACE = 'ros2bag_record_qos_profile' +ERROR_STRING = r'\[ERROR] \[ros2bag]:' + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(): + return LaunchDescription([launch_testing.actions.ReadyToTest()]) + + +class TestRos2BagBurst(unittest.TestCase): + @classmethod + def setUpClass(cls, launch_service, proc_info, proc_output): + @contextlib.contextmanager + def launch_bag_command(self, arguments, **kwargs): + pkg_command_action = ExecuteProcess( + cmd=['ros2', 'bag', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2bag-cli', + output='screen', + **kwargs + ) + with launch_testing.tools.launch_process( + launch_service, pkg_command_action, proc_info, proc_output + ) as pkg_command: + yield pkg_command + cls.launch_bag_command = launch_bag_command + + def test_burst(self): + """Test the burst mode of playback.""" + bag_path = RESOURCES_PATH / 'empty_bag' + arguments = ['burst', bag_path.as_posix()] + with self.launch_bag_command(arguments=arguments) as bag_command: + bag_command.wait_for_shutdown(timeout=5) + expected_string_regex = re.compile(ERROR_STRING) + matches = expected_string_regex.search(bag_command.output) + assert not matches, 'ros2bag CLI did not produce the expected output' diff --git a/rosbag2_interfaces/srv/Burst.srv b/rosbag2_interfaces/srv/Burst.srv index e21bfcb359..d3a24f5091 100644 --- a/rosbag2_interfaces/srv/Burst.srv +++ b/rosbag2_interfaces/srv/Burst.srv @@ -1,3 +1,3 @@ -uint64 num_messages # Number of messages to burst +uint64 num_messages # Number of messages to burst; zero to burst the whole bag --- uint64 actually_burst # Number of messages actually burst diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index c11ca23929..e61a98eb60 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -137,6 +137,27 @@ class Player exec.cancel(); spin_thread.join(); } + + void burst( + const rosbag2_storage::StorageOptions & storage_options, + PlayOptions & play_options, + size_t num_messages) + { + auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); + auto player = std::make_shared( + std::move(reader), storage_options, play_options); + + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(player); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + player->burst(num_messages); + + exec.cancel(); + spin_thread.join(); + } }; class Recorder @@ -276,6 +297,7 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) .def("play", &rosbag2_py::Player::play) + .def("burst", &rosbag2_py::Player::burst) ; py::class_(m, "Recorder") diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index de9f6bf7a5..a4ee000609 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -131,7 +131,8 @@ class Player : public rclcpp::Node virtual bool play_next(); /// \brief Burst the next \p num_messages messages from the queue when paused. - /// \param num_messages The number of messages to burst from the queue. + /// \param num_messages The number of messages to burst from the queue. Specifying zero means no + /// limit (i.e. burst the entire bag). /// \details This call will play the next \p num_messages from the queue in burst mode. The /// timing of the messages is ignored. /// \note If internal player queue is starving and storage has not been completely loaded, diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index c6308b996a..97ed3973dc 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -351,9 +351,14 @@ bool Player::play_next() size_t Player::burst(const size_t num_messages) { + if (!clock_->is_paused()) { + RCLCPP_WARN_STREAM(get_logger(), "Burst can only be used when in the paused state."); + return 0; + } + uint64_t messages_played = 0; - for (auto ii = 0u; ii < num_messages; ++ii) { + for (auto ii = 0u; ii < num_messages || num_messages == 0; ++ii) { if (play_next()) { ++messages_played; } else { From 18bdb23275778117afb525ef3bcb0c84982b2e40 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Tue, 29 Mar 2022 11:23:47 +0900 Subject: [PATCH 2/4] Play in a thread to get the buffers filled, etc. Signed-off-by: Geoffrey Biggs --- rosbag2_py/src/rosbag2_py/_transport.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index e61a98eb60..a78c763752 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -153,10 +153,15 @@ class Player [&exec]() { exec.spin(); }); + auto play_thread = std::thread( + [&player]() { + player->play(); + }); player->burst(num_messages); exec.cancel(); spin_thread.join(); + play_thread.join(); } }; From c1f02a67074858647b95f4205c445ecec53847cf Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Wed, 25 May 2022 09:35:42 +0900 Subject: [PATCH 3/4] Alphabetise verb list Signed-off-by: Geoffrey Biggs --- ros2bag/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ros2bag/setup.py b/ros2bag/setup.py index e7e80be3e1..f8731c2ff7 100644 --- a/ros2bag/setup.py +++ b/ros2bag/setup.py @@ -38,11 +38,11 @@ 'ros2bag.verb = ros2bag.verb:VerbExtension', ], 'ros2bag.verb': [ + 'burst = ros2bag.verb.burst:BurstVerb', 'convert = ros2bag.verb.convert:ConvertVerb', 'info = ros2bag.verb.info:InfoVerb', 'list = ros2bag.verb.list:ListVerb', 'play = ros2bag.verb.play:PlayVerb', - 'burst = ros2bag.verb.burst:BurstVerb', 'record = ros2bag.verb.record:RecordVerb', 'reindex = ros2bag.verb.reindex:ReindexVerb' ], From 9ae21f157012f2d4bc9131db2c12669f1e9619a3 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Thu, 2 Jun 2022 09:32:20 +0900 Subject: [PATCH 4/4] Fix linter errors Signed-off-by: Geoffrey Biggs --- ros2bag/ros2bag/verb/burst.py | 1 + ros2bag/test/test_burst.py | 1 + 2 files changed, 2 insertions(+) diff --git a/ros2bag/ros2bag/verb/burst.py b/ros2bag/ros2bag/verb/burst.py index 9a27605e86..3e675b0545 100644 --- a/ros2bag/ros2bag/verb/burst.py +++ b/ros2bag/ros2bag/verb/burst.py @@ -28,6 +28,7 @@ from rosbag2_py import StorageOptions import yaml + class BurstVerb(VerbExtension): """Burst data from a bag.""" diff --git a/ros2bag/test/test_burst.py b/ros2bag/test/test_burst.py index 6d7a791cef..be04df4ad4 100644 --- a/ros2bag/test/test_burst.py +++ b/ros2bag/test/test_burst.py @@ -41,6 +41,7 @@ def generate_test_description(): class TestRos2BagBurst(unittest.TestCase): + @classmethod def setUpClass(cls, launch_service, proc_info, proc_output): @contextlib.contextmanager