From 43fe65e8f3d166dc890d9c4e59e543f797d56581 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Wed, 26 Jan 2022 17:54:02 +0800 Subject: [PATCH 1/4] Make sure published messages are acknowledged for play mode Signed-off-by: Barry Xu --- ros2bag/ros2bag/verb/play.py | 20 ++++++++++++ rosbag2_py/src/rosbag2_py/_transport.cpp | 1 + .../rosbag2_transport/play_options.hpp | 4 +++ .../src/rosbag2_transport/player.cpp | 32 +++++++++++++++++++ 4 files changed, 57 insertions(+) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 23b603ecda..ba42f82a9f 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from argparse import ArgumentTypeError from argparse import FileType from rclpy.qos import InvalidQoSProfileException @@ -35,6 +36,13 @@ def positive_float(arg: str) -> float: return value +def not_negative_int(arg: str) -> int: + value = int(arg) + if value < 0: + raise ArgumentTypeError(f'Value {value} is less than zero.') + return value + + class PlayVerb(VerbExtension): """Play back ROS data from a bag.""" @@ -93,6 +101,17 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '--start-offset', type=check_positive_float, default=0.0, help='Start the playback player this many seconds into the bag file.') + parser.add_argument( + '--wait-for-all-acked', type=not_negative_int, default=0, + help='Wait until all published messages are acknowledged by all subscribers or until ' + 'the timeout elapses in millisecond before play is terminated. ' + 'Especially for the case of sending message with big size in a short time. ' + 'Negative timeout is invalid. ' + '0 means wait forever until all published messages are acknowledged by all ' + 'subscribers. ' + "Note that this option is valid only if the publisher\'s QOS profile is " + 'RELIABLE.', + metavar='TIMEOUT') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -131,6 +150,7 @@ def main(self, *, args): # noqa: D102 play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset + play_options.wait_acked_timeout = args.wait_for_all_acked player = Player() player.play(storage_options, play_options) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 9ea5ba3a67..c11ca23929 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -247,6 +247,7 @@ PYBIND11_MODULE(_transport, m) { "start_offset", &PlayOptions::getStartOffset, &PlayOptions::setStartOffset) + .def_readwrite("wait_acked_timeout", &PlayOptions::wait_acked_timeout) ; py::class_(m, "RecordOptions") diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 623233e4a8..0bfeec0d06 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -62,6 +62,10 @@ struct PlayOptions KeyboardHandler::KeyCode play_next_key = KeyboardHandler::KeyCode::CURSOR_RIGHT; KeyboardHandler::KeyCode increase_rate_key = KeyboardHandler::KeyCode::CURSOR_UP; KeyboardHandler::KeyCode decrease_rate_key = KeyboardHandler::KeyCode::CURSOR_DOWN; + + // Timeout for waiting all published messages acknowledged + // negative means published messages need not to be acknowledged + int64_t wait_acked_timeout = -1; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 53d4075fd5..82b1ce6e62 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -229,6 +229,30 @@ void Player::play() std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; ready_to_play_from_queue_cv_.notify_all(); + + // Wait for all published messages are acknowledged + if (play_options_.wait_acked_timeout >= 0) { + std::chrono::milliseconds timeout(play_options_.wait_acked_timeout); + if (timeout == std::chrono::milliseconds(0)) { + timeout = std::chrono::milliseconds(-1); + } + for (auto pub : publishers_) { + try { + if (!pub.second->wait_for_all_acked(timeout)) { + RCLCPP_ERROR( + get_logger(), + "Failed to wait all published messages acknowledged for topic %s", + pub.first.c_str()); + } + } catch (std::exception & e) { + RCLCPP_ERROR( + get_logger(), + "Failed to wait all published messages acknowledged for topic %s : %s", + pub.first.c_str(), + e.what()); + } + } + } } void Player::pause() @@ -483,6 +507,14 @@ void Player::prepare_publishers() publishers_.insert( std::make_pair( topic.name, create_generic_publisher(topic.name, topic.type, topic_qos))); + if (play_options_.wait_acked_timeout >= 0 && + topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) + { + RCLCPP_WARN( + get_logger(), + "--wait-for-all-acked is invaild for topic '%s' since reliability of QOS is BestEffort.", + topic.name.c_str()); + } } catch (const std::runtime_error & e) { // using a warning log seems better than adding a new option // to ignore some unknown message type library From 96ae9005f7f28106aa8c6bbb305e45fb0c4721a3 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 28 Jan 2022 14:07:34 +0800 Subject: [PATCH 2/4] Address review comments Signed-off-by: Barry Xu --- ros2bag/ros2bag/verb/play.py | 2 +- .../rosbag2_transport/play_options.hpp | 4 ++-- .../src/rosbag2_transport/player.cpp | 22 ++++++++++++++----- 3 files changed, 19 insertions(+), 9 deletions(-) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index ba42f82a9f..1679cc6ca7 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -102,7 +102,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '--start-offset', type=check_positive_float, default=0.0, help='Start the playback player this many seconds into the bag file.') parser.add_argument( - '--wait-for-all-acked', type=not_negative_int, default=0, + '--wait-for-all-acked', type=not_negative_int, default=-1, help='Wait until all published messages are acknowledged by all subscribers or until ' 'the timeout elapses in millisecond before play is terminated. ' 'Especially for the case of sending message with big size in a short time. ' diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 0bfeec0d06..27e33b9561 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -63,8 +63,8 @@ struct PlayOptions KeyboardHandler::KeyCode increase_rate_key = KeyboardHandler::KeyCode::CURSOR_UP; KeyboardHandler::KeyCode decrease_rate_key = KeyboardHandler::KeyCode::CURSOR_DOWN; - // Timeout for waiting all published messages acknowledged - // negative means published messages need not to be acknowledged + // Timeout for waiting for all published messages to be acknowledged. + // Negative value means that published messages do not need to be acknowledged. int64_t wait_acked_timeout = -1; }; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 82b1ce6e62..f02ae21138 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -230,7 +230,7 @@ void Player::play() is_ready_to_play_from_queue_ = false; ready_to_play_from_queue_cv_.notify_all(); - // Wait for all published messages are acknowledged + // Wait for all published messages to be acknowledged. if (play_options_.wait_acked_timeout >= 0) { std::chrono::milliseconds timeout(play_options_.wait_acked_timeout); if (timeout == std::chrono::milliseconds(0)) { @@ -241,7 +241,7 @@ void Player::play() if (!pub.second->wait_for_all_acked(timeout)) { RCLCPP_ERROR( get_logger(), - "Failed to wait all published messages acknowledged for topic %s", + "Timeout to wait all published messages acknowledged for topic %s", pub.first.c_str()); } } catch (std::exception & e) { @@ -487,6 +487,7 @@ void Player::prepare_publishers() // Create topic publishers auto topics = reader_->get_all_topics_and_types(); + std::string topic_without_support_acked; for (const auto & topic : topics) { if (publishers_.find(topic.name) != publishers_.end()) { continue; @@ -510,10 +511,7 @@ void Player::prepare_publishers() if (play_options_.wait_acked_timeout >= 0 && topic_qos.reliability() == rclcpp::ReliabilityPolicy::BestEffort) { - RCLCPP_WARN( - get_logger(), - "--wait-for-all-acked is invaild for topic '%s' since reliability of QOS is BestEffort.", - topic.name.c_str()); + topic_without_support_acked += topic.name + ", "; } } catch (const std::runtime_error & e) { // using a warning log seems better than adding a new option @@ -523,6 +521,18 @@ void Player::prepare_publishers() "Ignoring a topic '%s', reason: %s.", topic.name.c_str(), e.what()); } } + + if (!topic_without_support_acked.empty()) { + // remove the last ", " + topic_without_support_acked.erase( + topic_without_support_acked.end() - 2, + topic_without_support_acked.end()); + + RCLCPP_WARN( + get_logger(), + "--wait-for-all-acked is invaild for below topics since reliability of QOS is BestEffort.\n" + "%s", topic_without_support_acked.c_str()); + } } bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message) From e45d1edb4b0331fa19eaf662e3b9a3f1177a9486 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Fri, 28 Jan 2022 14:50:31 +0800 Subject: [PATCH 3/4] Move check_not_negative() to __init__.py Signed-off-by: Barry Xu --- ros2bag/ros2bag/api/__init__.py | 11 +++++++++++ ros2bag/ros2bag/verb/play.py | 11 ++--------- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/ros2bag/ros2bag/api/__init__.py b/ros2bag/ros2bag/api/__init__.py index b96e81f8b8..f054fe5aaa 100644 --- a/ros2bag/ros2bag/api/__init__.py +++ b/ros2bag/ros2bag/api/__init__.py @@ -107,3 +107,14 @@ def check_path_exists(value: Any) -> str: raise ArgumentTypeError("Bag file '{}' does not exist!".format(value)) except ValueError: raise ArgumentTypeError('{} is not the valid type (string)'.format(value)) + + +def check_not_negative_int(arg: str) -> int: + """Argparse validator to verify that a value is a int and not negative.""" + try: + value = int(arg) + if value < 0: + raise ArgumentTypeError(f'Value {value} is less than zero.') + return value + except ValueError: + raise ArgumentTypeError('{} is not the valid type (int)'.format(value)) diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 1679cc6ca7..d82f1a2a7b 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -12,10 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from argparse import ArgumentTypeError from argparse import FileType from rclpy.qos import InvalidQoSProfileException +from ros2bag.api import check_not_negative_int from ros2bag.api import check_path_exists from ros2bag.api import check_positive_float from ros2bag.api import convert_yaml_to_qos_profile @@ -36,13 +36,6 @@ def positive_float(arg: str) -> float: return value -def not_negative_int(arg: str) -> int: - value = int(arg) - if value < 0: - raise ArgumentTypeError(f'Value {value} is less than zero.') - return value - - class PlayVerb(VerbExtension): """Play back ROS data from a bag.""" @@ -102,7 +95,7 @@ def add_arguments(self, parser, cli_name): # noqa: D102 '--start-offset', type=check_positive_float, default=0.0, help='Start the playback player this many seconds into the bag file.') parser.add_argument( - '--wait-for-all-acked', type=not_negative_int, default=-1, + '--wait-for-all-acked', type=check_not_negative_int, default=-1, help='Wait until all published messages are acknowledged by all subscribers or until ' 'the timeout elapses in millisecond before play is terminated. ' 'Especially for the case of sending message with big size in a short time. ' From 7cc4fcbcc8723f1f6f29e20af691a7c0b4b3efa3 Mon Sep 17 00:00:00 2001 From: Barry Xu Date: Tue, 1 Feb 2022 17:26:32 +0800 Subject: [PATCH 4/4] Address review comments Signed-off-by: Barry Xu --- rosbag2_transport/src/rosbag2_transport/player.cpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index f02ae21138..446bb8303f 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -241,13 +241,14 @@ void Player::play() if (!pub.second->wait_for_all_acked(timeout)) { RCLCPP_ERROR( get_logger(), - "Timeout to wait all published messages acknowledged for topic %s", + "Timed out while waiting for all published messages to be acknowledged for topic %s", pub.first.c_str()); } } catch (std::exception & e) { RCLCPP_ERROR( get_logger(), - "Failed to wait all published messages acknowledged for topic %s : %s", + "Exception occurred while waiting for all published messages to be acknowledged for " + "topic %s : %s", pub.first.c_str(), e.what()); } @@ -530,8 +531,8 @@ void Player::prepare_publishers() RCLCPP_WARN( get_logger(), - "--wait-for-all-acked is invaild for below topics since reliability of QOS is BestEffort.\n" - "%s", topic_without_support_acked.c_str()); + "--wait-for-all-acked is invalid for the below topics since reliability of QOS is " + "BestEffort.\n%s", topic_without_support_acked.c_str()); } }