Skip to content

Commit

Permalink
Add implementation of matching publisher/subscriber counts (ros2#310)
Browse files Browse the repository at this point in the history
* Add unimplemented versions of pub/sub count.

* Static implementation of pub/sub count.

* Finish connext implementation.

* Apply suggestions from code review

Co-Authored-By: mjcarroll <michael@openrobotics.org>
Signed-off-by: Devin Bonnie <dbbonnie@amazon.com>
  • Loading branch information
mjcarroll authored and dabonnie committed Apr 3, 2019
1 parent b8fb369 commit 3b0ef8b
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,44 @@
#ifndef RMW_CONNEXT_CPP__CONNEXT_STATIC_PUBLISHER_INFO_HPP_
#define RMW_CONNEXT_CPP__CONNEXT_STATIC_PUBLISHER_INFO_HPP_

#include <atomic>

#include "rmw_connext_shared_cpp/types.hpp"

#include "rosidl_typesupport_connext_cpp/message_type_support.h"

class ConnextPublisherListener;

extern "C"
{
struct ConnextStaticPublisherInfo
{
DDSPublisher * dds_publisher_;
ConnextPublisherListener * listener_;
DDSDataWriter * topic_writer_;
const message_type_support_callbacks_t * callbacks_;
rmw_gid_t publisher_gid;
};
} // extern "C"

class ConnextPublisherListener : public DDSPublisherListener
{
public:
virtual void on_publication_matched(
DDSDataWriter * writer,
const DDS_PublicationMatchedStatus & status)
{
(void) writer;
current_count_ = status.current_count;
}

std::size_t current_count() const
{
return current_count_;
}

private:
std::atomic<std::size_t> current_count_;
};

#endif // RMW_CONNEXT_CPP__CONNEXT_STATIC_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,45 @@
#ifndef RMW_CONNEXT_CPP__CONNEXT_STATIC_SUBSCRIBER_INFO_HPP_
#define RMW_CONNEXT_CPP__CONNEXT_STATIC_SUBSCRIBER_INFO_HPP_

#include <atomic>

#include "rmw_connext_shared_cpp/ndds_include.hpp"

#include "rosidl_typesupport_connext_cpp/message_type_support.h"

class ConnextSubscriberListener;

extern "C"
{
struct ConnextStaticSubscriberInfo
{
DDSSubscriber * dds_subscriber_;
ConnextSubscriberListener * listener_;
DDSDataReader * topic_reader_;
DDSReadCondition * read_condition_;
bool ignore_local_publications;
const message_type_support_callbacks_t * callbacks_;
};
} // extern "C"

class ConnextSubscriberListener : public DDSSubscriberListener
{
public:
virtual void on_subscription_matched(
DDSDataReader * reader,
const DDS_SubscriptionMatchedStatus & status)
{
(void) reader;
current_count_ = status.current_count;
}

std::size_t current_count() const
{
return current_count_;
}

private:
std::atomic<std::size_t> current_count_;
};

#endif // RMW_CONNEXT_CPP__CONNEXT_STATIC_SUBSCRIBER_INFO_HPP_
85 changes: 76 additions & 9 deletions rmw_connext_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ rmw_create_publisher(
DDSDataWriter * topic_writer = nullptr;
DDSTopic * topic = nullptr;
DDSTopicDescription * topic_description = nullptr;
void * buf = nullptr;
void * info_buf = nullptr;
void * listener_buf = nullptr;
ConnextPublisherListener * publisher_listener = nullptr;
ConnextStaticPublisherInfo * publisher_info = nullptr;
rmw_publisher_t * publisher = nullptr;
std::string mangled_name = "";
Expand Down Expand Up @@ -141,8 +143,18 @@ rmw_create_publisher(
goto fail;
}

// Allocate memory for the PublisherListener object.
listener_buf = rmw_allocate(sizeof(ConnextPublisherListener));
if (!listener_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher listener");
goto fail;
}
// Use a placement new to construct the PublisherListener in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_listener, listener_buf, goto fail, ConnextPublisherListener, )
listener_buf = nullptr; // Only free the buffer pointer.

dds_publisher = participant->create_publisher(
publisher_qos, NULL, DDS_STATUS_MASK_NONE);
publisher_qos, publisher_listener, DDS_PUBLICATION_MATCHED_STATUS);
if (!dds_publisher) {
RMW_SET_ERROR_MSG("failed to create publisher");
goto fail;
Expand Down Expand Up @@ -188,18 +200,20 @@ rmw_create_publisher(
}

// Allocate memory for the ConnextStaticPublisherInfo object.
buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
info_buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!info_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher info");
goto fail;
}
// Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, )
buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.
RMW_TRY_PLACEMENT_NEW(publisher_info, info_buf, goto fail, ConnextStaticPublisherInfo, )
info_buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.
publisher_info->dds_publisher_ = dds_publisher;
publisher_info->topic_writer_ = topic_writer;
publisher_info->callbacks_ = callbacks;
publisher_info->publisher_gid.implementation_identifier = rti_connext_identifier;
publisher_info->listener_ = publisher_listener;
publisher_listener = nullptr;
static_assert(
sizeof(ConnextPublisherGID) <= RMW_GID_STORAGE_SIZE,
"RMW_GID_STORAGE_SIZE insufficient to store the rmw_connext_cpp GID implemenation."
Expand Down Expand Up @@ -266,18 +280,61 @@ rmw_create_publisher(
(std::cerr << ss.str()).flush();
}
}
if (publisher_listener) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
publisher_listener->~ConnextPublisherListener(), ConnextPublisherListener)
rmw_free(publisher_listener);
}
if (publisher_info) {
if (publisher_info->listener_) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
publisher_info->listener_->~ConnextPublisherListener(), ConnextPublisherListener)
rmw_free(publisher_info->listener_);
}
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
publisher_info->~ConnextStaticPublisherInfo(), ConnextStaticPublisherInfo)
rmw_free(publisher_info);
}
if (buf) {
rmw_free(buf);
if (info_buf) {
rmw_free(info_buf);
}
if (listener_buf) {
rmw_free(listener_buf);
}

return NULL;
}

