Skip to content

Commit

Permalink
Second round of comments from ros2#960 upstream (#15)
Browse files Browse the repository at this point in the history
* Removes duration parameter. A leftover after switching to playback_duration.

* Fixes comment.

* Solves format in rosbag2_py -> _transport.cpp

* Applies style suggestions.

* Changes play() to return a boolean indicating whether the request could be fulfilled.

* Removes extra unnecessary code.
  • Loading branch information
agalbachicar authored Mar 17, 2022
1 parent f83fdc1 commit f71ea9c
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 90 deletions.
10 changes: 1 addition & 9 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,6 @@ def add_arguments(self, parser, cli_name): # noqa: D102
"Note that this option is valid only if the publisher\'s QOS profile is "
'RELIABLE.',
metavar='TIMEOUT')
parser.add_argument(
'-f', '--duration', type=float, default=None,
help='Play for SEC seconds. Default is None, meaning that playback will continue '
'until the end of the bag. Valid range > 0.0')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -154,9 +150,5 @@ def main(self, *, args): # noqa: D102
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = args.wait_for_all_acked

# Gets the duration in nanoseconds when a value is provided for player
# consumption.
duration = int(args.duration * 1e9) if args.duration else args.duration

player = Player()
player.play(storage_options, play_options, duration)
player.play(storage_options, play_options)
5 changes: 3 additions & 2 deletions rosbag2_interfaces/srv/Play.srv
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# See rosbag2_transport::PlayOptions::start_time
# See rosbag2_transport::PlayOptions::start_offset
builtin_interfaces/Time start_offset
# See rosbag2_transport::PlayOptions::playback_duration
builtin_interfaces/Duration playback_duration
---
bool success # return true
# returns false when another playback execution is running, otherwise true
bool success
3 changes: 0 additions & 3 deletions rosbag2_interfaces/srv/PlayFor.srv

This file was deleted.

8 changes: 3 additions & 5 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,7 @@ class Player

void play(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options,
const std::optional<rcutils_duration_value_t> & duration)
PlayOptions & play_options)
{
auto reader = rosbag2_transport::ReaderWriterFactory::make_reader(storage_options);
auto player = std::make_shared<rosbag2_transport::Player>(
Expand All @@ -144,7 +143,7 @@ class Player
[&exec]() {
exec.spin();
});
player->play(duration);
player->play();

exec.cancel();
spin_thread.join();
Expand Down Expand Up @@ -291,8 +290,7 @@ PYBIND11_MODULE(_transport, m) {

py::class_<rosbag2_py::Player>(m, "Player")
.def(py::init())
.def(
"play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
.def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,6 @@ class SubscriptionManager
});
}

std::future<void> spin_subscriptions_for(rcutils_duration_value_t duration)
{
return async(
std::launch::async, [this, duration]() {
rclcpp::executors::SingleThreadedExecutor exec;
auto end = std::chrono::high_resolution_clock::now() + std::chrono::nanoseconds(duration);
while (rclcpp::ok() && continue_spinning_until(expected_topics_with_size_, end)) {
exec.spin_node_some(subscriber_node_);
}
});
}

private:
bool continue_spinning(
const std::unordered_map<std::string, size_t> & expected_topics_with_sizes,
Expand All @@ -159,23 +147,6 @@ class SubscriptionManager
return false;
}

bool continue_spinning_until(
const std::unordered_map<std::string, size_t> & expected_topics_with_sizes,
std::chrono::time_point<std::chrono::high_resolution_clock> end_time)
{
auto current = std::chrono::high_resolution_clock::now();
if (current > end_time) {
return false;
}

for (const auto & topic_expected : expected_topics_with_sizes) {
if (subscribed_messages_[topic_expected.first].size() < topic_expected.second) {
return true;
}
}
return false;
}

std::vector<rclcpp::SubscriptionBase::SharedPtr> subscriptions_;
std::unordered_map<std::string,
std::vector<std::shared_ptr<rclcpp::SerializedMessage>>> subscribed_messages_;
Expand Down
4 changes: 2 additions & 2 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class Player : public rclcpp::Node
virtual ~Player();

