Skip to content

Commit

Permalink
Bugfix for broken bag split when using cache (#936)
Browse files Browse the repository at this point in the history
* Bugfix for broken bag split when using cache

Add `start()` method to the `CacheConsumer` class and call it from
`sequential_writer` after split.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Adjust expectation for number of splits in SequentialCompressionWriterTest
writer_creates_correct_metadata_relative_filepaths since logic for split
has been changed.

Now it will be fair. If specified max_baf_file_size = 1 it will be
reasonable to expect only one message per file.

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>

* Fix type cast warning on Windows build

Signed-off-by: Michael Orlov <michael.orlov@apex.ai>
  • Loading branch information
MichaelOrlov authored Dec 16, 2021
1 parent a076560 commit d74555d
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
message->topic_name = test_topic_name;

writer_->write(message);
// bag size == max_bafile_size, no split yet
// bag size == max_bagfile_size, split
writer_->write(message);
// bag size > max_bagfile_size, split
// bag size == max_bagfile_size, split
writer_->write(message);
writer_.reset();

EXPECT_EQ(
intercepted_metadata_.relative_file_paths.size(), 2u);
intercepted_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer

~CacheConsumer();

/// \brief start inner consumer thread if it hasn't been started yet
void start();

/// shut down the consumer thread
void close();

Expand Down
8 changes: 8 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ void CacheConsumer::close()
message_cache_->done_flushing();
}

void CacheConsumer::start()
{
is_stop_issued_ = false;
if (!consumer_thread_.joinable()) {
consumer_thread_ = std::thread(&CacheConsumer::exec_consuming, this);
}
}

void CacheConsumer::exec_consuming()
{
bool exit_flag = false;
Expand Down
8 changes: 6 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ void SequentialWriter::switch_to_next_storage()
for (const auto & topic : topics_names_to_info_) {
storage_->create_topic(topic.second.topic_metadata);
}

if (use_cache_) {
// restart consumer thread for cache
cache_consumer_->start();
}
}

void SequentialWriter::split_bagfile()
Expand Down Expand Up @@ -347,8 +352,7 @@ bool SequentialWriter::should_split_bagfile() const
if (storage_options_.max_bagfile_size !=
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
should_split = should_split ||
(storage_->get_bagfile_size() > storage_options_.max_bagfile_size);
should_split = (storage_->get_bagfile_size() >= storage_options_.max_bagfile_size);
}

// Splitting by time
Expand Down
121 changes: 118 additions & 3 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,24 @@ class SequentialWriterTest : public Test
std::unique_ptr<MockMetadataIo> metadata_io_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;
rosbag2_storage::StorageOptions storage_options_;
uint64_t fake_storage_size_;
std::atomic<uint32_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
rosbag2_storage::BagMetadata fake_metadata_;
std::string fake_storage_uri_;
};

std::shared_ptr<rosbag2_storage::SerializedBagMessage> make_test_msg()
{
static uint32_t counter = 0;
std::string msg_content = "Hello" + std::to_string(counter++);
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data = rosbag2_storage::make_serialized_message(
msg_content.c_str(), msg_length);
return message;
}

TEST_F(
SequentialWriterTest,
write_uses_converters_to_convert_serialization_format_if_input_and_output_format_are_different) {
Expand Down Expand Up @@ -207,12 +220,12 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
fake_storage_size_++;
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
return fake_storage_size_.load();
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
Expand Down Expand Up @@ -266,6 +279,108 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf
}
}

TEST_F(
SequentialWriterTest,
writer_with_cache_splits_when_storage_bagfile_size_gt_max_bagfile_size) {
const size_t message_count = 15;
const size_t expected_total_written_messages = message_count - 1;
const size_t max_bagfile_size = 5;
const auto expected_splits = message_count / max_bagfile_size;
fake_storage_size_ = 0;
size_t written_messages = 0;

ON_CALL(
*storage_,
write(An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())).
WillByDefault(
[this, &written_messages]
(const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs)
{
written_messages += msgs.size();
fake_storage_size_.fetch_add(static_cast<uint32_t>(msgs.size()));
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_.load();
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

EXPECT_CALL(*metadata_io_, write_metadata).Times(1);

EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(3);

// intercept the metadata write so we can analyze it.
ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
fake_metadata_ = metadata;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::string rmw_format = "rmw_format";

storage_options_.max_bagfile_size = max_bagfile_size;
storage_options_.max_cache_size = 4000u;
storage_options_.snapshot_mode = false;

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

auto timeout = std::chrono::seconds(2);
for (auto i = 1u; i < message_count; ++i) {
writer_->write(make_test_msg());
// Wait for written_messages == i for each 5th message with timeout in 2 sec
// Need yield resources and make sure that cache_consumer had a chance to dump buffer to the
// storage before split is gonna occur. i.e. each 5th message.
if ((i % max_bagfile_size) == 0) {
auto start_time = std::chrono::steady_clock::now();
while ((i != written_messages) &&
(std::chrono::steady_clock::now() - start_time < timeout))
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
EXPECT_EQ(i, written_messages);
}
if ((i % max_bagfile_size) == 1) { // Check on the 6th and 11 message that split happened.
// i.e. fake_storage_size_ zeroed on split and then incremented in cache_consumer callback.
auto start_time = std::chrono::steady_clock::now();
while ((fake_storage_size_ != 1u) &&
((std::chrono::steady_clock::now() - start_time) < timeout))
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
EXPECT_EQ(fake_storage_size_, 1u) << "current message number = " << i;
}
}

writer_.reset();
EXPECT_EQ(written_messages, expected_total_written_messages);

// metadata should be written now that the Writer was released.
EXPECT_EQ(
fake_metadata_.relative_file_paths.size(),
static_cast<unsigned int>(expected_splits)) <<
"Storage should have split bagfile " << (expected_splits - 1);

const auto base_path = storage_options_.uri;
int counter = 0;
for (const auto & path : fake_metadata_.relative_file_paths) {
std::stringstream ss;
ss << base_path << "_" << counter;

const auto expected_path = ss.str();
counter++;
EXPECT_EQ(expected_path, path);
}
}

TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) {
const size_t counter = 1000;
const uint64_t max_cache_size = 0;
Expand Down

0 comments on commit d74555d

Please sign in to comment.