Skip to content

Commit

Permalink
Split Rosbag2Transport into Player and Recorder classes - first pass …
Browse files Browse the repository at this point in the history
…to enable further progress (ros2#721)

Signed-off-by: Emerson Knapp <eknapp@amazon.com>
  • Loading branch information
emersonknapp authored Apr 8, 2021
1 parent 07c05da commit fb7fc1c
Show file tree
Hide file tree
Showing 14 changed files with 141 additions and 139 deletions.
30 changes: 18 additions & 12 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,19 @@ namespace rosbag2_py
class Player
{
public:
Player() = default;
virtual ~Player() = default;
Player()
{
rclcpp::init(0, nullptr);
}
virtual ~Player()
{
rclcpp::shutdown();
}

void play(
const rosbag2_storage::StorageOptions & storage_options,
PlayOptions & play_options)
{
auto writer = std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>());
std::shared_ptr<rosbag2_cpp::Reader> reader = nullptr;
// Determine whether to build compression or regular reader
{
Expand All @@ -116,18 +120,22 @@ class Player
}
}

rosbag2_transport::Rosbag2Transport impl(reader, writer);
impl.init();
rosbag2_transport::Player impl(reader);
impl.play(storage_options, play_options);
impl.shutdown();
}
};

class Recorder
{
public:
Recorder() = default;
virtual ~Recorder() = default;
Recorder()
{
rclcpp::init(0, nullptr);
}
virtual ~Recorder()
{
rclcpp::shutdown();
}

void record(
const rosbag2_storage::StorageOptions & storage_options,
Expand Down Expand Up @@ -160,10 +168,8 @@ class Recorder
std::make_unique<rosbag2_cpp::writers::SequentialWriter>());
}

rosbag2_transport::Rosbag2Transport impl(reader, writer);
impl.init();
rosbag2_transport::Recorder impl(writer);
impl.record(storage_options, record_options);
impl.shutdown();
}
};

Expand Down
60 changes: 30 additions & 30 deletions rosbag2_transport/include/rosbag2_transport/rosbag2_transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,26 +35,44 @@ class Writer;
namespace rosbag2_transport
{

class Rosbag2Node;

class Rosbag2Transport
class Player
{
public:
/// Default constructor
ROSBAG2_TRANSPORT_PUBLIC
Rosbag2Transport();
Player();

/// Constructor for testing, allows to set the reader and writer to use
ROSBAG2_TRANSPORT_PUBLIC
Rosbag2Transport(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<rosbag2_cpp::Writer> writer);
explicit Player(std::shared_ptr<rosbag2_cpp::Reader> reader);

ROSBAG2_TRANSPORT_PUBLIC
void init();
virtual ~Player();

/**
* Replay a bagfile.
*
* \param storage_options Option regarding the storage (e.g. bag file name)
* \param play_options Options regarding the playback (e.g. queue size)
*/
ROSBAG2_TRANSPORT_PUBLIC
void shutdown();
void play(
const rosbag2_storage::StorageOptions & storage_options,
const PlayOptions & play_options);

protected:
std::shared_ptr<rosbag2_cpp::Reader> reader_;
};

class Recorder
{
public:
ROSBAG2_TRANSPORT_PUBLIC
Recorder();

ROSBAG2_TRANSPORT_PUBLIC
explicit Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer);

ROSBAG2_TRANSPORT_PUBLIC
virtual ~Recorder();

/**
* Records topics to a bagfile. Subscription happens at startup time, hence the topics must
Expand All @@ -68,26 +86,8 @@ class Rosbag2Transport
const rosbag2_storage::StorageOptions & storage_options,
const RecordOptions & record_options);

/**
* Replay all topics in a bagfile.
*
* \param storage_options Option regarding the storage (e.g. bag file name)
* \param play_options Options regarding the playback (e.g. queue size)
*/
ROSBAG2_TRANSPORT_PUBLIC
void play(
const rosbag2_storage::StorageOptions & storage_options,
const PlayOptions & play_options);

private:
std::shared_ptr<rclcpp::Node> setup_node(
std::string node_prefix = "",
const std::vector<std::string> & topic_remapping_options = {});

std::shared_ptr<rosbag2_cpp::Reader> reader_;
protected:
std::shared_ptr<rosbag2_cpp::Writer> writer_;

std::shared_ptr<rclcpp::Node> transport_node_;
};

} // namespace rosbag2_transport
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ rclcpp::QoS publisher_qos_for_topic(

namespace rosbag2_transport
{
namespace impl
{

const std::chrono::milliseconds
Player::queue_read_wait_period_ = std::chrono::milliseconds(100);
Expand Down Expand Up @@ -240,4 +242,5 @@ void Player::prepare_clock(const PlayOptions & options, rcutils_time_point_value
}
}

} // namespace impl
} // namespace rosbag2_transport
6 changes: 3 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ class Reader;

namespace rosbag2_transport
{

class GenericPublisher;
class Rosbag2Node;
namespace impl
{

class Player
{
Expand Down Expand Up @@ -76,6 +75,7 @@ class Player
std::shared_ptr<rclcpp::TimerBase> clock_publish_timer_;
};

} // namespace impl
} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__PLAYER_HPP_
16 changes: 9 additions & 7 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

