diff --git a/rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp b/rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp new file mode 100644 index 0000000000..5650d8c22f --- /dev/null +++ b/rosbag2_cpp/include/rosbag2_cpp/bag_events.hpp @@ -0,0 +1,193 @@ +// Copyright 2021 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_CPP__BAG_EVENTS_HPP_ +#define ROSBAG2_CPP__BAG_EVENTS_HPP_ + +#include +#include +#include +#include + +#include "rclcpp/function_traits.hpp" +#include "rosbag2_cpp/visibility_control.hpp" + +namespace rosbag2_cpp +{ + +namespace bag_events +{ + +/** + * \brief The types of bag events available for registering callbacks. + */ +enum class BagEvent +{ + /// The output bag file has been split, starting a new file. + WRITE_SPLIT, + /// Reading of the input bag file has gone over a split, opening the next file. + READ_SPLIT, +}; + +/** + * \brief The information structure passed to callbacks for the WRITE_SPLIT and READ_SPLIT events. + */ +struct BagSplitInfo +{ + /// The URI of the file that was closed. + std::string closed_file; + /// The URI of the file that was opened. + std::string opened_file; +}; + +using BagSplitCallbackType = std::function; + +/** + * \brief Use this structure to register callbacks with Writers. + */ +struct WriterEventCallbacks +{ + /// The callback to call for the WRITE_SPLIT event. + BagSplitCallbackType write_split_callback; +}; + +/** + * \brief Use this structure to register callbacks with Readers. + */ +struct ReaderEventCallbacks +{ + /// The callback to call for the READ_SPLIT event. + BagSplitCallbackType read_split_callback; +}; + +/** + * \brief Base class for event callbacks. + * + * This class should not be used directly. + */ +class BagEventCallbackBase +{ +public: + using SharedPtr = std::shared_ptr; + using InfoPtr = std::shared_ptr; + + virtual ~BagEventCallbackBase() + {} + + virtual void execute(InfoPtr & info) = 0; + + virtual bool is_type(BagEvent event) const = 0; +}; + +/** + * \brief Templated class for storing an event callback. + * + * This class should not be used directly by users. + */ +template +class BagEventCallback : public BagEventCallbackBase +{ +public: + BagEventCallback(const EventCallbackT & callback, BagEvent event) + : callback_(callback), + event_(event) + {} + + virtual ~BagEventCallback() + {} + + void execute(InfoPtr & info) override + { + callback_(*std::static_pointer_cast(info)); + } + + bool is_type(BagEvent event) const override + { + return event == event_; + } + +private: + using EventCallbackInfoT = typename std::remove_reference::template argument_type<0>>::type; + + EventCallbackT callback_; + BagEvent event_; +}; + +/** + * \brief The class used to manage event callbacks registered with a Writer or Reader. + * + * Each implementation of the Writer and Reader interfaces should store one instance of this type. + * When new callbacks are registered, they should be passed to that instance using \ref + * add_event_callback. When an event occurs, the callbacks registered for it can be called by + * calling the \ref execute_callbacks method, passing in the event information. + */ +class EventCallbackManager +{ +public: + /** + * \brief Add an event callback. + * + * \param callback The callback that should be called for the event. + * \param event The event, one of the values of the \ref BagEvent enumeration. + */ + template + void add_event_callback(const EventCallbackT & callback, const BagEvent event) + { + auto cb = std::make_shared>(callback, event); + callbacks_.push_back(cb); + } + + /** + * \brief Check if a callback is registered for the given event. + * + * \param event The event, one of the values of the \ref BagEvent enumeration. + * \return True if a callback is registered for the event, false otherwise. + */ + bool has_callback_for_event(const BagEvent event) const + { + for (auto & cb : callbacks_) { + if (cb->is_type(event)) { + return true; + } + } + return false; + } + + /** + * \brief Execute all callbacks registered for the given event. + * + * The provided information value is passed to each callback by copy. + * + * \param event The event, one of the values of the \ref BagEvent enumeration. + * \param info The information relevant to the event that has occurred. The type varies by event. + */ + void execute_callbacks(const BagEvent event, BagEventCallbackBase::InfoPtr info) + { + for (auto & cb : callbacks_) { + if (cb->is_type(event)) { + cb->execute(info); + } + } + } + +private: + std::vector callbacks_; +}; + +} // namespace bag_events + +} // namespace rosbag2_cpp + +#endif // ROSBAG2_CPP__BAG_EVENTS_HPP_ diff --git a/rosbag2_cpp/include/rosbag2_cpp/reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/reader.hpp index 13373ac9b0..aaf9416124 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/reader.hpp @@ -23,6 +23,7 @@ #include "rclcpp/serialization.hpp" #include "rclcpp/serialized_message.hpp" +#include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/converter_options.hpp" #include "rosbag2_cpp/readers/sequential_reader.hpp" #include "rosbag2_cpp/visibility_control.hpp" @@ -179,6 +180,12 @@ class ROSBAG2_CPP_PUBLIC Reader final return *reader_impl_; } + /** + * \brief Add callbacks for events that may occur during bag reading. + * \param callbacks the structure containing the callback to add for each event. + */ + void add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks); + private: std::unique_ptr reader_impl_; }; diff --git a/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp index d330cd3dd9..5497edd274 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/reader_interfaces/base_reader_interface.hpp @@ -19,6 +19,7 @@ #include #include "rcutils/types.h" +#include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/converter_options.hpp" #include "rosbag2_cpp/visibility_control.hpp" @@ -57,6 +58,8 @@ class ROSBAG2_CPP_PUBLIC BaseReaderInterface virtual void reset_filter() = 0; virtual void seek(const rcutils_time_point_value_t & timestamp) = 0; + + virtual void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) = 0; }; } // namespace reader_interfaces diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index 875a968121..f056c79021 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -20,6 +20,7 @@ #include #include +#include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/converter.hpp" #include "rosbag2_cpp/reader_interfaces/base_reader_interface.hpp" #include "rosbag2_cpp/serialization_format_converter_factory.hpp" @@ -101,6 +102,18 @@ class ROSBAG2_CPP_PUBLIC SequentialReader */ virtual std::string get_current_uri() const; + /** + * \brief Add callbacks for events that may occur during bag reading. + * \param callbacks the structure containing the callback to add for each event. + */ + void add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) override; + + /** + * \brief Check if a callback is registered for the given event. + * \return True if there is any callback registered for the event, false otherwise. + */ + bool has_callback_for_event(const bag_events::BagEvent event) const; + protected: /** * Opens the current file and sets up the filters in the new storage. @@ -172,6 +185,8 @@ class ROSBAG2_CPP_PUBLIC SequentialReader private: rosbag2_storage::StorageOptions storage_options_; std::shared_ptr converter_factory_{}; + + bag_events::EventCallbackManager callback_manager_; }; } // namespace readers diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index 25d2e4bb5d..ba4ea2d48d 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -25,6 +25,7 @@ #include "rclcpp/serialized_message.hpp" #include "rclcpp/time.hpp" +#include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/converter_options.hpp" #include "rosbag2_cpp/visibility_control.hpp" #include "rosbag2_cpp/writers/sequential_writer.hpp" @@ -204,6 +205,12 @@ class ROSBAG2_CPP_PUBLIC Writer final return *writer_impl_; } + /* + * \brief Add callbacks for events that may occur during bag writing. + * \param callbacks the structure containing the callback to add for each event. + */ + void add_event_callbacks(bag_events::WriterEventCallbacks & callbacks); + private: std::mutex writer_mutex_; std::unique_ptr writer_impl_; diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp index 396b6a4ae2..39d070db1e 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp @@ -17,6 +17,7 @@ #include +#include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/converter_options.hpp" #include "rosbag2_cpp/visibility_control.hpp" @@ -51,6 +52,8 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface * \returns true if snapshot is successful, false if snapshot fails or is not supported */ virtual bool take_snapshot() = 0; + + virtual void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) = 0; }; } // namespace writer_interfaces diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 3ce74c74b2..a5156c8ec3 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -21,6 +21,7 @@ #include #include +#include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/cache/cache_consumer.hpp" #include "rosbag2_cpp/cache/circular_message_cache.hpp" #include "rosbag2_cpp/cache/message_cache.hpp" @@ -114,6 +115,12 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter */ bool take_snapshot() override; + /** + * \brief Add callbacks for events that may occur during bag writing. + * \param callbacks the structure containing the callback to add for each event. + */ + void add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) override; + protected: std::string base_folder_; std::unique_ptr storage_factory_; @@ -163,6 +170,9 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter /// Helper method to write messages while also updating tracked metadata. void write_messages( const std::vector> & messages); + bool is_first_message_ {true}; + + bag_events::EventCallbackManager callback_manager_; }; } // namespace writers diff --git a/rosbag2_cpp/src/rosbag2_cpp/reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/reader.cpp index 5acb51440c..64d30a9176 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/reader.cpp @@ -92,4 +92,9 @@ void Reader::seek(const rcutils_time_point_value_t & timestamp) reader_impl_->seek(timestamp); } +void Reader::add_event_callbacks(bag_events::ReaderEventCallbacks & callbacks) +{ + reader_impl_->add_event_callbacks(callbacks); +} + } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp index 8230ba8f15..20a4d862fa 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp @@ -235,8 +235,12 @@ void SequentialReader::load_current_file() void SequentialReader::load_next_file() { assert(current_file_iterator_ != file_paths_.end()); + auto info = std::make_shared(); + info->closed_file = get_current_file(); current_file_iterator_++; + info->opened_file = get_current_file(); load_current_file(); + callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info); } std::string SequentialReader::get_current_file() const @@ -292,5 +296,19 @@ void SequentialReader::fill_topics_metadata() } } +void SequentialReader::add_event_callbacks(const bag_events::ReaderEventCallbacks & callbacks) +{ + if (callbacks.read_split_callback) { + callback_manager_.add_event_callback( + callbacks.read_split_callback, + bag_events::BagEvent::READ_SPLIT); + } +} + +bool SequentialReader::has_callback_for_event(const bag_events::BagEvent event) const +{ + return callback_manager_.has_callback_for_event(event); +} + } // namespace readers } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 75457b32e4..63c7aec861 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -186,4 +186,9 @@ void Writer::write( return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format()); } +void Writer::add_event_callbacks(bag_events::WriterEventCallbacks & callbacks) +{ + writer_impl_->add_event_callbacks(callbacks); +} + } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 5d19088522..86716ba4ca 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -266,13 +266,18 @@ void SequentialWriter::switch_to_next_storage() void SequentialWriter::split_bagfile() { + auto info = std::make_shared(); + info->closed_file = storage_->get_relative_file_path(); switch_to_next_storage(); + info->opened_file = storage_->get_relative_file_path(); metadata_.relative_file_paths.push_back(strip_parent_path(storage_->get_relative_file_path())); rosbag2_storage::FileInformation file_info{}; file_info.path = strip_parent_path(storage_->get_relative_file_path()); metadata_.files.push_back(file_info); + + callback_manager_.execute_callbacks(bag_events::BagEvent::WRITE_SPLIT, info); } void SequentialWriter::write(std::shared_ptr message) @@ -406,5 +411,14 @@ void SequentialWriter::write_messages( } } +void SequentialWriter::add_event_callbacks(const bag_events::WriterEventCallbacks & callbacks) +{ + if (callbacks.write_split_callback) { + callback_manager_.add_event_callback( + callbacks.write_split_callback, + bag_events::BagEvent::WRITE_SPLIT); + } +} + } // namespace writers } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp b/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp index ce4f0ff4aa..3005cef366 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp +++ b/rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp @@ -37,6 +37,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &)); MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &)); MOCK_METHOD0(has_next, bool()); + MOCK_METHOD0(has_next_file, bool()); MOCK_METHOD0(read_next, std::shared_ptr()); MOCK_METHOD1(write, void(std::shared_ptr)); MOCK_METHOD1( diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp index e45c930a4f..2fa301da06 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_reader.cpp @@ -55,30 +55,47 @@ class SequentialReaderTest : public Test auto message = std::make_shared(); message->topic_name = topic_with_type.name; - auto relative_file_path = + relative_file_path_ = (rcpputils::fs::path(storage_uri_) / "some/folder").string(); auto storage_factory = std::make_unique>(); auto metadata_io = std::make_unique>(); - rosbag2_storage::BagMetadata metadata; - metadata.relative_file_paths = {relative_file_path}; - metadata.version = 4; - metadata.topics_with_message_count.push_back({{topic_with_type}, 1}); - metadata.storage_identifier = "mock_storage"; - - EXPECT_CALL(*metadata_io, read_metadata(_)).WillRepeatedly(Return(metadata)); + bag_file_1_path_ = relative_file_path_ / "bag_file1"; + bag_file_2_path_ = relative_file_path_ / "bag_file2"; + metadata_.relative_file_paths = {bag_file_1_path_.string(), bag_file_2_path_.string()}; + metadata_.version = 4; + metadata_.topics_with_message_count.push_back({{topic_with_type}, 6}); + metadata_.storage_identifier = "mock_storage"; + + EXPECT_CALL(*metadata_io, read_metadata(_)).WillRepeatedly(Return(metadata_)); EXPECT_CALL(*metadata_io, metadata_file_exists(_)).WillRepeatedly(Return(true)); EXPECT_CALL(*storage_, get_all_topics_and_types()) .Times(AtMost(1)).WillRepeatedly(Return(topics_and_types)); - EXPECT_CALL(*storage_, has_next()).WillRepeatedly(Return(true)); + // 5 messages in the first bag file, then infinite in the second + EXPECT_CALL(*storage_, has_next()).Times(AnyNumber()); + ON_CALL(*storage_, has_next).WillByDefault( + [this]() { + num_next_++; + if (num_next_ % 5 == 0 && num_next_ > 0) { + return false; + } else { + return true; + } + }); + EXPECT_CALL(*storage_, has_next_file()).WillRepeatedly(Return(true)); EXPECT_CALL(*storage_, read_next()).WillRepeatedly(Return(message)); - EXPECT_CALL(*storage_factory, open_read_only(_)); + EXPECT_CALL(*storage_factory, open_read_only(_)).Times(AnyNumber()); ON_CALL(*storage_factory, open_read_only).WillByDefault( - [this, relative_file_path](const rosbag2_storage::StorageOptions & storage_options) { + [this](const rosbag2_storage::StorageOptions & storage_options) { + EXPECT_TRUE( + std::find( + metadata_.relative_file_paths.begin(), + metadata_.relative_file_paths.end(), + storage_options.uri) != + metadata_.relative_file_paths.end()); // Storage_id has to be set to something for open to succeed EXPECT_EQ(storage_options.storage_id, "mock_storage"); - EXPECT_STREQ(relative_file_path.c_str(), storage_options.uri.c_str()); return storage_; }); @@ -92,7 +109,12 @@ class SequentialReaderTest : public Test std::unique_ptr reader_; std::string storage_serialization_format_; std::string storage_uri_; + rosbag2_storage::BagMetadata metadata_; + rcpputils::fs::path relative_file_path_; + rcpputils::fs::path bag_file_1_path_; + rcpputils::fs::path bag_file_2_path_; rosbag2_storage::StorageOptions default_storage_options_; + size_t num_next_ = 0; }; TEST_F(SequentialReaderTest, read_next_uses_converters_to_convert_serialization_format) { @@ -162,3 +184,29 @@ TEST_F(SequentialReaderTest, open_determines_unspecified_storage_id_from_metadat // This call fails if the SequentialReader doesn't pull storage impl from metadata reader_->open(storage_options, {"", storage_serialization_format_}); } + +TEST_F(SequentialReaderTest, next_file_calls_callback) { + bool callback_called = false; + std::string closed_file, opened_file; + rosbag2_cpp::bag_events::ReaderEventCallbacks callbacks; + callbacks.read_split_callback = + [&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_file = info.closed_file; + opened_file = info.opened_file; + callback_called = true; + }; + reader_->add_event_callbacks(callbacks); + + reader_->open(default_storage_options_, {"", storage_serialization_format_}); + // Calling read_next() 6 times should trigger the read-split event callback + reader_->read_next(); + reader_->read_next(); + reader_->read_next(); + reader_->read_next(); + reader_->read_next(); + reader_->read_next(); + + ASSERT_TRUE(callback_called); + EXPECT_EQ(closed_file, bag_file_1_path_.string()); + EXPECT_EQ(opened_file, bag_file_2_path_.string()); +} diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index c2e0b1693e..aba3c80bf7 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -501,3 +501,63 @@ TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) std::string rmw_format = "rmw_format"; EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error); } + +TEST_F(SequentialWriterTest, split_event_calls_callback) +{ + const int message_count = 7; + const int max_bagfile_size = 5; + + ON_CALL( + *storage_, + write(An>())).WillByDefault( + [this](std::shared_ptr) { + fake_storage_size_ += 1; + }); + + ON_CALL(*storage_, get_bagfile_size).WillByDefault( + [this]() { + return fake_storage_size_.load(); + }); + + ON_CALL(*metadata_io_, write_metadata).WillByDefault( + [this](const std::string &, const rosbag2_storage::BagMetadata & metadata) { + fake_metadata_ = metadata; + }); + + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + auto message = std::make_shared(); + message->topic_name = "test_topic"; + + storage_options_.max_bagfile_size = max_bagfile_size; + + bool callback_called = false; + std::string closed_file, opened_file; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_file = info.closed_file; + opened_file = info.opened_file; + callback_called = true; + }; + writer_->add_event_callbacks(callbacks); + + writer_->open(storage_options_, {"rmw_format", "rmw_format"}); + writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""}); + + for (auto i = 0; i < message_count; ++i) { + writer_->write(message); + } + + ASSERT_TRUE(callback_called); + auto expected_closed = rcpputils::fs::path(storage_options_.uri) / (storage_options_.uri + "_0"); + EXPECT_EQ(closed_file, expected_closed.string()); + EXPECT_EQ(opened_file, fake_storage_uri_); +} diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 9b82745b2d..04654874d5 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -13,6 +13,8 @@ find_package(builtin_interfaces REQUIRED) find_package(rosidl_default_generators REQUIRED) rosidl_generate_interfaces(${PROJECT_NAME} + "msg/ReadSplitEvent.msg" + "msg/WriteSplitEvent.msg" "srv/Burst.srv" "srv/GetRate.srv" "srv/IsPaused.srv" diff --git a/rosbag2_interfaces/msg/ReadSplitEvent.msg b/rosbag2_interfaces/msg/ReadSplitEvent.msg new file mode 100644 index 0000000000..ebebd51b6d --- /dev/null +++ b/rosbag2_interfaces/msg/ReadSplitEvent.msg @@ -0,0 +1,4 @@ +# The full path of the file that was finished and closed +string closed_file +# The full path of the new file that was opened to continue playback +string opened_file diff --git a/rosbag2_interfaces/msg/WriteSplitEvent.msg b/rosbag2_interfaces/msg/WriteSplitEvent.msg new file mode 100644 index 0000000000..62d4f6d60b --- /dev/null +++ b/rosbag2_interfaces/msg/WriteSplitEvent.msg @@ -0,0 +1,4 @@ +# The full path of the file that was finished and closed +string closed_file +# The full path of the new file that was created to continue recording +string opened_file diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 02b0741f8d..47673e5168 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -34,6 +34,7 @@ #include "rclcpp/qos.hpp" #include "rosbag2_cpp/clocks/player_clock.hpp" +#include "rosbag2_interfaces/msg/read_split_event.hpp" #include "rosbag2_interfaces/srv/get_rate.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" @@ -240,6 +241,8 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_burst_; rclcpp::Service::SharedPtr srv_seek_; + rclcpp::Publisher::SharedPtr split_event_pub_; + // defaults std::shared_ptr keyboard_handler_; std::vector keyboard_callbacks_; diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index d880c57353..49e99e0abc 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -32,6 +32,8 @@ #include "rosbag2_interfaces/srv/snapshot.hpp" +#include "rosbag2_interfaces/msg/write_split_event.hpp" + #include "rosbag2_storage/topic_metadata.hpp" #include "rosbag2_transport/record_options.hpp" @@ -160,6 +162,18 @@ class Recorder : public rclcpp::Node // Toogle paused key callback handle KeyboardHandler::callback_handle_t toggle_paused_key_callback_handle_ = KeyboardHandler::invalid_handle; + + // Variables for event publishing + rclcpp::Publisher::SharedPtr split_event_pub_; + bool event_publisher_thread_should_exit_ = false; + bool write_split_has_occurred_ = false; + rosbag2_cpp::bag_events::BagSplitInfo bag_split_info_; + std::mutex event_publisher_thread_mutex_; + std::condition_variable event_publisher_thread_wake_cv_; + std::thread event_publisher_thread_; + + void event_publisher_thread_main(); + bool event_publisher_thread_should_wake(); }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index d774ddf929..ff0bb7ae8d 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -557,6 +557,20 @@ void Player::prepare_publishers() "--wait-for-all-acked is invalid for the below topics since reliability of QOS is " "BestEffort.\n%s", topic_without_support_acked.c_str()); } + + // Create a publisher and callback for when encountering a split in the input + split_event_pub_ = create_publisher( + "events/read_split", + 1); + rosbag2_cpp::bag_events::ReaderEventCallbacks callbacks; + callbacks.read_split_callback = + [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { + auto message = rosbag2_interfaces::msg::ReadSplitEvent(); + message.closed_file = info.closed_file; + message.opened_file = info.opened_file; + split_event_pub_->publish(message); + }; + reader_->add_event_callbacks(callbacks); } bool Player::publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message) diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index ff7bb18803..572021818d 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -27,6 +27,7 @@ #include "rclcpp/logging.hpp" #include "rclcpp/clock.hpp" +#include "rosbag2_cpp/bag_events.hpp" #include "rosbag2_cpp/writer.hpp" #include "rosbag2_interfaces/srv/snapshot.hpp" @@ -107,6 +108,15 @@ Recorder::~Recorder() } subscriptions_.clear(); + + { + std::lock_guard lock(event_publisher_thread_mutex_); + event_publisher_thread_should_exit_ = true; + } + event_publisher_thread_wake_cv_.notify_all(); + if (event_publisher_thread_.joinable()) { + event_publisher_thread_.join(); + } } void Recorder::record() @@ -133,6 +143,24 @@ void Recorder::record() }); } + // Start the thread that will publish events + event_publisher_thread_ = std::thread(&Recorder::event_publisher_thread_main, this); + + split_event_pub_ = create_publisher( + "events/write_split", + 1); + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [this](rosbag2_cpp::bag_events::BagSplitInfo & info) { + { + std::lock_guard lock(event_publisher_thread_mutex_); + bag_split_info_ = info; + write_split_has_occurred_ = true; + } + event_publisher_thread_wake_cv_.notify_all(); + }; + writer_->add_event_callbacks(callbacks); + serialization_format_ = record_options_.rmw_serialization_format; RCLCPP_INFO(this->get_logger(), "Listening for topics..."); subscribe_topics(get_requested_or_available_topics()); @@ -143,6 +171,38 @@ void Recorder::record() } } +void Recorder::event_publisher_thread_main() +{ + RCLCPP_INFO(get_logger(), "Event publisher thread: Starting"); + + bool should_exit = false; + + while (!should_exit) { + std::unique_lock lock(event_publisher_thread_mutex_); + event_publisher_thread_wake_cv_.wait( + lock, + [this] {return event_publisher_thread_should_wake();}); + + if (write_split_has_occurred_) { + write_split_has_occurred_ = false; + + auto message = rosbag2_interfaces::msg::WriteSplitEvent(); + message.closed_file = bag_split_info_.closed_file; + message.opened_file = bag_split_info_.opened_file; + split_event_pub_->publish(message); + } + + should_exit = event_publisher_thread_should_exit_; + } + + RCLCPP_INFO(get_logger(), "Event publisher thread: Exiting"); +} + +bool Recorder::event_publisher_thread_should_wake() +{ + return write_split_has_occurred_ || event_publisher_thread_should_exit_; +} + const rosbag2_cpp::Writer & Recorder::get_writer_handle() { return *writer_; diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp index 43c849a75f..84fe1d3569 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_reader.hpp @@ -55,6 +55,14 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn std::shared_ptr read_next() override { + // "Split" the bag every few messages + if (num_read_ > 0 && num_read_ % max_messages_per_file_ == 0) { + auto info = std::make_shared(); + info->closed_file = "BagFile" + std::to_string(file_number_); + file_number_++; + info->opened_file = "BagFile" + std::to_string(file_number_); + callback_manager_.execute_callbacks(rosbag2_cpp::bag_events::BagEvent::READ_SPLIT, info); + } // filter_ was considered when incrementing num_read_ in has_next() return messages_[num_read_++]; } @@ -85,6 +93,16 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn num_read_ = 0; } + void + add_event_callbacks(const rosbag2_cpp::bag_events::ReaderEventCallbacks & callbacks) override + { + if (callbacks.read_split_callback) { + callback_manager_.add_event_callback( + callbacks.read_split_callback, + rosbag2_cpp::bag_events::BagEvent::READ_SPLIT); + } + } + void prepare( std::vector> messages, std::vector topics) @@ -93,6 +111,11 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn topics_ = std::move(topics); } + size_t max_messages_per_file() const + { + return max_messages_per_file_; + } + private: std::vector> messages_; rosbag2_storage::BagMetadata metadata_; @@ -100,6 +123,9 @@ class MockSequentialReader : public rosbag2_cpp::reader_interfaces::BaseReaderIn size_t num_read_; rcutils_time_point_value_t seek_time_ = 0; rosbag2_storage::StorageFilter filter_; + rosbag2_cpp::bag_events::EventCallbackManager callback_manager_; + size_t file_number_ = 0; + const size_t max_messages_per_file_ = 5; }; #endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_READER_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp index 0845e5d3f8..d5bfb30a86 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp @@ -55,6 +55,15 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn snapshot_buffer_.push_back(message); } messages_per_topic_[message->topic_name] += 1; + messages_per_file_ += 1; + if (messages_per_file_ == max_messages_per_file_) { // "Split" the bag every few messages + auto info = std::make_shared(); + info->closed_file = "BagFile" + std::to_string(file_number_); + file_number_ += 1; + info->opened_file = "BagFile" + std::to_string(file_number_); + callback_manager_.execute_callbacks(rosbag2_cpp::bag_events::BagEvent::WRITE_SPLIT, info); + messages_per_file_ = 0; + } } bool take_snapshot() override @@ -64,6 +73,16 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn return true; } + void + add_event_callbacks(const rosbag2_cpp::bag_events::WriterEventCallbacks & callbacks) override + { + if (callbacks.write_split_callback) { + callback_manager_.add_event_callback( + callbacks.write_split_callback, + rosbag2_cpp::bag_events::BagEvent::WRITE_SPLIT); + } + } + const std::vector> & get_messages() { return messages_; @@ -84,12 +103,21 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn return topics_; } + size_t max_messages_per_file() const + { + return max_messages_per_file_; + } + private: std::unordered_map topics_; std::vector> messages_; std::vector> snapshot_buffer_; std::unordered_map messages_per_topic_; + size_t messages_per_file_ = 0; bool snapshot_mode_ = false; + rosbag2_cpp::bag_events::EventCallbackManager callback_manager_; + size_t file_number_ = 0; + const size_t max_messages_per_file_ = 5; }; #endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_WRITER_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_play.cpp b/rosbag2_transport/test/rosbag2_transport/test_play.cpp index 1a47e45061..0b090752d6 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play.cpp @@ -632,3 +632,65 @@ TEST_F(RosBag2PlayQosOverrideTestFixture, override_has_precedence_over_recorded) // Fails if times out play_and_wait(timeout); } + +TEST_F(RosBag2PlayTestFixture, read_split_callback_is_called) +{ + auto topic_types = std::vector{ + {"topic1", "test_msgs/BasicTypes", "", ""}, + }; + + auto prepared_mock_reader = std::make_unique(); + + const size_t num_msgs_in_bag = prepared_mock_reader->max_messages_per_file() + 1; + const int64_t start_time_ms = 100; + const int64_t message_spacing_ms = 50; + + std::vector> messages; + messages.reserve(num_msgs_in_bag); + + auto primitive_message = get_messages_basic_types()[0]; + for (size_t i = 0; i < num_msgs_in_bag; i++) { + primitive_message->int32_value = static_cast(i + 1); + const int64_t timestamp = start_time_ms + message_spacing_ms * static_cast(i); + messages.push_back(serialize_test_message("topic1", timestamp, primitive_message)); + } + + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + bool callback_called = false; + std::string closed_file, opened_file; + rosbag2_cpp::bag_events::ReaderEventCallbacks callbacks; + callbacks.read_split_callback = + [&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_file = info.closed_file; + opened_file = info.opened_file; + callback_called = true; + }; + // This tests adding to the underlying prepared_mock_reader via the Reader instance + reader->add_event_callbacks(callbacks); + + auto player = std::make_shared(std::move(reader), storage_options_, play_options_); + + sub_ = std::make_shared(); + sub_->add_subscription("/topic1", messages.size()); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE( + sub_->spin_and_wait_for_matched(player->get_list_of_publishers(), std::chrono::seconds(30))); + + auto await_received_messages = sub_->spin_subscriptions(); + + player->play(); + + await_received_messages.get(); + + auto replayed_test_primitives = sub_->get_received_messages( + "/topic1"); + EXPECT_THAT(replayed_test_primitives, SizeIs(messages.size())); + + // Confirm that the callback was called and the file names have been sent with the event + ASSERT_TRUE(callback_called); + EXPECT_EQ(closed_file, "BagFile0"); + EXPECT_EQ(opened_file, "BagFile1"); +} diff --git a/rosbag2_transport/test/rosbag2_transport/test_record.cpp b/rosbag2_transport/test/rosbag2_transport/test_record.cpp index d6e4ccbb8a..ece991a127 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_record.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_record.cpp @@ -307,3 +307,53 @@ TEST_F(RecordIntegrationTestFixture, duration_and_noncompatibility_policies_mixe }); ASSERT_TRUE(succeeded); } + +TEST_F(RecordIntegrationTestFixture, write_split_callback_is_called) +{ + auto string_message = get_messages_strings()[1]; + std::string string_topic = "/string_topic"; + + bool callback_called = false; + std::string closed_file, opened_file; + rosbag2_cpp::bag_events::WriterEventCallbacks callbacks; + callbacks.write_split_callback = + [&callback_called, &closed_file, &opened_file](rosbag2_cpp::bag_events::BagSplitInfo & info) { + closed_file = info.closed_file; + opened_file = info.opened_file; + callback_called = true; + }; + writer_->add_event_callbacks(callbacks); + + rosbag2_transport::RecordOptions record_options = + {false, false, {string_topic}, "rmw_format", 100ms}; + auto recorder = std::make_shared( + std::move(writer_), storage_options_, record_options); + recorder->record(); + + start_async_spin(recorder); + + auto & writer = recorder->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + + const size_t expected_messages = mock_writer.max_messages_per_file() + 1; + + rosbag2_test_common::PublicationManager pub_manager; + pub_manager.setup_publisher(string_topic, string_message, expected_messages); + ASSERT_TRUE(pub_manager.wait_for_matched(string_topic.c_str())); + pub_manager.run_publishers(); + + auto ret = rosbag2_test_common::wait_until_shutdown( + std::chrono::seconds(5), + [&mock_writer, &expected_messages]() { + return mock_writer.get_messages().size() >= expected_messages; + }); + auto recorded_messages = mock_writer.get_messages(); + EXPECT_TRUE(ret) << "failed to capture expected messages in time"; + EXPECT_THAT(recorded_messages, SizeIs(expected_messages)); + + // Confirm that the callback was called and the file names have been sent with the event + ASSERT_TRUE(callback_called); + EXPECT_EQ(closed_file, "BagFile0"); + EXPECT_EQ(opened_file, "BagFile1"); +}