diff --git a/ros2bag/ros2bag/api/__init__.py b/ros2bag/ros2bag/api/__init__.py index b96e81f8b8..f054fe5aaa 100644 --- a/ros2bag/ros2bag/api/__init__.py +++ b/ros2bag/ros2bag/api/__init__.py @@ -107,3 +107,14 @@ def check_path_exists(value: Any) -> str: raise ArgumentTypeError("Bag file '{}' does not exist!".format(value)) except ValueError: raise ArgumentTypeError('{} is not the valid type (string)'.format(value)) + + +def check_not_negative_int(arg: str) -> int: + """Argparse validator to verify that a value is a int and not negative.""" + try: + value = int(arg) + if value < 0: + raise ArgumentTypeError(f'Value {value} is less than zero.') + return value + except ValueError: + raise ArgumentTypeError('{} is not the valid type (int)'.format(value)) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 23b603ecda..d82f1a2a7b 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -15,6 +15,7 @@ from argparse import FileType from rclpy.qos import InvalidQoSProfileException +from ros2bag.api import check_not_negative_int from ros2bag.api import check_path_exists from ros2bag.api import check_positive_float from ros2bag.api import convert_yaml_to_qos_profile @@ -93,6 +94,17 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '--start-offset', type=check_positive_float, default=0.0, help='Start the playback player this many seconds into the bag file.') + parser.add_argument( + '--wait-for-all-acked', type=check_not_negative_int, default=-1, + help='Wait until all published messages are acknowledged by all subscribers or until ' + 'the timeout elapses in millisecond before play is terminated. ' + 'Especially for the case of sending message with big size in a short time. ' + 'Negative timeout is invalid. ' + '0 means wait forever until all published messages are acknowledged by all ' + 'subscribers. ' + "Note that this option is valid only if the publisher\'s QOS profile is " + 'RELIABLE.', + metavar='TIMEOUT') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -131,6 +143,7 @@ def main(self, *, args): # noqa: D102 play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset + play_options.wait_acked_timeout = args.wait_for_all_acked player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 9ea5ba3a67..c11ca23929 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -247,6 +247,7 @@ PYBIND11_MODULE(_transport, m) { "start_offset", &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) + .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 623233e4a8..27e33b9561 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -62,6 +62,10 @@ struct PlayOptions KeyboardHandler::KeyCode play_next_key = KeyboardHandler::KeyCode::CURSOR_RIGHT; KeyboardHandler::KeyCode increase_rate_key = KeyboardHandler::KeyCode::CURSOR_UP; KeyboardHandler::KeyCode decrease_rate_key = KeyboardHandler::KeyCode::CURSOR_DOWN; + + // Timeout for waiting for all published messages to be acknowledged. + // Negative value means that published messages do not need to be acknowledged. + int64_t wait_acked_timeout = -1; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 53d4075fd5..446bb8303f 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -229,6 +229,31 @@ void Player::play() std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; ready_to_play_from_queue_cv_.notify_all(); + + // Wait for all published messages to be acknowledged. + if (play_options_.wait_acked_timeout >= 0) { + std::chrono::milliseconds timeout(play_options_.wait_acked_timeout); + if (timeout == std::chrono::milliseconds(0)) { + timeout = std::chrono::milliseconds(-1); + } + for (auto pub : publishers_) { + try { + if (!pub.second->wait_for_all_acked(timeout)) { + RCLCPP_ERROR( + get_logger(), + "Timed out while waiting for all published messages to be acknowledged for topic %s", + pub.first.c_str()); + } + } catch (std::exception & e) { + RCLCPP_ERROR( + get_logger(), + "Exception occurred while waiting for all published messages to be acknowledged for " + "topic %s : %s", + pub.first.c_str(), + e.what()); + } + } + } } void Player::pause() @@ -463,6 +488,7 @@ void Player::prepare_publishers() // Create topic publishers auto topics = reader_->get_all_topics_and_types(); + std::string topic_without_support_acked; for (const auto & topic : topics) { if (publishers_.find(topic.name) != publishers_.end()) { continue; @@ -483,6 +509,11 @@ void Player::prepare_publishers() publishers_.insert( std::make_pair( topic.name, create_generic_publisher(topic.name, topic.type, topic_qos))); + if (play_options_.wait_acked_timeout >= 0 && + topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) + { + topic_without_support_acked += topic.name + ", "; + } } catch (const std::runtime_error & e) { // using a warning log seems better than adding a new option // to ignore some unknown message type library @@ -491,6 +522,18 @@ void Player::prepare_publishers() "Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what()); } } + + if (!topic_without_support_acked.empty()) { + // remove the last ", " + topic_without_support_acked.erase( + topic_without_support_acked.end() - 2, + topic_without_support_acked.end()); + + RCLCPP_WARN( + get_logger(), + "--wait-for-all-acked is invalid for the below topics since reliability of QOS is " + "BestEffort.\n%s", topic_without_support_acked.c_str()); + } } bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message)