From 7f8c9d33dba6fac60a95b23c28b97060087ec810 Mon Sep 17 00:00:00 2001 From: Simon Hoinkis Date: Thu, 4 Jan 2024 12:50:26 +0100 Subject: [PATCH] Implement 'rmw_subscription_set_on_new_message_callback' (#88) --- rmw_iceoryx_cpp/src/rmw_subscription.cpp | 47 +++++++++++++++++-- .../src/types/iceoryx_subscription.hpp | 31 ++++++++++-- 2 files changed, 69 insertions(+), 9 deletions(-) diff --git a/rmw_iceoryx_cpp/src/rmw_subscription.cpp b/rmw_iceoryx_cpp/src/rmw_subscription.cpp index 6c47018..cbd4adb 100644 --- a/rmw_iceoryx_cpp/src/rmw_subscription.cpp +++ b/rmw_iceoryx_cpp/src/rmw_subscription.cpp @@ -16,6 +16,7 @@ #include #include "iceoryx_posh/capro/service_description.hpp" +#include "iceoryx_posh/popo/listener.hpp" #include "rcutils/error_handling.h" @@ -188,11 +189,49 @@ rmw_ret_t rmw_subscription_set_on_new_message_callback( rmw_event_callback_t callback, const void * user_data) { - RCUTILS_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); - RCUTILS_CHECK_ARGUMENT_FOR_NULL(callback, RMW_RET_INVALID_ARGUMENT); - RCUTILS_CHECK_ARGUMENT_FOR_NULL(user_data, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT); - return RMW_RET_UNSUPPORTED; + auto iceoryx_subscription = static_cast(subscription->data); + if (!iceoryx_subscription) { + RMW_SET_ERROR_MSG("subscription data is null"); + return RMW_RET_ERROR; + } + + auto iceoryx_receiver = iceoryx_subscription->iceoryx_receiver_; + if (!iceoryx_receiver) { + RMW_SET_ERROR_MSG("iceoryx_receiver is null"); + return RMW_RET_ERROR; + } + const std::lock_guard lock(iceoryx_subscription->mutex_); + rmw_ret_t ret = RMW_RET_ERROR; + + if (callback == nullptr) { + iceoryx_subscription->listener_.detachEvent( + *(iceoryx_subscription->iceoryx_receiver_), + iox::popo::SubscriberEvent::DATA_RECEIVED); + ret = RMW_RET_OK; + return ret; + } + + iceoryx_subscription->user_callback_ = callback; + iceoryx_subscription->user_data_ = user_data; + iceoryx_subscription->listener_ + .attachEvent( + *(iceoryx_subscription->iceoryx_receiver_), + iox::popo::SubscriberEvent::DATA_RECEIVED, + iox::popo::createNotificationCallback( + IceoryxSubscription::onSampleReceivedCallback, + *iceoryx_subscription)) + .or_else( + [&](auto) { + RMW_SET_ERROR_MSG( + "rmw_subscription_get_content_filter: Unable to attach subscriber to listener"); + ret = RMW_RET_ERROR; + }); + + ret = RMW_RET_OK; + + return ret; } rmw_ret_t diff --git a/rmw_iceoryx_cpp/src/types/iceoryx_subscription.hpp b/rmw_iceoryx_cpp/src/types/iceoryx_subscription.hpp index 9ff2add..d28d68a 100644 --- a/rmw_iceoryx_cpp/src/types/iceoryx_subscription.hpp +++ b/rmw_iceoryx_cpp/src/types/iceoryx_subscription.hpp @@ -15,8 +15,12 @@ #ifndef TYPES__ICEORYX_SUBSCRIPTION_HPP_ #define TYPES__ICEORYX_SUBSCRIPTION_HPP_ +#include + #include "iceoryx_posh/popo/untyped_subscriber.hpp" +#include "iceoryx_posh/popo/listener.hpp" +#include "rmw/error_handling.h" #include "rmw/rmw.h" #include "rmw/types.h" @@ -31,12 +35,29 @@ struct IceoryxSubscription iceoryx_receiver_(iceoryx_receiver), is_fixed_size_(rmw_iceoryx_cpp::iceoryx_is_fixed_size(type_supports)), message_size_(rmw_iceoryx_cpp::iceoryx_get_message_size(type_supports)) - {} + { + } rosidl_message_type_support_t type_supports_; - iox::popo::UntypedSubscriber * const iceoryx_receiver_; - bool is_fixed_size_; - size_t message_size_; -}; + iox::popo::UntypedSubscriber * const iceoryx_receiver_{nullptr}; + bool is_fixed_size_{false}; + size_t message_size_{0}; + std::mutex mutex_; + /// TODO Why not having one listener for all subscriptions? + iox::popo::Listener listener_; + rmw_event_callback_t user_callback_{nullptr}; + const void * user_data_{nullptr}; + static void onSampleReceivedCallback(iox::popo::UntypedSubscriber *, IceoryxSubscription * self) + { + /// TODO This lock isn't needed, the listener calls are sequential, right? + const std::lock_guard lock(self->mutex_); + if (self == nullptr) { + RMW_SET_ERROR_MSG("onSampleReceivedCallback: Invalid arguments"); + return; + } + + self->user_callback_(self->user_data_, 1); + } +}; #endif // TYPES__ICEORYX_SUBSCRIPTION_HPP_