diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 9f8a4f1791..71ffd23a05 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -85,6 +85,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '-d', '--delay', type=positive_float, default=0.0, help='Sleep duration before play (each loop), in seconds. Negative durations invalid.') + parser.add_argument( + '--playback_duration', type=float, default=-1.0, + help='Playback duration, in seconds. Negative durations mark an infinite playback. ' + 'Default is -1.0.') parser.add_argument( '--disable-keyboard-controls', action='store_true', help='disables keyboard controls for playback') @@ -144,6 +148,7 @@ def main(self, *, args): # noqa: D102 play_options.topic_remapping_options = topic_remapping play_options.clock_publish_frequency = args.clock play_options.delay = args.delay + play_options.playback_duration = args.playback_duration play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index f1e7ba34e2..5ce9a1d277 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -16,7 +16,7 @@ rosidl_generate_interfaces(${PROJECT_NAME} "srv/GetRate.srv" "srv/IsPaused.srv" "srv/Pause.srv" - "srv/PlayFor.srv" + "srv/Play.srv" "srv/PlayNext.srv" "srv/Resume.srv" "srv/Seek.srv" diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv new file mode 100644 index 0000000000..c9e7af80f2 --- /dev/null +++ b/rosbag2_interfaces/srv/Play.srv @@ -0,0 +1,6 @@ +# See rosbag2_transport::PlayOptions::start_time +builtin_interfaces/Time start_offset +# See rosbag2_transport::PlayOptions::playback_duration +builtin_interfaces/Duration playback_duration +--- +bool success # return true diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index a6e45e3af8..da4393cb9d 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -69,6 +69,17 @@ struct OptionsWrapper : public T static_cast(RCUTILS_S_TO_NS(delay))); } + double getPlaybackDuration() const + { + return RCUTILS_NS_TO_S(static_cast(this->playback_duration.nanoseconds())); + } + + void setPlaybackDuration(double playback_duration) + { + this->playback_duration = rclcpp::Duration::from_nanoseconds( + static_cast(RCUTILS_S_TO_NS(playback_duration))); + } + double getDelay() const { return RCUTILS_NS_TO_S(static_cast(this->delay.nanoseconds())); @@ -242,6 +253,10 @@ PYBIND11_MODULE(_transport, m) { "delay", &PlayOptions::getDelay, &PlayOptions::setDelay) + .def_property( + "playback_duration", + &PlayOptions::getPlaybackDuration, + &PlayOptions::setPlaybackDuration) .def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls) .def_readwrite("start_paused", &PlayOptions::start_paused) .def_property( @@ -277,8 +292,7 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) .def( - "play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg( - "play_options"), py::arg("duration") = std::nullopt) + "play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) ; py::class_(m, "Recorder") diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 9818259bba..3c06d3d16c 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -110,8 +110,7 @@ function(create_tests_for_rmw_implementation) test/rosbag2_transport/test_play_for.cpp INCLUDE_DIRS $ LINK_LIBS rosbag2_transport - AMENT_DEPS test_msgs rosbag2_test_common - ${SKIP_TEST}) + AMENT_DEPS test_msgs rosbag2_test_common) rosbag2_transport_add_gmock(test_play_loop test/rosbag2_transport/test_play_loop.cpp diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 27e33b9561..f345363706 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -50,6 +50,10 @@ struct PlayOptions // Sleep before play. Negative durations invalid. Will delay at the beginning of each loop. rclcpp::Duration delay = rclcpp::Duration(0, 0); + // Determines the maximum duration of the playback. Negative durations will make the playback to + // not stop. Default configuration makes the player to not stop execution. + rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0); + // Start paused. bool start_paused = false; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index cd7c3af163..17477cf47c 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -36,7 +36,7 @@ #include "rosbag2_interfaces/srv/get_rate.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" -#include "rosbag2_interfaces/srv/play_for.hpp" +#include "rosbag2_interfaces/srv/play.hpp" #include "rosbag2_interfaces/srv/play_next.hpp" #include "rosbag2_interfaces/srv/resume.hpp" #include "rosbag2_interfaces/srv/set_rate.hpp" @@ -156,7 +156,7 @@ class Player : public rclcpp::Node bool is_storage_completely_loaded() const; void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_); void wait_for_filled_queue() const; - void play_messages_from_queue(const std::optional & play_until_time); + void play_messages_from_queue(const rcutils_duration_value_t & play_until_time); void prepare_publishers(); bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message); static constexpr double read_ahead_lower_bound_percentage_ = 0.9; @@ -194,6 +194,7 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_is_paused_; rclcpp::Service::SharedPtr srv_get_rate_; rclcpp::Service::SharedPtr srv_set_rate_; + rclcpp::Service::SharedPtr srv_play_; rclcpp::Service::SharedPtr srv_play_next_; rclcpp::Service::SharedPtr srv_play_for_; rclcpp::Service::SharedPtr srv_seek_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 62d01eca67..d7f25a3f2a 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -202,6 +202,18 @@ void Player::play(const std::optional & duration) "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } + rcutils_time_point_value_t play_until_time = starting_time_; + if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { + play_until_time += play_options_.playback_duration.nanoseconds(); + } else { + play_until_time = -1; + RCLCPP_INFO_STREAM( + get_logger(), + "Invalid playback duration value: " << play_options_.playback_duration.nanoseconds() << + ". Playback duration is disabled."); + } + RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time); + try { do { if (delay > rclcpp::Duration(0, 0)) { @@ -209,10 +221,6 @@ void Player::play(const std::optional & duration) std::chrono::nanoseconds delay_duration(delay.nanoseconds()); std::this_thread::sleep_for(delay_duration); } - std::optional play_until_time{}; - if (duration.has_value() && *duration > 0) { - play_until_time = {starting_time_ + *duration}; - } { std::lock_guard lk(reader_mutex_); reader_->seek(starting_time_); @@ -220,7 +228,7 @@ void Player::play(const std::optional & duration) } storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); - play_messages_from_queue({play_until_time}); + play_messages_from_queue(play_until_time); { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -426,8 +434,7 @@ void Player::enqueue_up_to_boundary(size_t boundary) } } -void Player::play_messages_from_queue( - const std::optional & play_until_time) +void Player::play_messages_from_queue(const rcutils_duration_value_t & play_until_time) { // Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message) // to support play_next() API logic. @@ -442,6 +449,9 @@ void Player::play_messages_from_queue( } while (message_ptr != nullptr && rclcpp::ok()) { rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; + if (play_until_time >= starting_time_ && message->time_stamp > play_until_time) { + break; + } // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause if (play_until_time.has_value() && message->time_stamp > *play_until_time) { @@ -658,6 +668,20 @@ void Player::create_control_services() { response->success = set_rate(request->rate); }); + srv_play_ = create_service( + "~/play", + [this]( + rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request, + rosbag2_interfaces::srv::Play::Response::SharedPtr response) + { + play_options_.start_offset = + static_cast(RCUTILS_S_TO_NS(request->start_offset.sec)) + + static_cast(request->start_offset.nanosec); + play_options_.playback_duration = rclcpp::Duration( + request->playback_duration.sec, request->playback_duration.nanosec); + play(); + response->success = true; + }); srv_play_next_ = create_service( "~/play_next", [this]( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index cd18b0f93b..653f3c4b54 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -36,12 +36,13 @@ #include "rosbag2_play_test_fixture.hpp" #include "rosbag2_transport_test_fixture.hpp" +#include "mock_player.hpp" + using namespace ::testing; // NOLINT using namespace rosbag2_transport; // NOLINT using namespace std::chrono_literals; // NOLINT using namespace rosbag2_test_common; // NOLINT - constexpr int kIntValue{32}; constexpr float kFloat1Value{40.}; @@ -113,7 +114,7 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture return messages; } - void SetUp() override + void InitPlayerWithPlaybackDurationAndPlay(int64_t milliseconds) { auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -122,50 +123,48 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber - // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. - sub_->add_subscription(kTopic1, 2); - sub_->add_subscription(kTopic2, 2); + sub_->add_subscription(kTopic1, 3); + sub_->add_subscription(kTopic2, 3); - player_ = std::make_shared( + play_options_.playback_duration = + rclcpp::Duration(std::chrono::nanoseconds(std::chrono::milliseconds(milliseconds))); + player_ = std::make_shared( std::move( reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player_->play(); + + await_received_messages.get(); } - std::shared_ptr player_; - std::future await_received_messages_; + std::shared_ptr player_; }; + TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(1000); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); - EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u))); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); auto replayed_test_arrays = sub_->get_received_messages( kTopic2); - EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u))); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) { - const rcutils_duration_value_t duration = - std::chrono::nanoseconds(std::chrono::milliseconds(300)).count(); - - auto await_received_messages = sub_->spin_subscriptions_for(duration); - - player_->play(duration); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(300); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -178,14 +177,7 @@ TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) { - const rcutils_duration_value_t duration = - std::chrono::nanoseconds(std::chrono::milliseconds(800)).count(); - - auto await_received_messages = sub_->spin_subscriptions_for(duration); - - player_->play(duration); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(800); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -206,7 +198,6 @@ class RosBag2PlayForFilteredTopicTestFixture : public RosBag2PlayForTestFixture { // Filter allows /topic2, blocks /topic1 play_options_.topics_to_filter = {"topic2"}; - RosBag2PlayForTestFixture::SetUp(); } }; @@ -214,16 +205,12 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_full_duration_recorded_messages_with_filtered_topics) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(1000); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); auto replayed_test_arrays = sub_->get_received_messages("/topic2"); // All messages should have arrived. @@ -236,36 +223,28 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_short_duration_recorded_messages_with_filtered_topics) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(300)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(300); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); auto replayed_test_arrays = sub_->get_received_messages("/topic2"); // All messages should have arrived. - EXPECT_THAT(replayed_test_arrays, SizeIs(0u)); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u))); } TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_intermediate_duration_recorded_messages_with_filtered_topics) { - auto await_received_messages = sub_->spin_subscriptions(); - - player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(800)).count()); - - await_received_messages.get(); + InitPlayerWithPlaybackDurationAndPlay(800); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); // No messages are allowed to have arrived - EXPECT_THAT(replayed_test_primitives, SizeIs(0u)); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); auto replayed_test_arrays = sub_->get_received_messages("/topic2"); // Some messages should have arrived.