Skip to content

Commit

Permalink
Expose raw CDR stream for publish and subscribe (#186)
Browse files Browse the repository at this point in the history
* publish raw function

* take raw

* linters

* add rmw_serialize functions

* rmw_fastrtps_cpp/src/rmw_serialize memory leak when creating typesupport

* linters

* comparison size_t with unsigned int

* remove asserts

* refactor take code

* use rcutls macro

* changes for rebase

* warn unused

* fix typo

* raw->serialized

* use size_t (#206)
  • Loading branch information
Karsten1987 authored Jun 16, 2018
1 parent a33f9f8 commit 636721d
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 55 deletions.
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add_library(rmw_fastrtps_cpp
src/rmw_publisher.cpp
src/rmw_request.cpp
src/rmw_response.cpp
src/rmw_serialize.cpp
src/rmw_service.cpp
src/rmw_service_names_and_types.cpp
src/rmw_service_server_is_available.cpp
Expand Down
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class TypeSupport : public eprosima::fastrtps::TopicDataType
public:
bool serializeROSmessage(const void * ros_message, eprosima::fastcdr::Cdr & ser);

bool deserializeROSmessage(eprosima::fastcdr::FastBuffer * data, void * ros_message);
bool deserializeROSmessage(eprosima::fastcdr::Cdr & deser, void * ros_message);

bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload);

Expand Down
10 changes: 4 additions & 6 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,10 @@ bool TypeSupport<MembersType>::serializeROSmessage(

template<typename MembersType>
bool TypeSupport<MembersType>::deserializeROSmessage(
eprosima::fastcdr::FastBuffer * buffer, void * ros_message)
eprosima::fastcdr::Cdr & deser, void * ros_message)
{
assert(buffer);
assert(ros_message);

eprosima::fastcdr::Cdr deser(*buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

// Deserialize encapsulation.
deser.read_encapsulation();

Expand Down Expand Up @@ -790,7 +786,9 @@ bool TypeSupport<MembersType>::deserialize(
assert(payload);

auto buffer = static_cast<eprosima::fastcdr::FastBuffer *>(data);
buffer->resize(payload->length);
if (!buffer->reserve(payload->length)) {
return false;
}
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;
}
Expand Down
67 changes: 54 additions & 13 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,82 @@

#include "rmw_fastrtps_cpp/custom_publisher_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "ros_message_serialization.hpp"
#include "rmw_fastrtps_cpp/macros.hpp"

#include "./ros_message_serialization.hpp"

extern "C"
{
rmw_ret_t
rmw_publish(const rmw_publisher_t * publisher, const void * ros_message)
{
assert(publisher);
assert(ros_message);
rmw_ret_t returnedValue = RMW_RET_ERROR;
auto error_allocator = rcutils_get_default_allocator();
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
publisher, "publisher pointer is null", return RMW_RET_ERROR, error_allocator);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
ros_message, "ros_message pointer is null", return RMW_RET_ERROR, error_allocator);

if (publisher->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

auto info = static_cast<CustomPublisherInfo *>(publisher->data);
assert(info);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (_serialize_ros_message(ros_message, ser, info->type_support_,
if (!_serialize_ros_message(ros_message, ser, info->type_support_,
info->typesupport_identifier_))
{
if (info->publisher_->write(&ser)) {
returnedValue = RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
}
} else {
RMW_SET_ERROR_MSG("cannot serialize data");
return RMW_RET_ERROR;
}
if (!info->publisher_->write(&ser)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

rmw_ret_t
rmw_publish_serialized_message(
const rmw_publisher_t * publisher, const rmw_serialized_message_t * serialized_message)
{
auto error_allocator = rcutils_get_default_allocator();
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
publisher, "publisher pointer is null", return RMW_RET_ERROR, error_allocator);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
serialized_message, "serialized_message pointer is null",
return RMW_RET_ERROR, error_allocator);

if (publisher->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

auto info = static_cast<CustomPublisherInfo *>(publisher->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer(
serialized_message->buffer, serialized_message->buffer_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
if (!ser.jump(serialized_message->buffer_length)) {
RMW_SET_ERROR_MSG("cannot correctly set serialized buffer");
return RMW_RET_ERROR;
}

if (!info->publisher_->write(&ser)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}

return returnedValue;
return RMW_RET_OK;
}
} // extern "C"
4 changes: 3 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ rmw_take_request(
assert(info);

CustomServiceRequest request = info->listener_->getRequest();
eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (request.buffer_ != nullptr) {
_deserialize_ros_message(request.buffer_, ros_request, info->request_type_support_,
_deserialize_ros_message(deser, ros_request, info->request_type_support_,
info->typesupport_identifier_);

// Get header
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ rmw_take_response(
CustomClientResponse response;

if (info->listener_->getResponse(response)) {
_deserialize_ros_message(response.buffer_.get(), ros_response, info->response_type_support_,
info->typesupport_identifier_);
eprosima::fastcdr::Cdr deser(
*response.buffer_,
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
_deserialize_ros_message(
deser, ros_response, info->response_type_support_, info->typesupport_identifier_);

request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;
Expand Down
90 changes: 90 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "fastcdr/FastBuffer.h"

#include "rmw/error_handling.h"
#include "rmw/serialized_message.h"
#include "rmw/rmw.h"

#include "./type_support_common.hpp"
#include "./ros_message_serialization.hpp"

extern "C"
{
rmw_ret_t
rmw_serialize(
const void * ros_message,
const rosidl_message_type_support_t * type_support,
rmw_serialized_message_t * serialized_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _serialize_ros_message(ros_message, ser, tss, ts->typesupport_identifier);
auto data_length = static_cast<size_t>(ser.getSerializedDataLength());
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("unable to dynamically resize serialized message");
return RMW_RET_ERROR;
}
}
memcpy(serialized_message->buffer, ser.getBufferPointer(), data_length);
serialized_message->buffer_length = data_length;
serialized_message->buffer_capacity = data_length;
_delete_typesupport(tss, ts->typesupport_identifier);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
}

rmw_ret_t
rmw_deserialize(
const rmw_serialized_message_t * serialized_message,
const rosidl_message_type_support_t * type_support,
void * ros_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer(
serialized_message->buffer, serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _deserialize_ros_message(deser, ros_message, tss, ts->typesupport_identifier);
_delete_typesupport(tss, ts->typesupport_identifier);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
}
} // extern "C"
5 changes: 3 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
#include "fastrtps/participant/Participant.h"
#include "fastrtps/subscriber/Subscriber.h"

#include "rmw_fastrtps_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_cpp/custom_subscriber_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"

#include "namespace_prefix.hpp"
#include "qos.hpp"
#include "rmw_fastrtps_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_cpp/custom_subscriber_info.hpp"
#include "type_support_common.hpp"

using Domain = eprosima::fastrtps::Domain;
Expand Down
Loading

0 comments on commit 636721d

Please sign in to comment.