Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add play-next-n functionality #963

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rosbag2_interfaces/srv/PlayNext.srv
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
uint64 num_messages 1 # Defaults to one for backwards compatibility.
---
bool success # can only play-next while playback is paused
uint64 played_messages # The number of messages that were actually played.
7 changes: 4 additions & 3 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,15 @@ class Player : public rclcpp::Node
ROSBAG2_TRANSPORT_PUBLIC
virtual bool set_rate(double);

/// \brief Playing next message from queue when in pause.
/// \brief Playing next \p num_messages messages from queue when in pause.
/// \param num_messages The number of messages to play from the queue.
/// \details This is blocking call and it will wait until next available message will be
/// published or rclcpp context shut down.
/// \note If internal player queue is starving and storage has not been completely loaded,
/// this method will wait until new element will be pushed to the queue.
/// \return true if player in pause mode and successfully played next message, otherwise false.
/// \return The number of actually sent messages from the queue.
ROSBAG2_TRANSPORT_PUBLIC
virtual bool play_next();
virtual uint64_t play_next(const uint64_t num_messages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
virtual uint64_t play_next(const uint64_t num_messages);
virtual uint64_t play_next(const uint64_t num_messages = 1u);


/// \brief Advance player to the message with closest timestamp >= time_point.
/// \details This is blocking call and it will wait until current message will be published
Expand Down
65 changes: 43 additions & 22 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,38 +315,59 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_
return message_ptr;
}

bool Player::play_next()
uint64_t Player::play_next(const uint64_t num_messages)
{
if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state.");
return false;
return 0u;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (num_messages == 0) {
RCLCPP_WARN_STREAM(get_logger(), "Called play next for zero number of messages");
return 0u;
}

RCLCPP_INFO_STREAM(get_logger(), "Playing next message.");

// Temporary take over playback from play_messages_from_queue()
std::lock_guard<std::mutex> main_play_loop_lk(skip_message_in_main_play_loop_mutex_);
skip_message_in_main_play_loop_ = true;
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to
// be called if not yet and queue to be filled with messages.
{
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
}
Comment on lines -330 to -335
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO. It doesn't make sense moving wait for playback start inside loop.


MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();
uint64_t message_counter = 0;

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
while (message_counter < num_messages) {
RCLCPP_INFO_STREAM(get_logger(), "Playing next message.");
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to
// be called if not yet and queue to be filled with messages.
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
next_message_published = publish_message(message);
clock_->jump(message->time_stamp);
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();

rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;

// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
clock_->resume();
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {
if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) {
break;
}
}
clock_->pause();

next_message_published = publish_message(message);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
if (!next_message_published) {
break;
}

message_counter++;
}
return next_message_published;


return message_counter;
}

void Player::seek(rcutils_time_point_value_t time_point)
Expand Down Expand Up @@ -585,7 +606,7 @@ void Player::add_keyboard_callbacks()
);
add_key_callback(
play_options_.play_next_key,
[this]() {play_next();},
[this]() {play_next(1u);},
"Play Next Message"
);
add_key_callback(
Expand Down Expand Up @@ -653,10 +674,10 @@ void Player::create_control_services()
srv_play_next_ = create_service<rosbag2_interfaces::srv::PlayNext>(
"~/play_next",
[this](
rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr,
rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr request,
rosbag2_interfaces::srv::PlayNext::Response::SharedPtr response)
{
response->success = play_next();
response->played_messages = play_next(request->num_messages);
});
srv_seek_ = create_service<rosbag2_interfaces::srv::Seek>(
"~/seek",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,11 @@ class CountPlayer : public Player
num_resumed++;
}

bool play_next() override
uint64_t play_next(const uint64_t num_messages) override
{
bool ret = Player::play_next();
if (ret) {
num_played_next++;
}
return ret;
const uint64_t played_messages = Player::play_next(num_messages);
num_played_next += played_messages;
return played_messages;
}

bool set_rate(double rate) override
Expand Down
108 changes: 88 additions & 20 deletions rosbag2_transport/test/rosbag2_transport/test_play_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ TEST_F(RosBag2PlayTestFixture, play_next_with_false_preconditions) {
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);

ASSERT_FALSE(player->is_paused());
ASSERT_FALSE(player->play_next());
ASSERT_EQ(0u, player->play_next(1u));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ASSERT_EQ(0u, player->play_next(1u));
ASSERT_EQ(player->play_next(), 0u);

player->pause();
ASSERT_TRUE(player->is_paused());
}
Expand All @@ -60,10 +60,10 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) {

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 2100, primitive_message),
serialize_test_message("topic1", 3300, primitive_message),
serialize_test_message("topic1", 4600, primitive_message),
serialize_test_message("topic1", 5900, primitive_message)
serialize_test_message("topic1", 210, primitive_message),
serialize_test_message("topic1", 330, primitive_message),
serialize_test_message("topic1", 460, primitive_message),
serialize_test_message("topic1", 590, primitive_message)
Comment on lines +63 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following timings was made thousands on purpose. It doesn't have impact on the test execution time.
Please revert changes. This is affect test logic.

