Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce transform listener nodes #442

Merged
merged 3 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 33 additions & 24 deletions tf2_ros/include/tf2_ros/transform_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ class TransformListener
: buffer_(buffer)
{
init(node, spin_thread, qos, static_qos, options, static_options);
node_logging_interface_ = node->get_node_logging_interface();
}

TF2_ROS_PUBLIC
Expand All @@ -117,6 +116,8 @@ class TransformListener
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & static_options)
{
spin_thread_ = spin_thread;
node_base_interface_ = node->get_node_base_interface();
node_logging_interface_ = node->get_node_logging_interface();

using callback_t = std::function<void (tf2_msgs::msg::TFMessage::SharedPtr)>;
Expand All @@ -125,43 +126,51 @@ class TransformListener
callback_t static_cb = std::bind(
&TransformListener::subscription_callback, this, std::placeholders::_1, true);

message_subscription_tf_ = rclcpp::create_subscription<tf2_msgs::msg::TFMessage>(
node,
"/tf",
qos,
std::move(cb),
options);
message_subscription_tf_static_ = rclcpp::create_subscription<tf2_msgs::msg::TFMessage>(
node,
"/tf_static",
static_qos,
std::move(static_cb),
static_options);

if (spin_thread) {
initThread(node->get_node_base_interface());
if (spin_thread_) {
// Create new callback group for message_subscription of tf and tf_static
callback_group_ = node_base_interface_->create_callback_group(
rclcpp::CallbackGroupType::MutuallyExclusive, false);
// Duplicate to modify option of subscription
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> tf_options = options;
rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> tf_static_options = static_options;
tf_options.callback_group = callback_group_;
tf_static_options.callback_group = callback_group_;

message_subscription_tf_ = rclcpp::create_subscription<tf2_msgs::msg::TFMessage>(
node, "/tf", qos, std::move(cb), tf_options);
message_subscription_tf_static_ = rclcpp::create_subscription<tf2_msgs::msg::TFMessage>(
node, "/tf_static", static_qos, std::move(static_cb), tf_static_options);

// Create executor with dedicated thread to spin.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A space here might be nice for a logical break between creating the subscribers and then setting up the executor

executor_ = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
executor_->add_callback_group(callback_group_, node_base_interface_);
Copy link
Member

@jacobperron jacobperron Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this broke an existing use-case where we expect the node to have other callbacks processed, not just the the two subscriptions added here. E.g. previously I could write something like this:

#include <rclcpp/rclcpp.hpp>
#include <tf2_ros/buffer.h>
#include <tf2_ros/transform_listener.h>
#include <std_msgs/msg/string.hpp>

#include <iostream>
#include <memory>
#include <chrono>
#include <thread>

int main(int argc, char ** argv)
{
  rclcpp::init(argc, argv);

  auto node = std::make_shared<rclcpp::Node>("foo_node");

  auto callback = [](const std_msgs::msg::String & msg) {
    std::cerr << "Message received: " << msg.data << std::endl;
  };  
  auto sub = node->create_subscription<std_msgs::msg::String>("foo", 1, callback);

  tf2_ros::Buffer buffer(node->get_clock());
  tf2_ros::TransformListener listener(buffer, node, true);

  while (rclcpp::ok()) {
    // rclcpp::spin_some(node);
    std::this_thread::sleep_for(std::chrono::seconds(1));
  }
  rclcpp::shutdown();

  return 0;
}

Maybe assuming callbacks will be handled by the transform listener is a bad assumption, but the signature where we provide our own Node (and we set spin_thread=true) seems ill-defined.

Copy link
Contributor

@SteveMacenski SteveMacenski Nov 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my_node is never spun in this example -- how could the subscription receive messages? The spin_thread is w.r.t. the TF callbacks to isolate them from the rest of the system, its not there to process your own callbacks in your own application. That was the intent of this PR to make TF2's system decoupled from the application code. You should spin_some() in your while loop to resolve this issue (which you should have been doing before this PR anyway?). This PR is creating an executor / callback group for the TF internal needs to be processed, so that's not connected to your application subscriber using the default executor that you haven't called in the application example. 😄

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong about what I think is happening; I will report back with a complete example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the code in my comment to be a compilable example. It appears that I can get the example to work if I uncomment the rclcpp::spin_some() call in the while-loop. IIRC, this would result in a runtime error previously. Should it be the new recommended approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes -- before I think it was a design error to have it spin for the user application too, that seems like a breakage in separation of concerns to have TF2 processing your execution model for your application. I would have assumed that it did what it is now doing and just spin a thread for TF2 to process its information internally. This was made because we want TF2 to process its own info and not be blocked or blocking other application callbacks & reducing the number of Node objects in complex stacks like Nav2.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it makes sense to not have users rely on the TransformListener executor, however the current change silently breaks anyone who was relying on this behavior (it took me a while to figure out why my callbacks were not being triggered). It would be nice if we could detect this scenario and warn users. At the very least, I think we should add a release note for ROS Humble.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not seem straight forward to add a warning, but I've added a release note in ros2/ros2_documentation#2127

dedicated_listener_thread_ = std::make_unique<std::thread>([&]() {executor_->spin();});
// Tell the buffer we have a dedicated thread to enable timeouts
buffer_.setUsingDedicatedThread(true);
} else {
message_subscription_tf_ = rclcpp::create_subscription<tf2_msgs::msg::TFMessage>(
node, "/tf", qos, std::move(cb), options);
message_subscription_tf_static_ = rclcpp::create_subscription<tf2_msgs::msg::TFMessage>(
node, "/tf_static", static_qos, std::move(static_cb), static_options);
}
}

TF2_ROS_PUBLIC
void initThread(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface);

/// Callback function for ros message subscriptoin
TF2_ROS_PUBLIC
void subscription_callback(tf2_msgs::msg::TFMessage::SharedPtr msg, bool is_static);

// ros::CallbackQueue tf_message_callback_queue_;
using thread_ptr =
std::unique_ptr<std::thread, std::function<void (std::thread *)>>;
thread_ptr dedicated_listener_thread_;
bool spin_thread_{false};
std::unique_ptr<std::thread> dedicated_listener_thread_;
rclcpp::CallbackGroup::SharedPtr callback_group_{nullptr};
rclcpp::executors::SingleThreadedExecutor::SharedPtr executor_;

rclcpp::Node::SharedPtr optional_default_node_ = nullptr;
rclcpp::Subscription<tf2_msgs::msg::TFMessage>::SharedPtr message_subscription_tf_;
rclcpp::Subscription<tf2_msgs::msg::TFMessage>::SharedPtr message_subscription_tf_static_;
tf2::BufferCore & buffer_;
tf2::TimePoint last_update_;
rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_interface_;
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface_;
};
} // namespace tf2_ros

