From 07c05da93761290a2b9e5a03ab733e094b63ebdd Mon Sep 17 00:00:00 2001 From: Emerson Knapp <537409+emersonknapp@users.noreply.github.com> Date: Wed, 7 Apr 2021 17:27:45 -0700 Subject: [PATCH] /clock publisher in Player (#695) * Add /clock publishing to Player Signed-off-by: Emerson Knapp --- ros2bag/ros2bag/verb/play.py | 12 ++ rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + rosbag2_transport/CMakeLists.txt | 6 + .../rosbag2_transport/play_options.hpp | 4 + .../src/rosbag2_transport/player.cpp | 16 +++ .../src/rosbag2_transport/player.hpp | 4 + .../rosbag2_transport/rosbag2_transport.cpp | 16 ++- .../rosbag2_play_test_fixture.hpp | 1 + .../test_play_publish_clock.cpp | 115 ++++++++++++++++++ 9 files changed, 173 insertions(+), 2 deletions(-) create mode 100644 rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 2198987e1e..1d71974824 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -28,6 +28,13 @@ import yaml +def positive_float(arg: str) -> float: + value = float(arg) + if value <= 0: + raise ValueError(f'Value {value} is less than or equal to zero.') + return value + + class PlayVerb(VerbExtension): """Play back ROS data from a bag.""" @@ -71,6 +78,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 ' pragmas: [\"\" = ]' 'Note that applicable settings are limited to read-only for ros2 bag play.' 'For a list of sqlite3 settings, refer to sqlite3 documentation') + parser.add_argument( + '--clock', type=positive_float, nargs='?', const=40, default=0, + help='Publish to /clock at a specific frequency in Hz, to act as a ROS Time Source. ' + 'Value must be positive. Defaults to not publishing.') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -104,6 +115,7 @@ def main(self, *, args): # noqa: D102 play_options.topic_qos_profile_overrides = qos_profile_overrides play_options.loop = args.loop play_options.topic_remapping_options = topic_remapping + play_options.clock_publish_frequency = args.clock player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 264a1157dd..4c6bb0a202 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -190,6 +190,7 @@ PYBIND11_MODULE(_transport, m) { &PlayOptions::setTopicQoSProfileOverrides) .def_readwrite("loop", &PlayOptions::loop) .def_readwrite("topic_remapping_options", &PlayOptions::topic_remapping_options) + .def_readwrite("clock_publish_frequency", &PlayOptions::clock_publish_frequency) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index d8980cbc27..c2536bb69c 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -147,6 +147,12 @@ function(create_tests_for_rmw_implementation) ${SKIP_TEST} LINK_LIBS rosbag2_transport AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common) + + rosbag2_transport_add_gmock(test_play_publish_clock + test/rosbag2_transport/test_play_publish_clock.cpp + ${SKIP_TEST} + LINK_LIBS rosbag2_transport + AMENT_DEPS rosbag2_cpp rosbag2_storage test_msgs rosbag2_test_common) endfunction() if(BUILD_TESTING) diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index b616a76591..1f551621d9 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -40,6 +40,10 @@ struct PlayOptions std::unordered_map topic_qos_profile_overrides = {}; bool loop = false; std::vector topic_remapping_options = {}; + + // Rate in Hz at which to publish to /clock. + // 0 (or negative) means that no publisher will be created + double clock_publish_frequency = 0.0; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index e14b4cda21..e163c9257f 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -222,6 +222,22 @@ void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value { double rate = options.rate > 0.0 ? options.rate : 1.0; clock_ = std::make_unique(starting_time, rate); + + // Create /clock publisher + if (options.clock_publish_frequency > 0.f) { + const auto publish_period = std::chrono::nanoseconds( + static_cast(RCUTILS_S_TO_NS(1) / options.clock_publish_frequency)); + // NOTE: PlayerClock does not own this publisher because rosbag2_cpp + // should not own transport-based functionality + clock_publisher_ = transport_node_->create_publisher( + "/clock", rclcpp::ClockQoS()); + clock_publish_timer_ = transport_node_->create_wall_timer( + publish_period, [this]() { + auto msg = rosgraph_msgs::msg::Clock(); + msg.clock = rclcpp::Time(clock_->now()); + clock_publisher_->publish(msg); + }); + } } } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.hpp b/rosbag2_transport/src/rosbag2_transport/player.hpp index 8920b8dfa3..180576522a 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.hpp +++ b/rosbag2_transport/src/rosbag2_transport/player.hpp @@ -25,11 +25,13 @@ #include "moodycamel/readerwriterqueue.h" #include "rclcpp/node.hpp" +#include "rclcpp/publisher.hpp" #include "rclcpp/qos.hpp" #include "rosbag2_cpp/clocks/player_clock.hpp" #include "rosbag2_storage/serialized_bag_message.hpp" #include "rosbag2_transport/play_options.hpp" +#include "rosgraph_msgs/msg/clock.hpp" namespace rosbag2_cpp { @@ -70,6 +72,8 @@ class Player std::unordered_map> publishers_; std::unordered_map topic_qos_profile_overrides_; std::unique_ptr clock_; + rclcpp::Publisher::SharedPtr clock_publisher_; + std::shared_ptr clock_publish_timer_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp index b48ec92c53..928565cfac 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp @@ -23,6 +23,7 @@ #include "rclcpp/rclcpp.hpp" +#include "rcpputils/scope_exit.hpp" #include "rcutils/time.h" #include "rosbag2_cpp/reader.hpp" @@ -89,9 +90,20 @@ std::shared_ptr Rosbag2Transport::setup_node( void Rosbag2Transport::play( const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options) { + auto transport_node = + setup_node(play_options.node_prefix, play_options.topic_remapping_options); + rclcpp::executors::SingleThreadedExecutor exec; + exec.add_node(transport_node); + auto spin_thread = std::thread( + [&exec]() { + exec.spin(); + }); + auto exit = rcpputils::scope_exit( + [&exec, &spin_thread]() { + exec.cancel(); + spin_thread.join(); + }); try { - auto transport_node = - setup_node(play_options.node_prefix, play_options.topic_remapping_options); Player player(reader_, transport_node); do { reader_->open(storage_options, {"", rmw_get_serialization_format()}); diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp index b33d3f2267..c68ab7b01f 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_play_test_fixture.hpp @@ -17,6 +17,7 @@ #include +#include "rosbag2_test_common/subscription_manager.hpp" #include "rosbag2_transport_test_fixture.hpp" class RosBag2PlayTestFixture : public Rosbag2TransportTestFixture diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp new file mode 100644 index 0000000000..9b6d0eb1dd --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp @@ -0,0 +1,115 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rosbag2_transport/rosbag2_transport.hpp" +#include "rosgraph_msgs/msg/clock.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" + +using namespace ::testing; // NOLINT + +namespace +{ +rcutils_duration_value_t period_for_frequency(double frequency) +{ + return static_cast(RCUTILS_S_TO_NS(1) / frequency); +} +} // namespace + +class ClockPublishFixture : public RosBag2PlayTestFixture +{ +public: + ClockPublishFixture() + : RosBag2PlayTestFixture() + { + // Fake bag setup + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}, + }; + + std::vector> messages; + for (size_t i = 0; i < messages_to_play_; i++) { + auto message = get_messages_basic_types()[0]; + message->int32_value = static_cast(i); + messages.push_back( + serialize_test_message("topic1", milliseconds_between_messages_ * i, message)); + } + + // Player setup + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + reader_ = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription( + "/clock", expected_clock_messages_, rclcpp::ClockQoS()); + } + + void run_test() + { + auto await_received_messages = sub_->spin_subscriptions(); + rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_); + rosbag2_transport.play(storage_options_, play_options_); + await_received_messages.get(); + + // Check that we got enough messages + auto received_clock = sub_->get_received_messages("/clock"); + EXPECT_THAT(received_clock, SizeIs(Ge(expected_clock_messages_))); + + // Check time deltas between messages + const auto expect_clock_delta = + period_for_frequency(play_options_.clock_publish_frequency) * play_options_.rate; + const auto allowed_error = static_cast( + expect_clock_delta * error_tolerance_); + // On Windows, publishing seems to take time to "warm up", ignore the first couple messages + const auto start_message = 2; + for (size_t i = start_message; i < expected_clock_messages_ - 1; i++) { + auto current = rclcpp::Time(received_clock[i]->clock).nanoseconds(); + auto next = rclcpp::Time(received_clock[i + 1]->clock).nanoseconds(); + auto delta = next - current; + auto error = std::abs(delta - expect_clock_delta); + EXPECT_LE(error, allowed_error) << "Message was too far from next: " << i; + } + } + + // Number of bag messages to publish + size_t messages_to_play_ = 10; + const int64_t milliseconds_between_messages_ = 50; + + // Amount of error allowed between expected and actual clock deltas (expected to be much smaller) + const double error_tolerance_ = 0.3; + + // Wait for just a few clock messages, we're checking the time between them, not the total count + size_t expected_clock_messages_ = 6; +}; + +TEST_F(ClockPublishFixture, clock_is_published_at_chosen_frequency) +{ + play_options_.clock_publish_frequency = 20; + run_test(); +} + +TEST_F(ClockPublishFixture, clock_respects_playback_rate) +{ + play_options_.clock_publish_frequency = 20; + play_options_.rate = 0.5; + messages_to_play_ = 5; + run_test(); +}