Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure published messages are acknowledged for play mode #951

Merged
merged 4 commits into from
Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ros2bag/ros2bag/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,14 @@ def check_path_exists(value: Any) -> str:
raise ArgumentTypeError("Bag file '{}' does not exist!".format(value))
except ValueError:
raise ArgumentTypeError('{} is not the valid type (string)'.format(value))


def check_not_negative_int(arg: str) -> int:
"""Argparse validator to verify that a value is a int and not negative."""
try:
value = int(arg)
if value < 0:
raise ArgumentTypeError(f'Value {value} is less than zero.')
return value
except ValueError:
raise ArgumentTypeError('{} is not the valid type (int)'.format(value))
13 changes: 13 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from argparse import FileType

from rclpy.qos import InvalidQoSProfileException
from ros2bag.api import check_not_negative_int
from ros2bag.api import check_path_exists
from ros2bag.api import check_positive_float
from ros2bag.api import convert_yaml_to_qos_profile
Expand Down Expand Up @@ -93,6 +94,17 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'--start-offset', type=check_positive_float, default=0.0,
help='Start the playback player this many seconds into the bag file.')
parser.add_argument(
'--wait-for-all-acked', type=check_not_negative_int, default=-1,
help='Wait until all published messages are acknowledged by all subscribers or until '
'the timeout elapses in millisecond before play is terminated. '
'Especially for the case of sending message with big size in a short time. '
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this flows a little better, though I tend to be a bit wordy.

Suggested change
'Especially for the case of sending message with big size in a short time. '
'Especially useful when sending many messages with large sizes in a short timeframe.'

'Negative timeout is invalid. '
'0 means wait forever until all published messages are acknowledged by all '
'subscribers. '
"Note that this option is valid only if the publisher\'s QOS profile is "
'RELIABLE.',
metavar='TIMEOUT')

def main(self, *, args): # noqa: D102
qos_profile_overrides = {} # Specify a valid default
Expand Down Expand Up @@ -131,6 +143,7 @@ def main(self, *, args): # noqa: D102
play_options.disable_keyboard_controls = args.disable_keyboard_controls
play_options.start_paused = args.start_paused
play_options.start_offset = args.start_offset
play_options.wait_acked_timeout = args.wait_for_all_acked

player = Player()
player.play(storage_options, play_options)
1 change: 1 addition & 0 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ PYBIND11_MODULE(_transport, m) {
"start_offset",
&PlayOptions::getStartOffset,
&PlayOptions::setStartOffset)
.def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout)
;

py::class_<RecordOptions>(m, "RecordOptions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ struct PlayOptions
KeyboardHandler::KeyCode play_next_key = KeyboardHandler::KeyCode::CURSOR_RIGHT;
KeyboardHandler::KeyCode increase_rate_key = KeyboardHandler::KeyCode::CURSOR_UP;
KeyboardHandler::KeyCode decrease_rate_key = KeyboardHandler::KeyCode::CURSOR_DOWN;

// Timeout for waiting for all published messages to be acknowledged.
// Negative value means that published messages do not need to be acknowledged.
int64_t wait_acked_timeout = -1;
};

} // namespace rosbag2_transport
Expand Down
43 changes: 43 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,31 @@ void Player::play()
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
ready_to_play_from_queue_cv_.notify_all();

// Wait for all published messages to be acknowledged.
if (play_options_.wait_acked_timeout >= 0) {
std::chrono::milliseconds timeout(play_options_.wait_acked_timeout);
if (timeout == std::chrono::milliseconds(0)) {
timeout = std::chrono::milliseconds(-1);
}
for (auto pub : publishers_) {
try {
if (!pub.second->wait_for_all_acked(timeout)) {
RCLCPP_ERROR(
get_logger(),
"Timed out while waiting for all published messages to be acknowledged for topic %s",
pub.first.c_str());
}
} catch (std::exception & e) {
RCLCPP_ERROR(
get_logger(),
"Exception occurred while waiting for all published messages to be acknowledged for "
"topic %s : %s",
pub.first.c_str(),
e.what());
}
}
}
}

void Player::pause()
Expand Down Expand Up @@ -463,6 +488,7 @@ void Player::prepare_publishers()

// Create topic publishers
auto topics = reader_->get_all_topics_and_types();
std::string topic_without_support_acked;
for (const auto & topic : topics) {
if (publishers_.find(topic.name) != publishers_.end()) {
continue;
Expand All @@ -483,6 +509,11 @@ void Player::prepare_publishers()
publishers_.insert(
std::make_pair(
topic.name, create_generic_publisher(topic.name, topic.type, topic_qos)));
if (play_options_.wait_acked_timeout >= 0 &&
topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort)
{
topic_without_support_acked += topic.name + ", ";
}
} catch (const std::runtime_error & e) {
// using a warning log seems better than adding a new option
// to ignore some unknown message type library
Expand All @@ -491,6 +522,18 @@ void Player::prepare_publishers()
"Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what());
}
}

if (!topic_without_support_acked.empty()) {
// remove the last ", "
topic_without_support_acked.erase(
topic_without_support_acked.end() - 2,
topic_without_support_acked.end());

RCLCPP_WARN(
get_logger(),
"--wait-for-all-acked is invalid for the below topics since reliability of QOS is "
"BestEffort.\n%s", topic_without_support_acked.c_str());
}
}

bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message)
Expand Down