From d74555d5e54fae6874c0261015fccb1802ea7939 Mon Sep 17 00:00:00 2001 From: Michael Orlov Date: Thu, 16 Dec 2021 00:03:28 -0800 Subject: [PATCH] Bugfix for broken bag split when using cache (#936) * Bugfix for broken bag split when using cache Add `start()` method to the `CacheConsumer` class and call it from `sequential_writer` after split. Signed-off-by: Michael Orlov * Adjust expectation for number of splits in SequentialCompressionWriterTest writer_creates_correct_metadata_relative_filepaths since logic for split has been changed. Now it will be fair. If specified max_baf_file_size = 1 it will be reasonable to expect only one message per file. Signed-off-by: Michael Orlov * Fix type cast warning on Windows build Signed-off-by: Michael Orlov --- .../test_sequential_compression_writer.cpp | 6 +- .../rosbag2_cpp/cache/cache_consumer.hpp | 3 + .../src/rosbag2_cpp/cache/cache_consumer.cpp | 8 ++ .../rosbag2_cpp/writers/sequential_writer.cpp | 8 +- .../rosbag2_cpp/test_sequential_writer.cpp | 121 +++++++++++++++++- 5 files changed, 138 insertions(+), 8 deletions(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp index 1ea06602c8..0db18df82a 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_writer.cpp @@ -241,14 +241,14 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative message->topic_name = test_topic_name; writer_->write(message); - // bag size == max_bafile_size, no split yet + // bag size == max_bagfile_size, split writer_->write(message); - // bag size > max_bagfile_size, split + // bag size == max_bagfile_size, split writer_->write(message); writer_.reset(); EXPECT_EQ( - intercepted_metadata_.relative_file_paths.size(), 2u); + intercepted_metadata_.relative_file_paths.size(), 3u); const auto base_path = tmp_dir_storage_options_.uri; int counter = 0; diff --git a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp index 51713f82a0..9c96a34803 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp @@ -71,6 +71,9 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer ~CacheConsumer(); + /// \brief start inner consumer thread if it hasn't been started yet + void start(); + /// shut down the consumer thread void close(); diff --git a/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp index 56f086aca1..6ebf9fc002 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp @@ -50,6 +50,14 @@ void CacheConsumer::close() message_cache_->done_flushing(); } +void CacheConsumer::start() +{ + is_stop_issued_ = false; + if (!consumer_thread_.joinable()) { + consumer_thread_ = std::thread(&CacheConsumer::exec_consuming, this); + } +} + void CacheConsumer::exec_consuming() { bool exit_flag = false; diff --git a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp index 43851abf09..6f752717a5 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp @@ -257,6 +257,11 @@ void SequentialWriter::switch_to_next_storage() for (const auto & topic : topics_names_to_info_) { storage_->create_topic(topic.second.topic_metadata); } + + if (use_cache_) { + // restart consumer thread for cache + cache_consumer_->start(); + } } void SequentialWriter::split_bagfile() @@ -347,8 +352,7 @@ bool SequentialWriter::should_split_bagfile() const if (storage_options_.max_bagfile_size != rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT) { - should_split = should_split || - (storage_->get_bagfile_size() > storage_options_.max_bagfile_size); + should_split = (storage_->get_bagfile_size() >= storage_options_.max_bagfile_size); } // Splitting by time diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp index fa7ace6bb2..c2e0b1693e 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp @@ -76,11 +76,24 @@ class SequentialWriterTest : public Test std::unique_ptr metadata_io_; std::unique_ptr writer_; rosbag2_storage::StorageOptions storage_options_; - uint64_t fake_storage_size_; + std::atomic fake_storage_size_{0}; // Need to be atomic for cache update since it + // uses in callback from cache_consumer thread rosbag2_storage::BagMetadata fake_metadata_; std::string fake_storage_uri_; }; +std::shared_ptr make_test_msg() +{ + static uint32_t counter = 0; + std::string msg_content = "Hello" + std::to_string(counter++); + auto msg_length = msg_content.length(); + auto message = std::make_shared(); + message->topic_name = "test_topic"; + message->serialized_data = rosbag2_storage::make_serialized_message( + msg_content.c_str(), msg_length); + return message; +} + TEST_F( SequentialWriterTest, write_uses_converters_to_convert_serialization_format_if_input_and_output_format_are_different) { @@ -207,12 +220,12 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf *storage_, write(An>())).WillByDefault( [this](std::shared_ptr) { - fake_storage_size_ += 1; + fake_storage_size_++; }); ON_CALL(*storage_, get_bagfile_size).WillByDefault( [this]() { - return fake_storage_size_; + return fake_storage_size_.load(); }); ON_CALL(*storage_, get_relative_file_path).WillByDefault( @@ -266,6 +279,108 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf } } +TEST_F( + SequentialWriterTest, + writer_with_cache_splits_when_storage_bagfile_size_gt_max_bagfile_size) { + const size_t message_count = 15; + const size_t expected_total_written_messages = message_count - 1; + const size_t max_bagfile_size = 5; + const auto expected_splits = message_count / max_bagfile_size; + fake_storage_size_ = 0; + size_t written_messages = 0; + + ON_CALL( + *storage_, + write(An> &>())). + WillByDefault( + [this, &written_messages] + (const std::vector> & msgs) + { + written_messages += msgs.size(); + fake_storage_size_.fetch_add(static_cast(msgs.size())); + }); + + ON_CALL(*storage_, get_bagfile_size).WillByDefault( + [this]() { + return fake_storage_size_.load(); + }); + + ON_CALL(*storage_, get_relative_file_path).WillByDefault( + [this]() { + return fake_storage_uri_; + }); + + EXPECT_CALL(*metadata_io_, write_metadata).Times(1); + + EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(3); + + // intercept the metadata write so we can analyze it. + ON_CALL(*metadata_io_, write_metadata).WillByDefault( + [this](const std::string &, const rosbag2_storage::BagMetadata & metadata) { + fake_metadata_ = metadata; + }); + + auto sequential_writer = std::make_unique( + std::move(storage_factory_), converter_factory_, std::move(metadata_io_)); + writer_ = std::make_unique(std::move(sequential_writer)); + + std::string rmw_format = "rmw_format"; + + storage_options_.max_bagfile_size = max_bagfile_size; + storage_options_.max_cache_size = 4000u; + storage_options_.snapshot_mode = false; + + writer_->open(storage_options_, {rmw_format, rmw_format}); + writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""}); + + auto timeout = std::chrono::seconds(2); + for (auto i = 1u; i < message_count; ++i) { + writer_->write(make_test_msg()); + // Wait for written_messages == i for each 5th message with timeout in 2 sec + // Need yield resources and make sure that cache_consumer had a chance to dump buffer to the + // storage before split is gonna occur. i.e. each 5th message. + if ((i % max_bagfile_size) == 0) { + auto start_time = std::chrono::steady_clock::now(); + while ((i != written_messages) && + (std::chrono::steady_clock::now() - start_time < timeout)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + EXPECT_EQ(i, written_messages); + } + if ((i % max_bagfile_size) == 1) { // Check on the 6th and 11 message that split happened. + // i.e. fake_storage_size_ zeroed on split and then incremented in cache_consumer callback. + auto start_time = std::chrono::steady_clock::now(); + while ((fake_storage_size_ != 1u) && + ((std::chrono::steady_clock::now() - start_time) < timeout)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + EXPECT_EQ(fake_storage_size_, 1u) << "current message number = " << i; + } + } + + writer_.reset(); + EXPECT_EQ(written_messages, expected_total_written_messages); + +// metadata should be written now that the Writer was released. + EXPECT_EQ( + fake_metadata_.relative_file_paths.size(), + static_cast(expected_splits)) << + "Storage should have split bagfile " << (expected_splits - 1); + + const auto base_path = storage_options_.uri; + int counter = 0; + for (const auto & path : fake_metadata_.relative_file_paths) { + std::stringstream ss; + ss << base_path << "_" << counter; + + const auto expected_path = ss.str(); + counter++; + EXPECT_EQ(expected_path, path); + } +} + TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) { const size_t counter = 1000; const uint64_t max_cache_size = 0;