ROSBAG2_TRANSPORT_PUBLIC
void play(const std::optional<rcutils_duration_value_t> & duration = std::nullopt);
bool play();

// Playback control interface
/// Pause the flow of time for playback.
Expand Down Expand Up @@ -184,6 +184,7 @@ class Player : public rclcpp::Node
std::mutex skip_message_in_main_play_loop_mutex_;
bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY(
skip_message_in_main_play_loop_mutex_) = false;
std::atomic_bool is_in_playback_{false};

rcutils_time_point_value_t starting_time_;

Expand All @@ -196,7 +197,6 @@ class Player : public rclcpp::Node
rclcpp::Service<rosbag2_interfaces::srv::SetRate>::SharedPtr srv_set_rate_;
rclcpp::Service<rosbag2_interfaces::srv::Play>::SharedPtr srv_play_;
rclcpp::Service<rosbag2_interfaces::srv::PlayNext>::SharedPtr srv_play_next_;
rclcpp::Service<rosbag2_interfaces::srv::PlayFor>::SharedPtr srv_play_for_;
rclcpp::Service<rosbag2_interfaces::srv::Seek>::SharedPtr srv_seek_;

// defaults
Expand Down
48 changes: 15 additions & 33 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,14 @@ bool Player::is_storage_completely_loaded() const
return !storage_loading_future_.valid();
}

void Player::play(const std::optional<rcutils_duration_value_t> & duration)
bool Player::play()
{
if (is_in_playback_) {
RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request.");
return false;
}
is_in_playback_ = true;

rclcpp::Duration delay(0, 0);
if (play_options_.delay >= rclcpp::Duration(0, 0)) {
delay = play_options_.delay;
Expand All @@ -202,15 +208,9 @@ void Player::play(const std::optional<rcutils_duration_value_t> & duration)
"Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled.");
}

rcutils_time_point_value_t play_until_time = starting_time_;
rcutils_time_point_value_t play_until_time = -1;
if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) {
play_until_time += play_options_.playback_duration.nanoseconds();
} else {
play_until_time = -1;
RCLCPP_INFO_STREAM(
get_logger(),
"Invalid playback duration value: " << play_options_.playback_duration.nanoseconds() <<
". Playback duration is disabled.");
play_until_time = starting_time_ + play_options_.playback_duration.nanoseconds();
}
RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time);

Expand Down Expand Up @@ -266,6 +266,9 @@ void Player::play(const std::optional<rcutils_duration_value_t> & duration)
}
}
}

is_in_playback_ = false;
return true;
}

void Player::pause()
Expand Down Expand Up @@ -454,9 +457,6 @@ void Player::play_messages_from_queue(const rcutils_duration_value_t & play_unti
}
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
if (play_until_time.has_value() && message->time_stamp > *play_until_time) {
break;
}
while (rclcpp::ok() && !clock_->sleep_until(message->time_stamp)) {
if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) {
break;
Expand Down Expand Up @@ -674,13 +674,9 @@ void Player::create_control_services()
rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request,
rosbag2_interfaces::srv::Play::Response::SharedPtr response)
{
play_options_.start_offset =
static_cast<rcutils_time_point_value_t>(RCUTILS_S_TO_NS(request->start_offset.sec)) +
static_cast<rcutils_time_point_value_t>(request->start_offset.nanosec);
play_options_.playback_duration = rclcpp::Duration(
request->playback_duration.sec, request->playback_duration.nanosec);
play();
response->success = true;
play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds();
play_options_.playback_duration = rclcpp::Duration(request->playback_duration);
response->success = play();
});
srv_play_next_ = create_service<rosbag2_interfaces::srv::PlayNext>(
"~/play_next",
Expand All @@ -690,20 +686,6 @@ void Player::create_control_services()
{
response->success = play_next();
});
srv_play_for_ = create_service<rosbag2_interfaces::srv::PlayFor>(
"~/play_for",
[this](
const std::shared_ptr<rmw_request_id_t>/* request_header */,
const std::shared_ptr<rosbag2_interfaces::srv::PlayFor::Request> request,
const std::shared_ptr<rosbag2_interfaces::srv::PlayFor::Response> response)
{
const rcutils_duration_value_t duration =
static_cast<rcutils_duration_value_t>(request->duration.sec) *
static_cast<rcutils_duration_value_t>(1000000000) +
static_cast<rcutils_duration_value_t>(request->duration.nanosec);
play({duration});
response->success = true;
});
srv_seek_ = create_service<rosbag2_interfaces::srv::Seek>(
"~/seek",
[this](
Expand Down
82 changes: 76 additions & 6 deletions rosbag2_transport/test/rosbag2_transport/test_play_for.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,18 +126,15 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture
sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1, 3);
sub_->add_subscription<test_msgs::msg::Arrays>(kTopic2, 3);

play_options_.playback_duration =
rclcpp::Duration(std::chrono::nanoseconds(std::chrono::milliseconds(milliseconds)));
player_ = std::make_shared<MockPlayer>(
std::move(
reader), storage_options_, play_options_);
play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds));
player_ = std::make_shared<MockPlayer>(std::move(reader), storage_options_, play_options_);
// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5)));

