diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 62150cf04c..9b82745b2d 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -13,6 +13,7 @@ find_package(builtin_interfaces REQUIRED) find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} + "srv/Burst.srv" "srv/GetRate.srv" "srv/IsPaused.srv" "srv/Pause.srv" diff --git a/rosbag2_interfaces/srv/Burst.srv b/rosbag2_interfaces/srv/Burst.srv new file mode 100644 index 0000000000..e21bfcb359 --- /dev/null +++ b/rosbag2_interfaces/srv/Burst.srv @@ -0,0 +1,3 @@ +uint64 num_messages # Number of messages to burst +--- +uint64 actually_burst # Number of messages actually burst diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 28bc569f47..7800141804 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -144,6 +144,12 @@ function(create_tests_for_rmw_implementation) LINK_LIBS rosbag2_transport AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_burst + test/rosbag2_transport/test_burst.cpp + INCLUDE_DIRS $ + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_qos test/rosbag2_transport/test_qos.cpp INCLUDE_DIRS $ diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index bfb3aa5483..de9f6bf7a5 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -36,6 +36,7 @@ #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" #include "rosbag2_interfaces/srv/play_next.hpp" +#include "rosbag2_interfaces/srv/burst.hpp" #include "rosbag2_interfaces/srv/resume.hpp" #include "rosbag2_interfaces/srv/set_rate.hpp" #include "rosbag2_interfaces/srv/seek.hpp" @@ -129,6 +130,16 @@ class Player : public rclcpp::Node ROSBAG2_TRANSPORT_PUBLIC virtual bool play_next(); + /// \brief Burst the next \p num_messages messages from the queue when paused. + /// \param num_messages The number of messages to burst from the queue. + /// \details This call will play the next \p num_messages from the queue in burst mode. The + /// timing of the messages is ignored. + /// \note If internal player queue is starving and storage has not been completely loaded, + /// this method will wait until new element will be pushed to the queue. + /// \return The number of messages that was played. + ROSBAG2_TRANSPORT_PUBLIC + virtual size_t burst(const size_t num_messages); + /// \brief Advance player to the message with closest timestamp >= time_point. /// \details This is blocking call and it will wait until current message will be published /// and message queue will be refilled. @@ -193,6 +204,7 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_get_rate_; rclcpp::Service::SharedPtr srv_set_rate_; rclcpp::Service::SharedPtr srv_play_next_; + rclcpp::Service::SharedPtr srv_burst_; rclcpp::Service::SharedPtr srv_seek_; // defaults diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 446bb8303f..c6308b996a 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -349,6 +349,21 @@ bool Player::play_next() return next_message_published; } +size_t Player::burst(const size_t num_messages) +{ + uint64_t messages_played = 0; + + for (auto ii = 0u; ii < num_messages; ++ii) { + if (play_next()) { + ++messages_played; + } else { + break; + } + } + + return messages_played; +} + void Player::seek(rcutils_time_point_value_t time_point) { // Temporary stop playback in play_messages_from_queue() and block play_next() @@ -658,6 +673,14 @@ void Player::create_control_services() { response->success = play_next(); }); + srv_burst_ = create_service( + "~/burst", + [this]( + rosbag2_interfaces::srv::Burst::Request::ConstSharedPtr request, + rosbag2_interfaces::srv::Burst::Response::SharedPtr response) + { + response->actually_burst = burst(request->num_messages); + }); srv_seek_ = create_service( "~/seek", [this]( diff --git a/rosbag2_transport/test/rosbag2_transport/test_burst.cpp b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp new file mode 100644 index 0000000000..582af8b99a --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_burst.cpp @@ -0,0 +1,367 @@ +// Copyright 2021, Apex.AI +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include + +#include "mock_player.hpp" +#include "rosbag2_play_test_fixture.hpp" +#include "test_msgs/message_fixtures.hpp" +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" + +using namespace ::testing; // NOLINT +using namespace rosbag2_transport; // NOLINT +using namespace rosbag2_test_common; // NOLINT + +TEST_F(RosBag2PlayTestFixture, burst_with_false_preconditions) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + {serialize_test_message("topic1", 2100, primitive_message)}; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + ASSERT_FALSE(player->is_paused()); + ASSERT_EQ(player->burst(1), 0u); + player->pause(); + ASSERT_TRUE(player->is_paused()); +} + +TEST_F(RosBag2PlayTestFixture, burst_bursts_requested_messages_without_delays) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 2100, primitive_message), + serialize_test_message("topic1", 3300, primitive_message), + serialize_test_message("topic1", 4600, primitive_message), + serialize_test_message("topic1", 5900, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->wait_for_playback_to_start(); + + const size_t NUM_MESSAGES_TO_BURST = 4; + + ASSERT_TRUE(player->is_paused()); + auto start = std::chrono::steady_clock::now(); + size_t played_messages = player->burst(NUM_MESSAGES_TO_BURST); + auto replay_time = std::chrono::steady_clock::now() - start; + ASSERT_EQ(played_messages, NUM_MESSAGES_TO_BURST); + ASSERT_THAT(replay_time, Lt(std::chrono::seconds(2))); + + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(NUM_MESSAGES_TO_BURST)); +} + +TEST_F(RosBag2PlayTestFixture, burst_stops_at_end_of_file) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 2100, primitive_message), + serialize_test_message("topic1", 3300, primitive_message), + serialize_test_message("topic1", 4600, primitive_message), + serialize_test_message("topic1", 5900, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + player->wait_for_playback_to_start(); + + ASSERT_TRUE(player->is_paused()); + size_t played_messages = player->burst(messages.size() + 1); // Request one more than available + ASSERT_EQ(played_messages, messages.size()); + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, burst_bursting_one_by_one_messages_with_the_same_timestamp) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 1000, primitive_message), + serialize_test_message("topic1", 1000, primitive_message), + serialize_test_message("topic1", 1000, primitive_message), + serialize_test_message("topic1", 1000, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(player->burst(1), 1u); + size_t played_messages = 1; + while (player->burst(1) == 1) { + // Yield CPU resources for player-play() running in separate thread to make sure that it + // will not play extra messages. + std::this_thread::sleep_for(std::chrono::milliseconds(30)); + played_messages++; + } + ASSERT_EQ(played_messages, messages.size()); + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_burst) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 1500, primitive_message), + serialize_test_message("topic1", 2500, primitive_message), + serialize_test_message("topic1", 2700, primitive_message), + serialize_test_message("topic1", 2800, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + + const size_t EXPECTED_BURST_COUNT = 2; + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(player->burst(EXPECTED_BURST_COUNT), EXPECTED_BURST_COUNT); + ASSERT_TRUE(player->is_paused()); + player->resume(); + auto start = std::chrono::steady_clock::now(); + player_future.get(); + auto replay_time = std::chrono::steady_clock::now() - start; + + auto expected_replay_time = + std::chrono::nanoseconds(messages.back()->time_stamp - messages[1]->time_stamp); + // Check for lower bound with some tolerance + ASSERT_THAT(replay_time, Gt(expected_replay_time - std::chrono::milliseconds(50))); + // Check for upper bound with some tolerance + ASSERT_THAT(replay_time, Lt(expected_replay_time + std::chrono::milliseconds(100))); + + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, player_can_resume_after_burst) { + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = 42; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message("topic1", 300, primitive_message), + serialize_test_message("topic1", 500, primitive_message), + serialize_test_message("topic1", 700, primitive_message) + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + + ASSERT_TRUE(player->is_paused()); + ASSERT_EQ(player->burst(1), 1u); + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); +} + +TEST_F(RosBag2PlayTestFixture, burst_bursting_only_filtered_topics) { + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 42; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{40.0f, 2.0f, 0.0f}}; + complex_message1->bool_values = {{true, false, true}}; + + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}, + {"topic2", "test_msgs/Arrays", "", ""}, + }; + + std::vector> messages = + { + serialize_test_message("topic1", 500, primitive_message1), + serialize_test_message("topic1", 700, primitive_message1), + serialize_test_message("topic1", 900, primitive_message1), + serialize_test_message("topic2", 550, complex_message1), + serialize_test_message("topic2", 750, complex_message1), + serialize_test_message("topic2", 950, complex_message1) + }; + + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", 0); + sub_->add_subscription("/topic2", 3); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->pause(); + ASSERT_TRUE(player->is_paused()); + + auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); + ASSERT_TRUE(player->is_paused()); + + const size_t EXPECTED_BURST_COUNT = 3; + ASSERT_EQ(player->burst(EXPECTED_BURST_COUNT), EXPECTED_BURST_COUNT); + + ASSERT_TRUE(player->is_paused()); + player->resume(); + player_future.get(); + await_received_messages.get(); + + auto replayed_topic1 = sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_topic1, SizeIs(0u)); + + auto replayed_topic2 = sub_->get_received_messages("/topic2"); + // All we care is that any messages arrived + EXPECT_THAT(replayed_topic2, SizeIs(Eq(EXPECTED_BURST_COUNT))); +}