-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add play-next-n functionality #963
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,2 +1,3 @@ | ||
uint64 num_messages 1 # Defaults to one for backwards compatibility. | ||
--- | ||
bool success # can only play-next while playback is paused | ||
uint64 played_messages # The number of messages that were actually played. |
Original file line number | Diff line number | Diff line change | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -315,38 +315,59 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_ | |||||||||||
return message_ptr; | ||||||||||||
} | ||||||||||||
|
||||||||||||
bool Player::play_next() | ||||||||||||
uint64_t Player::play_next(const uint64_t num_messages) | ||||||||||||
{ | ||||||||||||
if (!clock_->is_paused()) { | ||||||||||||
RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state."); | ||||||||||||
return false; | ||||||||||||
return 0u; | ||||||||||||
} | ||||||||||||
|
||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||
RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); | ||||||||||||
|
||||||||||||
// Temporary take over playback from play_messages_from_queue() | ||||||||||||
std::lock_guard<std::mutex> main_play_loop_lk(skip_message_in_main_play_loop_mutex_); | ||||||||||||
skip_message_in_main_play_loop_ = true; | ||||||||||||
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to | ||||||||||||
// be called if not yet and queue to be filled with messages. | ||||||||||||
{ | ||||||||||||
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_); | ||||||||||||
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); | ||||||||||||
} | ||||||||||||
Comment on lines
-330
to
-335
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IMO. It doesn't make sense moving wait for playback start inside loop. |
||||||||||||
|
||||||||||||
MichaelOrlov marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue(); | ||||||||||||
uint64_t message_counter = 0; | ||||||||||||
|
||||||||||||
bool next_message_published = false; | ||||||||||||
while (message_ptr != nullptr && !next_message_published) { | ||||||||||||
while (message_counter < num_messages) { | ||||||||||||
RCLCPP_INFO_STREAM(get_logger(), "Playing next message."); | ||||||||||||
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to | ||||||||||||
// be called if not yet and queue to be filled with messages. | ||||||||||||
{ | ||||||||||||
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; | ||||||||||||
next_message_published = publish_message(message); | ||||||||||||
clock_->jump(message->time_stamp); | ||||||||||||
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_); | ||||||||||||
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;}); | ||||||||||||
} | ||||||||||||
message_queue_.pop(); | ||||||||||||
message_ptr = peek_next_message_from_queue(); | ||||||||||||
|
||||||||||||
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue(); | ||||||||||||
|
||||||||||||
bool next_message_published = false; | ||||||||||||
while (message_ptr != nullptr && !next_message_published) { | ||||||||||||
{ | ||||||||||||
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr; | ||||||||||||
|
||||||||||||
// Do not move on until sleep_until returns true | ||||||||||||
// It will always sleep, so this is not a tight busy loop on pause | ||||||||||||
clock_->resume(); | ||||||||||||
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) { | ||||||||||||
if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) { | ||||||||||||
break; | ||||||||||||
} | ||||||||||||
} | ||||||||||||
clock_->pause(); | ||||||||||||
|
||||||||||||
next_message_published = publish_message(message); | ||||||||||||
} | ||||||||||||
message_queue_.pop(); | ||||||||||||
message_ptr = peek_next_message_from_queue(); | ||||||||||||
} | ||||||||||||
if (!next_message_published) { | ||||||||||||
break; | ||||||||||||
} | ||||||||||||
|
||||||||||||
message_counter++; | ||||||||||||
} | ||||||||||||
return next_message_published; | ||||||||||||
|
||||||||||||
|
||||||||||||
return message_counter; | ||||||||||||
} | ||||||||||||
|
||||||||||||
void Player::seek(rcutils_time_point_value_t time_point) | ||||||||||||
|
@@ -585,7 +606,7 @@ void Player::add_keyboard_callbacks() | |||||||||||
); | ||||||||||||
add_key_callback( | ||||||||||||
play_options_.play_next_key, | ||||||||||||
[this]() {play_next();}, | ||||||||||||
[this]() {play_next(1u);}, | ||||||||||||
"Play Next Message" | ||||||||||||
); | ||||||||||||
add_key_callback( | ||||||||||||
|
@@ -653,10 +674,10 @@ void Player::create_control_services() | |||||||||||
srv_play_next_ = create_service<rosbag2_interfaces::srv::PlayNext>( | ||||||||||||
"~/play_next", | ||||||||||||
[this]( | ||||||||||||
rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr, | ||||||||||||
rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr request, | ||||||||||||
rosbag2_interfaces::srv::PlayNext::Response::SharedPtr response) | ||||||||||||
{ | ||||||||||||
response->success = play_next(); | ||||||||||||
response->played_messages = play_next(request->num_messages); | ||||||||||||
}); | ||||||||||||
srv_seek_ = create_service<rosbag2_interfaces::srv::Seek>( | ||||||||||||
"~/seek", | ||||||||||||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -46,7 +46,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_with_false_preconditions) { | |||||||||||||||||
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_FALSE(player->is_paused()); | ||||||||||||||||||
ASSERT_FALSE(player->play_next()); | ||||||||||||||||||
ASSERT_EQ(0u, player->play_next(1u)); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
player->pause(); | ||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -60,10 +60,10 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { | |||||||||||||||||
|
||||||||||||||||||
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages = | ||||||||||||||||||
{ | ||||||||||||||||||
serialize_test_message("topic1", 2100, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 3300, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 4600, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 5900, primitive_message) | ||||||||||||||||||
serialize_test_message("topic1", 210, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 330, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 460, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 590, primitive_message) | ||||||||||||||||||
Comment on lines
+63
to
+66
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following timings was made thousands on purpose. It doesn't have impact on the test execution time.
Suggested change
|
||||||||||||||||||
}; | ||||||||||||||||||
|
||||||||||||||||||
auto prepared_mock_reader = std::make_unique<MockSequentialReader>(); | ||||||||||||||||||
|
@@ -88,14 +88,14 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) { | |||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
auto start = std::chrono::steady_clock::now(); | ||||||||||||||||||
ASSERT_TRUE(player->play_next()); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
size_t played_messages = 1; | ||||||||||||||||||
while (player->play_next()) { | ||||||||||||||||||
while (player->play_next(1u) == 1u) { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
played_messages++; | ||||||||||||||||||
} | ||||||||||||||||||
auto replay_time = std::chrono::steady_clock::now() - start; | ||||||||||||||||||
ASSERT_EQ(played_messages, messages.size()); | ||||||||||||||||||
ASSERT_THAT(replay_time, Lt(std::chrono::seconds(2))); | ||||||||||||||||||
ASSERT_THAT(replay_time, Lt(std::chrono::milliseconds(600))); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please revert changes. It does affect test logic.
Suggested change
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
player->resume(); | ||||||||||||||||||
player_future.get(); | ||||||||||||||||||
|
@@ -114,10 +114,10 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa | |||||||||||||||||
|
||||||||||||||||||
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages = | ||||||||||||||||||
{ | ||||||||||||||||||
serialize_test_message("topic1", 1000, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 1000, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 1000, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 1000, primitive_message) | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message) | ||||||||||||||||||
Comment on lines
+117
to
+120
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Following timings was made thousands on purpose. It doesn't have impact on the test execution time.
Suggested change
|
||||||||||||||||||
}; | ||||||||||||||||||
|
||||||||||||||||||
auto prepared_mock_reader = std::make_unique<MockSequentialReader>(); | ||||||||||||||||||
|
@@ -140,9 +140,9 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa | |||||||||||||||||
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_TRUE(player->play_next()); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
size_t played_messages = 1; | ||||||||||||||||||
while (player->play_next()) { | ||||||||||||||||||
while (player->play_next(1u) == 1u) { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
// Yield CPU resources for player-play() running in separate thread to make sure that it | ||||||||||||||||||
// will not play extra messages. | ||||||||||||||||||
std::this_thread::sleep_for(std::chrono::milliseconds(30)); | ||||||||||||||||||
|
@@ -158,6 +158,74 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa | |||||||||||||||||
EXPECT_THAT(replayed_topic1, SizeIs(messages.size())); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
TEST_F(RosBag2PlayTestFixture, play_next_n_messages_with_the_same_timestamp) { | ||||||||||||||||||
auto primitive_message = get_messages_basic_types()[0]; | ||||||||||||||||||
primitive_message->int32_value = 42; | ||||||||||||||||||
|
||||||||||||||||||
auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{ | ||||||||||||||||||
{"topic1", "test_msgs/BasicTypes", "", ""}}; | ||||||||||||||||||
|
||||||||||||||||||
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages = | ||||||||||||||||||
{ | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message), | ||||||||||||||||||
serialize_test_message("topic1", 100, primitive_message) | ||||||||||||||||||
}; | ||||||||||||||||||
|
||||||||||||||||||
auto prepared_mock_reader = std::make_unique<MockSequentialReader>(); | ||||||||||||||||||
prepared_mock_reader->prepare(messages, topic_types); | ||||||||||||||||||
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader)); | ||||||||||||||||||
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_); | ||||||||||||||||||
|
||||||||||||||||||
sub_ = std::make_shared<SubscriptionManager>(); | ||||||||||||||||||
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", messages.size()); | ||||||||||||||||||
|
||||||||||||||||||
// Wait for discovery to match publishers with subscribers | ||||||||||||||||||
ASSERT_TRUE( | ||||||||||||||||||
sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); | ||||||||||||||||||
|
||||||||||||||||||
auto await_received_messages = sub_->spin_subscriptions(); | ||||||||||||||||||
|
||||||||||||||||||
player->pause(); | ||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
|
||||||||||||||||||
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); | ||||||||||||||||||
|
||||||||||||||||||
MichaelOrlov marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_EQ(0u, player->play_next(0u)); | ||||||||||||||||||
// Yield CPU resources so messages are actually sent through. | ||||||||||||||||||
std::this_thread::sleep_for(std::chrono::milliseconds(50)); | ||||||||||||||||||
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(0u)); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
// Yield CPU resources so messages are actually sent through. | ||||||||||||||||||
std::this_thread::sleep_for(std::chrono::milliseconds(50)); | ||||||||||||||||||
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(1u)); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_EQ(2u, player->play_next(2u)); | ||||||||||||||||||
// Yield CPU resources so messages are actually sent through. | ||||||||||||||||||
std::this_thread::sleep_for(std::chrono::milliseconds(50)); | ||||||||||||||||||
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(3u)); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
// Yield CPU resources so messages are actually sent through. | ||||||||||||||||||
std::this_thread::sleep_for(std::chrono::milliseconds(50)); | ||||||||||||||||||
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u)); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_EQ(0u, player->play_next(1u)); | ||||||||||||||||||
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u)); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
player->resume(); | ||||||||||||||||||
player_future.get(); | ||||||||||||||||||
await_received_messages.get(); | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { | ||||||||||||||||||
auto primitive_message = get_messages_basic_types()[0]; | ||||||||||||||||||
primitive_message->int32_value = 42; | ||||||||||||||||||
|
@@ -193,8 +261,8 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) { | |||||||||||||||||
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_TRUE(player->play_next()); | ||||||||||||||||||
ASSERT_TRUE(player->play_next()); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
Comment on lines
+264
to
+265
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
player->resume(); | ||||||||||||||||||
auto start = std::chrono::steady_clock::now(); | ||||||||||||||||||
|
@@ -247,7 +315,7 @@ TEST_F(RosBag2PlayTestFixture, player_can_resume_after_play_next) { | |||||||||||||||||
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_TRUE(player->play_next()); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
player->resume(); | ||||||||||||||||||
player_future.get(); | ||||||||||||||||||
|
@@ -303,13 +371,13 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) { | |||||||||||||||||
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();}); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
ASSERT_TRUE(player->play_next()); | ||||||||||||||||||
ASSERT_EQ(1u, player->play_next(1u)); | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
|
||||||||||||||||||
size_t played_messages = 1; | ||||||||||||||||||
while (player->play_next()) { | ||||||||||||||||||
while (player->play_next(1u) == 1u) { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||||||
played_messages++; | ||||||||||||||||||
} | ||||||||||||||||||
ASSERT_EQ(played_messages, 3U); | ||||||||||||||||||
ASSERT_EQ(played_messages, 3u); | ||||||||||||||||||
|
||||||||||||||||||
ASSERT_TRUE(player->is_paused()); | ||||||||||||||||||
player->resume(); | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.