diff --git a/rosbag2_interfaces/srv/PlayNext.srv b/rosbag2_interfaces/srv/PlayNext.srv index 59e31f9ba9..c192a382da 100644 --- a/rosbag2_interfaces/srv/PlayNext.srv +++ b/rosbag2_interfaces/srv/PlayNext.srv @@ -1,2 +1,3 @@ +uint64 num_messages 1 # Defaults to one for backwards compatibility. --- -bool success # can only play-next while playback is paused +uint64 played_messages # The number of messages that were actually played. diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index bfb3aa5483..4f8f3660bd 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -120,14 +120,15 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC virtual bool set_rate(double); - /// \brief Playing next message from queue when in pause. + /// \brief Playing next \p num_messages messages from queue when in pause. + /// \param num_messages The number of messages to play from the queue. /// \details This is blocking call and it will wait until next available message will be /// published or rclcpp context shut down. /// \note If internal player queue is starving and storage has not been completely loaded, /// this method will wait until new element will be pushed to the queue. - /// \return true if player in pause mode and successfully played next message, otherwise false. + /// \return The number of actually sent messages from the queue. ROSBAG2_TRANSPORT_PUBLIC - virtual bool play_next(); + virtual uint64_t play_next(const uint64_t num_messages); /// \brief Advance player to the message with closest timestamp >= time_point. /// \details This is blocking call and it will wait until current message will be published diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 446bb8303f..12eb095229 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -315,38 +315,59 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_ return message_ptr; } -bool Player::play_next() +uint64_t Player::play_next(const uint64_t num_messages) { if (!clock_->is_paused()) { RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state."); - return false; + return 0u; } - RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); - // Temporary take over playback from play_messages_from_queue() std::lock_guard main_play_loop_lk(skip_message_in_main_play_loop_mutex_); skip_message_in_main_play_loop_ = true; - // Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to - // be called if not yet and queue to be filled with messages. - { - std::unique_lock lk(ready_to_play_from_queue_mutex_); - ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); - } - rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue(); + uint64_t message_counter = 0; - bool next_message_published = false; - while (message_ptr != nullptr && !next_message_published) { + while (message_counter < num_messages) { + RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); + // Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to + // be called if not yet and queue to be filled with messages. { - rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; - next_message_published = publish_message(message); - clock_->jump(message->time_stamp); + std::unique_lock lk(ready_to_play_from_queue_mutex_); + ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); } - message_queue_.pop(); - message_ptr = peek_next_message_from_queue(); + + rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue(); + + bool next_message_published = false; + while (message_ptr != nullptr && !next_message_published) { + { + rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; + + // Do not move on until sleep_until returns true + // It will always sleep, so this is not a tight busy loop on pause + clock_->resume(); + while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) { + if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { + break; + } + } + clock_->pause(); + + next_message_published = publish_message(message); + } + message_queue_.pop(); + message_ptr = peek_next_message_from_queue(); + } + if (!next_message_published) { + break; + } + + message_counter++; } - return next_message_published; + + + return message_counter; } void Player::seek(rcutils_time_point_value_t time_point) @@ -585,7 +606,7 @@ void Player::add_keyboard_callbacks() ); add_key_callback( play_options_.play_next_key, - [this]() {play_next();}, + [this]() {play_next(1u);}, "Play Next Message" ); add_key_callback( @@ -653,10 +674,10 @@ void Player::create_control_services() srv_play_next_ = create_service( "~/play_next", [this]( - rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr, + rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr request, rosbag2_interfaces::srv::PlayNext::Response::SharedPtr response) { - response->success = play_next(); + response->played_messages = play_next(request->num_messages); }); srv_seek_ = create_service( "~/seek", diff --git a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp index a08e96ea49..1516c23513 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_keyboard_controls.cpp @@ -65,13 +65,11 @@ class CountPlayer : public Player num_resumed++; } - bool play_next() override + uint64_t play_next(const uint64_t num_messages) override { - bool ret = Player::play_next(); - if (ret) { - num_played_next++; - } - return ret; + const uint64_t played_messages = Player::play_next(num_messages); + num_played_next += played_messages; + return played_messages; } bool set_rate(double rate) override diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp index c746500946..2eeb1f0b0f 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_next.cpp @@ -46,7 +46,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_with_false_preconditions) { auto player = std::make_shared(std::move(reader), storage_options_, play_options_); ASSERT_FALSE(player->is_paused()); - ASSERT_FALSE(player->play_next()); + ASSERT_EQ(0u, player->play_next(1u)); player->pause(); ASSERT_TRUE(player->is_paused()); } @@ -60,10 +60,10 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { std::vector> messages = { - serialize_test_message("topic1", 2100, primitive_message), - serialize_test_message("topic1", 3300, primitive_message), - serialize_test_message("topic1", 4600, primitive_message), - serialize_test_message("topic1", 5900, primitive_message) + serialize_test_message("topic1", 210, primitive_message), + serialize_test_message("topic1", 330, primitive_message), + serialize_test_message("topic1", 460, primitive_message), + serialize_test_message("topic1", 590, primitive_message) }; auto prepared_mock_reader = std::make_unique(); @@ -88,14 +88,14 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { ASSERT_TRUE(player->is_paused()); auto start = std::chrono::steady_clock::now(); - ASSERT_TRUE(player->play_next()); + ASSERT_EQ(1u, player->play_next(1u)); size_t played_messages = 1; - while (player->play_next()) { + while (player->play_next(1u) == 1u) { played_messages++; } auto replay_time = std::chrono::steady_clock::now() - start; ASSERT_EQ(played_messages, messages.size()); - ASSERT_THAT(replay_time, Lt(std::chrono::seconds(2))); + ASSERT_THAT(replay_time, Lt(std::chrono::milliseconds(600))); ASSERT_TRUE(player->is_paused()); player->resume(); player_future.get(); @@ -114,10 +114,10 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa std::vector> messages = { - serialize_test_message("topic1", 1000, primitive_message), - serialize_test_message("topic1", 1000, primitive_message), - serialize_test_message("topic1", 1000, primitive_message), - serialize_test_message("topic1", 1000, primitive_message) + serialize_test_message("topic1", 100, primitive_message), + serialize_test_message("topic1", 100, primitive_message), + serialize_test_message("topic1", 100, primitive_message), + serialize_test_message("topic1", 100, primitive_message) }; auto prepared_mock_reader = std::make_unique(); @@ -140,9 +140,9 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); ASSERT_TRUE(player->is_paused()); - ASSERT_TRUE(player->play_next()); + ASSERT_EQ(1u, player->play_next(1u)); size_t played_messages = 1; - while (player->play_next()) { + while (player->play_next(1u) == 1u) { // Yield CPU resources for player-play() running in separate thread to make sure that it // will not play extra messages. std::this_thread::sleep_for(std::chrono::milliseconds(30)); @@ -158,6 +158,74 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); } +TEST_F(RosBag2PlayTestFixture, play_next_n_messages_with_the_same_timestamp) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 100, primitive_message), + serialize_test_message("topic1", 100, primitive_message), + serialize_test_message("topic1", 100, primitive_message), + serialize_test_message("topic1", 100, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(0u, player->play_next(0u)); + // Yield CPU resources so messages are actually sent through. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_THAT(sub_->get_received_messages("/topic1"), SizeIs(0u)); + + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(1u, player->play_next(1u)); + // Yield CPU resources so messages are actually sent through. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_THAT(sub_->get_received_messages("/topic1"), SizeIs(1u)); + + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(2u, player->play_next(2u)); + // Yield CPU resources so messages are actually sent through. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_THAT(sub_->get_received_messages("/topic1"), SizeIs(3u)); + + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(1u, player->play_next(1u)); + // Yield CPU resources so messages are actually sent through. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_THAT(sub_->get_received_messages("/topic1"), SizeIs(4u)); + + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(0u, player->play_next(1u)); + EXPECT_THAT(sub_->get_received_messages("/topic1"), SizeIs(4u)); + + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); +} + TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { auto primitive_message = get_messages_basic_types()[0]; primitive_message->int32_value = 42; @@ -193,8 +261,8 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); ASSERT_TRUE(player->is_paused()); - ASSERT_TRUE(player->play_next()); - ASSERT_TRUE(player->play_next()); + ASSERT_EQ(1u, player->play_next(1u)); + ASSERT_EQ(1u, player->play_next(1u)); ASSERT_TRUE(player->is_paused()); player->resume(); auto start = std::chrono::steady_clock::now(); @@ -247,7 +315,7 @@ TEST_F(RosBag2PlayTestFixture, player_can_resume_after_play_next) { auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); ASSERT_TRUE(player->is_paused()); - ASSERT_TRUE(player->play_next()); + ASSERT_EQ(1u, player->play_next(1u)); ASSERT_TRUE(player->is_paused()); player->resume(); player_future.get(); @@ -303,13 +371,13 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) { auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); ASSERT_TRUE(player->is_paused()); - ASSERT_TRUE(player->play_next()); + ASSERT_EQ(1u, player->play_next(1u)); size_t played_messages = 1; - while (player->play_next()) { + while (player->play_next(1u) == 1u) { played_messages++; } - ASSERT_EQ(played_messages, 3U); + ASSERT_EQ(played_messages, 3u); ASSERT_TRUE(player->is_paused()); player->resume(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp index c14f9fe44d..0b0bdb6e81 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_seek.cpp @@ -115,17 +115,14 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time) { auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); EXPECT_TRUE(player->is_paused()); - EXPECT_TRUE(player->play_next()); - EXPECT_TRUE(player->play_next()); + EXPECT_EQ(2u, player->play_next(2u)); // Jump in timestamp equal to the timestamp in first message - 1 nanosecond player->seek(start_time_ms_ * 1000000 - 1); - EXPECT_TRUE(player->play_next()); - EXPECT_TRUE(player->play_next()); + EXPECT_EQ(2u, player->play_next(2u)); // Jump in timestamp equal to the timestamp in first message player->seek(start_time_ms_ * 1000000); - EXPECT_TRUE(player->play_next()); - EXPECT_TRUE(player->play_next()); + EXPECT_EQ(2u, player->play_next(2u)); player->resume(); player_future.get(); await_received_messages.get(); @@ -162,13 +159,13 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_with_timestamp_later_than_in_last_messag auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); EXPECT_TRUE(player->is_paused()); - EXPECT_TRUE(player->play_next()); + EXPECT_EQ(1u, player->play_next(1u)); // Jump in timestamp equal to the timestamp in last message + 1 nanosecond player->seek((start_time_ms_ + message_spacing_ms_ * (num_msgs_in_bag_ - 1)) * 1000000 + 1); // shouldn't be able to keep playing since we're at end of bag - EXPECT_FALSE(player->play_next()); + EXPECT_EQ(0u, player->play_next(1u)); player->resume(); player_future.get(); await_received_messages.get(); @@ -196,11 +193,11 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_forward) { auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); EXPECT_TRUE(player->is_paused()); - EXPECT_TRUE(player->play_next()); + EXPECT_EQ(1u, player->play_next(1u)); // Jump on third message (1200 ms) player->seek((start_time_ms_ + message_spacing_ms_ * 2) * 1000000); - EXPECT_TRUE(player->play_next()); + EXPECT_EQ(1u, player->play_next(1u)); player->resume(); player_future.get(); await_received_messages.get(); @@ -235,14 +232,12 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_back_in_time_from_the_end_of_the_bag) { EXPECT_TRUE(player->is_paused()); // Play all messages from bag - for (size_t i = 0; i < num_msgs_in_bag_; i++) { - EXPECT_TRUE(player->play_next()); - } - EXPECT_FALSE(player->play_next()); // Make sure there are no messages to play + EXPECT_EQ(num_msgs_in_bag_, player->play_next(num_msgs_in_bag_)); + EXPECT_EQ(0u, player->play_next(1u)); // Make sure there are no messages to play // Jump on third message (1200 ms) player->seek((start_time_ms_ + message_spacing_ms_ * 2) * 1000000); - EXPECT_TRUE(player->play_next()); + EXPECT_EQ(1u, player->play_next(1u)); player->resume(); player_future.get(); await_received_messages.get(); @@ -281,14 +276,12 @@ TEST_F(RosBag2PlaySeekTestFixture, seek_forward_from_the_end_of_the_bag) { EXPECT_TRUE(player->is_paused()); // Play all messages from bag - for (size_t i = 0; i < num_msgs_in_bag_; i++) { - EXPECT_TRUE(player->play_next()); - } - EXPECT_FALSE(player->play_next()); // Make sure there are no messages to play + EXPECT_EQ(num_msgs_in_bag_, player->play_next(num_msgs_in_bag_)); + EXPECT_EQ(0u, player->play_next(1u)); // Make sure there are no messages to play // Jump in timestamp equal to the timestamp in last message + 1 nanosecond player->seek((start_time_ms_ + message_spacing_ms_ * (num_msgs_in_bag_ - 1)) * 1000000 + 1); - EXPECT_FALSE(player->play_next()); + EXPECT_EQ(0u, player->play_next(1u)); player->resume(); player_future.get(); await_received_messages.get(); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp index 6b2e2fb982..e20271f7cf 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_services.cpp @@ -327,7 +327,7 @@ TEST_F(PlaySrvsTest, play_next) { message_counter_ = 0; } play_next_response = successful_call(cli_play_next_); - ASSERT_TRUE(play_next_response->success); + ASSERT_EQ(1u, play_next_response->played_messages); expect_messages(true, false); } @@ -337,13 +337,13 @@ TEST_F(PlaySrvsTest, play_next) { message_counter_ = 0; } play_next_response = successful_call(cli_play_next_); - ASSERT_FALSE(play_next_response->success); + ASSERT_EQ(0u, play_next_response->played_messages); expect_messages(false, false); // Check that play_next will return false when player not in pause mode. start_playback(); ASSERT_FALSE(player_->is_paused()); play_next_response = successful_call(cli_play_next_); - ASSERT_FALSE(play_next_response->success); + ASSERT_EQ(0u, play_next_response->played_messages); expect_messages(true); }