Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid allocations #211

Merged
merged 4 commits into from
Jun 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ MessageTypeSupport<MembersType>::MessageTypeSupport(const MembersType * members)
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 4));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(members, 0));
} else {
this->m_typeSize++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ RequestTypeSupport<ServiceMembersType, MessageMembersType>::RequestTypeSupport(
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 4));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 0));
} else {
this->m_typeSize++;
}
Expand All @@ -69,7 +69,7 @@ ResponseTypeSupport<ServiceMembersType, MessageMembersType>::ResponseTypeSupport
// Encapsulation size
this->m_typeSize = 4;
if (this->members_->member_count_ != 0) {
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 4));
this->m_typeSize += static_cast<uint32_t>(this->calculateMaxSerializedSize(this->members_, 0));
} else {
this->m_typeSize++;
}
Expand Down
7 changes: 7 additions & 0 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
namespace rmw_fastrtps_cpp
{

// Publishers write method will receive a pointer to this struct
struct SerializedData
{
bool is_cdr_buffer; // Whether next field is a pointer to a Cdr or to a plain ros message
void * data;
};

// Helper class that uses template specialization to read/write string types to/from a
// eprosima::fastcdr::Cdr
template<typename MembersType>
Expand Down
74 changes: 56 additions & 18 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,11 @@ size_t next_field_align(
std::vector<T> & data = *reinterpret_cast<std::vector<T> *>(field);
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, padding);
current_alignment += padding;
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, item_size);
current_alignment += item_size * data.size();
auto num_elems = data.size();
if (num_elems > 0) {
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, item_size);
current_alignment += item_size * data.size();
}
}
return current_alignment;
}
Expand All @@ -437,7 +440,7 @@ size_t next_field_align<std::string>(
if (!member->is_array_) {
current_alignment += eprosima::fastcdr::Cdr::alignment(current_alignment, padding);
current_alignment += padding;
auto str = *static_cast<std::string *>(field);
auto & str = *static_cast<std::string *>(field);
current_alignment += str.size() + 1;
} else if (member->array_size_ && !member->is_upper_bound_) {
auto str_arr = static_cast<std::string *>(field);
Expand Down Expand Up @@ -935,7 +938,7 @@ size_t TypeSupport<MembersType>::getEstimatedSerializedSize(
size_t ret_val = 4;

if (members_->member_count_ != 0) {
ret_val += TypeSupport::getEstimatedSerializedSize(members_, ros_message, ret_val);
ret_val += TypeSupport::getEstimatedSerializedSize(members_, ros_message, 0);
} else {
ret_val += 1;
}
Expand Down Expand Up @@ -994,13 +997,28 @@ bool TypeSupport<MembersType>::serialize(
assert(data);
assert(payload);

auto ser = static_cast<eprosima::fastcdr::Cdr *>(data);
if (payload->max_size >= ser->getSerializedDataLength()) {
payload->length = static_cast<uint32_t>(ser->getSerializedDataLength());
payload->encapsulation = ser->endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
memcpy(payload->data, ser->getBufferPointer(), ser->getSerializedDataLength());
return true;
auto ser_data = static_cast<SerializedData *>(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function could be strongly typed then, right? So this cast could go.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not possible, as it is the implementation of Fast-RTPS TopicDataType interface, declared in this line

if (ser_data->is_cdr_buffer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little bit confused. Why would you call serialize on already binary data?

Copy link
Collaborator Author

@MiguelCompany MiguelCompany Jun 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case when the call comes from rmw_publish_serialized_message. The serialize method is called internally by code inside Fast-RTPS publisher.

Copy link
Contributor

@Karsten1987 Karsten1987 Jun 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so right now, there is no call to serialize. The raw data will simply be wrapped in Cdr and directly send.
I feel like, this decision of whether it's actually necessary to serialize or not should be taken outside of the serialize function.
https://github.com/ros2/rmw_fastrtps/blob/master/rmw_fastrtps_cpp/src/rmw_publish.cpp#L86-L89

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say 'right now', do you mean on the code currently on master, or on this PR.

Whenever data is sent, the serialize function of TypeSupport class is called. In the code currently on master it just copies an externally serialized buffer into the SerializedPayload (that points) to the internal history record buffer.

The code on this PR does that on some cases (namely rmw_publish_serialized) and performs serialization directly on others (rmw_publish). On the latter case we save the allocation of a temporary buffer.

auto ser = static_cast<eprosima::fastcdr::Cdr *>(ser_data->data);
if (payload->max_size >= ser->getSerializedDataLength()) {
payload->length = static_cast<uint32_t>(ser->getSerializedDataLength());
payload->encapsulation = ser->endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
memcpy(payload->data, ser->getBufferPointer(), ser->getSerializedDataLength());
return true;
}
} else {
eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(payload->data),
payload->max_size); // Object that manages the raw buffer.
eprosima::fastcdr::Cdr ser(fastbuffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR); // Object that serializes the data.
if (this->serializeROSmessage(ser_data->data, ser)) {
payload->encapsulation = ser.endianness() ==
eprosima::fastcdr::Cdr::BIG_ENDIANNESS ? CDR_BE : CDR_LE;
payload->length = (uint32_t)ser.getSerializedDataLength();
return true;
}
}

return false;
Expand All @@ -1014,21 +1032,41 @@ bool TypeSupport<MembersType>::deserialize(
assert(data);
assert(payload);

auto buffer = static_cast<eprosima::fastcdr::FastBuffer *>(data);
if (!buffer->reserve(payload->length)) {
return false;
auto ser_data = static_cast<SerializedData *>(data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this function also could be strongly typed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not possible, as it is the implementation of Fast-RTPS TopicDataType interface, declared in this line

if (ser_data->is_cdr_buffer) {
auto buffer = static_cast<eprosima::fastcdr::FastBuffer *>(ser_data->data);
if (!buffer->reserve(payload->length)) {
return false;
}
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;
}
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;

eprosima::fastcdr::FastBuffer fastbuffer(
reinterpret_cast<char *>(payload->data),
payload->length);
eprosima::fastcdr::Cdr deser(
fastbuffer,
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
return deserializeROSmessage(deser, ser_data->data);
}

template<typename MembersType>
std::function<uint32_t()> TypeSupport<MembersType>::getSerializedSizeProvider(void * data)
{
assert(data);

auto ser = static_cast<eprosima::fastcdr::Cdr *>(data);
return [ser]() -> uint32_t {return static_cast<uint32_t>(ser->getSerializedDataLength());};
auto ser_data = static_cast<SerializedData *>(data);
auto ser_size = [this, ser_data]() -> uint32_t
{
if (ser_data->is_cdr_buffer) {
auto ser = static_cast<eprosima::fastcdr::Cdr *>(ser_data->data);
return static_cast<uint32_t>(ser->getSerializedDataLength());
}
return static_cast<uint32_t>(this->getEstimatedSerializedSize(ser_data->data));
};
return ser_size;
}

} // namespace rmw_fastrtps_cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

class ClientListener;

Expand Down Expand Up @@ -66,7 +67,10 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(response.buffer_.get(), &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = response.buffer_.get();
if (sub->takeNextData(&data, &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
response.sample_identity_ = sinfo.related_sample_identity;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/subscriber/SampleInfo.h"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

class ServiceListener;

Expand Down Expand Up @@ -69,7 +70,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener
request.buffer_ = new eprosima::fastcdr::FastBuffer();
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(request.buffer_, &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = request.buffer_;
if (sub->takeNextData(&data, &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
request.sample_identity_ = sinfo.sample_identity;

Expand Down
21 changes: 9 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "rmw_fastrtps_cpp/custom_publisher_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/macros.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

#include "./ros_message_serialization.hpp"

Expand All @@ -45,17 +46,10 @@ rmw_publish(const rmw_publisher_t * publisher, const void * ros_message)
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (!_serialize_ros_message(ros_message, buffer, ser, info->type_support_,
info->typesupport_identifier_))
{
RMW_SET_ERROR_MSG("cannot serialize data");
return RMW_RET_ERROR;
}
if (!info->publisher_->write(&ser)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_message);
if (!info->publisher_->write(&data)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be data.data ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As stated in this comment, it is not. The write method is provided with a pointer to a SerializedData struct, that will in turn be passed on to the serialize method on TypeSupport_impl.

RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}
Expand Down Expand Up @@ -92,7 +86,10 @@ rmw_publish_serialized_message(
return RMW_RET_ERROR;
}

if (!info->publisher_->write(&ser)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = &ser;
if (!info->publisher_->write(&data)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}
Expand Down
27 changes: 10 additions & 17 deletions rmw_fastrtps_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "rmw_fastrtps_cpp/custom_client_info.hpp"
#include "rmw_fastrtps_cpp/custom_service_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"
#include "ros_message_serialization.hpp"

extern "C"
Expand All @@ -50,24 +51,16 @@ rmw_send_request(
auto info = static_cast<CustomClientInfo *>(client->data);
assert(info);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (_serialize_ros_message(ros_request, buffer, ser, info->request_type_support_,
info->typesupport_identifier_))
{
eprosima::fastrtps::rtps::WriteParams wparams;

if (info->request_publisher_->write(&ser, wparams)) {
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 |
wparams.sample_identity().sequence_number().low;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
}
eprosima::fastrtps::rtps::WriteParams wparams;
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_request);
if (info->request_publisher_->write(&data, wparams)) {
returnedValue = RMW_RET_OK;
*sequence_id = ((int64_t)wparams.sample_identity().sequence_number().high) << 32 |
wparams.sample_identity().sequence_number().low;
} else {
RMW_SET_ERROR_MSG("cannot serialize data");
RMW_SET_ERROR_MSG("cannot publish data");
}

return returnedValue;
Expand Down
13 changes: 6 additions & 7 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rmw_fastrtps_cpp/custom_client_info.hpp"
#include "rmw_fastrtps_cpp/custom_service_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"
#include "ros_message_serialization.hpp"

extern "C"
Expand Down Expand Up @@ -89,12 +90,6 @@ rmw_send_response(
auto info = static_cast<CustomServiceInfo *>(service->data);
assert(info);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

_serialize_ros_message(ros_response, buffer, ser, info->response_type_support_,
info->typesupport_identifier_);
eprosima::fastrtps::rtps::WriteParams wparams;
memcpy(&wparams.related_sample_identity().writer_guid(), request_header->writer_guid,
sizeof(eprosima::fastrtps::rtps::GUID_t));
Expand All @@ -103,7 +98,11 @@ rmw_send_response(
wparams.related_sample_identity().sequence_number().low =
(int32_t)(request_header->sequence_number & 0xFFFFFFFF);

if (info->response_publisher_->write(&ser, wparams)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = const_cast<void *>(ros_response);

if (info->response_publisher_->write(&data, wparams)) {
returnedValue = RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
Expand Down
14 changes: 7 additions & 7 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,19 @@ rmw_serialize(
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _serialize_ros_message(ros_message, buffer, ser, tss, ts->typesupport_identifier);
auto data_length = static_cast<size_t>(ser.getSerializedDataLength());
auto data_length = _get_serialized_size(ros_message, tss, ts->typesupport_identifier);
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("unable to dynamically resize serialized message");
return RMW_RET_ERROR;
}
}
memcpy(serialized_message->buffer, ser.getBufferPointer(), data_length);

eprosima::fastcdr::FastBuffer buffer(serialized_message->buffer, data_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _serialize_ros_message(ros_message, buffer, ser, tss, ts->typesupport_identifier);
serialized_message->buffer_length = data_length;
serialized_message->buffer_capacity = data_length;
_delete_typesupport(tss, ts->typesupport_identifier);
Expand Down
16 changes: 9 additions & 7 deletions rmw_fastrtps_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rmw_fastrtps_cpp/custom_subscriber_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "rmw_fastrtps_cpp/macros.hpp"
#include "rmw_fastrtps_cpp/TypeSupport.hpp"

#include "./ros_message_serialization.hpp"

Expand Down Expand Up @@ -63,17 +64,15 @@ _take(
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "custom subscriber info is null", return RMW_RET_ERROR, error_msg_allocator);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastrtps::SampleInfo_t sinfo;

if (info->subscriber_->takeNextData(&buffer, &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = false;
data.data = ros_message;
if (info->subscriber_->takeNextData(&data, &sinfo)) {
info->listener_->data_taken();

if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
eprosima::fastcdr::Cdr deser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
_deserialize_ros_message(deser, ros_message, info->type_support_,
info->typesupport_identifier_);
if (message_info) {
_assign_message_info(message_info, &sinfo);
}
Expand Down Expand Up @@ -140,7 +139,10 @@ _take_serialized_message(
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastrtps::SampleInfo_t sinfo;

if (info->subscriber_->takeNextData(&buffer, &sinfo)) {
rmw_fastrtps_cpp::SerializedData data;
data.is_cdr_buffer = true;
data.data = &buffer;
if (info->subscriber_->takeNextData(&data, &sinfo)) {
info->listener_->data_taken();

if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
Expand Down
Loading