Skip to content

Commit

Permalink
Fix leak if client reponse is never taken (#201)
Browse files Browse the repository at this point in the history
* Fix leak if buffer is never taken
  • Loading branch information
sloretz authored Jun 1, 2018
1 parent 3700d75 commit bc1ff87
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 29 deletions.
47 changes: 23 additions & 24 deletions rmw_fastrtps_cpp/include/rmw_fastrtps_cpp/custom_client_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include <atomic>
#include <list>
#include <memory>
#include <utility>

#include "fastcdr/FastBuffer.h"

Expand All @@ -43,10 +45,7 @@ typedef struct CustomClientInfo
typedef struct CustomClientResponse
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
eprosima::fastcdr::FastBuffer * buffer_;

CustomClientResponse()
: buffer_(nullptr) {}
std::unique_ptr<eprosima::fastcdr::FastBuffer> buffer_;
} CustomClientResponse;

class ClientListener : public eprosima::fastrtps::SubscriberListener
Expand All @@ -63,10 +62,11 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener
assert(sub);

CustomClientResponse response;
response.buffer_ = new eprosima::fastcdr::FastBuffer();
// Todo(sloretz) eliminate heap allocation pending eprosima/Fast-CDR#19
response.buffer_.reset(new eprosima::fastcdr::FastBuffer());
eprosima::fastrtps::SampleInfo_t sinfo;

if (sub->takeNextData(response.buffer_, &sinfo)) {
if (sub->takeNextData(response.buffer_.get(), &sinfo)) {
if (eprosima::fastrtps::rtps::ALIVE == sinfo.sampleKind) {
response.sample_identity_ = sinfo.related_sample_identity;

Expand All @@ -75,44 +75,43 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
list.push_back(response);
list.emplace_back(std::move(response));
// the change to list_has_data_ needs to be mutually exclusive with
// rmw_wait() which checks hasData() and decides if wait() needs to
// be called
list_has_data_.store(true);
clock.unlock();
conditionVariable_->notify_one();
} else {
list.push_back(response);
list.emplace_back(std::move(response));
list_has_data_.store(true);
}
}
}
}
}

CustomClientResponse
getResponse()
bool
getResponse(CustomClientResponse & response)
{
std::lock_guard<std::mutex> lock(internalMutex_);
CustomClientResponse response;

auto pop_response = [this](CustomClientResponse & response) -> bool
{
if (!list.empty()) {
response = std::move(list.front());
list.pop_front();
list_has_data_.store(!list.empty());
return true;
}
return false;
};

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
if (!list.empty()) {
response = list.front();
list.pop_front();
list_has_data_.store(!list.empty());
}
} else {
if (!list.empty()) {
response = list.front();
list.pop_front();
list_has_data_.store(!list.empty());
}
return pop_response(response);
}

return response;
return pop_response(response);
}

void
Expand Down
8 changes: 3 additions & 5 deletions rmw_fastrtps_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ rmw_take_response(
auto info = static_cast<CustomClientInfo *>(client->data);
assert(info);

CustomClientResponse response = info->listener_->getResponse();
CustomClientResponse response;

if (response.buffer_ != nullptr) {
_deserialize_ros_message(response.buffer_, ros_response, info->response_type_support_,
if (info->listener_->getResponse(response)) {
_deserialize_ros_message(response.buffer_.get(), ros_response, info->response_type_support_,
info->typesupport_identifier_);

request_header->sequence_number = ((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;

delete response.buffer_;

*taken = true;
}

Expand Down

0 comments on commit bc1ff87

Please sign in to comment.