Skip to content

Commit

Permalink
Make sure published messages are acknowledged for play mode (#951)
Browse files Browse the repository at this point in the history
* Make sure published messages are acknowledged for play mode

Signed-off-by: Barry Xu <barry.xu@sony.com>

* Address review comments

Signed-off-by: Barry Xu <barry.xu@sony.com>

* Move check_not_negative() to __init__.py

Signed-off-by: Barry Xu <barry.xu@sony.com>

* Address review comments

Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 authored Feb 1, 2022
1 parent f31445b commit 9447f4e
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 0 deletions.
11 changes: 11 additions & 0 deletions ros2bag/ros2bag/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,14 @@ def check_path_exists(value: Any) -> str:
raise ArgumentTypeError("Bag file '{}' does not exist!".format(value))
except ValueError:
raise ArgumentTypeError('{} is not the valid type (string)'.format(value))


def check_not_negative_int(arg: str) -> int:
"""Argparse validator to verify that a value is a int and not negative."""
try:
value = int(arg)
if value < 0:
raise ArgumentTypeError(f'Value {value} is less than zero.')
return value
except ValueError:
raise ArgumentTypeError('{} is not the valid type (int)'.format(value))
13 changes: 13 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from argparse import FileType

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_path_exists
from ros2bag.api import check_positive_float
from ros2bag.api import convert_yaml_to_qos_profile
Expand Down Expand Up @@ -93,6 +94,17 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'--start-offset', type=check_positive_float, default=0.0,
help='Start the playback player this many seconds into the bag file.')
parser.add_argument(
'--wait-for-all-acked', type=check_not_negative_int, default=-1,
help='Wait until all published messages are acknowledged by all subscribers or until '
'the timeout elapses in millisecond before play is terminated. '
'Especially for the case of sending message with big size in a short time. '
'Negative timeout is invalid. '
'0 means wait forever until all published messages are acknowledged by all '
'subscribers. '
"Note that this option is valid only if the publisher\'s QOS profile is "
'RELIABLE.',
metavar='TIMEOUT')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -131,6 +143,7 @@ def main(self, *, args): # noqa: D102
play_options.disable_keyboard_controls = args.disable_keyboard_controls
play_options.start_paused = args.start_paused
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = args.wait_for_all_acked

player = Player()
player.play(storage_options, play_options)
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ PYBIND11_MODULE(_transport, m) {
"start_offset",
&PlayOptions::getStartOffset,
&PlayOptions::setStartOffset)
.def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ struct PlayOptions
KeyboardHandler::KeyCode play_next_key = KeyboardHandler::KeyCode::CURSOR_RIGHT;
KeyboardHandler::KeyCode increase_rate_key = KeyboardHandler::KeyCode::CURSOR_UP;
KeyboardHandler::KeyCode decrease_rate_key = KeyboardHandler::KeyCode::CURSOR_DOWN;

// Timeout for waiting for all published messages to be acknowledged.
// Negative value means that published messages do not need to be acknowledged.
int64_t wait_acked_timeout = -1;
};

} // namespace rosbag2_transport
Expand Down
43 changes: 43 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,31 @@ void Player::play()
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
ready_to_play_from_queue_cv_.notify_all();

// Wait for all published messages to be acknowledged.
if (play_options_.wait_acked_timeout >= 0) {
std::chrono::milliseconds timeout(play_options_.wait_acked_timeout);
if (timeout == std::chrono::milliseconds(0)) {
timeout = std::chrono::milliseconds(-1);
}
for (auto pub : publishers_) {
try {
if (!pub.second->wait_for_all_acked(timeout)) {
RCLCPP_ERROR(
get_logger(),
"Timed out while waiting for all published messages to be acknowledged for topic %s",
pub.first.c_str());
}
} catch (std::exception & e) {
RCLCPP_ERROR(
get_logger(),
"Exception occurred while waiting for all published messages to be acknowledged for "
"topic %s : %s",
pub.first.c_str(),
e.what());
}
}
}
}

void Player::pause()
Expand Down Expand Up @@ -463,6 +488,7 @@ void Player::prepare_publishers()

// Create topic publishers
auto topics = reader_->get_all_topics_and_types();
std::string topic_without_support_acked;
for (const auto & topic : topics) {
if (publishers_.find(topic.name) != publishers_.end()) {
continue;
Expand All @@ -483,6 +509,11 @@ void Player::prepare_publishers()
publishers_.insert(
std::make_pair(
topic.name, create_generic_publisher(topic.name, topic.type, topic_qos)));
if (play_options_.wait_acked_timeout >= 0 &&
topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort)
{
topic_without_support_acked += topic.name + ", ";
}
} catch (const std::runtime_error & e) {
// using a warning log seems better than adding a new option
// to ignore some unknown message type library
Expand All @@ -491,6 +522,18 @@ void Player::prepare_publishers()
"Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what());
}
}

if (!topic_without_support_acked.empty()) {
// remove the last ", "
topic_without_support_acked.erase(
topic_without_support_acked.end() - 2,
topic_without_support_acked.end());

RCLCPP_WARN(
get_logger(),
"--wait-for-all-acked is invalid for the below topics since reliability of QOS is "
"BestEffort.\n%s", topic_without_support_acked.c_str());
}
}

bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message)
Expand Down

0 comments on commit 9447f4e

Please sign in to comment.