Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose raw CDR stream for publish and subscribe #186

Merged
merged 15 commits into from
Jun 16, 2018
1 change: 1 addition & 0 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add_library(rmw_fastrtps_cpp
src/rmw_publisher.cpp
src/rmw_request.cpp
src/rmw_response.cpp
src/rmw_serialize.cpp
src/rmw_service.cpp
src/rmw_service_names_and_types.cpp
src/rmw_service_server_is_available.cpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class TypeSupport : public eprosima::fastrtps::TopicDataType
public:
bool serializeROSmessage(const void * ros_message, eprosima::fastcdr::Cdr & ser);

bool deserializeROSmessage(eprosima::fastcdr::FastBuffer * data, void * ros_message);
bool deserializeROSmessage(eprosima::fastcdr::Cdr & deser, void * ros_message);

bool serialize(void * data, eprosima::fastrtps::rtps::SerializedPayload_t * payload);

Expand Down
10 changes: 4 additions & 6 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/TypeSupport_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -734,14 +734,10 @@ bool TypeSupport<MembersType>::serializeROSmessage(

template<typename MembersType>
bool TypeSupport<MembersType>::deserializeROSmessage(
eprosima::fastcdr::FastBuffer * buffer, void * ros_message)
eprosima::fastcdr::Cdr & deser, void * ros_message)
{
assert(buffer);
assert(ros_message);

eprosima::fastcdr::Cdr deser(*buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

// Deserialize encapsulation.
deser.read_encapsulation();

Expand Down Expand Up @@ -790,7 +786,9 @@ bool TypeSupport<MembersType>::deserialize(
assert(payload);

auto buffer = static_cast<eprosima::fastcdr::FastBuffer *>(data);
buffer->resize(payload->length);
if (!buffer->reserve(payload->length)) {
return false;
}
memcpy(buffer->getBuffer(), payload->data, payload->length);
return true;
}
Expand Down
67 changes: 54 additions & 13 deletions rmw_fastrtps_cpp/src/rmw_publish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,82 @@

#include "rmw_fastrtps_cpp/custom_publisher_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"
#include "ros_message_serialization.hpp"
#include "rmw_fastrtps_cpp/macros.hpp"

#include "./ros_message_serialization.hpp"

extern "C"
{
rmw_ret_t
rmw_publish(const rmw_publisher_t * publisher, const void * ros_message)
{
assert(publisher);
assert(ros_message);
rmw_ret_t returnedValue = RMW_RET_ERROR;
auto error_allocator = rcutils_get_default_allocator();
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
publisher, "publisher pointer is null", return RMW_RET_ERROR, error_allocator);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
ros_message, "ros_message pointer is null", return RMW_RET_ERROR, error_allocator);

if (publisher->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

auto info = static_cast<CustomPublisherInfo *>(publisher->data);
assert(info);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (_serialize_ros_message(ros_message, ser, info->type_support_,
if (!_serialize_ros_message(ros_message, ser, info->type_support_,
info->typesupport_identifier_))
{
if (info->publisher_->write(&ser)) {
returnedValue = RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("cannot publish data");
}
} else {
RMW_SET_ERROR_MSG("cannot serialize data");
return RMW_RET_ERROR;
}
if (!info->publisher_->write(&ser)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

rmw_ret_t
rmw_publish_serialized_message(
const rmw_publisher_t * publisher, const rmw_serialized_message_t * serialized_message)
{
auto error_allocator = rcutils_get_default_allocator();
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
publisher, "publisher pointer is null", return RMW_RET_ERROR, error_allocator);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
serialized_message, "serialized_message pointer is null",
return RMW_RET_ERROR, error_allocator);

if (publisher->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("publisher handle not from this implementation");
return RMW_RET_ERROR;
}

auto info = static_cast<CustomPublisherInfo *>(publisher->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(
info, "publisher info pointer is null", return RMW_RET_ERROR, error_allocator);

eprosima::fastcdr::FastBuffer buffer(
serialized_message->buffer, serialized_message->buffer_length);
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);
if (!ser.jump(serialized_message->buffer_length)) {
RMW_SET_ERROR_MSG("cannot correctly set serialized buffer");
return RMW_RET_ERROR;
}

if (!info->publisher_->write(&ser)) {
RMW_SET_ERROR_MSG("cannot publish data");
return RMW_RET_ERROR;
}

return returnedValue;
return RMW_RET_OK;
}
} // extern "C"
4 changes: 3 additions & 1 deletion rmw_fastrtps_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ rmw_take_request(
assert(info);

CustomServiceRequest request = info->listener_->getRequest();
eprosima::fastcdr::Cdr deser(*request.buffer_, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

if (request.buffer_ != nullptr) {
_deserialize_ros_message(request.buffer_, ros_request, info->request_type_support_,
_deserialize_ros_message(deser, ros_request, info->request_type_support_,
info->typesupport_identifier_);

// Get header
Expand Down
8 changes: 6 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,12 @@ rmw_take_response(
CustomClientResponse response;

if (info->listener_->getResponse(response)) {
_deserialize_ros_message(response.buffer_.get(), ros_response, info->response_type_support_,
info->typesupport_identifier_);
eprosima::fastcdr::Cdr deser(
*response.buffer_,
eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);
_deserialize_ros_message(
deser, ros_response, info->response_type_support_, info->typesupport_identifier_);

request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;
Expand Down
90 changes: 90 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_serialize.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "fastcdr/FastBuffer.h"

#include "rmw/error_handling.h"
#include "rmw/serialized_message.h"
#include "rmw/rmw.h"

#include "./type_support_common.hpp"
#include "./ros_message_serialization.hpp"

extern "C"
{
rmw_ret_t
rmw_serialize(
const void * ros_message,
const rosidl_message_type_support_t * type_support,
rmw_serialized_message_t * serialized_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer;
eprosima::fastcdr::Cdr ser(
buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN, eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _serialize_ros_message(ros_message, ser, tss, ts->typesupport_identifier);
auto data_length = static_cast<size_t>(ser.getSerializedDataLength());
if (serialized_message->buffer_capacity < data_length) {
if (rmw_serialized_message_resize(serialized_message, data_length) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("unable to dynamically resize serialized message");
return RMW_RET_ERROR;
}
}
memcpy(serialized_message->buffer, ser.getBufferPointer(), data_length);
serialized_message->buffer_length = data_length;
serialized_message->buffer_capacity = data_length;
_delete_typesupport(tss, ts->typesupport_identifier);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
}

rmw_ret_t
rmw_deserialize(
const rmw_serialized_message_t * serialized_message,
const rosidl_message_type_support_t * type_support,
void * ros_message)
{
const rosidl_message_type_support_t * ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_c__identifier);
if (!ts) {
ts = get_message_typesupport_handle(
type_support, rosidl_typesupport_introspection_cpp::typesupport_identifier);
if (!ts) {
RMW_SET_ERROR_MSG("type support not from this implementation");
return RMW_RET_ERROR;
}
}

auto tss = _create_message_type_support(ts->data, ts->typesupport_identifier);
eprosima::fastcdr::FastBuffer buffer(
serialized_message->buffer, serialized_message->buffer_length);
eprosima::fastcdr::Cdr deser(buffer, eprosima::fastcdr::Cdr::DEFAULT_ENDIAN,
eprosima::fastcdr::Cdr::DDS_CDR);

auto ret = _deserialize_ros_message(deser, ros_message, tss, ts->typesupport_identifier);
_delete_typesupport(tss, ts->typesupport_identifier);
return ret == true ? RMW_RET_OK : RMW_RET_ERROR;
}
} // extern "C"
5 changes: 3 additions & 2 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@
#include "fastrtps/participant/Participant.h"
#include "fastrtps/subscriber/Subscriber.h"

#include "rmw_fastrtps_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_cpp/custom_subscriber_info.hpp"
#include "rmw_fastrtps_cpp/identifier.hpp"

#include "namespace_prefix.hpp"
#include "qos.hpp"
#include "rmw_fastrtps_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_cpp/custom_subscriber_info.hpp"
#include "type_support_common.hpp"

using Domain = eprosima::fastrtps::Domain;
Expand Down
Loading