Expand Down
31 changes: 4 additions & 27 deletions tf2_ros/src/transform_listener.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,35 +60,12 @@ TransformListener::TransformListener(tf2::BufferCore & buffer, bool spin_thread)

TransformListener::~TransformListener()
{
if (spin_thread_) {
executor_->cancel();
dedicated_listener_thread_->join();
}
}

void TransformListener::initThread(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface)
{
auto executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();

// This lambda is required because `std::thread` cannot infer the correct
// rclcpp::spin, since there are more than one versions of it (overloaded).
// see: http://stackoverflow.com/a/27389714/671658
// I (wjwwood) chose to use the lamda rather than the static cast solution.
auto run_func =
[executor](rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base_interface) {
executor->add_node(node_base_interface);
executor->spin();
executor->remove_node(node_base_interface);
};
dedicated_listener_thread_ = thread_ptr(
new std::thread(run_func, node_base_interface),
[executor](std::thread * t) {
executor->cancel();
t->join();
delete t;
// TODO(tfoote) reenable callback queue processing
// tf_message_callback_queue_.callAvailable(ros::WallDuration(0.01));
});
// Tell the buffer we have a dedicated thread to enable timeouts
buffer_.setUsingDedicatedThread(true);
}

void TransformListener::subscription_callback(
const tf2_msgs::msg::TFMessage::SharedPtr msg,
Expand Down