rmw_ret_t
rmw_publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count)
{
if (!publisher) {
RMW_SET_ERROR_MSG("publisher handle is null");
return RMW_RET_INVALID_ARGUMENT;
}

if (!subscription_count) {
RMW_SET_ERROR_MSG("subscription_count is null");
return RMW_RET_INVALID_ARGUMENT;
}

auto info = static_cast<ConnextStaticPublisherInfo *>(publisher->data);
if (!info) {
RMW_SET_ERROR_MSG("publisher internal data is invalid");
return RMW_RET_ERROR;
}
if (!info->listener_) {
RMW_SET_ERROR_MSG("publisher internal listener is invalid");
return RMW_RET_ERROR;
}

*subscription_count = info->listener_->current_count();

return RMW_RET_OK;
}

rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
Expand Down Expand Up @@ -317,6 +374,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
publisher_info->dds_publisher_->get_instance_handle(), EntityType::Publisher);
node_info->publisher_listener->trigger_graph_guard_condition();
DDSPublisher * dds_publisher = publisher_info->dds_publisher_;

if (dds_publisher) {
if (publisher_info->topic_writer_) {
if (dds_publisher->delete_datawriter(publisher_info->topic_writer_) != DDS_RETCODE_OK) {
Expand All @@ -334,6 +392,15 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
RMW_SET_ERROR_MSG("cannot delete datawriter because the publisher is null");
return RMW_RET_ERROR;
}

ConnextPublisherListener * pub_listener = publisher_info->listener_;
if (pub_listener) {
RMW_TRY_DESTRUCTOR(
pub_listener->~ConnextPublisherListener(),
ConnextPublisherListener, return RMW_RET_ERROR)
rmw_free(pub_listener);
}

RMW_TRY_DESTRUCTOR(
publisher_info->~ConnextStaticPublisherInfo(),
ConnextStaticPublisherInfo, return RMW_RET_ERROR)
Expand Down
73 changes: 65 additions & 8 deletions rmw_connext_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ rmw_create_subscription(
DDSTopicDescription * topic_description = nullptr;
DDSDataReader * topic_reader = nullptr;
DDSReadCondition * read_condition = nullptr;
void * buf = nullptr;
void * info_buf = nullptr;
void * listener_buf = nullptr;
ConnextSubscriberListener * subscriber_listener = nullptr;
ConnextStaticSubscriberInfo * subscriber_info = nullptr;
rmw_subscription_t * subscription = nullptr;
std::string mangled_name;
Expand Down Expand Up @@ -137,8 +139,18 @@ rmw_create_subscription(
goto fail;
}

// Allocate memory for the SubscriberListener object.
listener_buf = rmw_allocate(sizeof(ConnextSubscriberListener));
if (!listener_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory for subscriber listener");
goto fail;
}
// Use a placement new to construct the ConnextSubscriberListener in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_listener, listener_buf, goto fail, ConnextSubscriberListener, )
listener_buf = nullptr; // Only free the buffer pointer.

dds_subscriber = participant->create_subscriber(
subscriber_qos, NULL, DDS_STATUS_MASK_NONE);
subscriber_qos, subscriber_listener, DDS_SUBSCRIPTION_MATCHED_STATUS);
if (!dds_subscriber) {
RMW_SET_ERROR_MSG("failed to create subscriber");
goto fail;
Expand Down Expand Up @@ -192,19 +204,21 @@ rmw_create_subscription(
}

// Allocate memory for the ConnextStaticSubscriberInfo object.
buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!buf) {
info_buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!info_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, )
buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.
RMW_TRY_PLACEMENT_NEW(subscriber_info, info_buf, goto fail, ConnextStaticSubscriberInfo, )
info_buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.
subscriber_info->dds_subscriber_ = dds_subscriber;
subscriber_info->topic_reader_ = topic_reader;
subscriber_info->read_condition_ = read_condition;
subscriber_info->callbacks_ = callbacks;
subscriber_info->ignore_local_publications = ignore_local_publications;
subscriber_info->listener_ = subscriber_listener;
subscriber_listener = nullptr;

subscription->implementation_identifier = rti_connext_identifier;
subscription->data = subscriber_info;
Expand Down Expand Up @@ -272,18 +286,61 @@ rmw_create_subscription(
(std::cerr << ss.str()).flush();
}
}
if (subscriber_listener) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
subscriber_listener->~ConnextSubscriberListener(), ConnextSubscriberListener)
rmw_free(subscriber_listener);
}
if (subscriber_info) {
if (subscriber_info->listener_) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
subscriber_info->listener_->~ConnextSubscriberListener(), ConnextSubscriberListener)
rmw_free(subscriber_info->listener_);
}
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
subscriber_info->~ConnextStaticSubscriberInfo(), ConnextStaticSubscriberInfo)
rmw_free(subscriber_info);
}
if (buf) {
rmw_free(buf);
if (info_buf) {
rmw_free(info_buf);
}
if (listener_buf) {
rmw_free(listener_buf);
}

return NULL;
}

rmw_ret_t
rmw_subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count)
{
if (!subscription) {
RMW_SET_ERROR_MSG("subscription handle is null");
return RMW_RET_INVALID_ARGUMENT;
}

if (!publisher_count) {
RMW_SET_ERROR_MSG("publisher_count is null");
return RMW_RET_INVALID_ARGUMENT;
}

auto info = static_cast<ConnextStaticSubscriberInfo *>(subscription->data);
if (!info) {
RMW_SET_ERROR_MSG("subscriber internal data is invalid");
return RMW_RET_ERROR;
}
if (!info->listener_) {
RMW_SET_ERROR_MSG("subscriber internal listener is invalid");
return RMW_RET_ERROR;
}

*publisher_count = info->listener_->current_count();

return RMW_RET_OK;
}

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
Expand Down

0 comments on commit 3b0ef8b

Please sign in to comment.