namespace rosbag2_transport
{
namespace impl
{
Recorder::Recorder(
std::shared_ptr<rosbag2_cpp::Writer> writer,
std::shared_ptr<rclcpp::Node> transport_node)
Expand Down Expand Up @@ -162,12 +164,12 @@ void Recorder::subscribe_topics(
{
for (const auto & topic_with_type : topics_and_types) {
subscribe_topic(
{
topic_with_type.first,
topic_with_type.second,
serialization_format_,
serialized_offered_qos_profiles_for_topic(topic_with_type.first)
});
{
topic_with_type.first,
topic_with_type.second,
serialization_format_,
serialized_offered_qos_profiles_for_topic(topic_with_type.first)
});
}
}

Expand Down Expand Up @@ -298,5 +300,5 @@ void Recorder::warn_if_new_qos_for_subscribed_topic(const std::string & topic_na
}
}
}

} // namespace impl
} // namespace rosbag2_transport
6 changes: 3 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/recorder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,8 @@ class Writer;

namespace rosbag2_transport
{

class GenericSubscription;
class Rosbag2Node;
namespace impl
{

class Recorder
{
Expand Down Expand Up @@ -106,6 +105,7 @@ class Recorder
std::unordered_map<std::string, rclcpp::QoS> topic_qos_profile_overrides_;
};

} // namespace impl
} // namespace rosbag2_transport

#endif // ROSBAG2_TRANSPORT__RECORDER_HPP_
68 changes: 35 additions & 33 deletions rosbag2_transport/src/rosbag2_transport/rosbag2_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,32 +35,45 @@
#include "player.hpp"
#include "recorder.hpp"

namespace
{
std::shared_ptr<rclcpp::Node> setup_node(
std::string node_prefix = "",
const std::vector<std::string> & topic_remapping_options = {})
{
auto node_options = rclcpp::NodeOptions().arguments(topic_remapping_options);
return std::make_shared<rclcpp::Node>(node_prefix + "_rosbag2", node_options);
}
} // namespace

namespace rosbag2_transport
{

Rosbag2Transport::Rosbag2Transport()
: reader_(std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>())),
writer_(std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>()))
Player::Player(std::shared_ptr<rosbag2_cpp::Reader> reader)
: reader_(std::move(reader))
{}

Rosbag2Transport::Rosbag2Transport(
std::shared_ptr<rosbag2_cpp::Reader> reader,
std::shared_ptr<rosbag2_cpp::Writer> writer)
: reader_(std::move(reader)), writer_(std::move(writer)) {}
Player::Player()
: Player(std::make_shared<rosbag2_cpp::Reader>(
std::make_unique<rosbag2_cpp::readers::SequentialReader>()))
{}

void Rosbag2Transport::init()
{
rclcpp::init(0, nullptr);
}
Player::~Player()
{}

void Rosbag2Transport::shutdown()
{
rclcpp::shutdown();
}
Recorder::Recorder(std::shared_ptr<rosbag2_cpp::Writer> writer)
: writer_(std::move(writer))
{}

Recorder::Recorder()
: Recorder(std::make_shared<rosbag2_cpp::Writer>(
std::make_unique<rosbag2_cpp::writers::SequentialWriter>()))
{}

Recorder::~Recorder()
{}

void Rosbag2Transport::record(
void Recorder::record(
const rosbag2_storage::StorageOptions & storage_options, const RecordOptions & record_options)
{
try {
Expand All @@ -69,25 +82,14 @@ void Rosbag2Transport::record(

auto transport_node = setup_node(record_options.node_prefix);

Recorder recorder(writer_, transport_node);
impl::Recorder recorder(writer_, transport_node);
recorder.record(record_options);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(rclcpp::get_logger("rosbag2_transport"), "Failed to record: %s", e.what());
}
}

std::shared_ptr<rclcpp::Node> Rosbag2Transport::setup_node(
std::string node_prefix,
const std::vector<std::string> & topic_remapping_options)
{
if (!transport_node_) {
auto node_options = rclcpp::NodeOptions().arguments(topic_remapping_options);
transport_node_ = std::make_shared<rclcpp::Node>(node_prefix + "_rosbag2", node_options);
}
return transport_node_;
}

void Rosbag2Transport::play(
void Player::play(
const rosbag2_storage::StorageOptions & storage_options, const PlayOptions & play_options)
{
auto transport_node =
Expand All @@ -104,13 +106,13 @@ void Rosbag2Transport::play(
spin_thread.join();
});
try {
Player player(reader_, transport_node);
impl::Player player(reader_, transport_node);
do {
reader_->open(storage_options, {"", rmw_get_serialization_format()});
player.play(play_options);
} while (rclcpp::ok() && play_options.loop);
} catch (std::runtime_error & e) {
RCLCPP_ERROR(transport_node_->get_logger(), "Failed to play: %s", e.what());
RCLCPP_ERROR(transport_node->get_logger(), "Failed to play: %s", e.what());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ class RecordIntegrationTestFixture : public Rosbag2TransportTestFixture
// the future object returned from std::async needs to be stored not to block the execution
future_ = std::async(
std::launch::async, [this, options]() {
rosbag2_transport::Rosbag2Transport rosbag2_transport(reader_, writer_);
rosbag2_transport.record(storage_options_, options);
rosbag2_transport::Recorder recorder(writer_);
recorder.record(storage_options_, options);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,6 @@
using namespace ::testing; // NOLINT
using namespace rosbag2_test_common; // NOLINT

inline char separator()
{
#ifdef _WIN32
return '\\';
#else
return '/';
#endif
}

class Rosbag2TransportTestFixture : public Test
{
public:
Expand Down
Loading

0 comments on commit fb7fc1c

Please sign in to comment.