Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add play-next-n functionality #963

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rosbag2_interfaces/srv/PlayNext.srv
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
uint64 num_messages 1 # Defaults to one for backwards compatibility
---
bool success # can only play-next while playback is paused
5 changes: 4 additions & 1 deletion rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,13 +121,16 @@ class Player : public rclcpp::Node
virtual bool set_rate(double);

/// \brief Playing next message from queue when in pause.
/// \param num_messages The number of messages to play from the queue. It is
/// nullopt by default which makes it just take one. When zero, it'll just
/// make the method return true.
/// \details This is blocking call and it will wait until next available message will be
/// published or rclcpp context shut down.
/// \note If internal player queue is starving and storage has not been completely loaded,
/// this method will wait until new element will be pushed to the queue.
/// \return true if player in pause mode and successfully played next message, otherwise false.
ROSBAG2_TRANSPORT_PUBLIC
virtual bool play_next();
virtual bool play_next(const std::optional<uint64_t> num_messages = std::nullopt);
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved

/// \brief Advance player to the message with closest timestamp >= time_point.
/// \details This is blocking call and it will wait until current message will be published
Expand Down
44 changes: 31 additions & 13 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,13 @@ rosbag2_storage::SerializedBagMessageSharedPtr * Player::peek_next_message_from_
return message_ptr;
}

bool Player::play_next()
bool Player::play_next(const std::optional<uint64_t> num_messages)
{
// Don't do anything if no messages were requested
if (num_messages.has_value() && *num_messages == 0) {
return true;
}

if (!clock_->is_paused()) {
RCLCPP_WARN_STREAM(get_logger(), "Called play next, but not in paused state.");
return false;
Expand All @@ -326,25 +331,38 @@ bool Player::play_next()

// Temporary take over playback from play_messages_from_queue()
std::lock_guard<std::mutex> main_play_loop_lk(skip_message_in_main_play_loop_mutex_);
skip_message_in_main_play_loop_ = true;

MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
// Wait for player to be ready for playback messages from queue i.e. wait for Player:play() to
// be called if not yet and queue to be filled with messages.
{
std::unique_lock<std::mutex> lk(ready_to_play_from_queue_mutex_);
ready_to_play_from_queue_cv_.wait(lk, [this] {return is_ready_to_play_from_queue_;});
}

rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();

bool next_message_published = false;
while (message_ptr != nullptr && !next_message_published) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
next_message_published = publish_message(message);
clock_->jump(message->time_stamp);
uint64_t messages_remaining = num_messages.has_value() ? *num_messages : 1u;

while (messages_remaining > 0) {
// Reset state for the next iteration.
next_message_published = false;
skip_message_in_main_play_loop_ = true;

rosbag2_storage::SerializedBagMessageSharedPtr * message_ptr = peek_next_message_from_queue();

while (message_ptr != nullptr && !next_message_published) {
{
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
next_message_published = publish_message(message);
clock_->jump(message->time_stamp);
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
}
message_queue_.pop();
message_ptr = peek_next_message_from_queue();
// There are no more messages of the built publishers to play.
if (message_ptr == nullptr) {
break;
}
messages_remaining--;
}
return next_message_published;
}
Expand Down Expand Up @@ -653,10 +671,10 @@ void Player::create_control_services()
srv_play_next_ = create_service<rosbag2_interfaces::srv::PlayNext>(
"~/play_next",
[this](
rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr,
rosbag2_interfaces::srv::PlayNext::Request::ConstSharedPtr request,
rosbag2_interfaces::srv::PlayNext::Response::SharedPtr response)
{
response->success = play_next();
response->success = play_next({request->num_messages});
});
srv_seek_ = create_service<rosbag2_interfaces::srv::Seek>(
"~/seek",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class CountPlayer : public Player
num_resumed++;
}

bool play_next() override
bool play_next(const std::optional<uint64_t> /* num_messages */ = std::nullopt) override
{
bool ret = Player::play_next();
if (ret) {
Expand Down
107 changes: 107 additions & 0 deletions rosbag2_transport/test/rosbag2_transport/test_play_next.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,69 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_one_by_one_messages_with_the_sa
EXPECT_THAT(replayed_topic1, SizeIs(messages.size()));
}

TEST_F(RosBag2PlayTestFixture, play_next_n_messages_with_the_same_timestamp) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message),
serialize_test_message("topic1", 1000, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", messages.size());

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30)));

auto await_received_messages = sub_->spin_subscriptions();

player->pause();
ASSERT_TRUE(player->is_paused());

auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
player->wait_for_playback_to_start();
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved

MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({1u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(1u));

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({2u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(3u));

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({1u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u));

ASSERT_TRUE(player->is_paused());
ASSERT_FALSE(player->play_next({1u}));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(4u));

ASSERT_TRUE(player->is_paused());
player->resume();
player_future.get();
await_received_messages.get();
}

TEST_F(RosBag2PlayTestFixture, play_respect_messages_timing_after_play_next) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;
Expand Down Expand Up @@ -324,3 +387,47 @@ TEST_F(RosBag2PlayTestFixture, play_next_playing_only_filtered_topics) {
// All we care is that any messages arrived
EXPECT_THAT(replayed_topic2, SizeIs(Eq(3u)));
}

TEST_F(RosBag2PlayTestFixture, play_next_no_message_must_succeed) {
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = 42;

auto topic_types = std::vector<rosbag2_storage::TopicMetadata>{
{"topic1", "test_msgs/BasicTypes", "", ""}};

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{
serialize_test_message("topic1", 1000, primitive_message)
};

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));
auto player = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);

sub_ = std::make_shared<SubscriptionManager>();
sub_->add_subscription<test_msgs::msg::BasicTypes>("/topic1", messages.size());

// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30)));

auto await_received_messages = sub_->spin_subscriptions();

player->pause();
ASSERT_TRUE(player->is_paused());

auto player_future = std::async(std::launch::async, [&player]() -> void {player->play();});
player->wait_for_playback_to_start();

ASSERT_TRUE(player->is_paused());
ASSERT_TRUE(player->play_next({0u}));
// Yield CPU resources so messages are actually sent through.
std::this_thread::sleep_for(std::chrono::milliseconds(50));
EXPECT_THAT(sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1"), SizeIs(0u));

ASSERT_TRUE(player->is_paused());
player->resume();
player_future.get();
await_received_messages.get();
}