Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix for broken bag split when using cache #936

Merged
merged 3 commits into from
Dec 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,14 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
message->topic_name = test_topic_name;

writer_->write(message);
// bag size == max_bafile_size, no split yet
// bag size == max_bagfile_size, split
writer_->write(message);
// bag size > max_bagfile_size, split
// bag size == max_bagfile_size, split
writer_->write(message);
writer_.reset();

EXPECT_EQ(
intercepted_metadata_.relative_file_paths.size(), 2u);
intercepted_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
Expand Down
3 changes: 3 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/cache/cache_consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class ROSBAG2_CPP_PUBLIC CacheConsumer

~CacheConsumer();

/// \brief start inner consumer thread if it hasn't been started yet
void start();

/// shut down the consumer thread
void close();

Expand Down
8 changes: 8 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/cache/cache_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ void CacheConsumer::close()
message_cache_->done_flushing();
}

void CacheConsumer::start()
{
is_stop_issued_ = false;
if (!consumer_thread_.joinable()) {
consumer_thread_ = std::thread(&CacheConsumer::exec_consuming, this);
}
}

void CacheConsumer::exec_consuming()
{
bool exit_flag = false;
Expand Down
8 changes: 6 additions & 2 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ void SequentialWriter::switch_to_next_storage()
for (const auto & topic : topics_names_to_info_) {
storage_->create_topic(topic.second.topic_metadata);
}

if (use_cache_) {
MichaelOrlov marked this conversation as resolved.
Show resolved Hide resolved
// restart consumer thread for cache
cache_consumer_->start();
}
}

void SequentialWriter::split_bagfile()
Expand Down Expand Up @@ -347,8 +352,7 @@ bool SequentialWriter::should_split_bagfile() const
if (storage_options_.max_bagfile_size !=
rosbag2_storage::storage_interfaces::MAX_BAGFILE_SIZE_NO_SPLIT)
{
should_split = should_split ||
(storage_->get_bagfile_size() > storage_options_.max_bagfile_size);
should_split = (storage_->get_bagfile_size() >= storage_options_.max_bagfile_size);
}

// Splitting by time
Expand Down
121 changes: 118 additions & 3 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,24 @@ class SequentialWriterTest : public Test
std::unique_ptr<MockMetadataIo> metadata_io_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;
rosbag2_storage::StorageOptions storage_options_;
uint64_t fake_storage_size_;
std::atomic<uint32_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
rosbag2_storage::BagMetadata fake_metadata_;
std::string fake_storage_uri_;
};

std::shared_ptr<rosbag2_storage::SerializedBagMessage> make_test_msg()
{
static uint32_t counter = 0;
std::string msg_content = "Hello" + std::to_string(counter++);
auto msg_length = msg_content.length();
auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = "test_topic";
message->serialized_data = rosbag2_storage::make_serialized_message(
msg_content.c_str(), msg_length);
return message;
}

TEST_F(
SequentialWriterTest,
write_uses_converters_to_convert_serialization_format_if_input_and_output_format_are_different) {
Expand Down Expand Up @@ -207,12 +220,12 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf
*storage_,
write(An<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>>())).WillByDefault(
[this](std::shared_ptr<const rosbag2_storage::SerializedBagMessage>) {
fake_storage_size_ += 1;
fake_storage_size_++;
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_;
return fake_storage_size_.load();
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
Expand Down Expand Up @@ -266,6 +279,108 @@ TEST_F(SequentialWriterTest, writer_splits_when_storage_bagfile_size_gt_max_bagf
}
}

TEST_F(
SequentialWriterTest,
writer_with_cache_splits_when_storage_bagfile_size_gt_max_bagfile_size) {
const size_t message_count = 15;
const size_t expected_total_written_messages = message_count - 1;
const size_t max_bagfile_size = 5;
const auto expected_splits = message_count / max_bagfile_size;
fake_storage_size_ = 0;
size_t written_messages = 0;

ON_CALL(
*storage_,
write(An<const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> &>())).
WillByDefault(
[this, &written_messages]
(const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs)
{
written_messages += msgs.size();
fake_storage_size_.fetch_add(static_cast<uint32_t>(msgs.size()));
});

ON_CALL(*storage_, get_bagfile_size).WillByDefault(
[this]() {
return fake_storage_size_.load();
});

ON_CALL(*storage_, get_relative_file_path).WillByDefault(
[this]() {
return fake_storage_uri_;
});

EXPECT_CALL(*metadata_io_, write_metadata).Times(1);

EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(3);

// intercept the metadata write so we can analyze it.
ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
fake_metadata_ = metadata;
});

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::string rmw_format = "rmw_format";

storage_options_.max_bagfile_size = max_bagfile_size;
storage_options_.max_cache_size = 4000u;
storage_options_.snapshot_mode = false;

writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({"test_topic", "test_msgs/BasicTypes", "", ""});

auto timeout = std::chrono::seconds(2);
for (auto i = 1u; i < message_count; ++i) {
writer_->write(make_test_msg());
// Wait for written_messages == i for each 5th message with timeout in 2 sec
// Need yield resources and make sure that cache_consumer had a chance to dump buffer to the
// storage before split is gonna occur. i.e. each 5th message.
if ((i % max_bagfile_size) == 0) {
auto start_time = std::chrono::steady_clock::now();
while ((i != written_messages) &&
(std::chrono::steady_clock::now() - start_time < timeout))
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
EXPECT_EQ(i, written_messages);
}
if ((i % max_bagfile_size) == 1) { // Check on the 6th and 11 message that split happened.
// i.e. fake_storage_size_ zeroed on split and then incremented in cache_consumer callback.
auto start_time = std::chrono::steady_clock::now();
while ((fake_storage_size_ != 1u) &&
((std::chrono::steady_clock::now() - start_time) < timeout))
{
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
EXPECT_EQ(fake_storage_size_, 1u) << "current message number = " << i;
}
}

writer_.reset();
EXPECT_EQ(written_messages, expected_total_written_messages);

// metadata should be written now that the Writer was released.
EXPECT_EQ(
fake_metadata_.relative_file_paths.size(),
static_cast<unsigned int>(expected_splits)) <<
"Storage should have split bagfile " << (expected_splits - 1);

const auto base_path = storage_options_.uri;
int counter = 0;
for (const auto & path : fake_metadata_.relative_file_paths) {
std::stringstream ss;
ss << base_path << "_" << counter;

const auto expected_path = ss.str();
counter++;
EXPECT_EQ(expected_path, path);
}
}

TEST_F(SequentialWriterTest, do_not_use_cache_if_cache_size_is_zero) {
const size_t counter = 1000;
const uint64_t max_cache_size = 0;
Expand Down