From fb7fc1cb3eb7d64300a3a4fb8c86a598f8df7b1e Mon Sep 17 00:00:00 2001 From: Emerson Knapp <537409+emersonknapp@users.noreply.github.com> Date: Thu, 8 Apr 2021 14:35:52 -0700 Subject: [PATCH] Split Rosbag2Transport into Player and Recorder classes - first pass to enable further progress (#721) Signed-off-by: Emerson Knapp --- rosbag2_py/src/rosbag2_py/_transport.cpp | 30 ++++---- .../rosbag2_transport/rosbag2_transport.hpp | 60 ++++++++-------- .../src/rosbag2_transport/player.cpp | 3 + .../src/rosbag2_transport/player.hpp | 6 +- .../src/rosbag2_transport/recorder.cpp | 16 +++-- .../src/rosbag2_transport/recorder.hpp | 6 +- .../rosbag2_transport/rosbag2_transport.cpp | 68 ++++++++++--------- .../record_integration_fixture.hpp | 4 +- .../rosbag2_transport_test_fixture.hpp | 9 --- .../test/rosbag2_transport/test_play.cpp | 40 +++++------ .../test/rosbag2_transport/test_play_loop.cpp | 6 +- .../test_play_publish_clock.cpp | 4 +- .../rosbag2_transport/test_play_timing.cpp | 24 +++---- .../test_play_topic_remap.cpp | 4 +- 14 files changed, 141 insertions(+), 139 deletions(-) diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index 4c6bb0a202..1730702a9f 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -89,15 +89,19 @@ namespace rosbag2_py class Player { public: - Player() = default; - virtual ~Player() = default; + Player() + { + rclcpp::init(0, nullptr); + } + virtual ~Player() + { + rclcpp::shutdown(); + } void play( const rosbag2_storage::StorageOptions & storage_options, PlayOptions & play_options) { - auto writer = std::make_shared( - std::make_unique()); std::shared_ptr reader = nullptr; // Determine whether to build compression or regular reader { @@ -116,18 +120,22 @@ class Player } } - rosbag2_transport::Rosbag2Transport impl(reader, writer); - impl.init(); + rosbag2_transport::Player impl(reader); impl.play(storage_options, play_options); - impl.shutdown(); } }; class Recorder { public: - Recorder() = default; - virtual ~Recorder() = default; + Recorder() + { + rclcpp::init(0, nullptr); + } + virtual ~Recorder() + { + rclcpp::shutdown(); + } void record( const rosbag2_storage::StorageOptions & storage_options, @@ -160,10 +168,8 @@ class Recorder std::make_unique()); } - rosbag2_transport::Rosbag2Transport impl(reader, writer); - impl.init(); + rosbag2_transport::Recorder impl(writer); impl.record(storage_options, record_options); - impl.shutdown(); } }; diff --git a/rosbag2_transport/include/rosbag2_transport/rosbag2_transport.hpp b/rosbag2_transport/include/rosbag2_transport/rosbag2_transport.hpp index 8dae99ace4..8bba6a9685 100644 --- a/rosbag2_transport/include/rosbag2_transport/rosbag2_transport.hpp +++ b/rosbag2_transport/include/rosbag2_transport/rosbag2_transport.hpp @@ -35,26 +35,44 @@ class Writer; namespace rosbag2_transport { -class Rosbag2Node; - -class Rosbag2Transport +class Player { public: - /// Default constructor ROSBAG2_TRANSPORT_PUBLIC - Rosbag2Transport(); + Player(); - /// Constructor for testing, allows to set the reader and writer to use ROSBAG2_TRANSPORT_PUBLIC - Rosbag2Transport( - std::shared_ptr reader, - std::shared_ptr writer); + explicit Player(std::shared_ptr reader); ROSBAG2_TRANSPORT_PUBLIC - void init(); + virtual ~Player(); + /** + * Replay a bagfile. + * + * \param storage_options Option regarding the storage (e.g. bag file name) + * \param play_options Options regarding the playback (e.g. queue size) + */ ROSBAG2_TRANSPORT_PUBLIC - void shutdown(); + void play( + const rosbag2_storage::StorageOptions & storage_options, + const PlayOptions & play_options); + +protected: + std::shared_ptr reader_; +}; + +class Recorder +{ +public: + ROSBAG2_TRANSPORT_PUBLIC + Recorder(); + + ROSBAG2_TRANSPORT_PUBLIC + explicit Recorder(std::shared_ptr writer); + + ROSBAG2_TRANSPORT_PUBLIC + virtual ~Recorder(); /** * Records topics to a bagfile. Subscription happens at startup time, hence the topics must @@ -68,26 +86,8 @@ class Rosbag2Transport const rosbag2_storage::StorageOptions & storage_options, const RecordOptions & record_options); - /** - * Replay all topics in a bagfile. - * - * \param storage_options Option regarding the storage (e.g. bag file name) - * \param play_options Options regarding the playback (e.g. queue size) - */ - ROSBAG2_TRANSPORT_PUBLIC - void play( - const rosbag2_storage::StorageOptions & storage_options, - const PlayOptions & play_options); - -private: - std::shared_ptr setup_node( - std::string node_prefix = "", - const std::vector & topic_remapping_options = {}); - - std::shared_ptr reader_; +protected: std::shared_ptr writer_; - - std::shared_ptr transport_node_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index e163c9257f..8d758deaf7 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -73,6 +73,8 @@ rclcpp::QoS publisher_qos_for_topic( namespace rosbag2_transport { +namespace impl +{ const std::chrono::milliseconds Player::queue_read_wait_period_ = std::chrono::milliseconds(100); @@ -240,4 +242,5 @@ void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value } } +} // namespace impl } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.hpp b/rosbag2_transport/src/rosbag2_transport/player.hpp index 180576522a..c41fefb578 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.hpp +++ b/rosbag2_transport/src/rosbag2_transport/player.hpp @@ -40,9 +40,8 @@ class Reader; namespace rosbag2_transport { - -class GenericPublisher; -class Rosbag2Node; +namespace impl +{ class Player { @@ -76,6 +75,7 @@ class Player std::shared_ptr clock_publish_timer_; }; +} // namespace impl } // namespace rosbag2_transport #endif // ROSBAG2_TRANSPORT__PLAYER_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 3526d49b98..41122d3264 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -46,6 +46,8 @@ namespace rosbag2_transport { +namespace impl +{ Recorder::Recorder( std::shared_ptr writer, std::shared_ptr transport_node) @@ -162,12 +164,12 @@ void Recorder::subscribe_topics( { for (const auto & topic_with_type : topics_and_types) { subscribe_topic( - { - topic_with_type.first, - topic_with_type.second, - serialization_format_, - serialized_offered_qos_profiles_for_topic(topic_with_type.first) - }); + { + topic_with_type.first, + topic_with_type.second, + serialization_format_, + serialized_offered_qos_profiles_for_topic(topic_with_type.first) + }); } } @@ -298,5 +300,5 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na } } } - +} // namespace impl } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.hpp b/rosbag2_transport/src/rosbag2_transport/recorder.hpp index a4c28c7d40..d1babf6a02 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.hpp @@ -38,9 +38,8 @@ class Writer; namespace rosbag2_transport { - -class GenericSubscription; -class Rosbag2Node; +namespace impl +{ class Recorder { @@ -106,6 +105,7 @@ class Recorder std::unordered_map topic_qos_profile_overrides_; }; +} // namespace impl } // namespace rosbag2_transport #endif // ROSBAG2_TRANSPORT__RECORDER_HPP_ diff --git a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp index 928565cfac..2b56d4c816 100644 --- a/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp +++ b/rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp @@ -35,32 +35,45 @@ #include "player.hpp" #include "recorder.hpp" +namespace +{ +std::shared_ptr setup_node( + std::string node_prefix = "", + const std::vector & topic_remapping_options = {}) +{ + auto node_options = rclcpp::NodeOptions().arguments(topic_remapping_options); + return std::make_shared(node_prefix + "_rosbag2", node_options); +} +} // namespace + namespace rosbag2_transport { -Rosbag2Transport::Rosbag2Transport() -: reader_(std::make_shared( - std::make_unique())), - writer_(std::make_shared( - std::make_unique())) +Player::Player(std::shared_ptr reader) +: reader_(std::move(reader)) {} -Rosbag2Transport::Rosbag2Transport( - std::shared_ptr reader, - std::shared_ptr writer) -: reader_(std::move(reader)), writer_(std::move(writer)) {} +Player::Player() +: Player(std::make_shared( + std::make_unique())) +{} -void Rosbag2Transport::init() -{ - rclcpp::init(0, nullptr); -} +Player::~Player() +{} -void Rosbag2Transport::shutdown() -{ - rclcpp::shutdown(); -} +Recorder::Recorder(std::shared_ptr writer) +: writer_(std::move(writer)) +{} + +Recorder::Recorder() +: Recorder(std::make_shared( + std::make_unique())) +{} + +Recorder::~Recorder() +{} -void Rosbag2Transport::record( +void Recorder::record( const rosbag2_storage::StorageOptions & storage_options, const RecordOptions & record_options) { try { @@ -69,25 +82,14 @@ void Rosbag2Transport::record( auto transport_node = setup_node(record_options.node_prefix); - Recorder recorder(writer_, transport_node); + impl::Recorder recorder(writer_, transport_node); recorder.record(record_options); } catch (std::runtime_error & e) { RCLCPP_ERROR(rclcpp::get_logger("rosbag2_transport"), "Failed to record: %s", e.what()); } } -std::shared_ptr Rosbag2Transport::setup_node( - std::string node_prefix, - const std::vector & topic_remapping_options) -{ - if (!transport_node_) { - auto node_options = rclcpp::NodeOptions().arguments(topic_remapping_options); - transport_node_ = std::make_shared(node_prefix + "_rosbag2", node_options); - } - return transport_node_; -} - -void Rosbag2Transport::play( +void Player::play( const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options) { auto transport_node = @@ -104,13 +106,13 @@ void Rosbag2Transport::play( spin_thread.join(); }); try { - Player player(reader_, transport_node); + impl::Player player(reader_, transport_node); do { reader_->open(storage_options, {"", rmw_get_serialization_format()}); player.play(play_options); } while (rclcpp::ok() && play_options.loop); } catch (std::runtime_error & e) { - RCLCPP_ERROR(transport_node_->get_logger(), "Failed to play: %s", e.what()); + RCLCPP_ERROR(transport_node->get_logger(), "Failed to play: %s", e.what()); } } diff --git a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp index d6b7d78687..b421999db6 100644 --- a/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/record_integration_fixture.hpp @@ -51,8 +51,8 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture // the future object returned from std::async needs to be stored not to block the execution future_ = std::async( std::launch::async, [this, options]() { - rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.record(storage_options_, options); + rosbag2_transport::Recorder recorder(writer_); + recorder.record(storage_options_, options); }); } diff --git a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp index 226a46838e..57158b3e61 100644 --- a/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp +++ b/rosbag2_transport/test/rosbag2_transport/rosbag2_transport_test_fixture.hpp @@ -45,15 +45,6 @@ using namespace ::testing; // NOLINT using namespace rosbag2_test_common; // NOLINT -inline char separator() -{ -#ifdef _WIN32 - return '\\'; -#else - return '/'; -#endif -} - class Rosbag2TransportTestFixture : public Test { public: diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 9ec5e51837..650df9b928 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -75,8 +75,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics) auto await_received_messages = sub_->spin_subscriptions(); - Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + Player player(reader_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -144,8 +144,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_all_topics_with_ auto await_received_messages = sub_->spin_subscriptions(); - Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + Player player(reader_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -214,8 +214,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) auto await_received_messages = sub_->spin_subscriptions(); - Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + Player player(reader_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -245,8 +245,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) auto await_received_messages = sub_->spin_subscriptions(); - Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + Player player(reader_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -276,8 +276,8 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics) auto await_received_messages = sub_->spin_subscriptions(); - auto rosbag2_transport = Rosbag2Transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + auto player = Player(reader_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -329,9 +329,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ auto await_received_messages = sub_->spin_subscriptions(); - Rosbag2Transport rosbag2_transport(reader_, writer_); + Player player(reader_); play_options_.topics_to_filter = {"topic2"}; - rosbag2_transport.play(storage_options_, play_options_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -355,9 +355,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ await_received_messages = sub_->spin_subscriptions(); - rosbag2_transport = Rosbag2Transport(reader_, writer_); + player = Player(reader_); play_options_.topics_to_filter = {"topic1"}; - rosbag2_transport.play(storage_options_, play_options_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -382,9 +382,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_messages_are_played_for_filtered_topics_ await_received_messages = sub_->spin_subscriptions(); - rosbag2_transport = Rosbag2Transport(reader_, writer_); + player = Player(reader_); play_options_.topics_to_filter = {"topic1", "topic2"}; - rosbag2_transport.play(storage_options_, play_options_); + player.play(storage_options_, play_options_); await_received_messages.get(); @@ -437,8 +437,8 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture void play_and_wait(Duration timeout, bool expect_timeout = false) { auto await_received_messages = sub_->spin_subscriptions(); - Rosbag2Transport transport{reader_, writer_}; - transport.play(storage_options_, play_options_); + Player player{reader_}; + player.play(storage_options_, play_options_); const auto result = await_received_messages.wait_for(timeout); // Must EXPECT, can't ASSERT because transport needs to be shutdown if timed out if (expect_timeout) { @@ -446,8 +446,8 @@ class RosBag2PlayQosOverrideTestFixture : public RosBag2PlayTestFixture } else { EXPECT_NE(result, std::future_status::timeout); } - // Have to rclcpp::shutdown here to make the spin_subscriptions async thread exit - transport.shutdown(); + // Have to shutdown here to make the spin_subscriptions async thread exit + rclcpp::shutdown(); } const std::string topic_name_{"/test_topic"}; diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp index 5999a0d4f0..facf20b9c4 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_loop.cpp @@ -64,10 +64,8 @@ TEST_F(RosBag2PlayTestFixture, messages_played_in_loop) { auto await_received_messages = sub_->spin_subscriptions(); - auto rosbag2_transport_ptr = std::make_shared( - reader_, - writer_); - std::thread loop_thread(&rosbag2_transport::Rosbag2Transport::play, rosbag2_transport_ptr, + auto player = std::make_shared(reader_); + std::thread loop_thread(&rosbag2_transport::Player::play, player, storage_options_, rosbag2_transport::PlayOptions{read_ahead_queue_size, "", rate, {}, {}, loop_playback, {}}); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp index 9b6d0eb1dd..a77de319a0 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_publish_clock.cpp @@ -65,8 +65,8 @@ class ClockPublishFixture : public RosBag2PlayTestFixture void run_test() { auto await_received_messages = sub_->spin_subscriptions(); - rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + rosbag2_transport::Player player(reader_); + player.play(storage_options_, play_options_); await_received_messages.get(); // Check that we got enough messages diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp index 1eefe444ea..f55e043618 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_timing.cpp @@ -60,8 +60,8 @@ TEST_F(Rosbag2TransportTestFixture, playing_respects_relative_timing_of_stored_m // we check that time elapsed during playing is at least the time difference between the two // messages auto start = std::chrono::steady_clock::now(); - Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + Player player(reader_); + player.play(storage_options_, play_options_); auto replay_time = std::chrono::steady_clock::now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); @@ -95,8 +95,8 @@ TEST_F(Rosbag2TransportTestFixture, playing_respects_rate) play_options_.rate = 2.0; auto start = std::chrono::steady_clock::now(); - Rosbag2Transport rosbag2_transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + Player player(reader_); + player.play(storage_options_, play_options_); auto replay_time = std::chrono::steady_clock::now() - start; ASSERT_THAT(replay_time, Gt(0.5 * message_time_difference)); @@ -109,8 +109,8 @@ TEST_F(Rosbag2TransportTestFixture, playing_respects_rate) play_options_.rate = 1.0; start = std::chrono::steady_clock::now(); - rosbag2_transport = Rosbag2Transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + player = Player(reader_); + player.play(storage_options_, play_options_); replay_time = std::chrono::steady_clock::now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); @@ -122,8 +122,8 @@ TEST_F(Rosbag2TransportTestFixture, playing_respects_rate) play_options_.rate = 0.5; start = std::chrono::steady_clock::now(); - rosbag2_transport = Rosbag2Transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + player = Player(reader_); + player.play(storage_options_, play_options_); replay_time = std::chrono::steady_clock::now() - start; ASSERT_THAT(replay_time, Gt(2 * message_time_difference)); @@ -135,8 +135,8 @@ TEST_F(Rosbag2TransportTestFixture, playing_respects_rate) play_options_.rate = 0.0; start = std::chrono::steady_clock::now(); - rosbag2_transport = Rosbag2Transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + player = Player(reader_); + player.play(storage_options_, play_options_); replay_time = std::chrono::steady_clock::now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); @@ -148,8 +148,8 @@ TEST_F(Rosbag2TransportTestFixture, playing_respects_rate) play_options_.rate = -1.23f; start = std::chrono::steady_clock::now(); - rosbag2_transport = Rosbag2Transport(reader_, writer_); - rosbag2_transport.play(storage_options_, play_options_); + player = Player(reader_); + player.play(storage_options_, play_options_); replay_time = std::chrono::steady_clock::now() - start; ASSERT_THAT(replay_time, Gt(message_time_difference)); diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp index 79d050bd29..0f14d723f4 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_topic_remap.cpp @@ -63,9 +63,9 @@ TEST_F(RosBag2PlayTestFixture, recorded_message_is_played_on_remapped_topic) { remapped_topic, 1u); auto await_received_messages = sub_->spin_subscriptions(); - rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_); + rosbag2_transport::Player player(reader_); - rosbag2_transport.play(storage_options_, play_options_); + player.play(storage_options_, play_options_); await_received_messages.get();