Skip to content

Commit

Permalink
Fix a bug on invalid pointer address when using "MESSAGE" compressio… (
Browse files Browse the repository at this point in the history
…#866)

* Fix a bug on invalid pointer address when using "MESSAGE" compression mode to write data
* Deprecate one old write interface and add new interface instead

Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 authored Oct 18, 2021
1 parent 4615fba commit 004ec83
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 35 deletions.
30 changes: 27 additions & 3 deletions rosbag2_cpp/include/rosbag2_cpp/writer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,14 +144,38 @@ class ROSBAG2_CPP_PUBLIC Writer final
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
* \throws runtime_error if the Writer is not open or duplicating message is failed.
*/
[[deprecated(
"Use write(std::shared_ptr<rclcpp::SerializedMessage> message," \
" const std::string & topic_name," \
" const std::string & type_name," \
" const rclcpp::Time & time) instead."
)]]
void write(
const rclcpp::SerializedMessage & message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a serialized message to a bagfile.
* The topic will be created if it has not been created already.
*
* \warning after calling this function, the serialized data will no longer be managed by message.
*
* \param message rclcpp::SerializedMessage The serialized message to be written to the bagfile
* \param topic_name the string of the topic this messages belongs to
* \param type_name the string of the type associated with this message
* \param time The time stamp of the message
* \throws runtime_error if the Writer is not open.
*/
void write(
std::shared_ptr<rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time);

/**
* Write a non-serialized message to a bagfile.
* The topic will be created if it has not been created already.
Expand All @@ -168,10 +192,10 @@ class ROSBAG2_CPP_PUBLIC Writer final
const std::string & topic_name,
const rclcpp::Time & time)
{
rclcpp::SerializedMessage serialized_msg;
auto serialized_msg = std::make_shared<rclcpp::SerializedMessage>();

rclcpp::Serialization<MessageT> serialization;
serialization.serialize_message(&message, &serialized_msg);
serialization.serialize_message(&message, serialized_msg.get());
return write(serialized_msg, topic_name, rosidl_generator_traits::name<MessageT>(), time);
}

Expand Down
67 changes: 63 additions & 4 deletions rosbag2_cpp/src/rosbag2_cpp/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

#include <algorithm>
#include <chrono>
#include <cstring>
#include <memory>
#include <stdexcept>
#include <string>
#include <utility>

#include "rclcpp/logging.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/time.hpp"

Expand Down Expand Up @@ -117,14 +119,71 @@ void Writer::write(
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();

// temporary store the payload in a shared_ptr.
// add custom no-op deleter to avoid deep copying data.
serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
const_cast<rcutils_uint8_array_t *>(&message.get_rcl_serialized_message()),
[](rcutils_uint8_array_t * /* data */) {});
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_cpp"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});

// While using compression mode and cache size isn't 0, another thread deals with this serialized
// message asynchronously.
// In order to keep serialized message valid, have to duplicate message.

rcutils_allocator_t allocator = rcutils_get_default_allocator();

rcutils_ret_t ret = rcutils_uint8_array_init(
serialized_bag_message->serialized_data.get(),
message.get_rcl_serialized_message().buffer_capacity,
&allocator);
if (ret != RCUTILS_RET_OK) {
auto err = std::string("Failed to call rcutils_uint8_array_init(): return ");
err += ret;
throw std::runtime_error(err);
}

std::memcpy(
serialized_bag_message->serialized_data->buffer,
message.get_rcl_serialized_message().buffer,
message.get_rcl_serialized_message().buffer_length);

serialized_bag_message->serialized_data->buffer_length =
message.get_rcl_serialized_message().buffer_length;

return write(
serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

void Writer::write(
std::shared_ptr<rclcpp::SerializedMessage> message,
const std::string & topic_name,
const std::string & type_name,
const rclcpp::Time & time)
{
auto serialized_bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
serialized_bag_message->topic_name = topic_name;
serialized_bag_message->time_stamp = time.nanoseconds();

serialized_bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_cpp"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});

*serialized_bag_message->serialized_data = message->release_rcl_serialized_message();

return write(serialized_bag_message, topic_name, type_name, rmw_get_serialization_format());
}

} // namespace rosbag2_cpp
6 changes: 5 additions & 1 deletion rosbag2_tests/test/rosbag2_tests/test_rosbag2_cpp_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,12 @@ TEST(TestRosbag2CPPAPI, minimal_writer_example)
writer.write(bag_message, "/my/other/topic", "test_msgs/msg/BasicTypes");

// yet another alternative, writing the rclcpp::SerializedMessage directly
std::shared_ptr<rclcpp::SerializedMessage> serialized_msg2 =
std::make_shared<rclcpp::SerializedMessage>();
serialization.serialize_message(&test_msg, serialized_msg2.get());

writer.write(
serialized_msg, "/yet/another/topic", "test_msgs/msg/BasicTypes",
serialized_msg2, "/yet/another/topic", "test_msgs/msg/BasicTypes",
rclcpp::Clock().now());

// writing a non-serialized message
Expand Down
30 changes: 3 additions & 27 deletions rosbag2_transport/src/rosbag2_transport/recorder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <vector>

#include "rclcpp/logging.hpp"
#include "rclcpp/clock.hpp"

#include "rosbag2_cpp/writer.hpp"

Expand Down Expand Up @@ -234,33 +235,8 @@ Recorder::create_subscription(
topic_name,
topic_type,
qos,
[this, topic_name](std::shared_ptr<rclcpp::SerializedMessage> message) {
auto bag_message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
// the serialized bag message takes ownership of the incoming rclcpp serialized message
// we therefore have to make sure to cleanup that memory in a custom deleter.
bag_message->serialized_data = std::shared_ptr<rcutils_uint8_array_t>(
new rcutils_uint8_array_t,
[](rcutils_uint8_array_t * msg) {
auto fini_return = rcutils_uint8_array_fini(msg);
delete msg;
if (fini_return != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
rclcpp::get_logger("rosbag2_transport"),
"Failed to destroy serialized message: " << rcutils_get_error_string().str);
}
});
*bag_message->serialized_data = message->release_rcl_serialized_message();
bag_message->topic_name = topic_name;
rcutils_time_point_value_t time_stamp;
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCLCPP_ERROR_STREAM(
this->get_logger(),
"Error getting current time. Error:" << rcutils_get_error_string().str);
}
bag_message->time_stamp = time_stamp;

writer_->write(bag_message);
[this, topic_name, topic_type](std::shared_ptr<rclcpp::SerializedMessage> message) {
writer_->write(message, topic_name, topic_type, rclcpp::Clock(RCL_SYSTEM_TIME).now());
});
return subscription;
}
Expand Down

0 comments on commit 004ec83

Please sign in to comment.