Suggested change
serialize_test_message("topic1", 210, primitive_message),
serialize_test_message("topic1", 330, primitive_message),
serialize_test_message("topic1", 460, primitive_message),
serialize_test_message("topic1", 590, primitive_message)
serialize_test_message("topic1", 2100, primitive_message),
serialize_test_message("topic1", 3300, primitive_message),
serialize_test_message("topic1", 4600, primitive_message),
serialize_test_message("topic1", 5900, primitive_message)

};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
Expand All @@ -88,14 +88,14 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_all_messages_without_delays) {

ASSERT_TRUE(player->is_paused());
auto start = std::chrono::steady_clock::now();
ASSERT_TRUE(player->play_next());
ASSERT_EQ(1u, player->play_next(1u));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ASSERT_EQ(1u, player->play_next(1u));
ASSERT_EQ(player->play_next(), 1u);

size_t played_messages = 1;
while (player->play_next()) {
while (player->play_next(1u) == 1u) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (player->play_next(1u) == 1u) {
while (player->play_next() == 1u) {

played_messages++;
}
auto replay_time = std::chrono::steady_clock::now() - start;
ASSERT_EQ(played_messages, messages.size());
ASSERT_THAT(replay_time, Lt(std::chrono::seconds(2)));
ASSERT_THAT(replay_time, Lt(std::chrono::milliseconds(600)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert changes. It does affect test logic.

Suggested change
ASSERT_THAT(replay_time, Lt(std::chrono::milliseconds(600)));
ASSERT_THAT(replay_time, Lt(std::chrono::seconds(2)));

ASSERT_TRUE(player->is_paused());
player->resume();
player_future.get();
Expand All @@ -114,10 +114,10 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message)
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message)
Comment on lines +117 to +120
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following timings was made thousands on purpose. It doesn't have impact on the test execution time.
Please revert changes. This is affect test logic.

Suggested change
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message)
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message)

};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
Expand All @@ -140,9 +140,9 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next());
ASSERT_EQ(1u, player->play_next(1u));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ASSERT_EQ(1u, player->play_next(1u));
ASSERT_EQ(player->play_next(), 1u);

size_t played_messages = 1;
while (player->play_next()) {
while (player->play_next(1u) == 1u) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (player->play_next(1u) == 1u) {
while (player->play_next() == 1u) {

// Yield CPU resources for player-play() running in separate thread to make sure that it
// will not play extra messages.
std::this_thread::sleep_for(std::chrono::milliseconds(30));
Expand All @@ -158,6 +158,74 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa
EXPECT_THAT(replayed_topic1, SizeIs(messages.size()));
}

TEST_F(RosBag2PlayTestFixture, play_next_n_messages_with_the_same_timestamp) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message),
serialize_test_message("topic1", 100, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", messages.size());

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30)));

auto await_received_messages = sub_->spin_subscriptions();

player->pause();
ASSERT_TRUE(player->is_paused());

auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});

MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_TRUE(player->is_paused());
ASSERT_EQ(0u, player->play_next(0u));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(0u));

ASSERT_TRUE(player->is_paused());
ASSERT_EQ(1u, player->play_next(1u));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(1u));

ASSERT_TRUE(player->is_paused());
ASSERT_EQ(2u, player->play_next(2u));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(3u));

ASSERT_TRUE(player->is_paused());
ASSERT_EQ(1u, player->play_next(1u));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u));

ASSERT_TRUE(player->is_paused());
ASSERT_EQ(0u, player->play_next(1u));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u));

ASSERT_TRUE(player->is_paused());
player->resume();
player_future.get();
await_received_messages.get();
}

TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;
Expand Down Expand Up @@ -193,8 +261,8 @@ TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) {
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next());
ASSERT_TRUE(player->play_next());
ASSERT_EQ(1u, player->play_next(1u));
ASSERT_EQ(1u, player->play_next(1u));
Comment on lines +264 to +265
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ASSERT_EQ(1u, player->play_next(1u));
ASSERT_EQ(1u, player->play_next(1u));
ASSERT_EQ(player->play_next(), 1u);
ASSERT_EQ(player->play_next(), 1u);

ASSERT_TRUE(player->is_paused());
player->resume();
auto start = std::chrono::steady_clock::now();
Expand Down Expand Up @@ -247,7 +315,7 @@ TEST_F(RosBag2PlayTestFixture, player_can_resume_after_play_next) {
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next());
ASSERT_EQ(1u, player->play_next(1u));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ASSERT_EQ(1u, player->play_next(1u));
ASSERT_EQ(player->play_next(), 1u);

ASSERT_TRUE(player->is_paused());
player->resume();
player_future.get();
Expand Down Expand Up @@ -303,13 +371,13 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) {
auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next());
ASSERT_EQ(1u, player->play_next(1u));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ASSERT_EQ(1u, player->play_next(1u));
ASSERT_EQ(player->play_next(), 1u);


size_t played_messages = 1;
while (player->play_next()) {
while (player->play_next(1u) == 1u) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
while (player->play_next(1u) == 1u) {
while (player->play_next() == 1u) {

played_messages++;
}
ASSERT_EQ(played_messages, 3U);
ASSERT_EQ(played_messages, 3u);

ASSERT_TRUE(player->is_paused());
player->resume();
Expand Down
Loading