From f71ea9cea1c8d05189306e562a3c2ed5f80cbe4f Mon Sep 17 00:00:00 2001 From: Agustin Alba Chicar Date: Thu, 17 Mar 2022 04:50:57 -0300 Subject: [PATCH] Second round of comments from #960 upstream (#15) * Removes duration parameter. A leftover after switching to playback_duration. * Fixes comment. * Solves format in rosbag2_py -> _transport.cpp * Applies style suggestions. * Changes play() to return a boolean indicating whether the request could be fulfilled. * Removes extra unnecessary code. --- ros2bag/ros2bag/verb/play.py | 10 +-- rosbag2_interfaces/srv/Play.srv | 5 +- rosbag2_interfaces/srv/PlayFor.srv | 3 - rosbag2_py/src/rosbag2_py/_transport.cpp | 8 +- .../subscription_manager.hpp | 29 ------- .../include/rosbag2_transport/player.hpp | 4 +- .../src/rosbag2_transport/player.cpp | 48 ++++------- .../test/rosbag2_transport/test_play_for.cpp | 82 +++++++++++++++++-- .../test/rosbag2_transport/test_play_loop.cpp | 2 +- 9 files changed, 101 insertions(+), 90 deletions(-) delete mode 100644 rosbag2_interfaces/srv/PlayFor.srv diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 71ffd23a05..213471ce74 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -109,10 +109,6 @@ def add_arguments(self, parser, cli_name): # noqa: D102 "Note that this option is valid only if the publisher\'s QOS profile is " 'RELIABLE.', metavar='TIMEOUT') - parser.add_argument( - '-f', '--duration', type=float, default=None, - help='Play for SEC seconds. Default is None, meaning that playback will continue ' - 'until the end of the bag. Valid range > 0.0') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -154,9 +150,5 @@ def main(self, *, args): # noqa: D102 play_options.start_offset = args.start_offset play_options.wait_acked_timeout = args.wait_for_all_acked - # Gets the duration in nanoseconds when a value is provided for player - # consumption. - duration = int(args.duration * 1e9) if args.duration else args.duration - player = Player() - player.play(storage_options, play_options, duration) + player.play(storage_options, play_options) diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv index c9e7af80f2..54edf7dd45 100644 --- a/rosbag2_interfaces/srv/Play.srv +++ b/rosbag2_interfaces/srv/Play.srv @@ -1,6 +1,7 @@ -# See rosbag2_transport::PlayOptions::start_time +# See rosbag2_transport::PlayOptions::start_offset builtin_interfaces/Time start_offset # See rosbag2_transport::PlayOptions::playback_duration builtin_interfaces/Duration playback_duration --- -bool success # return true +# returns false when another playback execution is running, otherwise true +bool success diff --git a/rosbag2_interfaces/srv/PlayFor.srv b/rosbag2_interfaces/srv/PlayFor.srv deleted file mode 100644 index 8f60cb07b4..0000000000 --- a/rosbag2_interfaces/srv/PlayFor.srv +++ /dev/null @@ -1,3 +0,0 @@ -builtin_interfaces/Duration duration ---- -bool success # return true diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index da4393cb9d..44a8817691 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -131,8 +131,7 @@ class Player void play( const rosbag2_storage::StorageOptions & storage_options, - PlayOptions & play_options, - const std::optional & duration) + PlayOptions & play_options) { auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options); auto player = std::make_shared( @@ -144,7 +143,7 @@ class Player [&exec]() { exec.spin(); }); - player->play(duration); + player->play(); exec.cancel(); spin_thread.join(); @@ -291,8 +290,7 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) - .def( - "play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) + .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) ; py::class_(m, "Recorder") diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index 15ea840112..f37fa75477 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -129,18 +129,6 @@ class SubscriptionManager }); } - std::future spin_subscriptions_for(rcutils_duration_value_t duration) - { - return async( - std::launch::async, [this, duration]() { - rclcpp::executors::SingleThreadedExecutor exec; - auto end = std::chrono::high_resolution_clock::now() + std::chrono::nanoseconds(duration); - while (rclcpp::ok() && continue_spinning_until(expected_topics_with_size_, end)) { - exec.spin_node_some(subscriber_node_); - } - }); - } - private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, @@ -159,23 +147,6 @@ class SubscriptionManager return false; } - bool continue_spinning_until( - const std::unordered_map & expected_topics_with_sizes, - std::chrono::time_point end_time) - { - auto current = std::chrono::high_resolution_clock::now(); - if (current > end_time) { - return false; - } - - for (const auto & topic_expected : expected_topics_with_sizes) { - if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) { - return true; - } - } - return false; - } - std::vector subscriptions_; std::unordered_map>> subscribed_messages_; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 17477cf47c..b593ba7cc7 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -94,7 +94,7 @@ class Player : public rclcpp::Node virtual ~Player(); ROSBAG2_TRANSPORT_PUBLIC - void play(const std::optional & duration = std::nullopt); + bool play(); // Playback control interface /// Pause the flow of time for playback. @@ -184,6 +184,7 @@ class Player : public rclcpp::Node std::mutex skip_message_in_main_play_loop_mutex_; bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY( skip_message_in_main_play_loop_mutex_) = false; + std::atomic_bool is_in_playback_{false}; rcutils_time_point_value_t starting_time_; @@ -196,7 +197,6 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_set_rate_; rclcpp::Service::SharedPtr srv_play_; rclcpp::Service::SharedPtr srv_play_next_; - rclcpp::Service::SharedPtr srv_play_for_; rclcpp::Service::SharedPtr srv_seek_; // defaults diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d7f25a3f2a..6e21907fe4 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -191,8 +191,14 @@ bool Player::is_storage_completely_loaded() const return !storage_loading_future_.valid(); } -void Player::play(const std::optional & duration) +bool Player::play() { + if (is_in_playback_) { + RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request."); + return false; + } + is_in_playback_ = true; + rclcpp::Duration delay(0, 0); if (play_options_.delay >= rclcpp::Duration(0, 0)) { delay = play_options_.delay; @@ -202,15 +208,9 @@ void Player::play(const std::optional & duration) "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } - rcutils_time_point_value_t play_until_time = starting_time_; + rcutils_time_point_value_t play_until_time = -1; if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { - play_until_time += play_options_.playback_duration.nanoseconds(); - } else { - play_until_time = -1; - RCLCPP_INFO_STREAM( - get_logger(), - "Invalid playback duration value: " << play_options_.playback_duration.nanoseconds() << - ". Playback duration is disabled."); + play_until_time = starting_time_ + play_options_.playback_duration.nanoseconds(); } RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time); @@ -266,6 +266,9 @@ void Player::play(const std::optional & duration) } } } + + is_in_playback_ = false; + return true; } void Player::pause() @@ -454,9 +457,6 @@ void Player::play_messages_from_queue(const rcutils_duration_value_t & play_unti } // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause - if (play_until_time.has_value() && message->time_stamp > *play_until_time) { - break; - } while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) { if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { break; @@ -674,13 +674,9 @@ void Player::create_control_services() rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request, rosbag2_interfaces::srv::Play::Response::SharedPtr response) { - play_options_.start_offset = - static_cast(RCUTILS_S_TO_NS(request->start_offset.sec)) + - static_cast(request->start_offset.nanosec); - play_options_.playback_duration = rclcpp::Duration( - request->playback_duration.sec, request->playback_duration.nanosec); - play(); - response->success = true; + play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds(); + play_options_.playback_duration = rclcpp::Duration(request->playback_duration); + response->success = play(); }); srv_play_next_ = create_service( "~/play_next", @@ -690,20 +686,6 @@ void Player::create_control_services() { response->success = play_next(); }); - srv_play_for_ = create_service( - "~/play_for", - [this]( - const std::shared_ptr/* request_header */, - const std::shared_ptr request, - const std::shared_ptr response) - { - const rcutils_duration_value_t duration = - static_cast(request->duration.sec) * - static_cast(1000000000) + - static_cast(request->duration.nanosec); - play({duration}); - response->success = true; - }); srv_seek_ = create_service( "~/seek", [this]( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index 653f3c4b54..cd6d49480e 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -126,18 +126,15 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture sub_->add_subscription(kTopic1, 3); sub_->add_subscription(kTopic2, 3); - play_options_.playback_duration = - rclcpp::Duration(std::chrono::nanoseconds(std::chrono::milliseconds(milliseconds))); - player_ = std::make_shared( - std::move( - reader), storage_options_, play_options_); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds)); + player_ = std::make_shared(std::move(reader), storage_options_, play_options_); // Wait for discovery to match publishers with subscribers ASSERT_TRUE( sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); auto await_received_messages = sub_->spin_subscriptions(); - player_->play(); + ASSERT_TRUE(player_->play()); await_received_messages.get(); } @@ -252,3 +249,76 @@ TEST_F( EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); } + +class RosBag2PlayForInterruptedTestFixture : public RosBag2PlayTestFixture +{ +public: + static constexpr const char * kTopic1Name{"topic1"}; + static constexpr const char * kTopic1{"/topic1"}; + + std::vector get_topic_types() + { + return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}}; + } + + std::vector> + get_serialized_messages() + { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = kIntValue; + + // @{ Ordering matters. The mock reader implementation moves messages + // around without any knowledge about message chronology. It just picks + // the next one Make sure to keep the list in order or sort it! + std::vector> messages = + {serialize_test_message(kTopic1Name, 500, primitive_message), + serialize_test_message(kTopic1Name, 700, primitive_message), + serialize_test_message(kTopic1Name, 900, primitive_message)}; + // @} + return messages; + } +}; + +TEST_F( + RosBag2PlayForInterruptedTestFixture, + play_should_return_false_when_interrupted) +{ + + auto topic_types = get_topic_types(); + auto messages = get_serialized_messages(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription(kTopic1, 3); + + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(1000)); + std::shared_ptr player_ = std::make_shared( + std::move( + reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); + + auto await_received_messages = sub_->spin_subscriptions(); + + auto play_execution_1 = std::async(std::launch::async, [player_]() {return player_->play();}); + auto play_execution_2 = std::async(std::launch::async, [player_]() {return player_->play();}); + + await_received_messages.get(); + + const auto play_execution_1_wait_result = play_execution_1.wait_for( + std::chrono::milliseconds( + 1500)); + const auto play_execution_2_wait_result = play_execution_2.wait_for( + std::chrono::milliseconds( + 1500)); + + ASSERT_EQ(std::future_status::ready, play_execution_1_wait_result); + ASSERT_EQ(std::future_status::ready, play_execution_2_wait_result); + + ASSERT_TRUE( + (play_execution_1.get() && !play_execution_2.get()) || + (!play_execution_1.get() && play_execution_2.get())); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp index f4c1fed059..65f7795dac 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp @@ -131,7 +131,7 @@ TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) { auto player = std::make_shared( std::move( reader), storage_options_, play_options); - std::thread loop_thread(&rosbag2_transport::Player::play, player, std::nullopt); + std::thread loop_thread(&rosbag2_transport::Player::play, player); await_received_messages.get(); rclcpp::shutdown();