Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add compatibility with new IPC implementation #140

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,10 @@ class SubscriptionManager
template<typename MessageT>
void add_subscription(const std::string & topic_name, size_t expected_number_of_messages)
{
auto qos = rclcpp::QoS(rclcpp::KeepAll());
qos.get_rmw_qos_profile().depth = 4;
qos.reliability(RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT);
qos.durability(RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT);
qos.avoid_ros_namespace_conventions(false);
expected_topics_with_size_[topic_name] = expected_number_of_messages;
auto subscription = create_subscription<MessageT>(topic_name);

subscriptions_.push_back(
subscriber_node_->create_subscription<MessageT>(
topic_name,
qos,
[this, topic_name](std::shared_ptr<rmw_serialized_message_t> msg) {
subscribed_messages_[topic_name].push_back(msg);
}));
expected_topics_with_size_[topic_name] = expected_number_of_messages;
subscriptions_.push_back(subscription);
}

template<typename MessageT>
Expand All @@ -77,6 +67,55 @@ class SubscriptionManager
}

private:
template<typename MessageT>
rclcpp::SubscriptionBase::SharedPtr
create_subscription(const std::string & topic_name)
{
using CallbackMessageT = rmw_serialized_message_t;
using AllocT = std::allocator<void>;

auto qos = rclcpp::QoS(rclcpp::KeepAll());
qos.get_rmw_qos_profile().depth = 4;
qos.reliability(RMW_QOS_POLICY_RELIABILITY_SYSTEM_DEFAULT);
qos.durability(RMW_QOS_POLICY_DURABILITY_SYSTEM_DEFAULT);
qos.avoid_ros_namespace_conventions(false);
auto options = rclcpp::SubscriptionOptionsWithAllocator<AllocT>();

auto callback = [this, topic_name](std::shared_ptr<CallbackMessageT> msg) {
subscribed_messages_[topic_name].push_back(msg);
};

auto allocator = std::make_shared<AllocT>();
using rclcpp::AnySubscriptionCallback;
AnySubscriptionCallback<CallbackMessageT, AllocT>
any_subscription_callback(allocator);
any_subscription_callback.set(callback);

using rclcpp::message_memory_strategy::MessageMemoryStrategy;
auto msg_mem_strat =
MessageMemoryStrategy<CallbackMessageT, AllocT>::create_default();

auto subscription =
rclcpp::Subscription<CallbackMessageT, AllocT>::make_shared(
subscriber_node_->get_node_base_interface()->get_shared_rcl_node_handle(),
*rosidl_typesupport_cpp::get_message_type_support_handle<MessageT>(),
topic_name,
options.template to_rcl_subscription_options<MessageT>(qos),
any_subscription_callback,
options.event_callbacks,
msg_mem_strat);

// Intra-process message handling is not supported.
bool use_intra_process = false;
subscriber_node_->get_node_topics_interface()->add_subscription(
subscription,
use_intra_process,
nullptr);

auto sub_base_ptr = std::dynamic_pointer_cast<rclcpp::SubscriptionBase>(subscription);
return sub_base_ptr;
}

bool continue_spinning(std::unordered_map<std::string, size_t> expected_topics_with_sizes)
{
for (const auto & topic_expected : expected_topics_with_sizes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,6 @@ void GenericSubscription::return_serialized_message(
message.reset();
}

void GenericSubscription::handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info)
{
(void) ipm;
(void) message_info;
throw std::runtime_error("Intra process is not supported");
}

const std::shared_ptr<rcl_subscription_t>
GenericSubscription::get_intra_process_subscription_handle() const
{
return nullptr;
}

std::shared_ptr<rmw_serialized_message_t>
GenericSubscription::borrow_serialized_message(size_t capacity)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,6 @@ class GenericSubscription : public rclcpp::SubscriptionBase

void return_serialized_message(std::shared_ptr<rmw_serialized_message_t> & message) override;

// Intra-process message handling is not supported by this publisher
void handle_intra_process_message(
rcl_interfaces::msg::IntraProcessMessage & ipm,
const rmw_message_info_t & message_info) override;

// Intra-process message handling is not supported by this publisher (returns nullptr always)
const std::shared_ptr<rcl_subscription_t>
get_intra_process_subscription_handle() const override;

private:
RCLCPP_DISABLE_COPY(GenericSubscription)

Expand Down
4 changes: 3 additions & 1 deletion rosbag2_transport/src/rosbag2_transport/rosbag2_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ std::shared_ptr<GenericSubscription> Rosbag2Node::create_generic_subscription(
topic,
callback);

get_node_topics_interface()->add_subscription(subscription, nullptr);
// Intra-process message handling is not supported.
bool use_intra_process = false;
get_node_topics_interface()->add_subscription(subscription, use_intra_process, nullptr);
} catch (const std::runtime_error & ex) {
ROSBAG2_TRANSPORT_LOG_ERROR_STREAM(
"Error subscribing to topic '" << topic << "'. Error: " << ex.what());
Expand Down