Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose raw CDR stream for publish and subscribe #186

Merged
merged 15 commits into from
Jun 16, 2018
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add_library(rmw_fastrtps_cpp
src/rmw_publisher.cpp
src/rmw_request.cpp
src/rmw_response.cpp
src/rmw_serialize.cpp
src/rmw_service.cpp
src/rmw_service_names_and_types.cpp
src/rmw_service_server_is_available.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class TypeSupport : public eprosima::fastrtps::TopicDataType
public:
bool serializeROSmessage(const void * ros_message, eprosima::fastcdr::Cdr & ser);

bool deserializeROSmessage(eprosima::fastcdr::FastBuffer * data, void * ros_message);
bool deserializeROSmessage(eprosima::fastcdr::Cdr & deser, void * ros_message);

bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload);

Expand Down
15 changes: 9 additions & 6 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,10 @@ bool TypeSupport<MembersType>::serializeROSmessage(

template<typename MembersType>
bool TypeSupport<MembersType>::deserializeROSmessage(
eprosima::fastcdr::FastBuffer * buffer, void * ros_message)
eprosima::fastcdr::Cdr & deser, void * ros_message)
{
assert(buffer);
assert(ros_message);

eprosima::fastcdr::Cdr deser(*buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

// Deserialize encapsulation.
deser.read_encapsulation();

Expand Down Expand Up @@ -790,7 +786,14 @@ bool TypeSupport<MembersType>::deserialize(
assert(payload);

auto buffer = static_cast<eprosima::fastcdr::FastBuffer *>(data);
buffer->resize(payload->length);
// TODO(karsten1987): This is a bug IMO
// see issue here: https://github.com/eProsima/Fast-CDR/issues/15
// an empty buffer will always be initialized to a hard set value - atm 200
// it will call malloc internally, so we can use malloc here to preinitialize our
// own buffer.
if (!buffer->reserve(payload->length)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get a compiler error here

In file included from /workspace/ros2-pr-review/src/ros2/rmw_fastrtps/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport
.hpp:146:0,
                 from /workspace/ros2-pr-review/src/ros2/rmw_fastrtps/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/MessageType
Support.hpp:24,
                 from /workspace/ros2-pr-review/src/ros2/rmw_fastrtps/rmw_fastrtps_cpp/src/type_support_common.hpp:25,
                 from /workspace/ros2-pr-review/src/ros2/rmw_fastrtps/rmw_fastrtps_cpp/src/client_service_common.cpp:17:
/workspace/ros2-pr-review/src/ros2/rmw_fastrtps/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp: In member f
unction ‘bool rmw_fastrtps_cpp::TypeSupport<MembersType>::deserialize(eprosima::fastrtps::rtps::SerializedPayload_t*, void*
)’:
/workspace/ros2-pr-review/src/ros2/rmw_fastrtps/rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp:768:16: erro
r: ‘class eprosima::fastcdr::FastBuffer’ has no member named ‘reserve’
   if (!buffer->reserve(payload->length)) {

Copy link
Contributor

@sloretz sloretz May 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it supposed to be buffer->resize()? Edit: oops, I didn't read the comment.

return false;
}
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;
}
Expand Down
64 changes: 51 additions & 13 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,79 @@

#include "rmw_fastrtps_cpp/custom_publisher_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "ros_message_serialization.hpp"
#include "rmw_fastrtps_cpp/macros.hpp"

#include "./ros_message_serialization.hpp"

extern "C"
{
rmw_ret_t
rmw_publish(const rmw_publisher_t * publisher, const void * ros_message)
{
assert(publisher);
assert(ros_message);
rmw_ret_t returnedValue = RMW_RET_ERROR;
auto error_allocator = rcutils_get_default_allocator();
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
publisher, "publisher pointer is null", return RMW_RET_ERROR, error_allocator);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
ros_message, "ros_message pointer is null", return RMW_RET_ERROR, error_allocator);

if (publisher->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

auto info = static_cast<CustomPublisherInfo *>(publisher->data);
assert(info);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (_serialize_ros_message(ros_message, ser, info->type_support_,
if (!_serialize_ros_message(ros_message, ser, info->type_support_,
info->typesupport_identifier_))
{
if (info->publisher_->write(&ser)) {
returnedValue = RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
}
} else {
RMW_SET_ERROR_MSG("cannot serialize data");
return RMW_RET_ERROR;
}
if (!info->publisher_->write(&ser)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

rmw_ret_t
rmw_publish_raw(const rmw_publisher_t * publisher, const rmw_message_raw_t * raw_message)
{
auto error_allocator = rcutils_get_default_allocator();
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
publisher, "publisher pointer is null", return RMW_RET_ERROR, error_allocator);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
raw_message, "raw_message pointer is null", return RMW_RET_ERROR, error_allocator);

if (publisher->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

auto info = static_cast<CustomPublisherInfo *>(publisher->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer(raw_message->buffer, raw_message->buffer_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
if (!ser.jump(raw_message->buffer_length)) {
RMW_SET_ERROR_MSG("cannot correctly set raw buffer");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you return from here? Or is ok to continue the function when this error happens? I don't know what this function does off-hand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return RMW_RET_ERROR;
}

if (!info->publisher_->write(&ser)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}

return returnedValue;
return RMW_RET_OK;
}
} // extern "C"
4 changes: 3 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ rmw_take_request(
assert(info);

CustomServiceRequest request = info->listener_->getRequest();
eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (request.buffer_ != nullptr) {
_deserialize_ros_message(request.buffer_, ros_request, info->request_type_support_,
_deserialize_ros_message(deser, ros_request, info->request_type_support_,
info->typesupport_identifier_);

// Get header
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ rmw_take_response(
CustomClientResponse response;

if (info->listener_->getResponse(response)) {
_deserialize_ros_message(response.buffer_.get(), ros_response, info->response_type_support_,
info->typesupport_identifier_);
eprosima::fastcdr::Cdr deser(
*response.buffer_,
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
_deserialize_ros_message(
deser, ros_response, info->response_type_support_, info->typesupport_identifier_);

request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;
Expand Down
86 changes: 86 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "fastcdr/FastBuffer.h"

#include "rmw/error_handling.h"
#include "rmw/raw_message.h"
#include "rmw/rmw.h"

#include "./type_support_common.hpp"
#include "./ros_message_serialization.hpp"

extern "C"
{
rmw_ret_t
rmw_serialize(
const void * ros_message,
const rosidl_message_type_support_t * type_support,
rmw_message_raw_t * raw_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _serialize_ros_message(ros_message, ser, tss, ts->typesupport_identifier);
auto data_length = static_cast<unsigned int>(ser.getSerializedDataLength());
if (raw_message->buffer_capacity < data_length) {
rmw_raw_message_resize(raw_message, data_length);
}
memcpy(raw_message->buffer, ser.getBufferPointer(), data_length);
raw_message->buffer_length = data_length;
raw_message->buffer_capacity = data_length;
_delete_typesupport(tss, ts->typesupport_identifier);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
}

rmw_ret_t
rmw_deserialize(
const rmw_message_raw_t * raw_message,
const rosidl_message_type_support_t * type_support,
void * ros_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer(raw_message->buffer, raw_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _deserialize_ros_message(deser, ros_message, tss, ts->typesupport_identifier);
_delete_typesupport(tss, ts->typesupport_identifier);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
}
} // extern "C"
Loading