auto await_received_messages = sub_->spin_subscriptions();

player_->play();
ASSERT_TRUE(player_->play());

await_received_messages.get();
}
Expand Down Expand Up @@ -252,3 +249,76 @@ TEST_F(
EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays);
EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays);
}

class RosBag2PlayForInterruptedTestFixture : public RosBag2PlayTestFixture
{
public:
static constexpr const char * kTopic1Name{"topic1"};
static constexpr const char * kTopic1{"/topic1"};

std::vector<rosbag2_storage::TopicMetadata> get_topic_types()
{
return {{kTopic1Name, "test_msgs/BasicTypes", "", ""}};
}

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>
get_serialized_messages()
{
auto primitive_message = get_messages_basic_types()[0];
primitive_message->int32_value = kIntValue;

// @{ Ordering matters. The mock reader implementation moves messages
// around without any knowledge about message chronology. It just picks
// the next one Make sure to keep the list in order or sort it!
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{serialize_test_message(kTopic1Name, 500, primitive_message),
serialize_test_message(kTopic1Name, 700, primitive_message),
serialize_test_message(kTopic1Name, 900, primitive_message)};
// @}
return messages;
}
};

TEST_F(
RosBag2PlayForInterruptedTestFixture,
play_should_return_false_when_interrupted)
{

auto topic_types = get_topic_types();
auto messages = get_serialized_messages();

auto prepared_mock_reader = std::make_unique<MockSequentialReader>();
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1, 3);

play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(1000));
std::shared_ptr<MockPlayer> player_ = std::make_shared<MockPlayer>(
std::move(
reader), storage_options_, play_options_);
// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5)));

auto await_received_messages = sub_->spin_subscriptions();

auto play_execution_1 = std::async(std::launch::async, [player_]() {return player_->play();});
auto play_execution_2 = std::async(std::launch::async, [player_]() {return player_->play();});

await_received_messages.get();

const auto play_execution_1_wait_result = play_execution_1.wait_for(
std::chrono::milliseconds(
1500));
const auto play_execution_2_wait_result = play_execution_2.wait_for(
std::chrono::milliseconds(
1500));

ASSERT_EQ(std::future_status::ready, play_execution_1_wait_result);
ASSERT_EQ(std::future_status::ready, play_execution_2_wait_result);

ASSERT_TRUE(
(play_execution_1.get() && !play_execution_2.get()) ||
(!play_execution_1.get() && play_execution_2.get()));
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) {
auto player = std::make_shared<rosbag2_transport::Player>(
std::move(
reader), storage_options_, play_options);
std::thread loop_thread(&rosbag2_transport::Player::play, player, std::nullopt);
std::thread loop_thread(&rosbag2_transport::Player::play, player);

await_received_messages.get();
rclcpp::shutdown();
Expand Down

0 comments on commit f71ea9c

Please sign in to comment.