diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp index 9a0ebb6c82..51713f82a0 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp @@ -74,9 +74,6 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer /// shut down the consumer thread void close(); - /// Set new consume callback, restart thread if necessary - void change_consume_callback(consume_callback_function_t callback); - private: std::shared_ptr message_cache_; consume_callback_function_t consume_callback_; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp index 97e3a03175..eb61d2664c 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/circular_message_cache.hpp @@ -39,6 +39,11 @@ namespace rosbag2_cpp namespace cache { +/// Provides a "deferred-consumption" implementation of the MessageCacheInterface. +/// When a consumer asks for a buffer, it will not receive a new buffer until some control +/// source calls `swap_buffers` manually. +/// This is useful for a snapshot mode, where no data is written to disk until asked for, +/// then the full circular buffer is dumped all at once, giving historical context. class ROSBAG2_CPP_PUBLIC CircularMessageCache : public MessageCacheInterface { @@ -48,22 +53,24 @@ class ROSBAG2_CPP_PUBLIC CircularMessageCache /// Puts msg into circular buffer, replacing the oldest msg when buffer is full void push(std::shared_ptr msg) override; - /// get current buffer to consume + /// Get current buffer to consume. + /// Locks consumer_buffer and swap_buffers until release_consumer_buffer is called. + /// This may be repeatedly empty if `swap_buffers` has not been called. std::shared_ptr consumer_buffer() override; + /// Unlock access to the consumer buffer. + void release_consumer_buffer() override; + /// Swap the primary and secondary buffer before consumption. - /// NOTE: consumer_buffer() should be called sequentially after - /// swap_buffer() to ensure expected behavior. Calling swap_buffer() - /// while accessing consumer_buffer() will be undefined behavior. + /// NOTE: If swap_buffers is called again before consuming via consumer_buffer, + /// that data will be cleared for use by the producer. void swap_buffers() override; private: - /// Double buffers - std::shared_ptr primary_buffer_; - std::shared_ptr secondary_buffer_; - - /// Double buffers sync - std::mutex cache_mutex_; + std::shared_ptr producer_buffer_; + std::mutex producer_buffer_mutex_; + std::shared_ptr consumer_buffer_; + std::mutex consumer_buffer_mutex_; }; } // namespace cache diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp index 92f8562057..9834868131 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache.hpp @@ -49,6 +49,10 @@ namespace cache * Double buffering is a part of producer-consumer pattern and optimizes for * the consumer performance (which can be a bottleneck, e.g. disk writes). * +* This is a "greedy consumer" implementation - every time the consumer asks +* for a buffer to consume, the buffers are swapped so that the latest data +* goes to the consumer right away. +* * Two instances of MessageCacheBuffer are used, one for producer and one for * the consumer. Buffers are switched through swap_buffers function, which * involves synchronization and a simple pointer switch. @@ -72,14 +76,12 @@ class ROSBAG2_CPP_PUBLIC MessageCache /// Puts msg into primary buffer. With full cache, msg is ignored and counted as lost void push(std::shared_ptr msg) override; - /// Summarize dropped/remaining messages - void log_dropped() override; - - /// Set the cache to consume-only mode for final buffer flush before closing - void finalize() override; + /// Gets a consumer buffer. + /// In this greedy implementation, swap buffers before providing the buffer. + std::shared_ptr consumer_buffer() override; - /// Notify that flushing is complete - void notify_flushing_done() override; + /// Notify that consumer_buffer has been fully used. Unlock. + void release_consumer_buffer() override; /** * Consumer API: wait until primary buffer is ready and swap it with consumer buffer. @@ -90,27 +92,33 @@ class ROSBAG2_CPP_PUBLIC MessageCache **/ void swap_buffers() override; - /// Consumer API: get current buffer to consume - std::shared_ptr consumer_buffer() override; + /// Set the cache to consume-only mode for final buffer flush before closing + void begin_flushing() override; - /// Exposes counts of messages dropped per topic - std::unordered_map messages_dropped() const; + /// Notify that flushing is complete + void done_flushing() override; + + /// Summarize dropped/remaining messages + void log_dropped() override; + +protected: + /// Dropped messages per topic. Used for printing in alphabetic order + std::unordered_map messages_dropped_per_topic_; private: /// Producer API: notify consumer to wake-up (primary buffer has data) void notify_buffer_consumer(); /// Double buffers - std::shared_ptr primary_buffer_; - std::shared_ptr secondary_buffer_; + std::shared_ptr producer_buffer_; + std::mutex producer_buffer_mutex_; + std::shared_ptr consumer_buffer_; + std::recursive_mutex consumer_buffer_mutex_; - /// Dropped messages per topic. Used for printing in alphabetic order - std::unordered_map messages_dropped_per_topic_; /// Double buffers sync (following cpp core guidelines for condition variables) bool primary_buffer_can_be_swapped_ {false}; std::condition_variable cache_condition_var_; - std::mutex cache_mutex_; /// Cache is no longer accepting messages and is in the process of flushing std::atomic_bool flushing_ {false}; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp index 1609e108c9..12d24b2463 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/message_cache_interface.hpp @@ -40,41 +40,32 @@ class ROSBAG2_CPP_PUBLIC MessageCacheInterface public: virtual ~MessageCacheInterface() {} - /** - * Push a pointer to a bag message into the primary buffer. - */ + /// Push a bag message into the producer buffer. virtual void push(std::shared_ptr msg) = 0; - /** - * Get a pointer to the buffer that can be used for consuming the - * cached messages. In a double buffer implementation, this should - * always be called after swap_buffers(). - * - * \return a pointer to the consumer buffer interface. - */ + /// Get a pointer to the buffer that can be used for consuming the cached messages. + /// This call locks access to the buffer, `swap_buffers` and `consumer_buffer` will block until + /// `release_consumer_buffer` is called to unlock access to the buffer. + /// Consumer should call `release_consumer_buffer` when they are done consuming the messages. + /// \return a pointer to the consumer buffer interface. virtual std::shared_ptr consumer_buffer() = 0; - /** - * Swap primary and secondary buffers when using a double buffer - * configuration. - */ + /// Signals that tne consumer is done consuming, unlocking the buffer so it may be swapped. + virtual void release_consumer_buffer() = 0; + + /// Swap producer and consumer buffers. + /// Note: this will block if `consumer_buffer` has been called but `release_consumer_buffer` + /// has not been called yet to signal end of consuming. virtual void swap_buffers() = 0; - /** - * Mark message cache as having finished flushing - * remaining messages to the bagfile. - */ - virtual void notify_flushing_done() {} + /// Go into a read-only state to drain the final consumer buffer without letting in new data. + virtual void begin_flushing() {} - /** - * Print a log message with details of any dropped messages. - */ - virtual void log_dropped() {} + /// Call after `begin_flushing`, once the final consumer buffer is emptied. + virtual void done_flushing() {} - /** - * Perform any cleanup or final tasks before exitting. - */ - virtual void finalize() {} + /// Print a log message with details of any dropped messages. + virtual void log_dropped() {} }; } // namespace cache diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp index d8b0d63a31..5f2d0796ed 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer.hpp @@ -96,6 +96,12 @@ class ROSBAG2_CPP_PUBLIC Writer final */ void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type); + /** + * Trigger a snapshot when snapshot mode is enabled. + * \returns true if snapshot is successful, false if snapshot fails or is not supported + */ + bool take_snapshot(); + /** * Remove a new topic in the underlying storage. * If creation of subscription fails remove the topic diff --git a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp index 1f52d0f19e..396b6a4ae2 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writer_interfaces/base_writer_interface.hpp @@ -45,6 +45,12 @@ class ROSBAG2_CPP_PUBLIC BaseWriterInterface virtual void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) = 0; virtual void write(std::shared_ptr message) = 0; + + /** + * Triggers a snapshot for writers that support it. + * \returns true if snapshot is successful, false if snapshot fails or is not supported + */ + virtual bool take_snapshot() = 0; }; } // namespace writer_interfaces diff --git a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp index 14b5602028..4bc14b47bb 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/writers/sequential_writer.hpp @@ -22,6 +22,7 @@ #include #include "rosbag2_cpp/cache/cache_consumer.hpp" +#include "rosbag2_cpp/cache/circular_message_cache.hpp" #include "rosbag2_cpp/cache/message_cache.hpp" #include "rosbag2_cpp/cache/message_cache_interface.hpp" #include "rosbag2_cpp/converter.hpp" @@ -107,6 +108,12 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter */ void write(std::shared_ptr message) override; + /** + * Take a snapshot by triggering a circular buffer flip, writing data to disk. + * *\returns true if snapshot is successful + */ + bool take_snapshot() override; + protected: std::string base_folder_; std::unique_ptr storage_factory_; @@ -154,6 +161,11 @@ class ROSBAG2_CPP_PUBLIC SequentialWriter virtual std::shared_ptr get_writeable_message( std::shared_ptr message); + +private: + /// Helper method to write messages while also updating tracked metadata. + void write_messages( + const std::vector> & messages); }; } // namespace writers diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp index 1632560b21..56f086aca1 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp @@ -38,7 +38,7 @@ CacheConsumer::~CacheConsumer() void CacheConsumer::close() { - message_cache_->finalize(); + message_cache_->begin_flushing(); is_stop_issued_ = true; ROSBAG2_CPP_LOG_INFO_STREAM( @@ -47,17 +47,7 @@ void CacheConsumer::close() if (consumer_thread_.joinable()) { consumer_thread_.join(); } - message_cache_->notify_flushing_done(); -} - -void CacheConsumer::change_consume_callback( - CacheConsumer::consume_callback_function_t consume_callback) -{ - consume_callback_ = consume_callback; - if (!consumer_thread_.joinable()) { - is_stop_issued_ = false; - consumer_thread_ = std::thread(&CacheConsumer::exec_consuming, this); - } + message_cache_->done_flushing(); } void CacheConsumer::exec_consuming() @@ -65,18 +55,16 @@ void CacheConsumer::exec_consuming() bool exit_flag = false; bool flushing = false; while (!exit_flag) { - // Invariant at loop start: consumer buffer is empty - - // swap producer buffer with consumer buffer - message_cache_->swap_buffers(); - // make sure to use consistent callback for each iteration auto callback_for_this_loop = consume_callback_; - // consume all the data from consumer buffer + // Get the current consumer buffer. + // Depending on the cache implementation, this may swap buffers now, or could + // provide an empty buffer if swapping is handled via other conditions. auto consumer_buffer = message_cache_->consumer_buffer(); callback_for_this_loop(consumer_buffer->data()); consumer_buffer->clear(); + message_cache_->release_consumer_buffer(); if (flushing) {exit_flag = true;} // this was the final run if (is_stop_issued_) {flushing = true;} // run one final time to flush diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp index 725729aec5..539f7891ca 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/circular_message_cache.cpp @@ -29,26 +29,33 @@ namespace cache CircularMessageCache::CircularMessageCache(size_t max_buffer_size) { - primary_buffer_ = std::make_shared(max_buffer_size); - secondary_buffer_ = std::make_shared(max_buffer_size); + producer_buffer_ = std::make_shared(max_buffer_size); + consumer_buffer_ = std::make_shared(max_buffer_size); } void CircularMessageCache::push(std::shared_ptr msg) { - std::lock_guard cache_lock(cache_mutex_); - primary_buffer_->push(msg); + std::lock_guard cache_lock(producer_buffer_mutex_); + producer_buffer_->push(msg); } std::shared_ptr CircularMessageCache::consumer_buffer() { - return secondary_buffer_; + consumer_buffer_mutex_.lock(); + return consumer_buffer_; +} + +void CircularMessageCache::release_consumer_buffer() +{ + consumer_buffer_mutex_.unlock(); } void CircularMessageCache::swap_buffers() { - std::lock_guard cache_lock(cache_mutex_); - secondary_buffer_->clear(); - std::swap(primary_buffer_, secondary_buffer_); + std::lock_guard producer_lock(producer_buffer_mutex_); + std::lock_guard consumer_lock(consumer_buffer_mutex_); + consumer_buffer_->clear(); + std::swap(producer_buffer_, consumer_buffer_); } } // namespace cache diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp index 388f87c8df..11f084de09 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/message_cache.cpp @@ -30,8 +30,13 @@ namespace cache MessageCache::MessageCache(size_t max_buffer_size) { - primary_buffer_ = std::make_shared(max_buffer_size); - secondary_buffer_ = std::make_shared(max_buffer_size); + producer_buffer_ = std::make_shared(max_buffer_size); + consumer_buffer_ = std::make_shared(max_buffer_size); +} + +MessageCache::~MessageCache() +{ + log_dropped(); } void MessageCache::push(std::shared_ptr msg) @@ -39,9 +44,9 @@ void MessageCache::push(std::shared_ptr cache_lock(cache_mutex_); + std::lock_guard lock(producer_buffer_mutex_); if (!flushing_) { - pushed = primary_buffer_->push(msg); + pushed = producer_buffer_->push(msg); } } if (!pushed) { @@ -51,51 +56,45 @@ void MessageCache::push(std::shared_ptr cache_lock(cache_mutex_); - flushing_ = true; - } - cache_condition_var_.notify_one(); -} - -void MessageCache::notify_flushing_done() +std::shared_ptr MessageCache::consumer_buffer() { - flushing_ = false; + consumer_buffer_mutex_.lock(); + swap_buffers(); + return consumer_buffer_; } -void MessageCache::notify_buffer_consumer() +void MessageCache::release_consumer_buffer() { - { - std::lock_guard cache_lock(cache_mutex_); - primary_buffer_can_be_swapped_ = true; - } - cache_condition_var_.notify_one(); + consumer_buffer_mutex_.unlock(); } void MessageCache::swap_buffers() { - std::unique_lock lock(cache_mutex_); + std::unique_lock producer_lock(producer_buffer_mutex_); + std::lock_guard consumer_lock(consumer_buffer_mutex_); if (!flushing_) { // Required condition check to protect against spurious wakeups cache_condition_var_.wait( - lock, [this] { + producer_lock, [this] { return primary_buffer_can_be_swapped_ || flushing_; }); primary_buffer_can_be_swapped_ = false; } - std::swap(primary_buffer_, secondary_buffer_); + std::swap(producer_buffer_, consumer_buffer_); } -std::shared_ptr MessageCache::consumer_buffer() +void MessageCache::begin_flushing() { - return secondary_buffer_; + { + std::lock_guard lock(producer_buffer_mutex_); + flushing_ = true; + } + cache_condition_var_.notify_one(); } -std::unordered_map MessageCache::messages_dropped() const +void MessageCache::done_flushing() { - return messages_dropped_per_topic_; + flushing_ = false; } void MessageCache::log_dropped() @@ -123,7 +122,7 @@ void MessageCache::log_dropped() ROSBAG2_CPP_LOG_WARN_STREAM(log_text); } - size_t remaining = primary_buffer_->size() + secondary_buffer_->size(); + size_t remaining = producer_buffer_->size() + consumer_buffer_->size(); if (remaining > 0) { ROSBAG2_CPP_LOG_WARN_STREAM( "Cache buffers were unflushed with " << remaining << " remaining messages" @@ -131,9 +130,13 @@ void MessageCache::log_dropped() } } -MessageCache::~MessageCache() +void MessageCache::notify_buffer_consumer() { - log_dropped(); + { + std::lock_guard lock(producer_buffer_mutex_); + primary_buffer_can_be_swapped_ = true; + } + cache_condition_var_.notify_one(); } } // namespace cache diff --git a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp index 579a01506a..77632a3977 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writer.cpp @@ -76,6 +76,11 @@ void Writer::remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type writer_impl_->remove_topic(topic_with_type); } +bool Writer::take_snapshot() +{ + return writer_impl_->take_snapshot(); +} + void Writer::write(std::shared_ptr message) { std::lock_guard writer_lock(writer_mutex_); diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 3a522e7d66..2273a29e25 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -27,6 +27,7 @@ #include "rcpputils/filesystem_helper.hpp" #include "rosbag2_cpp/info.hpp" +#include "rosbag2_cpp/logging.hpp" #include "rosbag2_storage/storage_options.hpp" @@ -41,24 +42,6 @@ std::string strip_parent_path(const std::string & relative_path) { return rcpputils::fs::path(relative_path).filename().string(); } - -rosbag2_cpp::cache::CacheConsumer::consume_callback_function_t make_callback( - std::shared_ptr storage_interface, - std::unordered_map & topics_info_map, - std::mutex & topics_mutex) -{ - return [callback_interface = storage_interface, &topics_info_map, &topics_mutex]( - const std::vector> & msgs) { - callback_interface->write(msgs); - for (const auto & msg : msgs) { - // count messages as successfully written - std::lock_guard lock(topics_mutex); - if (topics_info_map.find(msg->topic_name) != topics_info_map.end()) { - topics_info_map[msg->topic_name].message_count++; - } - } - }; -} } // namespace SequentialWriter::SequentialWriter( @@ -133,16 +116,24 @@ void SequentialWriter::open( } use_cache_ = storage_options.max_cache_size > 0u; + if (storage_options.snapshot_mode && !use_cache_) { + throw std::runtime_error( + "Max cache size must be greater than 0 when snapshot mode is enabled"); + } + if (use_cache_) { - message_cache_ = std::make_shared( - storage_options.max_cache_size); + if (storage_options.snapshot_mode) { + message_cache_ = std::make_shared( + storage_options.max_cache_size); + } else { + message_cache_ = std::make_shared( + storage_options.max_cache_size); + } cache_consumer_ = std::make_unique( message_cache_, - make_callback( - storage_, - topics_names_to_info_, - topics_info_mutex_)); + std::bind(&SequentialWriter::write_messages, this, std::placeholders::_1)); } + init_metadata(); } @@ -260,15 +251,6 @@ void SequentialWriter::switch_to_next_storage() for (const auto & topic : topics_names_to_info_) { storage_->create_topic(topic.second.topic_metadata); } - - // Set new storage in buffer layer and restart consumer thread - if (use_cache_) { - cache_consumer_->change_consume_callback( - make_callback( - storage_, - topics_names_to_info_, - topics_info_mutex_)); - } } void SequentialWriter::split_bagfile() @@ -321,6 +303,16 @@ void SequentialWriter::write(std::shared_ptrswap_buffers(); + return true; +} + std::shared_ptr SequentialWriter::get_writeable_message( std::shared_ptr message) @@ -376,5 +368,20 @@ void SequentialWriter::finalize_metadata() } } +void SequentialWriter::write_messages( + const std::vector> & messages) +{ + if (messages.empty()) { + return; + } + storage_->write(messages); + for (const auto & msg : messages) { + std::lock_guard lock(topics_info_mutex_); + if (topics_names_to_info_.find(msg->topic_name) != topics_names_to_info_.end()) { + topics_names_to_info_[msg->topic_name].message_count++; + } + } +} + } // namespace writers } // namespace rosbag2_cpp diff --git a/rosbag2_cpp/test/rosbag2_cpp/mock_message_cache.hpp b/rosbag2_cpp/test/rosbag2_cpp/mock_message_cache.hpp index e32c708e6d..ebf9cc7145 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/mock_message_cache.hpp +++ b/rosbag2_cpp/test/rosbag2_cpp/mock_message_cache.hpp @@ -30,6 +30,11 @@ class MockMessageCache : public rosbag2_cpp::cache::MessageCache explicit MockMessageCache(uint64_t max_buffer_size) : rosbag2_cpp::cache::MessageCache(max_buffer_size) {} + std::unordered_map messages_dropped() const + { + return messages_dropped_per_topic_; + } + MOCK_METHOD0(log_dropped, void()); }; diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp index aa3f9b732c..cc53807000 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_circular_message_cache.cpp @@ -85,6 +85,7 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_overwrites_old) { auto consumer_buffer = circular_message_cache->consumer_buffer(); auto message_vector = consumer_buffer->data(); std::string first_message = deserialize_message(message_vector.front()->serialized_data); + circular_message_cache->release_consumer_buffer(); // Old messages should be dropped EXPECT_THAT(first_message, StrNe("Hello0")); @@ -115,6 +116,7 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) { // Swap filled cache to secondary circular_message_cache->swap_buffers(); EXPECT_THAT(circular_message_cache->consumer_buffer()->size(), Ne(0u)); + circular_message_cache->release_consumer_buffer(); // Swap back to primary (expected to empty buffer) circular_message_cache->swap_buffers(); @@ -123,4 +125,5 @@ TEST_F(CircularMessageCacheTest, circular_message_cache_ensure_empty) { circular_message_cache->swap_buffers(); // Cache should have been emptied EXPECT_THAT(circular_message_cache->consumer_buffer()->size(), Eq(0u)); + circular_message_cache->release_consumer_buffer(); } diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_message_cache.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_message_cache.cpp index 6a93901b65..058e3fbd4b 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_message_cache.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_message_cache.cpp @@ -105,49 +105,3 @@ TEST_F(MessageCacheTest, message_cache_writes_full_producer_buffer) { mock_cache_consumer->close(); EXPECT_EQ(consumed_message_count, message_count - should_be_dropped_count); } - -TEST_F(MessageCacheTest, message_cache_changing_callback) { - const uint32_t message_count = 50; - - size_t callback1_counter = 0; - size_t callback2_counter = 0; - - auto mock_message_cache = std::make_shared>( - cache_size_); - - auto mock_cache_consumer = std::make_unique>( - mock_message_cache, - [&callback1_counter]( - const std::vector> & msgs) { - callback1_counter += msgs.size(); - }); - - rosbag2_cpp::cache::CacheConsumer::consume_callback_function_t cb2 = - [&callback2_counter]( - const std::vector> & msgs) { - callback2_counter += msgs.size(); - }; - - uint32_t counter = 0; - for (uint32_t i = 0; i < message_count; ++i) { - if (counter >= message_count / 2) { - mock_cache_consumer->close(); - mock_cache_consumer->change_consume_callback(cb2); - } - auto msg = make_test_msg(); - mock_message_cache->push(msg); - counter++; - } - - auto total_actually_dropped = sum_up(mock_message_cache->messages_dropped()); - EXPECT_EQ(total_actually_dropped, 0u); - - mock_cache_consumer->close(); - - using namespace std::chrono_literals; - std::this_thread::sleep_for(20ms); - - size_t sum_consumed = callback1_counter + callback2_counter; - - EXPECT_EQ(sum_consumed, message_count); -} diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index 0ff5bfbd72..fa7ace6bb2 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -301,3 +302,87 @@ TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) { writer_->write(message); } } + +TEST_F(SequentialWriterTest, snapshot_mode_write_on_trigger) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 200; + storage_options_.snapshot_mode = true; + + // Expect a single write call when the snapshot is triggered + EXPECT_CALL( + *storage_, write( + An + > &>()) + ).Times(1); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::string rmw_format = "rmw_format"; + + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""}); + + for (auto i = 0u; i < 100; ++i) { + writer_->write(message); + } + writer_->take_snapshot(); +} + +TEST_F(SequentialWriterTest, snapshot_mode_not_triggered_no_storage_write) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 200; + storage_options_.snapshot_mode = true; + + // Storage should never be written to when snapshot mode is enabled + // but a snapshot is never triggered + EXPECT_CALL( + *storage_, write( + An + > &>()) + ).Times(0); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::string rmw_format = "rmw_format"; + + std::string msg_content = "Hello"; + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""}); + + for (auto i = 0u; i < 100; ++i) { + writer_->write(message); + } +} + +TEST_F(SequentialWriterTest, snapshot_mode_zero_cache_size_throws_exception) +{ + storage_options_.max_bagfile_size = 0; + storage_options_.max_cache_size = 0; + storage_options_.snapshot_mode = true; + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::string rmw_format = "rmw_format"; + EXPECT_THROW(writer_->open(storage_options_, {rmw_format, rmw_format}), std::runtime_error); +} diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 1d016684a2..62150cf04c 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -20,6 +20,7 @@ rosidl_generate_interfaces(${PROJECT_NAME} "srv/Resume.srv" "srv/Seek.srv" "srv/SetRate.srv" + "srv/Snapshot.srv" "srv/TogglePaused.srv" DEPENDENCIES builtin_interfaces ADD_LINTER_TESTS diff --git a/rosbag2_interfaces/srv/Snapshot.srv b/rosbag2_interfaces/srv/Snapshot.srv new file mode 100644 index 0000000000..410e0f9d9d --- /dev/null +++ b/rosbag2_interfaces/srv/Snapshot.srv @@ -0,0 +1,2 @@ +--- +bool success diff --git a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp index 0ff267de5a..68642f1ebc 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_options.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_options.hpp @@ -46,6 +46,10 @@ struct StorageOptions // Storage specific configuration file. // Defaults to empty string. std::string storage_config_uri = ""; + + // Enable snapshot mode. + // Defaults to disabled. + bool snapshot_mode = false; }; } // namespace rosbag2_storage diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index 91c2e46387..818daf1048 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -167,6 +167,11 @@ function(create_tests_for_rmw_implementation) AMENT_DEPS test_msgs rosbag2_test_common ${SKIP_TEST}) + rosbag2_transport_add_gmock(test_record_services + test/rosbag2_transport/test_record_services.cpp + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_topic_filter test/rosbag2_transport/test_topic_filter.cpp INCLUDE_DIRS diff --git a/rosbag2_transport/include/rosbag2_transport/recorder.hpp b/rosbag2_transport/include/rosbag2_transport/recorder.hpp index 3b09f5eeb5..2c5e937745 100644 --- a/rosbag2_transport/include/rosbag2_transport/recorder.hpp +++ b/rosbag2_transport/include/rosbag2_transport/recorder.hpp @@ -28,6 +28,8 @@ #include "rosbag2_cpp/writer.hpp" +#include "rosbag2_interfaces/srv/snapshot.hpp" + #include "rosbag2_storage/topic_metadata.hpp" #include "rosbag2_transport/record_options.hpp" @@ -121,6 +123,7 @@ class Recorder : public rclcpp::Node std::string serialization_format_; std::unordered_map topic_qos_profile_overrides_; std::unordered_set topic_unknown_types_; + rclcpp::Service::SharedPtr srv_snapshot_; }; } // namespace rosbag2_transport diff --git a/rosbag2_transport/src/rosbag2_transport/recorder.cpp b/rosbag2_transport/src/rosbag2_transport/recorder.cpp index 1acf95a043..847c3fab2a 100644 --- a/rosbag2_transport/src/rosbag2_transport/recorder.cpp +++ b/rosbag2_transport/src/rosbag2_transport/recorder.cpp @@ -28,6 +28,8 @@ #include "rosbag2_cpp/writer.hpp" +#include "rosbag2_interfaces/srv/snapshot.hpp" + #include "qos.hpp" #include "topic_filter.hpp" @@ -93,6 +95,19 @@ void Recorder::record() storage_options_, {rmw_get_serialization_format(), record_options_.rmw_serialization_format}); + // Only expose snapshot service when mode is enabled + if (storage_options_.snapshot_mode) { + srv_snapshot_ = create_service( + "~/snapshot", + [this]( + const std::shared_ptr/* request_header */, + const std::shared_ptr/* request */, + const std::shared_ptr response) + { + response->success = writer_->take_snapshot(); + }); + } + serialization_format_ = record_options_.rmw_serialization_format; RCLCPP_INFO(this->get_logger(), "Listening for topics..."); subscribe_topics(get_requested_or_available_topics()); diff --git a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp index cfdb7008d8..0845e5d3f8 100644 --- a/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp +++ b/rosbag2_transport/test/rosbag2_transport/mock_sequential_writer.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include "rosbag2_cpp/writer_interfaces/base_writer_interface.hpp" @@ -29,6 +30,7 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn const rosbag2_storage::StorageOptions & storage_options, const rosbag2_cpp::ConverterOptions & converter_options) override { + snapshot_mode_ = storage_options.snapshot_mode; (void) storage_options; (void) converter_options; } @@ -47,15 +49,31 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn void write(std::shared_ptr message) override { - messages_.push_back(message); + if (!snapshot_mode_) { + messages_.push_back(message); + } else { + snapshot_buffer_.push_back(message); + } messages_per_topic_[message->topic_name] += 1; } + bool take_snapshot() override + { + std::swap(snapshot_buffer_, messages_); + snapshot_buffer_.clear(); + return true; + } + const std::vector> & get_messages() { return messages_; } + const std::vector> & get_snapshot_buffer() + { + return snapshot_buffer_; + } + const std::unordered_map & messages_per_topic() { return messages_per_topic_; @@ -69,7 +87,9 @@ class MockSequentialWriter : public rosbag2_cpp::writer_interfaces::BaseWriterIn private: std::unordered_map topics_; std::vector> messages_; + std::vector> snapshot_buffer_; std::unordered_map messages_per_topic_; + bool snapshot_mode_ = false; }; #endif // ROSBAG2_TRANSPORT__MOCK_SEQUENTIAL_WRITER_HPP_ diff --git a/rosbag2_transport/test/rosbag2_transport/test_record_services.cpp b/rosbag2_transport/test/rosbag2_transport/test_record_services.cpp new file mode 100644 index 0000000000..7baa6ee4dc --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_record_services.cpp @@ -0,0 +1,154 @@ +// Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_interfaces/srv/snapshot.hpp" +#include "rosbag2_transport/recorder.hpp" + +#include "rosbag2_test_common/publication_manager.hpp" +#include "rosbag2_test_common/wait_for.hpp" + +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/msg/strings.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "record_integration_fixture.hpp" + +using namespace ::testing; // NOLINT + +class RecordSrvsTest : public RecordIntegrationTestFixture +{ +public: + using Snapshot = rosbag2_interfaces::srv::Snapshot; + + RecordSrvsTest() + : RecordIntegrationTestFixture() + {} + + ~RecordSrvsTest() override + { + exec_->cancel(); + rclcpp::shutdown(); + spin_thread_.join(); + } + + void TearDown() override + { + } + + void subscription_callback(const test_msgs::msg::Strings::SharedPtr) + { + } + + /// Use SetUp instead of ctor because we want to ASSERT some preconditions for the tests + void SetUp() override + { + RecordIntegrationTestFixture::SetUp(); + client_node_ = std::make_shared("test_record_client"); + + rosbag2_transport::RecordOptions record_options = + {false, false, {test_topic_}, "rmw_format", 100ms}; + storage_options_.snapshot_mode = true; + storage_options_.max_cache_size = 200; + recorder_ = std::make_shared( + std::move(writer_), storage_options_, record_options, recorder_name_); + recorder_->record(); + + auto string_message = get_messages_strings()[1]; + rosbag2_test_common::PublicationManager pub_manager; + pub_manager.setup_publisher(test_topic_, string_message, 50); + + const std::string ns = "/" + recorder_name_; + cli_snapshot_ = client_node_->create_client(ns + "/snapshot"); + + exec_ = std::make_shared(); + + exec_->add_node(recorder_); + exec_->add_node(client_node_); + spin_thread_ = std::thread( + [this]() { + exec_->spin(); + }); + + ASSERT_TRUE(pub_manager.wait_for_matched(test_topic_.c_str())); + pub_manager.run_publishers(); + + // Make sure expected service is present before starting test + ASSERT_TRUE(cli_snapshot_->wait_for_service(service_wait_timeout_)); + } + + /// Send a service request, and expect it to successfully return within a reasonable timeout + template + typename Srv::Response::SharedPtr successful_service_request( + typename rclcpp::Client::SharedPtr cli, + typename Srv::Request::SharedPtr request) + { + auto future = cli->async_send_request(request); + EXPECT_EQ(future.wait_for(service_call_timeout_), std::future_status::ready); + EXPECT_TRUE(future.valid()); + auto result = std::make_shared(); + EXPECT_NO_THROW({result = future.get();}); + EXPECT_TRUE(result); + return result; + } + + template + typename Srv::Response::SharedPtr successful_service_request( + typename rclcpp::Client::SharedPtr cli) + { + auto request = std::make_shared(); + return successful_service_request(cli, request); + } + +public: + // Basic configuration + const std::string recorder_name_ = "rosbag2_recorder_for_test_srvs"; + const std::chrono::seconds service_wait_timeout_ {2}; + const std::chrono::seconds service_call_timeout_ {1}; + const std::string test_topic_ = "/recorder_srvs_test_topic"; + + // Orchestration + std::thread spin_thread_; + std::shared_ptr exec_; + std::shared_ptr recorder_; + + // Service clients + rclcpp::Node::SharedPtr client_node_; + rclcpp::Client::SharedPtr cli_snapshot_; +}; + +TEST_F(RecordSrvsTest, trigger_snapshot) +{ + auto & writer = recorder_->get_writer_handle(); + MockSequentialWriter & mock_writer = + static_cast(writer.get_implementation_handle()); + EXPECT_THAT(mock_writer.get_messages().size(), Eq(0u)); + + // Sleep for 2 seconds to allow messages to accumulate in snapshot buffer + std::chrono::duration duration(2.0); + std::this_thread::sleep_for(duration); + EXPECT_THAT(mock_writer.get_snapshot_buffer().size(), Gt(0u)); + + successful_service_request(cli_snapshot_); + EXPECT_THAT(mock_writer.get_messages().size(), Ne(0u)); +}